Date & Timestamp parsing fails
Opened this issue · 0 comments
johanhenriksson commented
Im trying to read from a spark stream with a schema consisting of two test fields, one of type Date
and the other of type Timestamp
.
The row that causes problems looks as follows:
xadd events * time "2022-10-14T13:33:15.012345" date "2022-10-14" event click
This exact data parses totally fine if i read from a stream of json files without any manual conversion. However, when i read the very same row from redis streams, i first get a format error:
Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
Which is strange, since the json stream parses just fine. However, changing the timestamp to this format causes an even weirder error:
class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
...
Removing the timestamp column entirely and trying to parse a simple date 2022-10-14
causes a similar error:
Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.lang.Integer (java.sql.Date is in module java.sql of loader 'platform'; java.lang.Integer is in module java.base of loader 'bootstrap')
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
...
My code doesn't do anything special, it basically looks like this:
schema = StructType([
StructField('time', TimestampType()),
StructField('date', DateType()),
StructField('event', StringType()),
])
df = spark.readStream \
.format('redis') \
.option('stream.keys', redis_stream) \
.schema(schema) \
.load()
def batch_writer(bdf: DataFrame, batch_id: int) -> None:
bdf.write.format('delta') \
.mode('append') \
.save(table_path)
query = df.writeStream.format('delta') \
.foreachBatch(batch_writer) \
.outputMode('update') \
.option('checkpointLocation', f'{table_path}/_checkpoints') \
.start()
query.awaitTermination()
Any ideas?