Wrapper
A guide to the wrapper feature.
Wrapper is a very important concept in ograph, which can dynamically add functionality to nodes.
Currently, many important features are implemented through built-in wrappers. For example, loop execution, timeout control, error retry, and so on. You can also customize wrappers to replace the built-in implementations or implement personalized requirements.
The diagram below illustrates how wrappers work.
sequenceDiagram Pipeline->>Wrapper: Scheduling Wrapper-->>Wrapper: Do something Wrapper-->>Inner node: Exec Node.Run Inner node-->>Wrapper: Done Wrapper-->>Wrapper: Do something Wrapper->>Pipeline: Done
If you are still confused, don’t worry, you can continue reading to learn about how to use built-in wrappers and customize wrappers.
Built-in wrapper
ograph includes many useful wrappers built-in. To use them is very simple, here take retry wrapper as an example, explain how to use the built-in wrappers.
For more specific documentation, please refer to the reference.
func TestWrapper_Retry(t *testing.T) {
var failedCnt int
learningToWalk := func() error {
if failedCnt < 3 {
failedCnt++
return errors.New("fallen")
}
fmt.Println("Wow!")
return nil
}
pipeline := ograph.NewPipeline()
baby := ograph.NewElement("Baby").UseFn(learningToWalk).Apply(ogimpl.RetryOp(99))
// equals to Wrap(ogimpl.Retry).Params("MaxRetryTimes", 99)
pipeline.Register(baby)
if err := pipeline.Run(context.TODO(), nil); err != nil {
t.Error(err)
}
}
Output
2025/02/24 23:43:31 WARN retry failed node NodeName=Baby Error=fallen
2025/02/24 23:43:31 WARN retry failed node NodeName=Baby Error=fallen
2025/02/24 23:43:31 WARN retry failed node NodeName=Baby Error=fallen
Wow!
The most important is Apply(ogimpl.RetryOp(99))
, which means adding retry functionality to the node and the maximum number of retries is 99.
This is equivalent to the general write method Wrap(ogimpl.Retry).Params("MaxRetryTimes", 99)
. The former is a more convenient write method.
Custom wrapper
Define a wrapper
type CustomWrapper struct {
ograph.BaseWrapper
}
func (wrapper *CustomWrapper) Run(ctx context.Context, state ogcore.State) error {
fmt.Println("Before node start") // you can do something here
wrapper.Node.Run(ctx, state)
fmt.Println("After node finish") // you can do something here
return nil
}
func NewCustomWrapper() ogcore.Node {
return &CustomWrapper{}
}
Define a wrapper like defining a node, but it must implement the Wrap(node ogcore.Node)
method. The example inherits from ograph.BaseWrapper
, so you don’t need to implement it yourself.
Use in the pipeline
func TestWrapper_Customize(t *testing.T) {
pipeline := ograph.NewPipeline()
pipeline.RegisterFactory("MyWrapper", NewCustomWrapper)
innerNode := ograph.NewElement("InnerNode").
UseFn(func() error {
fmt.Println("Inner node running")
return nil
}).
Wrap("MyWrapper")
pipeline.Register(innerNode)
if err := pipeline.Run(context.TODO(), nil); err != nil {
t.Error(err)
}
}
- Register factory of wrapper.
- Wrap inner node by
Wrap("MyWrapper")
.
Output
Before node start
InnerNode failed.
After node finish
Combine multiple wrappers
You can wrap multiple wrappers around the same node. For example:
ograph.NewElement("InnerNode").
Apply(ogimpl.DelayOp(time.Second), ogimpl.LoopOp(3)).
Wrap(ogimpl.Debug).Wrap("MyWrapper")
Last updated 25 Feb 2025, 00:29 +0800 .