Description

The following Go code implements a simple MapReduce process to count the frequency of different words in specified text files and outputs the results in dictionary form. Specifically, it defines two key logical components:

  1. Mapper (Mapper):
    • The Run method within the Mapper struct first attempts to open the designated input file (e.g., “input1.txt” or “input2.txt”).
    • For the content of the file, the code reads and splits words line by line. Each word is considered a separate data item.
    • For each word, it stores it as a key and its frequency as a value in a temporary state.
  2. Reducer (Reducer):
    • The Run method within the Reducer struct receives results from all mapping operations (i.e., outputs from multiple Mapper instances) and calculates the total frequency of each word across the entire dataset.
    • It aggregates word frequencies generated by different Mapper instances and merges these results together to get a count for each word in the entire set of input files.
  3. Pipeline Processing:
    • The pipeline.Register() function is used to register different Mapper and Reducer components. Code creates two mappers (each reading different input files) and aggregates their results into one reducer instance for final processing.
    • After running the entire process, the program prints out each word along with its total occurrence count across all input files.

Code

  package main

import (
	"bufio"
	"context"
	"fmt"
	"os"
	"strings"

	"github.com/symphony09/ograph"
	"github.com/symphony09/ograph/ogcore"
)

type Mapper struct {
	ograph.BaseNode

	InputFileName string
}

func (mapper *Mapper) Run(ctx context.Context, state ogcore.State) error {
	file, err := os.Open(mapper.InputFileName)
	if err != nil {
		return fmt.Errorf("Error opening file: %w", err)
	}
	defer file.Close()

	// Use bufio.Scanner to read file content line by line
	scanner := bufio.NewScanner(file)

	// In the Map phase, each line of data is split into key-value pairs
	var mappedResults []string
	for scanner.Scan() {
		line := scanner.Text()

		var result []string
		for _, word := range strings.Fields(line) {
			result = append(result, fmt.Sprintf("%s\t%d", word, 1)) // Key-Value Pair: <word 1>
		}

		mappedResults = append(mappedResults, result...)
	}

	if err := scanner.Err(); err != nil {
		return fmt.Errorf("Error during mapping: %w", err)
	}

	// Add all key-value pairs to a map for the reduce operation
	state.Update("map_results", func(val any) any {
		var mapResults map[string][]int
		if val == nil {
			mapResults = make(map[string][]int)
		} else {
			mapResults = val.(map[string][]int)
		}

		for _, m := range mappedResults {
			parts := strings.Split(m, "\t")
			word := parts[0]
			value := 1 // 在 Map 阶段生成的每个 <key, value> 对中,value 总是 1。
			mapResults[word] = append(mapResults[word], value)
		}

		return mapResults
	})

	return nil
}

type Reducer struct {
	ograph.BaseNode
}

func (reducer *Reducer) Run(ctx context.Context, state ogcore.State) error {
	results, _ := state.Get("map_results")
	mapResults := results.(map[string][]int)

	// In the Reduce phase, sum up all values for the same keys
	reduceResults := make(map[string]int)
	for word, values := range mapResults {
		sum := 0
		for _, value := range values {
			sum += value
		}

		reduceResults[word] = sum
	}

	state.Set("reduce_results", reduceResults)
	return nil
}

func main() {
	pipeline := ograph.NewPipeline()

	mapper1 := ograph.NewElement("M1").UseNode(&Mapper{InputFileName: "input1.txt"})
	mapper2 := ograph.NewElement("M2").UseNode(&Mapper{InputFileName: "input2.txt"})
	reducer := ograph.NewElement("R1").UseNode(&Reducer{})

	pipeline.Register(mapper1).
		Register(mapper2).
		Register(reducer, ograph.Rely(mapper1, mapper2))

	state := ograph.NewState()

	if err := pipeline.Run(context.Background(), state); err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(state.Get("reduce_results"))
	}
}
  

Last updated 13 Mar 2025, 00:38 +0800 . history