Add PubSubConn.ReceiveContext()
RelicOfTesla opened this issue · 6 comments
-
add "PubSubConn.ReceiveContext" support watch the <-context.Done() .
because "redis.ReceiveContext" is return native redis response. was not call "Subscription.receiveInternal" decode this response. -
and, "ReceiveWithTimeout" is return net.ErrClosed, can not loop call receive
I'm sorry I don't understand what you're trying to say, do you have some code which demonstrates the issue you have?
func run(ctx context.Context){
psc := &redis.PubSubConn{Conn: conn}
err := psc.Subscribe(topicStr)
for {
r: = psc.ReceiveContext(ctx) // add this
err, ok := r.(error)
_ = ok
if errors.Is(v, context.DeadlineExceeded) || errors.Is(v, context.Canceled) {
break; // break loop
}
msg, ok := r.(redis.Message)
// ....
// if not have the psc.ReceiveContext(ctx), and i try "v, err := redis.ReceiveContext(psc.Conn, ctx)"
// the v was can not convert redis.Message, because not decode by Subscription.receiveInternal
// so , please add PubSubConn.ReceiveContext
}
}
func main(){
ctx,cancel := context.WithCancel(context.TODO())
go run(ctx)
time.Sleep(time.Second*10)
cancel()
time.Sleep(time.Second*10)
}
for {
v := psc.ReceiveWithTimeout(time.Second)
fmt.Println(v)
// first print: os.ErrDeadlineExceeded, and this connection was closed!!!!
// second print: net.ErrClosed , can not receive any redis message!!!
}
To confirm my understanding you're looking for a new feature PubSubConn.ReceiveContext(ctx context.Context)
?
You're also saying that psc.ReceiveWithTimeout
returns an different error indicated the connection is closed after a timeout occurs. This is indeed the case, and is by design, and that would also be the same for a Context
method as a timeout is treated as a fatal error, which can be seen here. It's a fatal error as with you can't be sure what state the connection is in, so it must be closed.
These are two questions..... second question it was an unexpected discovery
So to confirm you should be able to use psc.ReceiveContext(...)
to achieve what you want, with the exception that on timeout you're psc will be broken and hence will need to be set back up.
If you're seeing an unexpected result, please provide a working example and the error you are seeing.
I was just looking for this. @stevenh The reason we need a new method is shown by the following:
r := psc.Receive()
fmt.Printf("psc.Receive: %T %v\n", r, r)
// Neither of these work because Conn doesn't have ReceiveContext.
// r, err := psc.ReceiveContext(context.Background())
// r, err := psc.Conn.ReceiveContext(context.Background())
// This works, but it doesn't include the processing
// of receiveInternal:
r, err = redis.ReceiveContext(psc.Conn, context.Background())
if err != nil {
fmt.Printf("ReceiveContext: err %T: %s\n", err, err)
} else {
fmt.Printf("ReceiveContext: %T %v\n", r, r)
}
// So this is preferred:
r = psc.ReceiveContext(context.Background())
fmt.Printf("psc.ReceiveContext: %T %v\n", r, r)
The output from this given three successive publications is:
# This is a normal PubSubConn Receive
psc.Receive: redis.Message {ps:cfg:rmq [111 110 101]}
# This is what we get with redis.ReceiveContext()
ReceiveContext: []interface {} [[109 101 115 115 97 103 101] [112 115 58 99 102 103 58 114 109 113] [111 110 101]]
# This is what we want:
psc.ReceiveContext: redis.Message {ps:cfg:rmq [111 110 101]}
The enhancement is trivial, and is in #603. It's in draft until I can set up an environment where I can extend the test suite and verify it (it does work in my application). done