confluentinc/examples

Suggested Enhancements to Python Producer

chuck-confluent opened this issue · 0 comments

I wrote a python producer app that was consistently getting Local: Queue full errors (BufferError) and found this code sample from Magnus (author of librdkafka) helpful:

    for i in range(msg_count):
       while True:
            try:
                producer.produce(topic, value=msg_payload)
                producer.poll(0)
                break
            except BufferError as e:
                print(e, file=sys.stderr)
                producer.poll(1)

The while loop just means that normally a record will be produced and then there's a break to move out of the while loop and back into the for loop, but in case we hit BufferError, we need to retry the produce before we move back to the for loop.

I would also suggest including some performance tuning knobs in the example.

I have a code snippet that might help. My app reads from a CSV file to produce to two different topics. Here is a relevant snippet:

# configure schema registry client and avro serializer
schema_registry_client = SchemaRegistryClient(SCHEMA_REGISTRY_CONFIG)
my_avro_serializer = AvroSerializer(
    schema_registry_client=schema_registry_client,
    schema_str=MY_AVRO_SCHEMA
)
producer_config = KAFKA_CONFIG.copy()
producer_config['value.serializer'] = my_avro_serializer

# Configure other producer properties to tune performance. 
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
producer_config['queue.buffering.max.messages'] = 1000000
producer_config['linger.ms'] = 500
producer_config['batch.size'] = 2000000
producer_config['compression.type'] = 'snappy'

# Create producer and produce data
producer = SerializingProducer(producer_config)
records_produced = 0
with open('./my_data.csv') as data:
    reader = csv.DictReader(data,fieldnames=MY_FIELDNAMES)
    for row in reader:
        # preprocess csv row to use float values instead of scientific notation
        row_with_float_values = {field:float(value) for field, value in row.items()}
        while True:
            try:
                producer.produce(
                        topic=MY_TOPIC,
                        value=row_with_float_values,
                        on_delivery=log_kafka_errors)
                producer.poll(0)
                records_produced += 1
                break
            except BufferError as err:
                print(err, file=sys.stderr)
                print(f"produced {records_produced} records so far", file=sys.stdout)
                producer.flush()
print(f"produced a totoal of {records_produced} records", file=sys.stdout)
producer.flush()

There is some discussion about the extent to which calling poll(0) after each produce affects performance. Magnus says it's negligible but here's someone who doesn't agree.

It made sense to me to do producer.flush() when we hit a BufferError to clear everything out, but Magnus' example does poll(1) to wait for just 1 second to register callbacks for sent messages. I found that to cause me to hit the BufferError more often. To be honest, I don't really have a good sense of how poll works or what I should do with it from reading the docs, so maybe some more comments around how to use poll() and flush() would be good.

For reference, my full working example is here: