RedisLabs/spark-redis

Date & Timestamp parsing fails

Opened this issue · 0 comments

Im trying to read from a spark stream with a schema consisting of two test fields, one of type Date and the other of type Timestamp.

The row that causes problems looks as follows:

xadd events * time "2022-10-14T13:33:15.012345" date "2022-10-14" event click

This exact data parses totally fine if i read from a stream of json files without any manual conversion. However, when i read the very same row from redis streams, i first get a format error:

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]

Which is strange, since the json stream parses just fine. However, changing the timestamp to this format causes an even weirder error:

class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
     at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
     at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     ...

Removing the timestamp column entirely and trying to parse a simple date 2022-10-14 causes a similar error:

Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.lang.Integer (java.sql.Date is in module java.sql of loader 'platform'; java.lang.Integer is in module java.base of loader 'bootstrap')
     at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
     at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     ...

My code doesn't do anything special, it basically looks like this:

    schema = StructType([
        StructField('time', TimestampType()),
        StructField('date', DateType()),
        StructField('event', StringType()),
    ])

    df = spark.readStream \
        .format('redis') \
        .option('stream.keys', redis_stream) \
        .schema(schema) \
        .load()

    def batch_writer(bdf: DataFrame, batch_id: int) -> None:
        bdf.write.format('delta') \
            .mode('append') \
            .save(table_path)

    query = df.writeStream.format('delta') \
        .foreachBatch(batch_writer) \
        .outputMode('update') \
        .option('checkpointLocation', f'{table_path}/_checkpoints') \
        .start()

    query.awaitTermination()

Any ideas?