/flink-shapeless

ADT support for Flink with Shapeless

Primary LanguageScalaApache License 2.0Apache-2.0

Flink-Shapeless

Flink-Shapeless replaces the default macro based implicit provider for TypeInformation[T] in Apache Flink's Scala API with automatic type class derivation based on Shapeless.

Build Status codecov

Usage

The primary use case of Flink-Shapeless is to enable custom implicit TypeInformation instances in scope to override the default.

// Import Flink's Scala API as usual
import org.apache.flink.api.scala._
// Replace the macro-based TypeInformation provider
import derived.auto._

// Override TypeInformation[String]
implicit val strTypeInfo = MyASCIIStringTypeInfo

// Strings below are serialized with ASCII encoding,
// even when nested in tuples, data structures, etc.
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("/path/to/file")
val counts = text
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty).map(_ -> 1)
  .groupBy(0).sum(1)

Features

There are a couple of advantages to automatic type class derivation over the default macro based approach.

Customizability

Automatic derivation uses a modified version of the Scala implicit resolution mechanism with lowest priority. Thus it can be overridden for specific types by providing an implicit instance anywhere in scope, including in a companion object as idiomatic in Scala.

case class Foo(x: Int)
object Foo {
  implicit val info: TypeInformation[Foo] =
    MyOptimizedFooTypeInfo
}

case class Bar(foo: Foo, y: Double)

// All instances below use the optimized version.
implicitly[TypeInformation[Foo]]
implicitly[TypeInformation[List[Foo]]]
implicitly[TypeInformation[(Foo, Long)]]
implicitly[TypeInformation[Bar]]

Data Type Mappings

Creating custom serializers from scratch is usually not what you want to do. Most often, you want to map your custom data type to one with an existing serializer. This is where the Inject type class comes in (called like this because it is essentially an injective i.e. invertible function between the two data types). E.g. the following definition is enough to provide a TypeInformation instance for a Breeze Vector.

import breeze.linalg.Vector

implicit def injectVector[A]: Inject[Vector[A], Array[A]] =
  Inject(_.toArray, Vector(_))

Recursive ADTs

The default macro based implementation cannot handle Recursive data types or Coproducts without the use of reflection based serializers like Kryo. Only product types (tuples and case classes) are handled natively.

Flink-Shapeless extends the native Flink support to arbitrary Algebraic data types (ADTs) and will fail at compile time rather than default to runtime reflection. In Scala ADTs are encoded as sealed traits and case classes.

// Example: Recursive product
case class NTree[+A](v: A, children: List[NTree[A]])

// Example: Recursive coproduct
sealed trait BTree[+A]
case object BLeaf extends BTree[Nothing]
case class BNode[+A](l: BTree[A], v: A, r: BTree[A]) extends BTree[A]

Benchmarks

Checkout the TypeSerializer microbenchmarks comparing the default (Kryo) with the derived (via Shapeless) serializer on the NTree and BTree examples above. Flink-Shapeless achieves up to 10x speedup for NTree and up to 3x speedup for BTree.

More details about the setup:

  • Single threaded bulk serialization -> deserialization roundtrip
  • Random data generated with ScalaCheck
  • Varying number of trees (100-500) and number of nodes per tree (50-100)
  • Run on my development laptop with ScalaMeter

Limitations

There are a few well known limitations of automatic type class derivation with Shapeless.

  • Long compile times for large case classes and sealed trait hierarchies. Your mileage may vary.
  • Due to SI-7046 older versions of Scala may have problems with deriving TypeInformation for coproducts.