awslabs/amazon-kinesis-client

stopping tomcat container does not kill kinesis consumer connection

prayagupa opened this issue · 5 comments

I have stream consumer app running inside tomcat container.

When I stop the tomcat container, it is supposed to stop the kinesis-stream consumer-app resources as well, but it does not,

Even after /usr/local/apache-tomcat-8.0.42/bin/shutdown.sh, I see tomcat not properly stopped.

[ec2-user@ip-172-21-5-105 ~]$ ps aux | grep tomcat
root     13904  0.3 10.5 4611864 816080 ?      Sl   May08   1:55 /usr/java/jdk1.8.0_60//bin/java -Djava.util.logging.config.file=/usr/local/apache-tomcat-8.5.12/conf/logging.properties -Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager -Djdk.tls.ephemeralDHKeySize=2048 -Djava.protocol.handler.pkgs=org.apache.catalina.webresources -classpath /usr/local/apache-tomcat-8.5.12/bin/bootstrap.jar:/usr/local/apache-tomcat-8.5.12/bin/tomcat-juli.jar -Dcatalina.base=/usr/local/apache-tomcat-8.5.12 -Dcatalina.home=/usr/local/apache-tomcat-8.5.12 -Djava.io.tmpdir=/usr/local/apache-tomcat-8.5.12/temp org.apache.catalina.startup.Bootstrap start
ec2-user 15408  0.0  0.0 112648   964 pts/0    S+   00:14   0:00 grep --color=auto tomcat

Which lets the consumer keep running as I can see the consumerLeaseCounter keep increasing,

{
  "checkpoint": "49572589993860134938465509123149151340420296711845445634",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 394714,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "customerorder-e2e_172.21.5.105",
  "ownerSwitchesSinceCheckpoint": 0
}
{
  "checkpoint": "49572589993860134938465509123149151340420296711845445634",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 394720,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "customerorder-e2e_172.21.5.105",
  "ownerSwitchesSinceCheckpoint": 0
}

I see the the error stopping the consumer resources,

On nativeWorker.requestShutdown();. I'm wrapping nativeWorker around my thread called consumerThread = new Thread(nativeWorker);

{
  "timeMillis": 1494303061730,
  "thread": "localhost-startStop-2",
  "level": "INFO",
  "loggerName": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
  "message": "Shutting down consumer instance customerorder-e2e_172.21.5.105",
  "endOfBatch": false,
  "loggerFqcn": "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId": 177,
  "threadPriority": 5
}
{
  "timeMillis": 1494303061732,
  "thread": "Thread-8",
  "level": "ERROR",
  "loggerName": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
  "message": "Error starting a consumer customerorder-e2e",
  "thrown": {
    "commonElementCount": 0,
    "name": "java.lang.ThreadDeath",
    "extendedStackTrace": [
      {
        "class": "java.lang.Thread",
        "method": "stop",
        "file": "Thread.java",
        "line": 850,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
        "method": "shutdown",
        "file": "KinesisEventStreamConsumer.java",
        "line": 202,
        "exact": false,
        "location": "stream-driver-1.0-SNAPSHOT.jar",
        "version": "?"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke0",
        "file": "NativeMethodAccessorImpl.java",
        "line": -2,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke",
        "file": "NativeMethodAccessorImpl.java",
        "line": 62,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "sun.reflect.DelegatingMethodAccessorImpl",
        "method": "invoke",
        "file": "DelegatingMethodAccessorImpl.java",
        "line": 43,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.lang.reflect.Method",
        "method": "invoke",
        "file": "Method.java",
        "line": 497,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "org.springframework.beans.factory.support.DisposableBeanAdapter",
        "method": "invokeCustomDestroyMethod",
        "file": "DisposableBeanAdapter.java",
        "line": 364,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DisposableBeanAdapter",
        "method": "destroy",
        "file": "DisposableBeanAdapter.java",
        "line": 287,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroyBean",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 578,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroySingleton",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 554,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultListableBeanFactory",
        "method": "destroySingleton",
        "file": "DefaultListableBeanFactory.java",
        "line": 961,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroySingletons",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 523,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultListableBeanFactory",
        "method": "destroySingletons",
        "file": "DefaultListableBeanFactory.java",
        "line": 968,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "destroyBeans",
        "file": "AbstractApplicationContext.java",
        "line": 1033,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "doClose",
        "file": "AbstractApplicationContext.java",
        "line": 1009,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "close",
        "file": "AbstractApplicationContext.java",
        "line": 961,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.web.context.ContextLoader",
        "method": "closeWebApplicationContext",
        "file": "ContextLoader.java",
        "line": 581,
        "exact": false,
        "location": "spring-web-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.web.context.ContextLoaderListener",
        "method": "contextDestroyed",
        "file": "ContextLoaderListener.java",
        "line": 116,
        "exact": false,
        "location": "spring-web-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.StandardContext",
        "method": "listenerStop",
        "file": "StandardContext.java",
        "line": 4799,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.StandardContext",
        "method": "stopInternal",
        "file": "StandardContext.java",
        "line": 5438,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.util.LifecycleBase",
        "method": "stop",
        "file": "LifecycleBase.java",
        "line": 226,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.ContainerBase$StopChild",
        "method": "call",
        "file": "ContainerBase.java",
        "line": 1435,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.ContainerBase$StopChild",
        "method": "call",
        "file": "ContainerBase.java",
        "line": 1424,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "java.util.concurrent.FutureTask",
        "method": "run",
        "file": "FutureTask.java",
        "line": 266,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1142,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 617,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 745,
        "exact": true,
        "location": "?",
        "version": "1.8.0_60"
      }
    ]
  },
  "endOfBatch": false,
  "loggerFqcn": "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId": 36,
  "threadPriority": 5
}

I'm wrapping the nativeWorker in my StreamConsumer#consumeOnce - https://github.com/nihil-os/stream-driver/blob/kinesis-stream/src/main/java/com/eventstream/consumer/kinesis/KinesisEventStreamConsumer.java#L102

The way I'm making it work for now is using unix kill to stop the tomcat process.

The KCL isn't really designed to be managed by a container. I suspect the problem might be related to KinesisEventStreamConsumer.java:203, since the method returns a future that you're not waiting on.

@pfifer you are right. I will definitely see waiting on nativeWorker.requestShutdown.

might look like

            while(!nativeConsumer.requestShutdown().isDone()) {}
            consumerThread.stop();

Thanks for your input.

This is a bug as you shouldn't have to request the value of a future to trigger the completion of it. Thanks for reporting it.

For the time being you can do something like:

nativeConsumer.requestShutdown().get()

or if you don't want to wait forever do

nativeConsumer.requestShutdown().get(5, TimeUnit.SECONDS)

with whatever timeout you want to use. You will need to handle the exceptions these methods can throw though.

com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker has

    public Future<Void> requestShutdown() {}
    public void shutdown() {}

I believe requestShutdown was designed for graceful shutdwown, as I see shutdownCompleteLatch being used in implementation.

But even if I wait on requestShutdown().get() before stopping my wrapper thread on com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker, I'm getting java.lang.ThreadDeath error. I need to dig more into java thread. (error is on wrapperConsumerThread#stop())

{
  "commonElementCount": 0,
  "name": "java.lang.ThreadDeath",
  "extendedStackTrace": [
    {
      "class": "java.lang.Thread",
      "method": "stop",
      "file": "Thread.java",
      "line": 850,
      "exact": true,
      "location": "?",
      "version": "1.8.0_111"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
      "method": "shutdown",
      "file": "KinesisEventStreamConsumer.java",
      "line": 210,
      "exact": false,
      "location": "classes/",
      "version": "?"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumerComponentSpecs$$anonfun$1$$anonfun$apply$mcV$sp$2",
      "method": "apply",
      "file": "KinesisEventStreamConsumerComponentSpecs.scala",
      "line": 176,
      "exact": false,
      "location": "test-classes/",
      "version": "?"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumerComponentSpecs$$anonfun$1$$anonfun$apply$mcV$sp$2",
      "method": "apply",
      "file": "KinesisEventStreamConsumerComponentSpecs.scala",
      "line": 129,
      "exact": false,
      "location": "test-classes/",
      "version": "?"
    }
  ]
}

I get same ThreadDeath error alternatively(not always) on following pretty simple thread, and the thread still counts as alive.

    val processor = new Thread()

    processor.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
      override def uncaughtException(t: Thread, e: Throwable): Unit = e.printStackTrace()
    })

    println(processor.isAlive)
    processor.start()

    processor.stop()
    println(processor.isAlive)

Nevermind, I should not be trying to stop consumerThread(bad practise) as worker has already been handling shutdown. So, I am simply doing requestShutdown on the kinesis worker and waiting on it.

nativeConsumer.requestShutdown().get(5, TimeUnit.SECONDS)

That way my container would be fine as well.