On this page
article
MapReduce
Description
The following Go code implements a simple MapReduce process to count the frequency of different words in specified text files and outputs the results in dictionary form. Specifically, it defines two key logical components:
- Mapper (Mapper):
- The
Run
method within theMapper
struct first attempts to open the designated input file (e.g., “input1.txt” or “input2.txt”). - For the content of the file, the code reads and splits words line by line. Each word is considered a separate data item.
- For each word, it stores it as a key and its frequency as a value in a temporary state.
- The
- Reducer (Reducer):
- The
Run
method within theReducer
struct receives results from all mapping operations (i.e., outputs from multiple Mapper instances) and calculates the total frequency of each word across the entire dataset. - It aggregates word frequencies generated by different Mapper instances and merges these results together to get a count for each word in the entire set of input files.
- The
- Pipeline Processing:
- The
pipeline.Register()
function is used to register different Mapper and Reducer components. Code creates two mappers (each reading different input files) and aggregates their results into one reducer instance for final processing. - After running the entire process, the program prints out each word along with its total occurrence count across all input files.
- The
Code
package main
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"github.com/symphony09/ograph"
"github.com/symphony09/ograph/ogcore"
)
type Mapper struct {
ograph.BaseNode
InputFileName string
}
func (mapper *Mapper) Run(ctx context.Context, state ogcore.State) error {
file, err := os.Open(mapper.InputFileName)
if err != nil {
return fmt.Errorf("Error opening file: %w", err)
}
defer file.Close()
// Use bufio.Scanner to read file content line by line
scanner := bufio.NewScanner(file)
// In the Map phase, each line of data is split into key-value pairs
var mappedResults []string
for scanner.Scan() {
line := scanner.Text()
var result []string
for _, word := range strings.Fields(line) {
result = append(result, fmt.Sprintf("%s\t%d", word, 1)) // Key-Value Pair: <word 1>
}
mappedResults = append(mappedResults, result...)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("Error during mapping: %w", err)
}
// Add all key-value pairs to a map for the reduce operation
state.Update("map_results", func(val any) any {
var mapResults map[string][]int
if val == nil {
mapResults = make(map[string][]int)
} else {
mapResults = val.(map[string][]int)
}
for _, m := range mappedResults {
parts := strings.Split(m, "\t")
word := parts[0]
value := 1 // 在 Map 阶段生成的每个 <key, value> 对中,value 总是 1。
mapResults[word] = append(mapResults[word], value)
}
return mapResults
})
return nil
}
type Reducer struct {
ograph.BaseNode
}
func (reducer *Reducer) Run(ctx context.Context, state ogcore.State) error {
results, _ := state.Get("map_results")
mapResults := results.(map[string][]int)
// In the Reduce phase, sum up all values for the same keys
reduceResults := make(map[string]int)
for word, values := range mapResults {
sum := 0
for _, value := range values {
sum += value
}
reduceResults[word] = sum
}
state.Set("reduce_results", reduceResults)
return nil
}
func main() {
pipeline := ograph.NewPipeline()
mapper1 := ograph.NewElement("M1").UseNode(&Mapper{InputFileName: "input1.txt"})
mapper2 := ograph.NewElement("M2").UseNode(&Mapper{InputFileName: "input2.txt"})
reducer := ograph.NewElement("R1").UseNode(&Reducer{})
pipeline.Register(mapper1).
Register(mapper2).
Register(reducer, ograph.Rely(mapper1, mapper2))
state := ograph.NewState()
if err := pipeline.Run(context.Background(), state); err != nil {
fmt.Println(err)
} else {
fmt.Println(state.Get("reduce_results"))
}
}
Last updated 13 Mar 2025, 00:38 +0800 .