chrislusf/glow

question about Channel->map->map->ReduceByKey->AddOutput

Closed this issue · 1 comments

////////////////////////Question/////////////////////////////
blocking and can't print "println("reduce:")"

////////////////////////////Source Code///////////////////////

package main

import (
"flag"
"fmt"
"strings"
"strconv"
"sync"
_"os"
_"bufio"
_"io"
"io/ioutil"
"encoding/gob"
_ "github.com/chrislusf/glow/driver"
"github.com/chrislusf/glow/flow"
)

type WordSentence struct {
Word string
LineNumber int
}

type AccessByAgeGroup struct {
Addr string
Info MemInfo
}

type MemInfo struct {
Addr string
Size int
Count int
}

func init() {
gob.Register(MemInfo{})
}

func goStart(wg *sync.WaitGroup, fn func()) {
wg.Add(1)
go func() {
defer wg.Done()
fn()
}()
}

func testWordCount2() {

println("testWordCount2")
flowOut2 := make(chan AccessByAgeGroup)
chIn     := make(chan string)
f2       := flow.New()

f2.Channel(chIn).Partition(1).Map(func(line string) MemInfo {
    //println(line)
    words:=strings.Split(line, ":")
    //println(words[0]+" "+words[1])
    s, _ := strconv.ParseInt(words[1], 16, 0)
    return MemInfo{words[0], int(s), 1}
}).Map(func(ws MemInfo) (string, MemInfo) {
    println(ws.Addr)
    return ws.Addr, ws
}).ReduceByKey(func(x MemInfo, y MemInfo) (MemInfo) {
    println("reduce:")
    return MemInfo{x.Addr,x.Size+y.Size,x.Count+y.Count}
}).AddOutput(flowOut2)

flow.Ready()

var wg sync.WaitGroup

goStart(&wg, func() {
    f2.Run()
})

goStart(&wg, func() {
    for t := range flowOut2 {
        fmt.Printf("%s size:%-8d count:%-8d\n",
            t.Info.Addr,t.Info.Size,t.Info.Count)
    }
})

bytes, err := ioutil.ReadFile("passwd")
if err != nil {
    println("Failed to read")
    return
}

lines := strings.Split(string(bytes), "\r\n")
for _, line := range lines {
    chIn <-line
}

wg.Wait()

}

func main() {
flag.Parse()
testWordCount2()
}

If you ctrl+C the program in standalone mode, you can see these:

step:Input0
  output : d0
     shard:0 time:2.038281735s processed 2779
step:Map1
  input  : d0
     shard:0 time:2.038321665s processed 2779
  output : d1
     shard:0 time:2.038380948s processed 2779
step:Map2
  input  : d1
     shard:0 time:2.038388886s processed 2779
  output : d2
     shard:0 time:2.038408013s processed 2779
step:LocalSort3
  input  : d2
     shard:0 time:2.03841651s processed 2779
  output : d3
     shard:0 time:2.038403271s processed 0
step:LocalReduceByKey4
  input  : d3
     shard:0 time:2.038411775s processed 0
  output : d4
     shard:0 time:2.038419295s processed 0
step:MergeSorted5
  input  : d4
     shard:0 time:2.038427534s processed 0
  output : d5
     shard:0 time:2.038420435s processed 0
step:LocalReduceByKey6
  input  : d5
     shard:0 time:2.038428171s processed 0
  output : d6
     shard:0 time:2.038496247s processed 0

This shows the "step:LocalSort3" is not started.

Now you can do a "kill -3" for the program:

goroutine 23 [chan receive]:
github.com/chrislusf/glow/flow.(*Dataset).Map.func1(0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/dataset_map.go:27 +0x9b
github.com/chrislusf/glow/flow.(*Task).RunTask(0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step_task.go:33 +0x2d
github.com/chrislusf/glow/flow.(*Step).RunStep.func1(0xc820067540, 0x0, 0xc8200a23c0)
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:22 +0x48
created by github.com/chrislusf/glow/flow.(*Step).RunStep
    /Users/chrislu/dev/gopath/src/github.com/chrislusf/glow/flow/step.go:23 +0xd3

It shows the "step:Map2" is not finished. What it is waiting for?

It shows the "chIn" channel is not closed.