- Desarrollo de la aplicación con Scala+Spark
- Configuración del entorno de Spark+Airflow
- Ejecución de la prueba
Para el desarrollo de la aplicación se ha utilizado la versión 2.12.16 de Scala y la 3.2.1 de Spark, además de las siguientes herramientas:
-
Metadatos en Json
He asumido que los metadatos son generados a partir de un front conocido, por lo que tendríamos acceso a todas las opciones posibles y a la estructura concreta de dataflows, transformaciones, sinks...
-
Modelado de datos
A partir de asunción anterior, he decidido generar un modelo de datos en base a los metadatos, de forma que sea el modelo quien tenga las funcionalidades que correspondan a cada tipo de transformación, validación, etc... Aunque la jerarquía de transformaciones y acciones del ejemplo no es muy extensa, la intención sería utilizar una estructura de ADTs (Algebraic Data Types) para aprovechar la potencia y la fiabilidad que aporta la exhaustividad exigida por el pattern matching.
sealed trait Transformation {...} case class ValidateFields(name: String, input: String, validations: Vector[Validation]) extends Transformation {...} case class AddFields(name: String, input: String, additions: Vector[FieldAddition]) extends Transformation {...}
-
Decodificado de los metadatos
Para el decodificado del json, he preferido utilizar la libreria circe en vez de usar el motor de Spark. Con esta librería se tiene mayor control sobre decodificación, utilizando "cursores" para transformar las estructuras de datos del json en
case class
de Scala:implicit val transformationDecoder: Decoder[Transformation] = (cursor: ACursor) => { val transformationType = cursor.get[String]("type").getOrElse("") //transformation type attribute leads the decoding in Transformation transformationType match { case "validate_fields" => for { name <- cursor.get[String]("name") input <- cursor.downField("params").get[String]("input") validations <- cursor.downField("params").get[Vector[Validation]]("validations") } yield ValidateFields(name, input, validations) case "add_fields" => for { name <- cursor.get[String]("name") input <- cursor.downField("params").get[String]("input") additions <- cursor.downField("params").get[Vector[FieldAddition]]("addFields") } yield AddFields(name, input, additions) } }
-
Añadido de funcionalidades a los modelos
Una vez establecido el modelo, he añadido las funciones necesarias para que cada elemento puede realizar las tareas que secuencian el flujo de datos:
- Extraer datos de diferentes fuentes y ponerlas a disposición del resto de miembros del modelo.
- Aplicar distintas transformaciones a las fuentes que se especifiquen.
- Registrar el resultado de las transformaciones según el formato especificado.
-
Programa principal
Con el modelo completo con todas sus funcionalidades, el flujo principal del programa se ocupa de:
- Leer los metadatos.
- Decodificarlos según el modelo.
- Ejecutar los dataflows.
-
Configuración del entorno (WSL2, Docker) ✅
-
Spark
- Empaquetado del jar: compatibilidad entre versiones Scala-Spark
-
Airflow
- Codificación del DAG
- Ejecución de los operadores
- Uso del SparkSubmitOperator
- Cambio de API según versiones de Airflow
- Configuración de la conexión con Spark
- Selección del Conn type para Spark
- Codificación del DAG
...