apache/amoro

[Bug]: Spark query does not use catalog's built-in ugi

Opened this issue · 2 comments

What happened?

In the process of using Amoro, I found that the rpc response time was unstable when connecting to the router, resulting in slow query of the Amoro table. Therefore, I want to connect namanode directly through amoro to optimize query speed.
During the testing process, it was found that spark queries would experience exceptions. The reason is: when spark creates a task, the client will first register the token with the router. Later, when yarn starts the container, it will get this token to authenticate with hdfs. However, since this token is registered with the router, and after the task starts executing, the amoro file is actually directly connected to the namenode, so the token cannot be found on the namenode, so the authentication fails.
Theoretically, Amoro uses the keytab and principal authentication in the Amoro catalog to read data, and should not use the yarn token.

Affects Versions

master

What engines are you seeing the problem on?

No response

How to reproduce

No response

Relevant log output

2023-12-29 13:01:41 CST TaskSetManager WARN - Lost task 0.0 in stage 0.0 (TID 0) (10.55.156.39 executor 1): com.netease.arctic.shade.org.apache.iceberg.exceptions.RuntimeIOException: Failed to open Parquet file: hdfs://eadhadoop/user/kada/hive_db/kada_ext.db/dim_user_compatible_id/hive/1703800975179_44501/6-B-44501-00152-157-662839286894636421-00001.parquet
	at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:222)
	at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.<init>(AdaptHiveReadConf.java:74)
	at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.init(AdaptHiveParquetReader.java:71)
	at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveParquetReader.iterator(AdaptHiveParquetReader.java:83)
	at com.netease.arctic.io.CloseableIterableWrapper.iterator(CloseableIterableWrapper.java:39)
	at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:152)
	at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.<init>(CloseableIterable.java:143)
	at com.netease.arctic.shade.org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable.iterator(CloseableIterable.java:138)
	at com.netease.arctic.io.reader.AbstractArcticDataReader.readData(AbstractArcticDataReader.java:125)
	at com.netease.arctic.spark.reader.KeyedSparkBatchScan$RowReader.next(KeyedSparkBatchScan.java:246)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for bdms_hechao: HDFS_DELEGATION_TOKEN owner=bdms_hechao/dev@YOUDAO.163.COM, renewer=yarn, realUser=, issueDate=1703826072124, maxDate=1704430872124, sequenceNumber=942934, masterKeyId=20) can't be found in cache
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1507)
	at org.apache.hadoop.ipc.Client.call(Client.java:1453)
	at org.apache.hadoop.ipc.Client.call(Client.java:1363)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
	at com.sun.proxy.$Proxy20.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy21.getBlockLocations(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:845)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:834)
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:823)
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1062)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:303)
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:299)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:311)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
	at com.netease.arctic.shade.org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:69)
	at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:773)
	at com.netease.arctic.shade.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:657)
	at com.netease.arctic.shade.org.apache.iceberg.parquet.AdaptHiveReadConf.newReader(AdaptHiveReadConf.java:220)
	... 30 more

Anything else

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

  • I agree to follow this project's Code of Conduct

Any update on this issue?
Can you help to check if this bug still exists in the master branch? @ShawHee

@zhoujinsong Yes, it still exists in the master branch. I modified my environment and added doAs in SparkBatchScan, and it took effect.