ReactiveX/RxGo

Add `Subject` to rxgo

TongChia opened this issue · 0 comments

I miss the subject under rxjs

It is a bidirectional stream, could be emit new item via the next() method.
https://rxjs.dev/guide/subject
when that. i can share this subject object to other method to emit item or observe it.

I tried to simulate this method
But it is inevitable that one more channel is created

type Subject interface {
	rxgo.Observable
	Next(interface{})
}

type SubjectImpl struct {
	rxgo.Observable
	channel chan rxgo.Item
}

func (subject *SubjectImpl) Next(i interface{}) {
	rxgo.Of(i).SendBlocking(subject.channel)
}

func NewSubject(opts ...rxgo.Option) Subject {
	ch := make(chan rxgo.Item)
	obs := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		defer close(ch)
		for {
			select {
			case item := <-ch:
				item.SendBlocking(next)
			case <-ctx.Done():
				return
			}
		}
	}}, opts...)

	return &SubjectImpl{
		channel:    ch,
		Observable: obs,
	}
}