tensorflow/ecosystem

Cannot convert field to unsupported data type StructType([StructField("user_flush_num", ArrayType(IntegerType(), True)),StructField("field2", ArrayType(IntegerType(), True))])

jia66wei opened this issue · 2 comments

ss = SparkSession.builder.appName("123").getOrCreate()
struct = StructType([StructField("user_flush_num", ArrayType(IntegerType(), True)),StructField("field2", ArrayType(IntegerType(), True))])
struct1 = StructType([StructField('f1',StringType(),True), StructField('list',ArrayType(struct))])
data1 = sc.parallelize(["a","b","c"]).map(lambda x: [x,[[[1,2,3],[1,2,3]],[(1,2,3),(1,2,3)]]])
df = ss.createDataFrame(data1,struct1)
df.createOrReplaceTempView("table")
res = ss.sql("select * from table")
print(res.take(10))

out is ok:
[Row(f1='a', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])]), Row(f1='b', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])]), Row(f1='c', list=[Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3]), Row(user_flush_num=[1, 2, 3], field2=[1, 2, 3])])]

but, when convert to tfrecord is wrong:

df.write.format("tfrecords").option("recordType", "Example").option(
"codec", "org.apache.hadoop.io.compress.GzipCodec"
).save(output_root + "/train.tfrecord")

wrong info:
Caused by: java.lang.RuntimeException: Cannot convert field to unsupported data type StructType(StructField(item_newsid,IntegerType,true))
at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.org$tensorflow$spark$datasources$tfrecords$serde$DefaultTfRecordRowEncoder$$encodeFeature(DefaultTfRecordRowEncoder.scala:144)
at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:64)
at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:61)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.encodeExample(DefaultTfRecordRowEncoder.scala:61)
at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$3.apply(DefaultSource.scala:61)
at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$3.apply(DefaultSource.scala:58)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:151)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$$anonfun$4.apply(SparkHadoopMapReduceWriter.scala:148)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1374)
at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.org$apache$spark$internal$io$SparkHadoopMapReduceWriter$$executeTask(SparkHadoopMapReduceWriter.scala:163)
... 8 more

need help, thank you!

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/example/example.proto
can help, tfrecord format is limit, example can support:
which is oneof packed BytesList,// FloatList, or Int64List). >