java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider
davidscottturner opened this issue · 4 comments
Hi,
I am fairly new to Dataflow and tried to upload a streaming job template. I am having problems with executing a dataflow template with RuntimeValueProvider.
NOTE - This all worked with hardcoded values as a test
My code looks as follows:
Options:
public interface OrderPipelineOptions : PipelineOptions, StreamingOptions {
@get:Description("Kafka Bootstrap Servers")
@get:Default.String("127.0.0.1:9092")
var bootstrapServers: ValueProvider<String>
@get:Description("Kafka Topic")
@get:Default.String("order")
var inputTopic: ValueProvider<String>
@get:Description("Kafka Group Id")
@get:Default.String("local")
var consumerGroupId: ValueProvider<String>
}
Job:
object OrderKafkaToElasticsearchPipeline {
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OrderPipelineOptions::class.java)
var pipeline = Pipeline.create(options)
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply("Read from Kafka",
KafkaIO.read<String, String>()
.withBootstrapServers(options.bootstrapServers.get())
.withTopic(options.inputTopic.get())
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(StringDeserializer::class.java)
.commitOffsetsInFinalize()
.withoutMetadata())
/*
* Step #2: Get the values from the stream after 1 second
*/
.apply("Get Order Events from stream", Values.create<String>())
.apply("Wait a bit", Window.into<String>(FixedWindows.of(org.joda.time.Duration.standardSeconds(1))))
.apply("Deserialize", ParDo.of(ExtractOrderDataFn()))
.apply("Serialize", ParDo.of(ExtractJsonFromOrderEventFn()))
.apply(
"Send to elastic search",
ElasticsearchIO.write()
.withIdFn(ElasticsearchIO.Write.FieldValueExtractFn { input ->
input.path("id").asText()
})
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(arrayOf("<elastichost>"), "test", "doc")
.withUsername("<user>")
.withPassword("<password>")
)
)
pipeline.run()//.waitUntilFinish()
}
This seems correct according to the Dataflow documentation. Im also trying Kotlin for this. This all ran with hard coded values before so I know the execution code works fine.
I then build the project and execute it with Gradle:
build.gradle.kts:
plugins {
// Apply the Kotlin JVM plugin to add support for Kotlin.
kotlin("jvm") version "1.3.70"
java
id("com.github.johnrengelman.shadow") version "5.2.0"
}
repositories {
// Use jcenter for resolving dependencies.
// You can declare any Maven/Ivy/file repository here.
jcenter()
}
java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
dependencies {
implementation(platform(kotlin("bom")))
implementation(kotlin("stdlib-jdk8"))
implementation("org.slf4j:slf4j-simple:1.7.30")
implementation("org.apache.beam:beam-sdks-java-core:2.19.0")
implementation("org.apache.beam:beam-runners-direct-java:2.19.0")
implementation("org.apache.beam:beam-runners-google-cloud-dataflow-java:2.19.0")
implementation("org.apache.beam:beam-sdks-java-io-kafka:2.19.0")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.10.3")
implementation("org.apache.beam:beam-sdks-java-io-elasticsearch:2.19.0")
implementation("org.apache.kafka:kafka-clients:2.4.1")
implementation("com.fasterxml.jackson.core:jackson-databind:2.10.3")
implementation("org.apache.beam:beam-sdks-java-io-elasticsearch:2.19.0")
implementation("org.apache.commons:commons-lang3:3.9")
implementation("commons-cli:commons-cli:1.4")
}
tasks.withType<ShadowJar>() {
manifest {
attributes["Main-Class"] = "com.example.OrderKafkaToElasticsearchPipeline"
}
mergeServiceFiles()
}
tasks.register("execute", JavaExec::class.java) {
main = if (!project.hasProperty("mainClass")) System.getProperty("mainClass") else "NULL"
classpath(sourceSets["main"].runtimeClasspath)
args(System.getProperty("exec.args").split("\\s".toRegex()))
}
I then use gradle clean build
followed by:
gradle execute -DmainClass="com.example.orders.OrderKafkaToElasticsearchPipeline" -Dexec.args="--runner=DataflowRunner --templateLocation=gs:/<PATH>/OrderKafkaToElasticsearchPipeline --project=<PROJECT> --tempLocation=gs://<PATH>/temp" -Pdataflow-runner --info
Before trying to use the RuntimeValueProvider this all worked fine and the template was uploaded.
Now that Im trying to get the Kafka configuration passed in I get the following error:
Exception in thread "main" java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=bootstrapServers, default=127.0.0.1:9092}
at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:250)
at com.imburse.beams.orders.OrderKafkaToElasticsearchPipeline.main(OrderKafkaToElasticsearchPipeline.kt:99)
Any help or suggestions would be very welcome as to where I am going wrong.
Calling #get on the ValueProvider resolves the value so if you do this at pipeline construction time as your doing above you get the errors.
Only transforms / IO connectors where the public API of takes in ValueProvider support those parameters to be used within a template.
See https://stackoverflow.com/questions/43992120/valueprovider-issue for more details.
Thanks for the answer and see you closed it. I guess your answer is fairly unhelpful due to the fact the KafkaIO Im using is the Apache Beam KafkaIO (https://github.com/apache/beam/tree/master/sdks/java/io/kafka). Are you saying that because its API doesn't support a ValueProvider I am unable to overcome this issue? Do I log an issue there?
https://issues.apache.org/jira/browse/BEAM-3925 already exists.
You can not overcome this issue unless you implement support for KafkaIO to use ValueProviders.
Thanks Luke, appreciate the help on this.