On this page
rocket_launch
快速开始
关于如何启动和运行pipeline的指南。
安装
go get github.com/symphony09/ograph
演示
func TestDemo(t *testing.T) {
pipeline := ograph.NewPipeline()
n1 := ograph.NewElement("N1").UseFn(func() error {
fmt.Println("N1 running")
return nil
})
n2 := ograph.NewElement("N2").UseFn(func() error {
fmt.Println("N2 running")
return nil
})
pipeline.Register(n1).
Register(n2, ograph.Rely(n1))
if err := pipeline.Run(context.TODO(), nil); err != nil {
t.Error(err)
}
}
如代码的第13到14行所示,有两个节点被注册到了管道中,并且N2依赖于N1。
--- title: Demo pipeline --- flowchart LR A[Start] --> N1 N1((N1)) --> N2((N2)) N2 --> B[End]
输出
N1 running
N2 running
循环执行
--- title: Demo pipeline 2 --- flowchart LR A[Start] --> N1 N1((N1)) --> N2((N2)) N1 -- 3 --> N1 N2 --> B[End]
循环通过 Wrapper 实现, 还可以用来实现条件执行, 超时控制, 错误重试等功能.
选择执行
--- title: Demo pipeline 3 --- flowchart LR subgraph C1 direction LR C1_Start --> N1((N1)) N1 --> C1_End C1_Start --x N2((N2)) N2 --x C1_End end A[Start] --> C1 C1 --> B[End]
选择执行通过 Cluster 实现, 其他还有竞争执行等也是通过 Cluster
实现.
使用节点及工厂
之前的演示展示了最简单的使用方法。进一步地,你可以使用节点或工厂作为管道元素来执行逻辑。
声明一个节点
type Printer struct {
ograph.BaseNode // 继承基础节点。
}
func (printer *Printer) Run(ctx context.Context, state ogcore.State) error {
// 继承基础节点的Name方法。
// 返回在管道元素中设置的名字(例如 NewElement("N1"))。
fmt.Printf("[%s running]\n", printer.Name()) // [N1 running]
// 通过键从状态中获取数据。
fmt.Println(state.Get("key"))
return nil
}
代码涉及使用State
,请参阅State以获取相关文档。
在元素中使用节点
n1 := ograph.NewElement("N1").UseNode(&Printer{})
使用UseFn
和UseNode
本质上是单例模式,建议在OGraph中使用工厂模式。更多详情请参阅Factory。
声明一个工厂
pipeline.RegisterFactory("PrinterFactory", func() ogcore.Node {
return &Printer{}
})
在元素中使用工厂
n1 := ograph.NewElement("N1").UseFactory("PrinterFactory")
n2 := ograph.NewElement("N2").UseFactory("PrinterFactory")
Last updated 09 3月 2025, 23:31 +0800 .