add context control for pub cmd and add async method
someview opened this issue · 3 comments
Is your feature request related to a problem? Please describe.
add context control for pub cmd and add async method.
- For go, there are many reasons for us to use context to control a request.
- go has no keyword like
await
, but sometimes we may want it nonblock, eg: we pub a message without need to receive its result.
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
var err error
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
backoff := DefaultBackoff{100 * time.Millisecond}
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
host, err = c.serviceNameResolver.ResolveHost()
if err != nil {
c.log.WithError(err).Errorf("rpc client failed to resolve host")
return nil, err
}
rpcResult, err = c.Request(host, host, requestID, cmdType, message)
// success we got a response
if err == nil {
break
}
retryTime := backoff.Next()
c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
time.Sleep(retryTime)
}
return rpcResult, err
}
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
c.metrics.RPCRequestCount.Inc()
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
if err != nil {
return nil, err
}
ch := make(chan result, 1)
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
ch <- result{&RPCResult{
Cnx: cnx,
Response: response,
}, err}
})
timeoutCh := time.After(c.requestTimeout)
for {
select {
case res := <-ch:
// Ignoring producer not ready response.
// Continue to wait for the producer to create successfully
if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
timeoutCh = nil
break
}
}
return res.RPCResult, res.error
case <-timeoutCh:
return nil, ErrRequestTimeOut
}
}
}
Describe the solution you'd like
the above code is confused. Retry should seperate from a single request. eg:
func (c *rpcClient) WithRetry(func()){
}
this would avoid the above code that inline Request method to RequestToAnyBroker :
for{
for{
select{
case <-ch1:
case <- ch2:
}
}
}
I have find other mistake in the client code:
func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsWG.Add(1)
defer c.incomingRequestsWG.Done()
state := c.getState()
if state == connectionClosed || state == connectionClosing {
callback(req, ErrConnectionClosed)
} else {
select {
case <-c.closeCh:
callback(req, ErrConnectionClosed)
case c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
callback: callback,
}:
}
}
}
func (c *connection) closed() bool {
return connectionClosed == c.getState()
}
The state should be treated as atomic value.
The code may use chan any to instead multi select cases. The multi cases may cause performance issue. And chan any interface
would have better performance for this.
func (p *partitionProducer) runEventsLoop() {
for {
select {
case data, ok := <-p.dataChan:
// when doClose() is call, p.dataChan will be closed, data will be nil
if !ok {
return
}
p.internalSend(data)
case cmd, ok := <-p.cmdChan:
// when doClose() is call, p.dataChan will be closed, cmd will be nil
if !ok {
return
}
switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
p.internalClose(v)
return
}
case <-p.connectClosedCh:
p.log.Info("runEventsLoop will reconnect in producer")
p.reconnectToBroker()
case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
}
I have test some cases for this(cpu: 3.3GHZ):
even for little concurrency write and read from channel, a select case spends 40-50ns, but for a interface assert, just 5-10ns for struct type. grpc-go
framework also uses chan any to handle channel stateupdate。
the Receive is pretty confused with me. Why for range is needed
func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
case <-c.closeCh:
return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
return nil, newError(ConsumerClosed, "consumer closed")
}
return cm.Message, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
}