question about Channel->map->map->ReduceByKey->AddOutput
Closed this issue · 1 comments
////////////////////////Question/////////////////////////////
blocking and can't print "println("reduce:")"
////////////////////////////Source Code///////////////////////
package main
import (
"flag"
"fmt"
"strings"
"strconv"
"sync"
_"os"
_"bufio"
_"io"
"io/ioutil"
"encoding/gob"
_ "github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/flow"
)
type WordSentence struct {
Word string
LineNumber int
}
type AccessByAgeGroup struct {
Addr string
Info MemInfo
}
type MemInfo struct {
Addr string
Size int
Count int
}
func init() {
gob.Register(MemInfo{})
}
func goStart(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
fn()
}()
}
func testWordCount2() {
println("testWordCount2")
flowOut2 := make(chan AccessByAgeGroup)
chIn := make(chan string)
f2 := flow.New()
f2.Channel(chIn).Partition(1).Map(func(line string) MemInfo {
//println(line)
words:=strings.Split(line, ":")
//println(words[0]+" "+words[1])
s, _ := strconv.ParseInt(words[1], 16, 0)
return MemInfo{words[0], int(s), 1}
}).Map(func(ws MemInfo) (string, MemInfo) {
println(ws.Addr)
return ws.Addr, ws
}).ReduceByKey(func(x MemInfo, y MemInfo) (MemInfo) {
println("reduce:")
return MemInfo{x.Addr,x.Size+y.Size,x.Count+y.Count}
}).AddOutput(flowOut2)
flow.Ready()
var wg sync.WaitGroup
goStart(&wg, func() {
f2.Run()
})
goStart(&wg, func() {
for t := range flowOut2 {
fmt.Printf("%s size:%-8d count:%-8d\n",
t.Info.Addr,t.Info.Size,t.Info.Count)
}
})
bytes, err := ioutil.ReadFile("passwd")
if err != nil {
println("Failed to read")
return
}
lines := strings.Split(string(bytes), "\r\n")
for _, line := range lines {
chIn <-line
}
wg.Wait()
}
func main() {
flag.Parse()
testWordCount2()
}
If you ctrl+C the program in standalone mode, you can see these:
step:Input0
output : d0
shard:0 time:2.038281735s processed 2779
step:Map1
input : d0
shard:0 time:2.038321665s processed 2779
output : d1
shard:0 time:2.038380948s processed 2779
step:Map2
input : d1
shard:0 time:2.038388886s processed 2779
output : d2
shard:0 time:2.038408013s processed 2779
step:LocalSort3
input : d2
shard:0 time:2.03841651s processed 2779
output : d3
shard:0 time:2.038403271s processed 0
step:LocalReduceByKey4
input : d3
shard:0 time:2.038411775s processed 0
output : d4
shard:0 time:2.038419295s processed 0
step:MergeSorted5
input : d4
shard:0 time:2.038427534s processed 0
output : d5
shard:0 time:2.038420435s processed 0
step:LocalReduceByKey6
input : d5
shard:0 time:2.038428171s processed 0
output : d6
shard:0 time:2.038496247s processed 0
This shows the "step:LocalSort3" is not started.
Now you can do a "kill -3" for the program:
goroutine 23 [chan receive]:
github.com/chrislusf/glow/flow.(*Dataset).Map.func1(0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/dataset_map.go:27 +0x9b
github.com/chrislusf/glow/flow.(*Task).RunTask(0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step_task.go:33 +0x2d
github.com/chrislusf/glow/flow.(*Step).RunStep.func1(0xc820067540, 0x0, 0xc8200a23c0)
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:22 +0x48
created by github.com/chrislusf/glow/flow.(*Step).RunStep
/Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:23 +0xd3
It shows the "step:Map2" is not finished. What it is waiting for?
It shows the "chIn" channel is not closed.