apache/beam

[RRIO] [Task]: Add withErrorHandler to accept ErrorHandler.BadRecordErrorHandler

damondouglas opened this issue · 0 comments

What needs to happen?

A withErrorHandler method on RequestResponseIO accepts ErrorHandler.BadRecordErrorHandler and returns a PTransform<PCollection<RequestT>, PCollection<ResponseT>>.

The value of this new method is to enable chaining of API response PCollections to improve ease of use. For example,

try (BadRecordErrorHandler<T> errorHandler = pipeline.registerBadRecordErrorHandler(SomeDLQ.write())) {
    PCollection<ResponseT> results = records.apply(RequestResponseIO.of(...).withErrorHandler(errorHandler));
}

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)