Skip to content

Instantly share code, notes, and snippets.

@cnnrznn
Created May 6, 2020 15:21
Show Gist options
  • Save cnnrznn/fd352fd9aa3ae26440d21675c24c5d47 to your computer and use it in GitHub Desktop.
Save cnnrznn/fd352fd9aa3ae26440d21675c24c5d47 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
"github.com/cnnrznn/gomr"
)
type Job struct{}
type Entry struct {
phone string
countryCode string
phoneType string
carrier string
region string
}
/*
Return true if the entry has columns that
match your criteria
*/
func filter(entry Entry) bool {
// does the entry match your test?
return true
}
/*
Return a string representing the entry's "key".
Used to detect unique rows.
*/
func unique(entry Entry) string {
return entry.phone + entry.countryCode + entry.phoneType + entry.carrier + entry.region
}
func (j *Job) Map(in <-chan interface{}, out chan<- interface{}) {
defer close(out)
for item := range in {
ls := strings.Split(item.(string), ",")
entry := Entry{
phone: ls[0],
countryCode: ls[1],
phoneType: ls[2],
carrier: ls[3],
region: ls[4],
}
if filter(entry) {
out <- entry
}
}
}
func (j *Job) Partition(in <-chan interface{}, outs []chan interface{}, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
key, _ := strconv.Atoi(item.(Entry).phone)
outs[key%len(outs)] <- item
}
}
func (j *Job) Reduce(in <-chan interface{}, out chan<- interface{}, wg *sync.WaitGroup) {
defer wg.Done()
entries := make(map[string]bool)
for item := range in {
entry := item.(Entry)
key := unique(entry)
if !entries[key] {
out <- entry
}
entries[key] = true
}
}
func main() {
job := &Job{}
p := runtime.NumCPU()
ins, out := gomr.RunLocal(2*p, p, job)
gomr.TextFileParallel(os.Args[1], ins)
for item := range out {
fmt.Println(item)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment