If the source/target schemas drift, your pipeline doesn’t compile. This blog demonstrates that claim with Scala 3 macros (quotes reflection; Mirrors optional) + Spark 3.5.
Pipelines don’t even compile if producer/contract schemas drift. This article proves it with Scala 3 macros (compile-time evidence) and Spark structural checks (runtime pin).
A tiny but complete proof-of-concept:
- Policies describe how two schemas should match (exact, by position, by name and order, backward/forward compatible, etc.).
- A macro computes a deep structural shape of your case classes and proves at compile time that the producer type conforms to the target contract under a selected policy.
- At runtime, we mirror the policy with Spark’s built-in schema comparators for extra safety.
If the proof cannot be derived, your code fails to compile. No surprises at midnight.
Schema changes are the sneakiest failures in data systems.
Here, the compiler enforces your intent: if Out no longer conforms to Contract under a policy P, compilation aborts with a readable diff.
At runtime, Spark's schema comparators add a second seatbelt. (Scala Documentation)
Data shape drift is subtle (nullability, reordering, nested options, case changes, maps/arrays). This article pushes those checks to the compiler. You get fast feedback, explicit diffs, and documented intent via policy types.
- Policies as types -
SchemaPolicyencodes how to compare schemas (Exact, Ordered, ByPosition, Backward/Forward, Full) semantics as singleton types. - Macro shape - The macro in
ContractsCorewalks your types via Scala 3 quotes reflection and builds a normalized shape. A Scala 3 macro inspects our case classes (usingquotes/reflect), builds a normalized structural TypeShape, and computes a diff. If non-empty => compile error. Mirrors are not required here; this POC usesinline+${ ... }+TypeReprdirectly. (Scala Documentation) - Compile-time fuse - code that wires a sink must provide
SchemaConforms[Out, Contract, P]. If it can’t be summoned, the pipeline won’t compile. - Runtime pin (Spark) - we mirror the policy with Spark’s built-in schema comparators:
- unordered, case-insensitive, ignore nullability -->
DataType.equalsIgnoreCaseAndNullability - by position ->
DataType.equalsStructurally - ordered by name (CS/CI) ->
DataType.equalsStructurallyByNamewith the chosen resolver. (Apache Spark)
- unordered, case-insensitive, ignore nullability -->
- Scala 3.3.x
- Spark 3.5.x (
spark-sql) - Scala 3 consumes the 2.13 artifacts via TASTy. - A JVM 11+.
- Quotes-first: macros are structured around
inline/splice (${ ... }) andquotes/reflectAPIs. We useinline given derived[...] = ${ ... }and traverseTypeReprto compute deep shapes and diffs, emitting precise compile-time errors viareport.errorAndAbort. - Mirrors optional: Scala 3 introduces compiler‑derived
Mirrors for ADTs that enable higher‑level generic derivation. This POC does not rely onMirror.Of; the reflection is explicit for control and clarity. You can layer Mirror‑based derivation on top later if desired.
import ctdc.ContractsCore.{SchemaPolicy, CompileTime}
import CompileTime.SchemaConforms
final case class ContractUser(id: Long, email: String, age: Option[Int] = None)
final case class OutExact_Same(id: Long, email: String, age: Option[Int])
// If fields/types drift, this line fails at compile time with a diff:
val ev: SchemaConforms[OutExact_Same, ContractUser, SchemaPolicy.Exact.type] = summon
// Or use the ergonomic inline helper:
import CompileTime.conforms
val ev2 = conforms[OutExact_Same, ContractUser, SchemaPolicy.Exact.type]import ctdc.ContractsCore.SchemaPolicy
import ctdc.SparkCore.*
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.*
import java.nio.file.Files
object Demo:
final case class CustomerContract(id: Long, email: String, age: Option[Int] = None)
final case class CustomerProducer(id: Long, email: String, age: Option[Int], segment: String)
final case class CustomerNext(id: Long, email: String, age: Option[Int])
@main def run(): Unit =
given spark: SparkSession =
SparkSession.builder().appName("ctdc").master("local[*]").getOrCreate()
try
// 1> Make CSV in a temp dir (no external files)
import spark.implicits.*
val header = "id,email,age,segment"
val rows = Seq("1,a@b.com,21,S", "2,b@c.com,,L")
val inDir = Files.createTempDirectory("ctdc_in").toUri.toString
(header +: rows).toDS.coalesce(1).write.text(inDir) // write CSV as text
// Read CSV with an explicit schema is first-class Spark: schema(...) + load(...)
// (same API pattern for csv/json/parquet).
// 2> Build & run pipelines
val src = TypedSource[CustomerProducer]("csv", inDir, Map("header" -> "true"))
val sink = TypedSink[CustomerContract](Files.createTempDirectory("ctdc_out").toUri.toString)
// A> No transform — compile-time fuse: Producer ⟶ Contract under Backward
val outA =
PipelineBuilder[CustomerContract]("A")
.addSource(src)
.noTransform
.addSink[CustomerContract, SchemaPolicy.Backward.type](sink) // compile-time evidence required here
.build
.apply(spark)
// B> Transform to a declared Next, then require Next ⟶ Contract under Exact
val dropExtras: DataFrame => DataFrame = _.select($"id", $"email", $"age")
val outB =
PipelineBuilder[CustomerContract]("B")
.addSource(src)
.transformAs[CustomerNext]("drop segment")(dropExtras)
.addSink[CustomerContract, SchemaPolicy.Exact.type](sink)
.build
.apply(spark)
println(s"A: ${outA.count()} rows"); println(s"B: ${outB.count()} rows")
finally spark.stop()Why no toDF()? Creating the CSV via a Dataset[String] avoids case-class encoders and keeps the example dependency-free. If you prefer Seq[CaseClass].toDF, see “Encoders (Scala 3)” below. toDF/toDS require import spark.implicits._ from that SparkSession. (Apache Spark)
Exact/ExactUnorderedCI->DataType.equalsIgnoreCaseAndNullabilityExactByPosition->DataType.equalsStructurallyExactOrdered(case-sensitive) /ExactOrderedCI(case-insensitive) ->DataType.equalsStructurallyByNameBackward/Forwardsubset rules are enforced at compile time; runtime pin still uses a tolerant comparator to catch accidental drift.
- Primitives (
Int,Long,Double,Boolean,String, Java time/sql basics) Option[T](nullable),List/Seq/Vector/Array/Set[T](elements nullable)Map[K,V]with atomic keys (String,Int,Long,Short,Byte,Boolean)- Nested case classes.
(These align naturally with Spark’s
StructType,ArrayType, andMapType) (ibiblio.uib.no)
Spark's product encoders historically rely on Scala 2 reflection (TypeTag). In Scala 3 you’ll see "missing TypeTag" if you do Seq[CaseClass].toDF() without extra help. Two options:
-
Add Scala 3 encoders lib
libraryDependencies += "io.github.vincenzobaz" %% "spark-scala3-encoders" % "0.3.2"
and
import scala3encoders.givennext toimport spark.implicits.*. (Scaladex) -
Stay DataFrame-only for inputs (as in the example): write CSV/JSON strings and read with an explicit schema via
DataFrameReader.schema(..).load(..). (Apache Spark)
- The compile-time proof relies on Scala 3 quotes reflection (
TypeRepr,AppliedType,=:=,<:<) - the official metaprogramming API. Mirrors are optional for this approach and currently unused in the POC. - The runtime validations exactly reuse Spark’s documented structural comparators, matching our policies 1-to-1.
- Context parameters (
using/given) make compile-time evidence explicit and ergonomic.
- Rock the JVM: Scala Macros & Metaprogramming course. (Rock the JVM)
- Scala 3 macros & reflection (
quotes,reflect,TypeRepr), and macro best practices. (Scala Documentation) - Spark structural comparators on
DataType. (Apache Spark) - CSV read/write and explicit schemas. (Apache Spark)
toDF/toDSviaimport spark.implicits._. (Apache Spark)- Scala 3 encoders for Spark (community). (Scaladex)
TL;DR Compile-time evidence + policy types make schema intent explicit and enforceable. Spark's comparators keep you safe at runtime. If schemas drift, your job doesn’t ship.