tarantool/go-tarantool

Support watchers (subscription to server events)

Closed this issue · 6 comments

Protocol: tarantool/doc#2408.
net.box API (as a reference API): tarantool/doc#2409.

API proposal

The Watcher interface will be part of Connector interface (will be needed to simplify the code and for mocking in tests):

type Watcher interface {
    Watch(c chan<- *WatchEvent, key string) error
    Unwatch(key string) error
}

The event type in the main package:

type WatchEvent struct {
    Value interface{}
}

The event type in the connection_pool package:

// PoolWatchEvent extends type WatchEvent. It can be used in the code that knows about
// the ConnectionPool, but not necessary for other code.
type PoolWatchEvent struct {
     WatchEvent
     Conn *Connection
}

The Watch/Unwatch calls for Connector/Connection/ConnectionPool/ConnectionMulti:

// Watch creates a watch for the key. All incoming events for this key will be relay to c.
// Event will not block sending to c: the caller must ensure that c has sufficient buffer space to keep
// up with the expected event rate or some events may be missed. 
func Watch(c chan<- *WatchEvent, key string) error
// Unwatch removes a watch for the key.
func Unwatch(c chan<- *WatchEvent, key string) error

An example:

go func(done <-chan struct{}) {
    events := make(chan *tarantool.WatchEvent, 128)
    defer close(events)

    // conn is instance of Connector
    conn.Watch(events, key)
    defer conn.Unwatch(events, key)

    // create/init done channel
    for {
        select{
            case <- done: // or check event from Connection.Opts.Notify for the Closed event
                return
            case e := <-events:
                fmt.Println("Get value", e.Value)
                if e, ok := e.(*connection_pool.PoolWatchEvent); ok {
                    fmt.Println("Connected to:", e.Conn.Addr())
                }
         }
    }
}(done)
R-omk commented

the caller must ensure that c has sufficient buffer space

In addition to that, I would prefer that one last event is always kept independently of channel.

R-omk commented

Since one key can be subscribed only once, I think it is necessary to separate the fact of subscription from recipients. Register recipients separately from the subscription object.
Additionally, you can make a method that returns the latest event, without having to subscribe to the entire event stream.

Thank you for the feedback!

In addition to that, I would prefer that one last event is always kept independently of channel.

Could you provide a usage case, please? I did not quite understand when it will be useful.

Since one key can be subscribed only once, I think it is necessary to separate the fact of subscription from recipients.

The idea includes to subscribe to one key for multiple channels. So the code will be valid for the proposal API:

conn.Watch(channel1, "foo")
conn.Watch(channel2, "foo")
// Аfter the "foo" update, the event will be sent to channel1 and channel2.

I think that should work or I miss something?

R-omk commented

It's best to make the behavior as similar as possible to the native implementation in net.box,

https://github.com/tarantool/tarantool/blob/05cce2862787d0155f1393ecd2de74a4c7e017eb/src/box/lua/net_box.lua#L571-L713

omissions of events are possible here, but the last one will always be sent.

Thank you. I'll think about it a bit more and propose another API in a week.