AbsaOSS/pramen

Add support for initialization and finalization hooks

Closed this issue · 0 comments

Background

Currently, the running of the pipeline is completely controlled by Pramen orchestrater. The orchestrator decides which transformations to run and in which order. The only way to initialize the runtime in a custom manner is to include initialization check in each transformer, which is inconvenient.

Initialization and finalization hooks will help initialize various resources needed for the pipeline as the whole.

Feature

Add support for initialization and finalization hooks.

Proposed Solution

pramen.hook.startup.class = "com.example.myhook.MyStartupHook"
pramen.hook.shutdown.class = "com.example.myhook.MyShutdownHook"

The hook class should extend Runnable and can optionally take the config as the parameter. Pramen will pass the workflow configuration as the constructor parameter if it is available.

import com.typesafe.config.Config

class MyStartupHook(conf: Config) extends Runnable {
  override def run(): Unit = {
    ???
  }
}

You can get the current Spark session inside the method in the usual way. The Spark session is guaranteed to be created at this point.

val spark = SparkSession.builder().getOrCreate()

The final hook is invoked when all jobs has finished (successfully or failed), but the Spark session is not stopped yet, and the notification email is not yet sent.