Add `Subject` to rxgo
TongChia opened this issue · 0 comments
TongChia commented
I miss the subject
under rxjs
It is a bidirectional stream, could be emit new item via the next() method.
https://rxjs.dev/guide/subject
when that. i can share this subject object to other method to emit item or observe it.
I tried to simulate this method
But it is inevitable that one more channel is created
type Subject interface {
rxgo.Observable
Next(interface{})
}
type SubjectImpl struct {
rxgo.Observable
channel chan rxgo.Item
}
func (subject *SubjectImpl) Next(i interface{}) {
rxgo.Of(i).SendBlocking(subject.channel)
}
func NewSubject(opts ...rxgo.Option) Subject {
ch := make(chan rxgo.Item)
obs := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
defer close(ch)
for {
select {
case item := <-ch:
item.SendBlocking(next)
case <-ctx.Done():
return
}
}
}}, opts...)
return &SubjectImpl{
channel: ch,
Observable: obs,
}
}