apache/pulsar-client-go

add context control for pub cmd and add async method

someview opened this issue · 3 comments

Is your feature request related to a problem? Please describe.
add context control for pub cmd and add async method.

  1. For go, there are many reasons for us to use context to control a request.
  2. go has no keyword like await, but sometimes we may want it nonblock, eg: we pub a message without need to receive its result.
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
	message proto.Message) (*RPCResult, error) {
	var err error
	var host *url.URL
	var rpcResult *RPCResult
	startTime := time.Now()
	backoff := DefaultBackoff{100 * time.Millisecond}
	// we can retry these requests because this kind of request is
	// not specific to any particular broker
	for time.Since(startTime) < c.requestTimeout {
		host, err = c.serviceNameResolver.ResolveHost()
		if err != nil {
			c.log.WithError(err).Errorf("rpc client failed to resolve host")
			return nil, err
		}
		rpcResult, err = c.Request(host, host, requestID, cmdType, message)
		// success we got a response
		if err == nil {
			break
		}

		retryTime := backoff.Next()
		c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
		time.Sleep(retryTime)
	}

	return rpcResult, err
}


func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
	c.metrics.RPCRequestCount.Inc()
	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
	if err != nil {
		return nil, err
	}

	ch := make(chan result, 1)

	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
		ch <- result{&RPCResult{
			Cnx:      cnx,
			Response: response,
		}, err}
	})

	timeoutCh := time.After(c.requestTimeout)
	for {
		select {
		case res := <-ch:
			// Ignoring producer not ready response.
			// Continue to wait for the producer to create successfully
			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
				if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
					timeoutCh = nil
					break
				}
			}
			return res.RPCResult, res.error
		case <-timeoutCh:
			return nil, ErrRequestTimeOut
		}
	}
}

Describe the solution you'd like
the above code is confused. Retry should seperate from a single request. eg:

func (c *rpcClient) WithRetry(func()){
}
this would avoid the above code that inline Request method to RequestToAnyBroker :
for{
  for{
     select{
         case <-ch1:
         case <- ch2:
    }
  }
}

I have find other mistake in the client code:

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
	callback func(command *pb.BaseCommand, err error)) {
	c.incomingRequestsWG.Add(1)
	defer c.incomingRequestsWG.Done()

	state := c.getState()
	if state == connectionClosed || state == connectionClosing {
		callback(req, ErrConnectionClosed)

	} else {
		select {
		case <-c.closeCh:
			callback(req, ErrConnectionClosed)

		case c.incomingRequestsCh <- &request{
			id:       &requestID,
			cmd:      req,
			callback: callback,
		}:
		}
	}
}

func (c *connection) closed() bool {
	return connectionClosed == c.getState()
}

The state should be treated as atomic value.

The code may use chan any to instead multi select cases. The multi cases may cause performance issue. And chan any interface
would have better performance for this.

func (p *partitionProducer) runEventsLoop() {
	for {
		select {
		case data, ok := <-p.dataChan:
			// when doClose() is call, p.dataChan will be closed, data will be nil
			if !ok {
				return
			}
			p.internalSend(data)
		case cmd, ok := <-p.cmdChan:
			// when doClose() is call, p.dataChan will be closed, cmd will be nil
			if !ok {
				return
			}
			switch v := cmd.(type) {
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}
		case <-p.connectClosedCh:
			p.log.Info("runEventsLoop will reconnect in producer")
			p.reconnectToBroker()
		case <-p.batchFlushTicker.C:
			p.internalFlushCurrentBatch()
		}
	}
}

I have test some cases for this(cpu: 3.3GHZ):
even for little concurrency write and read from channel, a select case spends 40-50ns, but for a interface assert, just 5-10ns for struct type. grpc-go framework also uses chan any to handle channel stateupdate。

the Receive is pretty confused with me. Why for range is needed

func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
	for {
		select {
		case <-c.closeCh:
			return nil, newError(ConsumerClosed, "consumer closed")
		case cm, ok := <-c.messageCh:
			if !ok {
				return nil, newError(ConsumerClosed, "consumer closed")
			}
			return cm.Message, nil
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
}