Testing the modifications introduced from #64 many times the test raises a `java.lang.NullPointerException`
Closed this issue · 4 comments
Testing the modifications introduced from this PR many times the test raises a java.lang.NullPointerException
at
The exception is raised after an uncertain number of iterations over the input of
The remote callstack reports:
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:996)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:847)
Originally posted by @mariomastrodicasa in #64 (comment)
The snippet code where the exception is raised is:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
// Append callback takes care of the following:
// - call interceptors and user callback on completion
// - remember partition that is calculated in RecordAccumulator.append
AppendCallbacks<K, V> appendCallbacks = new AppendCallbacks<K, V>(callback, this.interceptors, record);
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
in particular on the line clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
. In that point the ProducerRecord<K, V>
in input is accessed to get both topic
and partition
.
The record
object is created inline within the
Future<RecordMetadata> Send(KNetProducerRecord<K, V> record)
{
ProducerRecord<byte[], byte[]> kRecord = ToProducerRecord(record, _keySerializer, _valueSerializer);
return Send(kRecord);
}
Currently, since the full call chain is synchronous till that point and within the Java code, the idea is that the record
(kRecord
in .NET) object was retired from the Garbage Collector because between .NET and Java (JVM) the code shares the JNI pointer.
Will be closed with next release of KNet
Working on #81 a java.lang.NullPointerException
issue like this was raised in another point. The java callstack is:
org.apache.kafka.common.utils.Time.timer(Time.java:89)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227)
and invocation point is in KNetConsumer
:
/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(Duration)"/>
public new KNetConsumerRecords<K, V> Poll(Duration timeout)
{
var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeout);
return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
}
The issue can be correlated to the same reported in masesgroup/KNet#242, so it will be reopened.
Closed with #82 which updates KNet version and fixes KEFCore code