How to implement multiple subscribers dynamically?
ormanli opened this issue · 2 comments
Hello,
I want to use RxGo to batch an expensive operation. I did similar stuff before using Multicast Sink from Reactor. I prepared a very simplified version of how I did. Map section defines the costly function.
type record struct {
ID uint64 `json:"ID"`
Name string `json:"Name"`
}
type recordSink struct {
recordMap map[uint64]record
input chan rxgo.Item
pipeline rxgo.Observable
counter uint64
mu sync.RWMutex
}
func (s *recordSink) add(r record) (record, error) {
id := atomic.AddUint64(&s.counter, 1)
go func(id uint64, r record) {
s.input <- rxgo.Of(record{
ID: id,
Name: r.Name,
})
}(id, r)
item, err := s.pipeline.
Find(func(i interface{}) bool {
r := i.(record)
return r.ID == id
}).Get()
if err != nil {
return record{}, err
}
return item.V.(record), nil
}
func (s *recordSink) getAll() []record {
s.mu.RLock()
defer s.mu.RUnlock()
records := make([]record, 0, len(s.recordMap))
for _, v := range s.recordMap {
records = append(records, v)
}
return records
}
func newRecordSink() *recordSink {
s := &recordSink{
input: make(chan rxgo.Item),
recordMap: make(map[uint64]record),
}
s.pipeline = rxgo.FromChannel(s.input).
BufferWithTimeOrCount(rxgo.WithDuration(time.Millisecond*10), 10).
Map(func(_ context.Context, i interface{}) (interface{}, error) {
records := i.([]interface{})
s.mu.Lock()
defer s.mu.Unlock()
for k := range records {
r := records[k].(record)
s.recordMap[r.ID] = r
}
return records, nil
}).
FlatMap(func(item rxgo.Item) rxgo.Observable {
return rxgo.Just(item.V.([]interface{}))()
})
return s
}
When I add a single record, it works without a problem. But if I add multiple records, it only processes one record and stops processing other items. As I understand from documentation, my code can only have one subscriber. Is there any way to have multiple subscribers added and removed dynamically?
There are some test cases I added to verify my code isn't working.
https://github.com/ormanli/rxgo-batching/blob/2c542f4a31be482b68bb74d035ef12a066857f15/sink_test.go#L11-L90
You can have multiple subscribers per observable. See .DoOnNext(...)
Not sure, what you want to do, but your code looks very complicated for doing batch processing. Why not use a simple working queue and have a timer look for new tasks?
timer := rxgo.Interval(rxgo.WithDuration(time.Second / 60))
timerDisposal := timer.DoOnNext(workHandler)
I prefer to keep things as simple as possible.