streamnative/pulsar-spark

[BUG] Pulsar SQL can NOT read topic data correctly which were written by Spark-Pulsar

frank-dkvan opened this issue · 7 comments

Describe the bug
I just write number 1 to 10 into a Pulsar Topic thru spark-pulsar connector . When I use Pulsar SQL to read the 10-number, the exception info occured "Query 20190902_101219_00035_9r62s failed: Topic persistent://public/default/topic_test1 does not have a valid schema"

Screenshots

  1. A simple Spark-Pulsar code to write 1-10 value into a Pulsar Topic
    image

  2. Exceptional Info from Pulsar SQL CLI while reading topic data
    image

@frank-dkvan thanks for reporting!
Could you please try to use pulsar-spark connector to read topic_test1 out? Does that work?

val df = spark
  .read
  .format("pulsar")
  .option("service.url", "pulsar://localhost:6650")
  .option("admin.url", "http://localhost:8080")
  .option("topic", "topic_test1")
  .load()

df.printSchema()

Hi @yjshen yes, it works correctly if I use pulsar-spark connector to read topic_test1. only Pulsar SQL can NOT read the same topic correctly.

@frank-dkvan, could you please try other data types? such as int, or complex types? I'm afraid pulsar sql cannot handle Schema.BYTES correctly.

@yjshen I cast the column "value" into STRING Type,as shown in the snapshot above...it doesn't work in Pulsar SQL :-(

@yjshen I tried to cast the column "value" into JSON STRING Type, it still can NOT work correctly in Pulsar SQL :-( but pulsar-spark connector can read it correctly.

----------pasted the sample code as follows----------------
import spark.implicits._
val ds = spark.createDataset(1 to 10)
ds
.selectExpr("to_json(struct(*)) AS value","CAST(value AS STRING) AS __key")
.write
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topic", "topic_test7")
.save()

@frank-dkvan thanks for this. I'll check it.

Close this issue for it's stale.