delta-io/connectors

Support for other formats than Flink's Table API RowData format

BjarkeTornager opened this issue · 4 comments

Are there plans to support any other formats than RowData for the Flink Delta connector for both writing and reading? I am new to the Flink Table API but it seems like there is no easy way to convert objects, e.g. Avro records (SpecificRecord or GenericRecord) to RowData (or Generic RowData) and also generate the RowType to supply for the schema. So if I am not mistaken then I would need to implement my own custom converter (from e.g. Avro to RowData) to be able to use the Flink Delta connector?

tdas commented

Yes, you will need to write a converter from Avro to RowType. We can even consider contributing that into the delta-flink connector as an utility method.

@tdas thanks for your reply. That makes sense. I think such a utility would be useful for better adoption and integration with Flink. My use case involves using Flink with delta to write/read streaming Avro records to/from storage as Parquet, and I think that is quite a common use case.

It seems like a generic way of doing this would be:

import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils
import org.apache.flink.formats.avro.AvroToRowDataConverters
import org.apache.flink.table.data.RowData
import org.apache.flink.table.types.logical.RowType

.
.
.

val stream: DataStream[<MySpecificRecordClass>] = ...

val rowType: RowType = LogicalTypeUtils.toRowType(AvroSchemaConverter.convertToDataType(<MySpecificRecordClass>.SCHEMA$.toString).getLogicalType)

val converter: AvroToRowDataConverters.AvroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType)

val streamRowData: DataStream[RowData] = stream.map(record => converter.convert(record).asInstanceOf[RowData])

env.execute("avro to RowData conversion")

But maybe there is a better way of doing the conversion?

This repo has been deprecated and the code is moved under connectors module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See #556 for details.