tensorflow/io

Exception when not fully consuming generator/iterator that feeds into ArrowStreamDataset

kszlim opened this issue · 1 comments

kszlim commented

Repro code:

import tensorflow as tf
import tensorflow_io.arrow as arrow_io
import pyarrow as pa

if __name__ == "__main__":
    data = pa.array(list(range(5000)), type=pa.float32())
    names = ["data"]
    batches = (pa.RecordBatch.from_arrays([data], names=names) for _ in range(100))
    ds = arrow_io.ArrowStreamDataset.from_record_batches(batches, output_types=(tf.float32,), batch_size=5000, batch_mode="drop_remainder")
    for sample in ds.take(10):
        print(sample)

Exception in thread Thread-1 (run_server):
Traceback (most recent call last):
File ".../venv/lib/python3.10/site-packages/tensorflow_io/python/ops/arrow_dataset_ops.py", line 572, in run_server
writer.write_batch(batch)
File "pyarrow/ipc.pxi", line 484, in pyarrow.lib._CRecordBatchWriter.write_batch
File "/usr/lib/python3.10/socket.py", line 723, in write
return self._sock.send(b)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File ".../venv/lib/python3.10/site-packages/tensorflow_io/python/ops/arrow_dataset_ops.py", line 578, in run_server
outfile.close()
File "/usr/lib/python3.10/socket.py", line 723, in write
return self._sock.send(b)
BrokenPipeError: [Errno 32] Broken pipe

Anyone run into this?