spotify/scio

SMB writes with CharSequence key can't be read with

clairemcginty opened this issue · 0 comments

Starting in 0.14 we discourage SMB reads with a CharSequence key, since there isn't a default implicit CharSequence Coder in scope. Using String as a key type with Avro now automatically works with key extraction fn, etc.

However, in some cases the upstream has been written with CharSequence key type, leading to the following error:

info] SortedBucketSource
[info] - should be able to read CharSequence-keyed sources written before 0.14 *** FAILED *** (2 seconds, 577 milliseconds)
[info]   java.lang.NullPointerException: Could not infer coder for key class class java.lang.String
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketPrimaryKeyedSource.keyTypeCoder(SortedBucketPrimaryKeyedSource.java:88)
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketSource.getOutputCoder(SortedBucketSource.java:186)
[info]   at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:155)
[info]   at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:135)
[info]   at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:557)
[info]   at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:489)
[info]   at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketIO$CoGbk.expand(SortedBucketIO.java:191)
[info]   at org.apache.beam.sdk.extensions.smb.SortedBucketIO$CoGbk.expand(SortedBucketIO.java:145)
[info]   at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:557)

Repro here: https://github.com/spotify/scio/compare/smb-charsequence-key?expand=1

The only workaround at this point is to keep the read Key type as CharSequence and define an implicit CharSequence coder like:

implicit val coder: Coder[CharSequence] = Coder[String].asInstanceOf[Coder[CharSequence]]
sc
   .sortMergeJoin(
     classOf[CharSequence],
     ...
)

We should relax this check to treat CharSequence/String as interoperable for Avro reads.