Install

  go get github.com/symphony09/ograph
  

Demo

func TestDemo(t *testing.T) {
	pipeline := ograph.NewPipeline()

	n1 := ograph.NewElement("N1").UseFn(func() error {
		fmt.Println("N1 running")
		return nil
	})
	n2 := ograph.NewElement("N2").UseFn(func() error {
		fmt.Println("N2 running")
		return nil
	})

	pipeline.Register(n1).
		Register(n2, ograph.Rely(n1))

	if err := pipeline.Run(context.TODO(), nil); err != nil {
		t.Error(err)
	}
}

As shown in lines 13 to 14 of the code, two nodes are registered in the pipeline, and N2 is dependent on N1.

---
title: Demo pipeline
---

flowchart LR
	A[Start] --> N1
    N1((N1)) --> N2((N2))
	N2 --> B[End]

Output

  N1 running
N2 running
  

Loop

---
title: Demo pipeline 2
---

flowchart LR
	A[Start] --> N1
    N1((N1)) --> N2((N2))
	N1 -- 3 --> N1
	N2 --> B[End]

The loop is executed through the Wrapper, which implements functions such as conditional execution, timeout control, and error retry.

Choose

---
title: Demo pipeline 3
---

flowchart LR
	subgraph C1
	direction LR

	C1_Start --> N1((N1))
	N1 --> C1_End

	C1_Start --x N2((N2))
	N2 --x C1_End
    end

	A[Start] --> C1
	C1 --> B[End]

This function is implemented through Cluster, and other functions such as race execution are also implemented through Cluster.

Use Node & Factory

The previous demo showed the simplest usage method. Further, you can use nodes or factories as pipeline elements to execute logic.

Declare a node

  type Printer struct {
	ograph.BaseNode //  Inherit the base node.
}

func (printer *Printer) Run(ctx context.Context, state ogcore.State) error {
	// Inherit the Name method from the base node.
	// Return the name set in the pipeline element (NewElement("N1")).
	fmt.Printf("[%s running]\n", printer.Name()) // [N1 running]

	// Get data from state by key.
	fmt.Println(state.Get("key"))
	return nil
}
  

The code involves the use of State, please refer to State for related documentation.

Use node in element

  n1 := ograph.NewElement("N1").UseNode(&Printer{})
  

Using UseFn and UseNode is essentially a singleton pattern, and it is recommended to use the factory pattern in OGraph. More details is here.

Declare a factory

  pipeline.RegisterFactory("PrinterFactory", func() ogcore.Node {
		return &Printer{}
	})
  

Use factory in element

  n1 := ograph.NewElement("N1").UseFactory("PrinterFactory")
n2 := ograph.NewElement("N2").UseFactory("PrinterFactory")
  

Last updated 03 Mar 2025, 00:07 +0800 . history