SETL requirement failed Exception
LiuxyEric opened this issue · 2 comments
Hi, I am new to the SETL and trying to build my own application on top of it. However when I ran my application, It failed with the message:
In my application, I have two stages.
- Stage one contains three factories which produce DataFrame
- Stage two contains one factory and it needs all three output DataFrames from stage one's factories
I used @ Delivery (producer = classOf[FactoryName]) in Stage two's factory to get three outputs from Stage one's factories.
@Delivery(producer = classOf[ClassName1])
val df1 = spark.emptyDataFrame
@Delivery(producer = classOf[ClassName2])
val df2 = spark.emptyDataFrame
@Delivery(producer = classOf[ClassName3])
val df3 = spark.emptyDataFrame
I can't find out the problem. Would you please help me out.
Hey sorry, I didn't see this issue. Did you solve it finally?
Hi @LiuxyEric, I could not reproduce your issue. Did you add your second Stage before adding your first Stage ? Here's what I tried.
Declaring three Factories for the first Stage:
class Factory1 extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
override def read(): Factory1.this.type = this
override def process(): Factory1.this.type = this
override def write(): Factory1.this.type = this
override def get(): DataFrame = List("1").toDF("id")
}
class Factory2 extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
override def read(): Factory2.this.type = this
override def process(): Factory2.this.type = this
override def write(): Factory2.this.type = this
override def get(): DataFrame = List("2").toDF("id")
}
class Factory3 extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
override def read(): Factory3.this.type = this
override def process(): Factory3.this.type = this
override def write(): Factory3.this.type = this
override def get(): DataFrame = List("3").toDF("id")
}
Declaring a Factory that ingest the results of the three previous Factories for the second Stage:
class FinalFactory extends Factory[Unit] with HasSparkSession {
@Delivery(producer = classOf[Factory1])
val resultOne: DataFrame = spark.emptyDataFrame
@Delivery(producer = classOf[Factory2])
val resultTwo: DataFrame = spark.emptyDataFrame
@Delivery(producer = classOf[Factory3])
val resultThree: DataFrame = spark.emptyDataFrame
override def read(): FinalFactory.this.type = {
resultOne.show(false)
resultTwo.show(false)
resultThree.show(false)
this
}
override def process(): FinalFactory.this.type = this
override def write(): FinalFactory.this.type = this
override def get(): Unit = {}
}
My main function:
val setl: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
val stageOne = new Stage()
stageOne.addFactory[Factory1]()
stageOne.addFactory[Factory2]()
stageOne.addFactory[Factory3]()
setl
.newPipeline()
.addStage(stageOne) // notice the stageOne is before FinalFactory
.addStage[FinalFactory]()
.run()
Output:
+---+
|id |
+---+
|1 |
+---+
+---+
|id |
+---+
|2 |
+---+
+---+
|id |
+---+
|3 |
+---+
You can see that the second Stage correctly ingested the results of the first Stage's Factories.
For other people that might encounter the same issue and that are looking for an answer, the requirement failed
is due to the pipeline expecting some deliverables but cannot find them. This feature has been added on v0.4.3. In the current v1.0.0-SNAPSHOT, we added more explicit exception messages, detailing the missing delivery. This might probably help in fixing the error.
So if you are using SETL v0.4.3 onwards, make sure to check all the available Deliveries in your Pipeline, or an exception will be thrown.