bulldog2011/luxun

PruducerSendThread Die

Kimjonghyeon opened this issue · 0 comments

  1. Async Producer server
    AsyncProducer.send(...) : throw QueueFullException occurred
    -> Still in the message transfer request
  2. Broker server die
  3. Producer server
    java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset
  4. queue.addAll(events) execute
private void tryToHandle(List<QueueItem<T>> events) {
    if (logger.isDebugEnabled()) {
        logger.debug("handling " + events.size() + " events");
    }
    if (events.size() > 0) {
        try {
            this.eventHandler.handle(events, underlyingProducer, serializer);
        } catch (ConnectionRefusedException e) {
            List<QueueItem<T>> remainedItems = new ArrayList<QueueItem<T>>(events);
            while (queue.size() > 0) {
                remainedItems.add(queue.poll());
            }
            if (this.callbackHandler != null) {
                this.callbackHandler.connectionRefused(e.getMessage(), remainedItems);
            }
        } catch (RuntimeException e) {
            logger.error("Error in handling batch of " + events.size() + " events", e);
            queue.addAll(events);
        }
    }
}

Queue full & ProducerSendThread die.

java.lang.IllegalStateException: Queue full
        at java.util.AbstractQueue.add(AbstractQueue.java:98)
        at java.util.AbstractQueue.addAll(AbstractQueue.java:187)
        at com.leansoft.luxun.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.java:163)
        at com.leansoft.luxun.producer.async.ProducerSendThread.processEvents(ProducerSendThread.java:126)
        at com.leansoft.luxun.producer.async.ProducerSendThread.run(ProducerSendThread.java:63) 

solution

  1. IllegalStateException occurred
  2. tryToHandle(events) -> re execute
  3. Thrift connection attempt
  4. Connection attempt timeout.
  5. ConnectionRefusedException occurred
  6. CallbackHandler.connectionRefused(...) execute