ReactiveX/RxGo

How to implement multiple subscribers dynamically?

ormanli opened this issue · 2 comments

Hello,
I want to use RxGo to batch an expensive operation. I did similar stuff before using Multicast Sink from Reactor. I prepared a very simplified version of how I did. Map section defines the costly function.

https://github.com/ormanli/rxgo-batching/blob/2c542f4a31be482b68bb74d035ef12a066857f15/main.go#L45-L119

type record struct {
	ID   uint64 `json:"ID"`
	Name string `json:"Name"`
}

type recordSink struct {
	recordMap map[uint64]record
	input     chan rxgo.Item
	pipeline  rxgo.Observable
	counter   uint64
	mu        sync.RWMutex
}

func (s *recordSink) add(r record) (record, error) {
	id := atomic.AddUint64(&s.counter, 1)

	go func(id uint64, r record) {
		s.input <- rxgo.Of(record{
			ID:   id,
			Name: r.Name,
		})
	}(id, r)

	item, err := s.pipeline.
		Find(func(i interface{}) bool {
			r := i.(record)
			return r.ID == id
		}).Get()

	if err != nil {
		return record{}, err
	}

	return item.V.(record), nil
}

func (s *recordSink) getAll() []record {
	s.mu.RLock()
	defer s.mu.RUnlock()

	records := make([]record, 0, len(s.recordMap))
	for _, v := range s.recordMap {
		records = append(records, v)
	}

	return records
}

func newRecordSink() *recordSink {
	s := &recordSink{
		input:     make(chan rxgo.Item),
		recordMap: make(map[uint64]record),
	}

	s.pipeline = rxgo.FromChannel(s.input).
		BufferWithTimeOrCount(rxgo.WithDuration(time.Millisecond*10), 10).
		Map(func(_ context.Context, i interface{}) (interface{}, error) {
			records := i.([]interface{})

			s.mu.Lock()
			defer s.mu.Unlock()
			for k := range records {
				r := records[k].(record)

				s.recordMap[r.ID] = r
			}

			return records, nil
		}).
		FlatMap(func(item rxgo.Item) rxgo.Observable {
			return rxgo.Just(item.V.([]interface{}))()
		})

	return s
}

When I add a single record, it works without a problem. But if I add multiple records, it only processes one record and stops processing other items. As I understand from documentation, my code can only have one subscriber. Is there any way to have multiple subscribers added and removed dynamically?

There are some test cases I added to verify my code isn't working.
https://github.com/ormanli/rxgo-batching/blob/2c542f4a31be482b68bb74d035ef12a066857f15/sink_test.go#L11-L90

You can have multiple subscribers per observable. See .DoOnNext(...)

Not sure, what you want to do, but your code looks very complicated for doing batch processing. Why not use a simple working queue and have a timer look for new tasks?

timer := rxgo.Interval(rxgo.WithDuration(time.Second / 60))
timerDisposal := timer.DoOnNext(workHandler)

I prefer to keep things as simple as possible.

Hey @ormanli,
It's because there's a race. I'm not exactly sure about what you're trying to achieve but when first is the first item emitted to the input channel, it works. If second is first, it doesn't work as first will be consumed by Find and you're having a goroutine stuck here.