Azure/azure-cosmosdb-spark

cannot be applied to (com.microsoft.azure.cosmosdb.spark.config.Config) .options(writeConfigMap)

Closed this issue · 2 comments

I am trying to use databricks to write stream from kafka to cosmos db (casssandra API) but when i am trying to update with the following config i get error

notebook:39: error: overloaded method value options with alternatives:
  (options: java.util.Map[String,String])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
  (options: scala.collection.Map[String,String])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
 cannot be applied to (com.microsoft.azure.cosmosdb.spark.config.Config)
  .options(writeConfigMap)

my code looks like this

val writeConfigMap = Config(Map(
  "Endpoint" -> "https://xxxxxxxx.cassandra.cosmosdb.azure.com:10350/",
  "Masterkey" -> "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==",
  "Database" -> "xxxxxxxx",
  "PreferredRegions" -> "East US2;",
  "Collection" -> "xxxxxxxxxxx",
  "WritingBatchSize" -> "100"))

val jsonSchema = new StructType().add("sku", StringType).add("bin", StringType).add("qty", IntegerType).add("trancode", IntegerType).add("counter", IntegerType)

// Setup connection to Kafka
val streamingInputDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", topic)   
  .option("startingOffsets", "earliest")
  .load()
  //.schema(jsonSchema)

val skuDF = streamingInputDF.selectExpr("CAST(value AS STRING)")

val nestedSkuDF = skuDF.select(from_json($"value", jsonSchema).as("sku"))

val flattenSkuDF = nestedSkuDF.selectExpr("sku.sku", "sku.bin", "sku.qty", "sku.trancode", "sku.counter")

flattenSkuDF
  .select("sku", "bin", "qty", "trancode", "counter")
  .writeStream.format(classOf[CosmosDBSinkProvider].getName)
  .outputMode("append")
  .options(writeConfigMap) --> error is here
  .option("checkpointLocation", "/tmp/eventhub-progress/meetup-events")
  .start()
  .awaitTermination(10000)

Am I missing something?

is there any sample link to stream from kafka and write to cosmosdb?

This connector only supports the core (SQL API) of Cosmos DB. Can you please try using the Cassandra Spark connector https://docs.microsoft.com/en-us/azure/cosmos-db/cassandra-spark-generic

i tried this connector and following is my code. I am trying to stream data to cosmos db cassandra

`
val skuSchema = new StructType()
.add("sku", "string")
.add("bin", "string")
.add("counter", "integer")
.add("qty", "integer")
.add("trancode", "integer")
.add("timestamp", "timestamp")

val configMap = Map(
"Endpoint" -> "XXXXXXx.cassandra.cosmosdb.azure.com",
"Masterkey" -> "XXXXXX",
"Database" -> "test",
"Collection" -> "sku")
val config = Config(configMap)`

`import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions
import org.apache.spark.sql.streaming.ProcessingTime
import scala.concurrent.duration

val jsonSchema = new StructType().add("sku", StringType).add("bin", StringType).add("qty", IntegerType).add("trancode", IntegerType).add("counter", IntegerType)

// Setup connection to Kafka
val streamingInputDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

val skuDF = streamingInputDF.selectExpr("CAST(value AS STRING)")

val nestedSkuDF = skuDF.select(from_json($"value", jsonSchema).as("sku"))

val flattenSkuDF = nestedSkuDF.selectExpr("sku.sku", "sku.bin", "sku.qty", "sku.trancode", "sku.counter")

display(flattenSkuDF)

// Upsert is no different from create
flattenSkuDF
.writeStream
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "sku", "keyspace" -> "test"))
.start()`

i get the following error

notebook:34: error: value mode is not a member of org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
possible cause: maybe a semicolon is missing before `value mode'?
.mode("append")
^