Qdrant Databricks Spark sink fails with "UNKNOWN: HTTP status code 464"
jwaro opened this issue · 5 comments
I'm working with spark dataframes in a Databricks notebook and I'm looking for an easy way to write from my notebook to a collection without having to collect the Spark dataframe on my disk space, so I'm using the connector detailed here.
- My use case is the single named vector upload.
- I have 'io.qdrant:spark:2.2.0' installed on my cluster
- I have my api_key and qdrant_url appropriately set
I'm not really sure how to investigate this further, and I haven't found any other cases of this. I'd love for some help in understanding how I can troubleshoot this!
The error presents on the following save line:
df.write.format("io.qdrant.spark.Qdrant").options(**options).mode("append").save()
Error message:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 29.0 failed 4 times, most recent failure: Lost task 5.3 in stage 29.0 (TID 205) (10.139.64.6 executor 7): java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 464
Full Error dump:
`Py4JJavaError: An error occurred while calling o548.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 29.0 failed 4 times, most recent failure: Lost task 5.3 in stage 29.0 (TID 205) (10.139.64.6 executor 7): java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 464
invalid content-type: null
trailers: Metadata(:status=464,server=awselb/2.0,date=Fri, 28 Jun 2024 03:09:07 GMT,content-length=0)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:93)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:91)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:91)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:91)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:58)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:25)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:559)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:505)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1743)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:552)
at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:482)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:557)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:445)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:900)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1709)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:903)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:798)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 464
invalid content-type: null
trailers: Metadata(:status=464,server=awselb/2.0,date=Fri, 28 Jun 2024 03:09:07 GMT,content-length=0)
at com.shaded.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:592)
at com.shaded.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:571)
at com.shaded.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:91)
at io.qdrant.spark.QdrantGrpc.upsert(QdrantGrpc.java:60)
at io.qdrant.spark.QdrantDataWriter.write(QdrantDataWriter.java:84)
... 33 more
Caused by: io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 464
invalid content-type: null
trailers: Metadata(:status=464,server=awselb/2.0,date=Fri, 28 Jun 2024 03:09:07 GMT,content-length=0)
at io.grpc.Status.asRuntimeException(Status.java:537)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3681)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3603)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3590)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3590)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1535)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1535)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1535)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3838)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3826)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1259)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1247)
at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2966)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2949)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:442)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:416)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:276)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:394)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:393)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:276)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.$anonfun$result$1(V2CommandExec.scala:47)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:47)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:45)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:54)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:286)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:286)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:298)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:528)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:225)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:154)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:477)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:285)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:280)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:265)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:441)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:265)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:265)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:217)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:350)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:341)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)`
Referenced this bug for the go-client. Changing the port to 6334 for the qdrant_url appears to be working.
Please update documentation to account for appropriate host / scenario!
Hello. Can you confirm the URL value from your options?
The port should be 6334
. The connector uses Qdrant's gRPC API.
https://qdrant.tech/documentation/interfaces/#grpc-interface
Also, we have a Databricks example at https://qdrant.tech/documentation/examples/databricks/ that might come on handy.
Yes, the 6334 port worked. Thank you for pointing to the interfaces page.
And yes, that was the example I had followed, Just stepping through the example, I wasn't aware of the need to leverage the 6334 port. Maybe a link / note can be mentioned on the example page about different ports.