awslabs/amazon-kinesis-producer

Orphaned Log-Reader Threads consume 100% of CPU

skidder opened this issue · 1 comments

On several occasions I've seen the KPL not shutdown cleanly in response to the destroy() method call. This has led to orphan threads that run in a tight loop and consume 100% all CPU resources.

The following chart shows the abrupt spike in CPU usage at 8:43 when the destroy() method was called:
CPU chart

I captured the following output from top and noted the PIDs of the offending threads: 9264, 9265

top output

I then ran jstat on the parent Java process and found these two threads with hex-encoded NIDs that correspond to the decimal-encoded PIDs mentioned earlier:

"kpl-daemon-0003" #15011 daemon prio=5 os_prio=0 tid=0x00007f292f29f000 nid=0x2431 runnable [0x00007f28c0741000]
   java.lang.Thread.State: RUNNABLE
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	- locked <0x00000000d68e5df0> (a java.io.InputStreamReader)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	- locked <0x00000000d68e5df0> (a java.io.InputStreamReader)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at com.amazonaws.services.kinesis.producer.LogInputStreamReader.run(LogInputStreamReader.java:97)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
	- <0x00000000d68e5ec0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"kpl-daemon-0002" #15010 daemon prio=5 os_prio=0 tid=0x00007f292f29e800 nid=0x2430 runnable [0x00007f28c0943000]
   java.lang.Thread.State: RUNNABLE
	at java.lang.Throwable.fillInStackTrace(Native Method)
	at java.lang.Throwable.fillInStackTrace(Throwable.java:783)
	- locked <0x00000000f5388c50> (a java.io.IOException)
	at java.lang.Throwable.<init>(Throwable.java:265)
	at java.lang.Exception.<init>(Exception.java:66)
	at java.io.IOException.<init>(IOException.java:58)
	at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:336)
	- locked <0x00000000d68f73b0> (a java.lang.UNIXProcess$ProcessPipeInputStream)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	- locked <0x00000000d68f73d8> (a java.io.InputStreamReader)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	- locked <0x00000000d68f73d8> (a java.io.InputStreamReader)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at com.amazonaws.services.kinesis.producer.LogInputStreamReader.run(LogInputStreamReader.java:97)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
	- <0x00000000d68f74f0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

You can see that the second thread is presently in the exception-handling block responding to an IOException that was thrown. It's also worth noting that there were no other Kinesis-related threads running.

The CPU is pegged at 100% because the threads continually spin in this loop that will exit only when the running boolean flag is set to false:

while (running) {
String logLine;
try {
logLine = reader.readLine();
// log.info("Considering({}): {}", streamType, logLine);
if (logLine == null) {
continue;
}
if (logLine.startsWith("++++")) {
startRead();
} else if (logLine.startsWith("----")) {
finishRead();
} else if (isReadingRecord) {
messageData.add(logLine);
} else {
logFunction.apply(log, logLine);
}
} catch (IOException ioex) {
if (shuttingDown) {
//
// Since the Daemon calls destroy instead of letting the process exit normally
// the input streams coming from the process will end up truncated.
// When we know the process is shutting down we can report the exception as info
//
if (ioex.getMessage() == null || !ioex.getMessage().contains("Stream closed")) {
//
// If the message is "Stream closed" we can safely ignore it. This is probably a bug
// with the UNIXProcess#ProcessPipeInputStream that it throws the exception. There
// is no other way to detect the other side of the request being closed.
//
log.info("Received IO Exception during shutdown. This can happen, but should indicate "
+ "that the stream has been closed: {}", ioex.getMessage());
}
} else {
log.error("Caught IO Exception while reading log line", ioex);
}
}
}

The running flag is set to false only when the shutdown() method is invoked. It's possible that this could be skipped if the thread for the native-executable runner is interrupted while running the process.waitFor() method:

int code = process.waitFor();
stdOutReader.shutdown();
stdErrReader.shutdown();
deletePipes();

Thanks for catching this.