spotify/scio

ParquetAvroTap does not set projection or dataSupplier

clairemcginty opened this issue · 2 comments

ParquetAvroTap does not use the value of params.suffix: https://github.com/spotify/scio/blob/46e72af/scio-parquet/src/main/scala/com/spotify/scio/parquet/avro/ParquetAvroIO.scala#L321-L343

repro in ParquetAvroIOTest:

  "ParquetAvroTap" should "work with projections" in {
    val schema = new Schema.Parser().parse(
      """
        |{
        |"type":"record",
        |"name":"TestRecordProjection",
        |"fields":[{"name":"int_field","type":["null", "int"]}]}
        |""".stripMargin
    )

    implicit val coder = avroGenericRecordCoder(schema)

    ParquetAvroTap(
      s"${testDir.toPath.resolve("*.parquet")}",
      ParquetAvroIO.ReadParam(identity[GenericRecord], schema)
    ).value.foreach { gr =>
      gr.get("int_field") should not be (null)
      gr.get("string_field") should be (null) // this assertion fails
    }
  }

Not setting dataSupplier can also be problematic for GenericRecord reads -- Parquet-Avro defaults to using a SpecificData model for reads, which will fail if a class matching the schema name exists on the classpath but doesn't implement a no-arg constructor.

(Note: for anyone impacted by this issue, you can work around it by setting projection/data supplier via Configuration:

val conf = ParquetConfiguration.of(
  AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[GenericDataSupplier] // Set data supplier
)

val projection: Schema = ...
AvroReadSupport.setRequestedProjection(conf, projection) // Set projection

// Use configuration in tap
val tap = ParquetAvroTap(path, ParquetAvroIO.ReadParam(identity[GenericRecord], conf = conf))

@clairemcginty probably you mean params.projection here:

ParquetAvroTap does not use the value of params.suffix

@clairemcginty probably you mean params.projection here:

ParquetAvroTap does not use the value of params.suffix

Ooops, yes, I meant projection 🤦‍♀️