ReactiveX/RxGo

How to fun out

imalkov82 opened this issue · 2 comments

I'm trying to fun out the final step of my reactive flow to achieve parallel execution of the final step using DoOnNext.

Running the code bellow I expect that thirdCounter = 2 and every "first DoOnNext", "second DoOnNext" and "third DoOnNext" will be printer twice (total 6 times)

The printing are as expected and also the Map concat the strings correctly. However, the thirdCounter = 7 hence the steps are over invoked.

What I'm missing here?

My code:

var thirdCounter int32
func localRun(names ...string) {
	observable := rxgo.Just(names)().
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "one")
			return s, nil
		}).
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "two")
			return s, nil
		}).
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			atomic.AddInt32(&thirdCounter, 1)
			s := i.(string)
			s = fmt.Sprintf("%s,%s", s, "three")
			return s, nil
		})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("first DoOnNext", i)
	})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("second DoOnNext", i)
	})

	observable.DoOnNext(func(i interface{}) {
		fmt.Println("third DoOnNext", i)
	})

	for item := range observable.Last().Observe() {
		fmt.Println(item.V)
	}
	fmt.Printf("Third Counter = %d\n", thirdCounter)
}
func TestMocktFlow(t *testing.T) {
	cs := make([]string, 0)
	cs = append(cs, "Hello")
	cs = append(cs, "Hi")
	localRun(cs...)
}

@teivah , kind reminder