masesgroup/KEFCore

Testing the modifications introduced from #64 many times the test raises a `java.lang.NullPointerException`

Closed this issue · 4 comments

Testing the modifications introduced from this PR many times the test raises a java.lang.NullPointerException at

var future = _kafkaProducer?.Send(new KNetProducerRecord<TKey, KNetEntityTypeData<TKey>>(record.AssociatedTopicName, 0, record.Key, record.Value!));

The exception is raised after an uncertain number of iterations over the input of
public IEnumerable<Future<RecordMetadata>> Commit(IEnumerable<IKafkaRowBag> records)

The remote callstack reports:

org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:996)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:847)

Originally posted by @mariomastrodicasa in #64 (comment)

The snippet code where the exception is raised is:

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        // Append callback takes care of the following:
        //  - call interceptors and user callback on completion
        //  - remember partition that is calculated in RecordAccumulator.append
        AppendCallbacks<K, V> appendCallbacks = new AppendCallbacks<K, V>(callback, this.interceptors, record);

        try {
            throwIfProducerClosed();
            // first make sure the metadata for the topic is available
            long nowMs = time.milliseconds();
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }

in particular on the line clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);. In that point the ProducerRecord<K, V> in input is accessed to get both topic and partition.

The record object is created inline within the

Future<RecordMetadata> Send(KNetProducerRecord<K, V> record)
{
    ProducerRecord<byte[], byte[]> kRecord = ToProducerRecord(record, _keySerializer, _valueSerializer);
    return Send(kRecord);
}

Currently, since the full call chain is synchronous till that point and within the Java code, the idea is that the record (kRecord in .NET) object was retired from the Garbage Collector because between .NET and Java (JVM) the code shares the JNI pointer.

Will be closed with next release of KNet

Working on #81 a java.lang.NullPointerException issue like this was raised in another point. The java callstack is:

org.apache.kafka.common.utils.Time.timer(Time.java:89)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227)

and invocation point is in KNetConsumer:

        /// <inheritdoc cref="IKNetConsumer{K, V}.Poll(Duration)"/>
        public new KNetConsumerRecords<K, V> Poll(Duration timeout)
        {
            var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeout);
            return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
        }

The issue can be correlated to the same reported in masesgroup/KNet#242, so it will be reopened.

Closed with #82 which updates KNet version and fixes KEFCore code