Error when running benchmark tests
Xoerk opened this issue · 14 comments
Hi,
We would like to try using this connector in our production environment and for that, we started to run some benchmarking tests, our Flink app consumes events from Kinesis where each event is ~2k, then uses delta sink to convert to parquet and into delta.
Our parquet schema contains ~40 columns in different datatype.
We have tried running performance tests and noticed failures in:
2022-06-02 15:11:54 | at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
2022-06-02 15:11:54 | at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:99)
2022-06-02 15:11:54 | at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
2022-06-02 15:11:54 | at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
2022-06-02 15:11:54 | at java.nio.Bits.reserveMemory(Bits.java:695)
2022-06-02 15:11:54 | 2022-06-02 12:11:54,918 WARN org.apache.flink.runtime.taskmanager.Task [] - Sink DeltaAssetPacketSink (1/4)#8 (c1a0c51d8c2274b48092b9c0db8009ee) switched from RUNNING to FAILED with failure cause: java.lang.OutOfMemoryError: Direct buffer memory
And also failures such as:
2022-06-02 12:41:39 | at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1$adapted(ParquetRecord.scala:167)
2022-06-02 12:41:39 | at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1(ParquetRecord.scala:173)
2022-06-02 12:41:39 | at com.github.mjakubowski84.parquet4s.MapParquetRecord.write(ParquetRecord.scala:460)
2022-06-02 12:41:39 | at org.apache.parquet.schema.GroupType.getType(GroupType.java:207)
2022-06-02 12:41:39 | at org.apache.parquet.schema.GroupType.getFieldIndex(GroupType.java:175)
2022-06-02 12:41:39 | }
2022-06-02 12:41:39 | }
2022-06-02 12:41:39 | optional binary value (STRING);
2022-06-02 12:41:39 | required binary key (STRING);
2022-06-02 12:41:39 | repeated group key_value {
Our testing Includes a load of sending ~2M events separate across 30min.
Our setup includes:
- Flink 1.14.4
- Scala 2.12
- 2 TaskManagers with 4gig memory
- Parallelism 2
- Checkpoint every 15min
AWS Kinesis event source,
Sink - S3 as Parquet and Delta log storage
It seems like the Flush of the delta happens only at checkpoint which might cause the out-of-memory problem.
Is there any configuration that could be done on the Delta sink to modify it? Is there any benchmarking test done on this connector?
@Xoerk Could you provide full stack traces, please? Especially the second one, since it does not show what the actual error is. Thanks!
@Xoerk
could you also share were there any partitions defined for this table (sink configuration)? If yes then how many distinct values +/-.
Thanks.
@Xoerk Flyby comment about the checkpointing. Committing newly written parquet files to the Delta table only happens at checkpoint boundaries. This is by design. This is what ensures exactly-once guarantee when writing into data into Delta. However, it wont cause out of memory problem with a large checkpoint interval like 15 mins, because actually is continuously written and flushed into underlying parquet files (so not buffered in memory) during that interval. Its just that those parquet are not visible in the delta table until they are committed at the checkpoint boundary. hope that makes sense.
It seems like the Flush of the delta happens only at checkpoint which might cause the out-of-memory problem.
In fact the data is flushed periodically, whenever internal buffers are filled. During checkpoint the files are "finalized" and renamed as described in the diagram here: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/#file-sink.
I guess that the root cause might be the number of writers opened in a task manager. If we assume that:
(1) taskmanager.numberOfTaskSlots=T
.
(2) There are N
partitions in your Delta table.
(3) Your Flink's DataStream IS NOT keyed by partition columns - that means each sink subtask writes to all partitions!
(4) Your object have C
columns.
then on each TaskManager there will be T
open io.delta.flink.sink.internal.writer.DeltaWriter
s, T*N
open org.apache.parquet.hadoop.ParquetWriter
and T*N*C
open org.apache.parquet.column.impl.ColumnWriterV1
. Each writer keeps some in-memory buffer.
This is clearly visible if you analyze taskmanager's memory dump (you can enable memory dumps by adding env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/some/path/taskmanager.hprof"
).
It is possible that your job is short on memory even before the checkpoint and OOM occurs during the checkpoint when memory probably increases temporarily and hits the limit. However, I need to investigate it deeper.
There are several questions that should help us identify the issue:
- Does the job fail exactly when the checkpoint is triggered?
- Is the delta table partitioned? How many partitions are there?
- What transformations are done in your job? (in particular any keyBy() operations).
When it comes to the second stacktrace, I think it is irrelevant to the OOM problem and deserves a separate ticket. The following stacktrace is thrown when Delta Flink Connector tries to create a checkpoint (Delta checkpoint, not Flink checkpoint) after the first 10 commits.
Caused by: org.apache.parquet.io.InvalidRecordException: map not found in optional group partitionValues (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
at org.apache.parquet.schema.GroupType.getFieldIndex(GroupType.java:175)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:207)
at com.github.mjakubowski84.parquet4s.MapParquetRecord.write(ParquetRecord.scala:460)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1(ParquetRecord.scala:173)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1$adapted(ParquetRecord.scala:167)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.write(ParquetRecord.scala:167)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.$anonfun$write$2(ParquetWriter.scala:175)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.$anonfun$write$2$adapted(ParquetWriter.scala:169)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.write(ParquetWriter.scala:169)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.write(ParquetWriter.scala:162)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.$anonfun$write$1(ParquetWriter.scala:144)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.$anonfun$write$1$adapted(ParquetWriter.scala:143)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.write(ParquetWriter.scala:143)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.write(ParquetWriter.scala:149)
at io.delta.standalone.internal.Checkpoints$.$anonfun$writeCheckpoint$4(Checkpoints.scala:256)
at io.delta.standalone.internal.Checkpoints$.$anonfun$writeCheckpoint$4$adapted(Checkpoints.scala:255)
at scala.collection.immutable.List.foreach(List.scala:388)
at io.delta.standalone.internal.Checkpoints$.writeCheckpoint(Checkpoints.scala:255)
at io.delta.standalone.internal.Checkpoints.checkpoint(Checkpoints.scala:125)
at io.delta.standalone.internal.Checkpoints.checkpoint$(Checkpoints.scala:121)
at io.delta.standalone.internal.DeltaLogImpl.checkpoint(DeltaLogImpl.scala:41)
at io.delta.standalone.internal.OptimisticTransactionImpl.postCommit(OptimisticTransactionImpl.scala:383)
at io.delta.standalone.internal.OptimisticTransactionImpl.commit(OptimisticTransactionImpl.scala:155)
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.doCommit(DeltaGlobalCommitter.java:319)
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:222)
at org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperator.commit(StreamingGlobalCommitterOperator.java:83)
at org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator.commitUpTo(AbstractStreamingCommitterOperator.java:154)
at org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterOperator.notifyCheckpointComplete(AbstractStreamingCommitterOperator.java:136)
at org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperator.notifyCheckpointComplete(StreamingGlobalCommitterOperator.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:283)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:990)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:961)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:977)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:302)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
In addition, I prepared a github repo in which I recreated both issues: https://github.com/grzegorz8/issue362
Hi All,
Sorry for the delayed response, @grzegorz8 I saw you managed to reproduce the issue, but just in case here are my full stack traces of both problems I have:
2022-06-02 12:11:54,945 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - OperationRouterTag (2/2) (1e235bcbf07d0b28c6b80b3a1d707ab9) switched from RUNNING to CANCELING.
2022-06-02 12:11:54,944 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job company-service (00000000000000000000000000000000) switched from state RUNNING to RESTARTING.
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:163) ~[flink-dist_2.12-1.14.4.jar:1.14.4]
at io.delta.flink.sink.internal.writer.DeltaWriter.write(DeltaWriter.java:259) ~[?:?]
at io.delta.flink.sink.internal.writer.DeltaWriterBucket.write(DeltaWriterBucket.java:334) ~[?:?]
at shaded.DeltaBulkPartWriter.write(DeltaBulkPartWriter.java:93) ~[?:?]
at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52) ~[?:?]
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301) ~[?:?]
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) ~[?:?]
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:70) ~[?:?]
at org.apache.flink.formats.parquet.row.ParquetRowDataBuilder$ParquetWriteSupport.write(ParquetRowDataBuilder.java:88) ~[?:?]
at org.apache.flink.formats.parquet.row.ParquetRowDataWriter.write(ParquetRowDataWriter.java:88) ~[?:?]
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:307) ~[?:?]
at org.apache.parquet.column.impl.ColumnWriteStoreV1.endRecord(ColumnWriteStoreV1.java:27) ~[?:?]
at org.apache.parquet.column.impl.ColumnWriteStoreBase.endRecord(ColumnWriteStoreBase.java:187) ~[?:?]
at org.apache.parquet.column.impl.ColumnWriteStoreBase.sizeCheck(ColumnWriteStoreBase.java:200) ~[?:?]
at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:315) ~[?:?]
at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:53) ~[?:?]
at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:122) ~[?:?]
at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) ~[?:?]
at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:312) ~[?:?]
at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:421) ~[?:?]
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:249) ~[?:?]
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) ~[?:?]
at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) ~[?:?]
at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:99) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_332]
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_332]
at java.nio.Bits.reserveMemory(Bits.java:695) ~[?:1.8.0_332]
java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
2022-06-02 12:11:54,936 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink DeltaSink (1/4) (c1a0c51d8c2274b48092b9c0db8009ee) switched from RUNNING to FAILED on company-svc-taskmanager-1-4 @ ip-10-60-11-157.ec2.internal (dataPort=45943).
2022-06-02 12:11:54,932 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Sink DeltaSink (1/4)#8 c1a0c51d8c2274b48092b9c0db8009ee.
2022-06-02 12:11:54,919 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Sink DeltaSink (1/4)#8 (c1a0c51d8c2274b48092b9c0db8009ee)
second problem
at com.github.mjakubowski84.parquet4s.MapParquetRecord.write(ParquetRecord.scala:460) ~[?:?]
at org.apache.parquet.schema.GroupType.getType(GroupType.java:207) ~[?:?]
at org.apache.parquet.schema.GroupType.getFieldIndex(GroupType.java:175) ~[?:?]
}
}
optional binary value (STRING);
required binary key (STRING);
repeated group key_value {
org.apache.parquet.io.InvalidRecordException: map not found in optional group partitionValues (MAP) {
2022-06-06 06:31:52,331 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink DeltaAssetPacketSink Global Committer (1/1) (121142e6bc2aef2d84fa8480bc17554f) switched from RUNNING to FAILED on company-svc-taskmanager-2-1 @ ip-10-60-11-144.ec2.internal (dataPort=37505).
2022-06-06 06:31:52,326 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Sink DeltaAssetPacketSink Global Committer (1/1)#18 121142e6bc2aef2d84fa8480bc17554f.
2022-06-06 06:31:52,323 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Sink DeltaAssetPacketSink Global Committer (1/1)#18 (121142e6bc2aef2d84fa8480bc17554f).
at java.lang.Thread.run(Thread.java:750)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$18(StreamTask.java:1406)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$16(StreamTask.java:1374)
at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1426)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:152)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.notifyCheckpointComplete(CommitterOperator.java:95)
at org.apache.flink.streaming.runtime.operators.sink.GlobalStreamingCommitterHandler.notifyCheckpointCompleted(GlobalStreamingCommitterHandler.java:95)
at org.apache.flink.streaming.runtime.operators.sink.AbstractStreamingCommitterHandler.commitUpTo(AbstractStreamingCommitterHandler.java:146)
at org.apache.flink.streaming.runtime.operators.sink.GlobalStreamingCommitterHandler.commit(GlobalStreamingCommitterHandler.java:76)
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:221)
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.doCommit(DeltaGlobalCommitter.java:318)
at io.delta.standalone.internal.OptimisticTransactionImpl.commit(OptimisticTransactionImpl.scala:155)
at io.delta.standalone.internal.OptimisticTransactionImpl.postCommit(OptimisticTransactionImpl.scala:383)
at io.delta.standalone.internal.DeltaLogImpl.checkpoint(DeltaLogImpl.scala:41)
at io.delta.standalone.internal.Checkpoints.checkpoint$(Checkpoints.scala:122)
at io.delta.standalone.internal.Checkpoints.checkpoint(Checkpoints.scala:126)
at io.delta.standalone.internal.Checkpoints$.writeCheckpoint(Checkpoints.scala:256)
at scala.collection.immutable.List.foreach(List.scala:388)
at io.delta.standalone.internal.Checkpoints$.$anonfun$writeCheckpoint$4$adapted(Checkpoints.scala:256)
at io.delta.standalone.internal.Checkpoints$.$anonfun$writeCheckpoint$4(Checkpoints.scala:257)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.write(ParquetWriter.scala:149)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.write(ParquetWriter.scala:143)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.$anonfun$write$1$adapted(ParquetWriter.scala:143)
at com.github.mjakubowski84.parquet4s.DefaultParquetWriter.$anonfun$write$1(ParquetWriter.scala:144)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:301)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.write(ParquetWriter.scala:162)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.write(ParquetWriter.scala:169)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.$anonfun$write$2$adapted(ParquetWriter.scala:169)
at com.github.mjakubowski84.parquet4s.ParquetWriteSupport.$anonfun$write$2(ParquetWriter.scala:175)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.write(ParquetRecord.scala:167)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1$adapted(ParquetRecord.scala:167)
at com.github.mjakubowski84.parquet4s.RowParquetRecord.$anonfun$write$1(ParquetRecord.scala:173)
at com.github.mjakubowski84.parquet4s.MapParquetRecord.write(ParquetRecord.scala:460)
at org.apache.parquet.schema.GroupType.getType(GroupType.java:207)
at org.apache.parquet.schema.GroupType.getFieldIndex(GroupType.java:175)
}
}
optional binary value (STRING);
required binary key (STRING);
repeated group key_value {
2022-06-06 06:31:52,323 WARN org.apache.flink.runtime.taskmanager.Task [] - Sink DeltaAssetPacketSink Global Committer (1/1)#18 (121142e6bc2aef2d84fa8480bc17554f) switched from RUNNING to FAILED with failure cause: org.apache.parquet.io.InvalidRecordException: map not found in optional group partitionValues (MAP) {
2022-06-06 06:31:43,686 INFO io.delta.standalone.internal.DeltaLogImpl [] - Updated snapshot to io.delta.standalone.internal.SnapshotImpl@42fb9429
@kristoffSC, for your question we have only 2 partition column
@Xoerk Thanks a lot for the stack traces. They are extremely useful, especially the first one.
for your question we have only 2 partition column
The crucial aspect is how many partitions there are. In other words, what is the number of distinct values in partition columns.
I also would like you to answer these two more questions which would greatly help me with recreating the problem reliably:
- Does the job fail exactly when the checkpoint is triggered?
- What transformations are done in your job (in particular any keyBy() operations)?
When it comes to the InvalidRecordExceptions
, it is dependencies problem. Let me explain in more detail.
com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.resolveSchema
is used to resolve parquet schema from a class definition (AddFile
in our case).
private val internalWriter = ParquetWriter.internalWriter(
path = new Path(path),
schema = ParquetSchemaResolver.resolveSchema[T],
options = options
)
ParquetSchemaResolver
, in turn, imports org.apache.parquet.schema._
from parquet-column
dependency. Now we have a dependency conflict:
flink-parquet:1.12.0 --> parquet-hadoop:1.11.1 --> parquet-column:1.11.1
delta-standalone:0.4.1 --> parquet-hadoop:1.10.1 --> parquet-column:1.10.1
delta-standalone:0.4.1 --> parquet4s-core_2.12:1.2.1 --> parquet-hadoop:1.10.1 --> parquet-column:1.10.1
In fact parquet-column:1.11.1
is loaded. In consequence org.apache.parquet.schema.Types.MapBuilder
is loaded, which uses:
return builder
.repeatedGroup().addFields(keyType, valueType).named(ConversionPatterns.MAP_REPEATED_NAME)
.named(name);
where
static final String MAP_REPEATED_NAME = "key_value";
while MapBuilder
in version 1.10.1 uses:
return builder
.repeatedGroup().addFields(keyType, valueType).named("map")
.named(name);
What is more, parquet4s
's com.github.mjakubowski84.parquet4s.MapParquetRecord
assumes that map field is still named "map":
private val MapKeyValueFieldName = "map"
In consequence, the following error is thrown:
Caused by: org.apache.parquet.io.InvalidRecordException: map not found in optional group partitionValues (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
MapKeyValueFieldName
has been changed in parquet4s
v1.8.0.
Parquet is upgraded to 1.12.0. Please note a change that is not breaking in case of interoperability with older versions of Parquet4S and Spark but might (however shouldn't) be breaking in case of other systems - from now on Map is saved internally using key_value field instead of map.
At first glance there are two possible solutions:
- Upgrade
parquet4s
version. - Relocate parquet dependencies from
delta-standalone
so that they do not conflict with Flink's parquet dependencies. As far as I know Flink connector uses flink-parquet dependencies to write actual data files.
Let me know what do you think.
@grzegorz8 - we should be able to upgrade to parquet4s 1.9.4, as mentioned here #303 (comment). That should resolve this issue, correct?
parquet4s 1.9.4 depends on parquet-hadoop 1.12.2.
@scottsand-db At first glance it looks good. I updated parquet version as suggested (except the fact that parquet4s:1.9.4 depends on parquet-hadoop:1.12.0), then I run my test job with newly created delta-flink
and delta-standalone
artifacts, and checkpoints were created successfully.
val parquet4sVersion = "1.9.4"
val parquetHadoopVersion = "1.12.0"
I checked it on master
and v0.4.1
.
-rw-r--r-- 1 grzegorz grzegorz 3,4K cze 15 13:29 00000000000000000021.json
-rw-r--r-- 1 grzegorz grzegorz 26 cze 15 13:29 _last_checkpoint
-rw-r--r-- 1 grzegorz grzegorz 19K cze 15 13:29 00000000000000000020.checkpoint.parquet
-rw-r--r-- 1 grzegorz grzegorz 160 cze 15 13:29 .00000000000000000020.checkpoint.parquet.crc
-rw-r--r-- 1 grzegorz grzegorz 3,4K cze 15 13:29 00000000000000000020.json
@grzegorz8 - awesome. want to make this into a PR?
next up, what do you think about us shading all parquet4s and hadoop-parquet libraries in delta-standalone? that way they are independent of what is available in the execution environment.
awesome. want to make this into a PR?
@scottsand-db Sure, I'll do.
next up, what do you think about us shading all parquet4s and hadoop-parquet libraries in delta-standalone? that way they are independent of what is available in the execution environment.
This is exactly what I have proposed as the alternative. Now I think this is a better approach. The final delta-standalone jar will be slightly bigger, but we will pretty sure there are no conflicts with other libraries.