InfluxCommunity/influxdb3-python

write: on_completed for batch callback

powersj opened this issue · 1 comments

Use Case

If we are going to provide a batching capability by default, we need to also provide a way to determine the overall result. In order to track how many success and failures occurred during a batch write, can we have a on_completed callback function? There was something similar in the v2 client.

Expected behavior

Ideally we would be able to report how many sucess or errors occured during a batch such that a user could do:

client.write(records)
client.write(records)

and get back a message during the on_completed callback

failed to write 0 of 10 batches
failed to write 1 of 10 batches

Note how the number of batches is unique to each call. Would something like this be possible?

Actual behavior

You can add a similar message current to each success and error, but cannot track the total for all batches.

Additional info

No response

@powersj, if we need total statistics for written data, we can utilize the existing callback functionality. Here’s an example:

from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError, InfluxDBClient3, WritePrecision

class BatchingCallback(object):
    def __init__(self):
        self.success_count = 0
        self.error_count = 0

    def success(self, conf, data: str):
        self.success_count += 1
        print(self)

    def error(self, conf, data: str, exception: InfluxDBError):
        self.error_count += 1
        print(self)

    def retry(self, conf, data: str, exception: InfluxDBError):
        print(f"Retryable error occurred for batch: {conf}, data: {data}, retry: {exception}")

    def __str__(self):
        return f"Success: {self.success_count}, Error: {self.error_count}, Total: {self.success_count + self.error_count}"

callback = BatchingCallback()

write_options = write_client_options(success_callback=callback.success,
                                     error_callback=callback.error,
                                     retry_callback=callback.retry,
                                     write_options=WriteOptions(batch_size=2))

with InfluxDBClient3(host="http://localhost:8086",
                     token="my-token",
                     org="my-org",
                     database="my-bucket",
                     write_client_options=write_options,
                     debug=False) as client:
    # Write first data batch
    records_1 = [
        "mem,host=host1 used_percent=23.43234543 1718621150",
        "mem,host=host2 used_percent=23.43234543 1718621150",
        "mem,host=host3 used_percent=23.43234543 1718621150"
    ]
    client.write(records_1, write_precision=WritePrecision.S)

    # Write second data batch
    records_2 = [
        "mem,host=host1 used_percent=23.43234543 1718621155",
        "mem,host=host2 used_percent=23.43234543 1718621155",
        "mem,host=host3 used_percent=23.43234543 1718621155"
    ]
    client.write(records_2, write_precision=WritePrecision.S)

    # Write error batch
    records_3 = ["mem,not_valid=true 1718621155"]
    client.write(records_3, write_precision=WritePrecision.S)

This implementation will print total statistics of written batches after every write request to InfluxDB.