Get a Flink Table Schema from an Event Platform stream
importcom.fasterxml.jackson.databind.node.ObjectNodeimportorg.apache.flink.table.api.{EnvironmentSettings, FormatDescriptor, Schema, Table, TableDescriptor, TableEnvironment}
importorg.apache.flink.api.scala._importorg.apache.flink.table.api._importorg.apache.flink.table.api.bridge.scala._importorg.wikimedia.flink.EventExtensions._importorg.wikimedia.eventutilities.core.event.WikimediaDefaults// Get a streaming TableEnvironmentvalsettings=EnvironmentSettings.newInstance().inStreamingMode().build()
TableEnvironment.create(settings)
// A declared Event Platform stream.valstreamName="mediawiki.page-links-change"// get an EventStream object using wikimedia event-utillities.valeventStream=WikimediaDefaults.EVENT_STREAM_FACTORY.createEventStream(streamName)
// Register a streaming page links change table.valpageLinksChangeStreamTable= eventStream.flinkRegisterKafkaTable(tableEnv)
// Query it however you likevalresult= tableEnv.sqlQuery(
""" |SELECT TUMBLE_START(kafka_timestamp, INTERVAL '1' MINUTE), database, COUNT(DISTINCT database) |FROM mediawiki_page_links_change |GROUP BY TUMBLE(kafka_timestamp, INTERVAL '1' MINUTE), database |""".stripMargin
)
// print the result, or insert it into a Sink connector table.
result.execute().print()
Or just get the Flink schema builder and use it to create whatever kind of table you like.
// Get a flink Table schema matching the event stream's latest// JSONSchema, with a kafka_timestamp as the watermark event time.valflinkSchemaBuilder= eventStream.flinkRegisterKafkaTable(tableEnv).build()
// Create a Flink streaming Table from the local filesystem..
tableEnv.createTemporaryTable(
"mediawiki_page_links_change",
TableDescriptor.forConnector("filesystem")
.schema(flinkSchema)
.option("path", "/tmp/page-links.change.json")
.format(FormatDescriptor.forFormat("json").build())
.build()
)
valpageLinksChangeTable= tableEnv.from("mediawiki_page_links_change")