On this page
article
并行合并排序
样例说明
一个 OGraph 的主要使用场景是并行计算加速。以合并排序为例,常见的合并排序算法没有利用多核 CPU 并行计算。
在合并排序的过程,需要对每个子序列进行排序,如果每个子序列的排序可以同时进行,那么就极大的的缩短计算时间。
在这种情况下,OGraph 可以让计算任务并行化,并且不让代码变成一团乱麻。
样例代码
以下代码实现了并行化的合并排序,代码中首先定义了一个合并函数用于合并两个已经排序完成的序列。
然后定义了一些排序任务,用于对子序列进行排序,并用合并函数合并到最终结果中。还定义了一个检查任务,用于对最终结果进行检验。
最后新建一个pipeline,注册这些任务并执行。
(为了代码简洁,对子序列的排序直接使用了标准库,而不是像常规合并排序一样递归地使用合并函数。)
package main
import (
"context"
"fmt"
"log"
"math/rand"
"slices"
"sync"
"github.com/symphony09/ograph"
)
func merge(left, right []int) []int {
result := make([]int, 0, len(left)+len(right))
for len(left) > 0 || len(right) > 0 {
if len(left) == 0 {
return append(result, right...)
}
if len(right) == 0 {
return append(result, left...)
}
if left[0] < right[0] {
result = append(result, left[0])
left = left[1:]
} else {
result = append(result, right[0])
right = right[1:]
}
}
return result
}
func main() {
size := 1000
randomInts := make([]int, 0, size)
sortedInts := make([]int, 0, size)
mux := &sync.Mutex{}
for i := 0; i < size; i++ {
randomInts = append(randomInts, rand.Intn(10000))
}
sortTasks := make([]*ograph.Element, 0, 10)
for i := 0; i < 10; i++ {
part := randomInts[i*100 : (i+1)*100]
t := ograph.NewElement(fmt.Sprintf("t%d", i)).UseFn(func() error {
slices.Sort(part)
mux.Lock()
sortedInts = merge(sortedInts, part)
mux.Unlock()
return nil
})
sortTasks = append(sortTasks, t)
}
checkTask := ograph.NewElement("check").UseFn(func() error {
if len(sortedInts) != 1000 || !slices.IsSorted(sortedInts) {
return fmt.Errorf("got wrong sort result")
}
return nil
})
p := ograph.NewPipeline()
err := p.Register(checkTask, ograph.Rely(sortTasks...)).Run(context.Background(), nil)
if err != nil {
log.Fatalf("run merge sort pipeline failed, err: %v\n", err)
}
fmt.Println("result:", sortedInts)
}
Last updated 05 5月 2025, 00:20 +0800 .