delta-io/connectors

Flink-delta incompatibility with AWS Serverless Runtime (Amazon Kinesis Data Analytics) - Flink 1.13

artesdi opened this issue · 2 comments

I'm trying to deploy a flink-delta connector on AWS Runtime to sink data on s3 in parquet.
Everything works perfectly on my local machine in a local environment
StreamExecutionEnvironment.createLocalEnvironment();

But it fails on AWS Serverless runtime (Amazon Kinesis Data Analytics, Flink 1.13 ) with IllegalAccessError exception
DeltaBulkPartWriter tried to access OutputStreamBasedPartFileWriter.<init>

AWS Flink Runtime: Apache Flink 1.13

Application dependencies:

flinkVersion = '1.13.6'
flinkParquetVersion = '1.8.1'
flinkDeltaVersion = '0.4.1'
deltaStandaloneVersion = '0.4.1'
kinesisAnalyticsRuntimeVersion = '1.2.0'
kinesisAnalyticsFlinkVersion = '2.0.0'
hadoopVersion = '3.1.0'

Is there a chance to test it with a flink-delta connector 0.5.0'?

I see that 0.4.1 is only accessible in maven on this date, but on Readme.md it said that 0.5.0 should be already there!
https://mvnrepository.com/artifact/io.delta/delta-flink

Exception

java.lang.IllegalAccessError: class org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter tried to access method 'void org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.<init>(java.lang.Object, org.apache.flink.core.fs.RecoverableFsDataOutputStream, long)' 

Stacktrace

java.lang.IllegalAccessError: class org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter tried to access method 'void org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.<init>(java.lang.Object, org.apache.flink.core.fs.RecoverableFsDataOutputStream, long)' (org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @46c59c8f; org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter is in unnamed module of loader 'app')
	at org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkPartWriter.<init>(DeltaBulkPartWriter.java:77)
	at org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter.openNew(DeltaBulkBucketWriter.java:83)
	at org.apache.flink.streaming.api.functions.sink.filesystem.DeltaBulkBucketWriter.openNew(DeltaBulkBucketWriter.java:41)
	at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90)
	at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
	at io.delta.flink.sink.internal.writer.DeltaWriterBucket.rollPartFile(DeltaWriterBucket.java:252)
	at io.delta.flink.sink.internal.writer.DeltaWriterBucket.write(DeltaWriterBucket.java:331)
	at io.delta.flink.sink.internal.writer.DeltaWriter.write(DeltaWriter.java:259)
	at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.processElement(AbstractSinkWriterOperator.java:80)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

Sorry the readme is wrong, 0.5.0 is not released yet (but coming soon)

Closed as a known issue, shading of org.apache.flink.streaming.api.functions.sink.filesystem helped.