GCS Source connector: Reached the end of stream with xx bytes left to read
Closed this issue · 2 comments
What version of the Stream Reactor are you reporting this issue for?
Release 8.1.4
Are you running the correct version of Kafka/Confluent for the Stream Reactor release?
I am running on Aiven Apache Kafka 3.8.0. My Kafka Connect is deployed using Strimzi on Kubernetes.
Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?
Yes, I am using GCS (Google Cloud Storage) as the data source and Kafka as the sink.
Have you read the docs?
Yes, I have read the documentation.
What is the expected behaviour?
I expect the connector to transfer Parquet files from GCS to a Kafka topic.
What was observed?
I encountered the following error:
java.io.EOFException: Reached the end of stream with 8861 bytes left to read
What is your Connect cluster configuration (connect-avro-distributed.properties)?
group.id: test-cluster
auto.create.topics.enable: true
offset.storage.topic: test-cluster-offsets
config.storage.topic: test-cluster-configs
status.storage.topic: test-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
publication.autocreate.mode: "filtered"
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
What is your connector properties configuration (my-connector.properties)?
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: eth-etl-gcs-source-parquet-connector0
labels:
strimzi.io/cluster: lambda-kafka-connect
spec:
class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
tasksMax: 1
config:
connect.gcpstorage.kcql: "
insert into eth-etl-tokens-from-parquet select * from data-lambda-ethereum-etl-tokens:tokens_parquet2 BATCH=10 STOREAS `Parquet` LIMIT 100;
"
topics: "eth-etl-tokens-from-parquet"
connect.gcpstorage.gcp.auth.mode: "File"
connect.gcpstorage.gcp.file: "${env:GOOGLE_APPLICATION_CREDENTIALS}"
connect.gcpstorage.gcp.project.id: "p2p-data-lambda"
connect.gcpstorage.error.policy: "THROW"
connect.gcpstorage.http.socket.timeout: 300000
connect.gcpstorage.source.extension.includes: "parquet"
connect.gcpstorage.source.partition.search.continuous: true
connect.gcpstorage.source.partition.search.interval: 300000
connect.gcpstorage.source.partition.search.recurse.levels: 0
errors.log.enable: "true"
errors.log.include.messages: "true"
log4j.logger.io.lenses.streamreactor.connect: "DEBUG"
log4j.logger.org.apache.parquet: "DEBUG"
log4j.logger.org.apache.hadoop: "DEBUG"
log4j.logger.io.lenses.streamreactor.connect.cloud.common: "DEBUG"
log4j.logger.io.lenses.streamreactor.connect.cloud.common.formats.reader: "DEBUG"
log4j.logger.com.google.cloud: "DEBUG"
log4j.logger.com.google.auth: "DEBUG"
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask: "DEBUG"
log4j.logger.org.apache.kafka.connect.runtime.WorkerTask: "DEBUG"
Please provide full log files (redact and sensitive information)
java.io.EOFException: Reached the end of stream with 8861 bytes left to read
at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:126)
at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:2165)
at org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1199)
at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1101)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1051)
at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1296)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:245)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:140)
at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.<init>(ParquetReaderIteratorAdaptor.scala:25)
at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.<init>(ParquetStreamReader.scala:35)
at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader$.apply(ParquetStreamReader.scala:76)
at io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection$.toStreamReader(FormatSelection.scala:196)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
at scala.util.Either.flatMap(Either.scala:360)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
at traverse @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
Parquet example:
❯ parquet meta ~/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet 20:23:49
File path: /Users/alinaglumova/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet
Created by: parquet-cpp-arrow version 13.0.0
Properties: (none)
Schema:
message schema {
required binary address (STRING);
optional binary symbol (STRING);
optional binary name (STRING);
optional binary decimals (STRING);
optional binary total_supply (STRING);
required int64 block_timestamp (TIMESTAMP(MICROS,false));
required int64 block_number;
required binary block_hash (STRING);
}
Row group 0: count: 1068 159.64 B records start: 4 total(compressed): 166.502 kB total(uncompressed):166.502 kB
--------------------------------------------------------------------------------
type encodings count avg size nulls min / max
address BINARY _ _ R 1068 47.51 B 0 "0x005c97569a24303e9ba6de6..." / "0xffffe5b9cb42b4996997c92..."
symbol BINARY _ _ R 1068 6.21 B 10 "" / "��"
name BINARY _ _ R 1068 10.27 B 10 "" / "����������"
decimals BINARY _ _ R 1068 0.47 B 65 "0" / "9"
total_supply BINARY _ _ R 1068 6.24 B 9 "0" / "9999999999999999999900000..."
block_timestamp INT64 _ _ R 1068 9.32 B 0 "2024-02-08T15:50:47.000000" / "2024-09-11T06:57:23.000000"
block_number INT64 _ _ R 1068 9.32 B 0 "19184445" / "20725728"
block_hash BINARY _ _ R 1068 70.31 B 0 "0x0002376d87ff1bbe5310679..." / "0xffae2542617a1ee9204fb27..."
Hi there,
We've released a new version today, v8.1.10. We made some major changes to the parquet handling in order to resolve this issue with large parquet files.
https://github.com/lensesio/stream-reactor/releases/tag/8.1.10
Please would you give it a go, if this is still something that is useful to you, and report back?
Kind regards
David.