[Bug]: Spark query does not use catalog's built-in ugi
Opened this issue · 2 comments
What happened?
In the process of using Amoro, I found that the rpc response time was unstable when connecting to the router, resulting in slow query of the Amoro table. Therefore, I want to connect namanode directly through amoro to optimize query speed.
During the testing process, it was found that spark queries would experience exceptions. The reason is: when spark creates a task, the client will first register the token with the router. Later, when yarn starts the container, it will get this token to authenticate with hdfs. However, since this token is registered with the router, and after the task starts executing, the amoro file is actually directly connected to the namenode, so the token cannot be found on the namenode, so the authentication fails.
Theoretically, Amoro uses the keytab and principal authentication in the Amoro catalog to read data, and should not use the yarn token.
Affects Versions
master
What engines are you seeing the problem on?
No response
How to reproduce
No response
Relevant log output
2023-12-29 13:01:41 CST TaskSetManager WARN - Lost task 0.0 in stage 0.0 (TID 0) (10.55.156.39 executor 1): com.netease.arctic.shade.org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: hdfs://eadhadoop/user/kada/hive_db/kada_ext.db/dim_user_compatible_id/hive/1703800975179_44501/6-B-44501-00152-157-662839286894636421-00001.parquet
at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:222)
at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.<init>(AdaptHiveReadConf.java:74)
at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.init(AdaptHiveParquetReader.java:71)
at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:83)
at com.netease.arctic.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:39)
at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:152)
at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:143)
at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:138)
at com.netease.arctic.io.reader.AbstractArcticDataReader.readData(AbstractArcticDataReader.java:125)
at com.netease.arctic.spark.reader.KeyedSparkBatchScan$RowReader.next(KeyedSparkBatchScan.java:246)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for bdms_hechao: HDFS_DELEGATION_TOKEN owner=bdms_hechao/dev@YOUDAO.163.COM, renewer=yarn, realUser=, issueDate=1703826072124, maxDate=1704430872124, sequenceNumber=942934, masterKeyId=20) can't be found in cache
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
at org.apache.hadoop.ipc.Client.call(Client.java:1453)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy20.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy21.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:845)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:834)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:823)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1062)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:303)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:299)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:311)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
at com.netease.arctic.shade.org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:773)
at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:220)
... 30 more
Anything else
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Any update on this issue?
Can you help to check if this bug still exists in the master branch? @ShawHee
@zhoujinsong Yes, it still exists in the master branch. I modified my environment and added doAs in SparkBatchScan, and it took effect.