aliyun/aliyun-emapreduce-datasources

emr-oss get OSSClient exception

NanXuejiao opened this issue · 4 comments

Describe the bug
Use emr-oss occurred an exception of java.lang.ClassNotFoundException: com.aliyun.oss.OSSClient

To Reproduce
Steps to reproduce the behavior:

  1. add these dependecies
<dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-oss</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-common_2.11</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.oss</groupId>
            <artifactId>aliyun-sdk-oss</artifactId>
            <version>3..0.0</version>
        </dependency>
  1. add oss config
trait RunLocally {
  val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.mapreduce.job.run-local", "true")
  conf.set("spark.hadoop.fs.oss.accessKeyId", "my accessKeyId")
  conf.set("spark.hadoop.fs.oss.accessKeySecret", "my accessKeySecret")
  conf.set("spark.hadoop.fs.oss.core.dependency.path", "local:///opt/spark/jars/emr-oss-2.0.0.jar,local:///opt/spark/jars/emr-common_2.11-2.0.0.jar")
  val sc = new SparkContext(conf)

  def getAppName: String
}
  1. submit OSSSample and see the error
object OSSSample extends RunLocally {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println(
        """Usage: bin/spark-submit --class OSSSample examples-1.0-SNAPSHOT-shaded.jar <inputPath> <numPartition>
          |
          |Arguments:
          |
          |    inputPath        Input OSS object path, like oss://accessKeyId:accessKeySecret@bucket.endpoint/a/b.txt
          |    numPartitions    the number of RDD partitions.
          |
        """.stripMargin)
      System.exit(1)
    }

    val inputPath = args(0)
    val numPartitions = args(1).toInt
    val ossData = sc.textFile(inputPath, numPartitions)
    println("The top 10 lines are:")
    ossData.top(10).foreach(println)
  }

  override def getAppName: String = "OSS Sample"
}

Expected behavior
Expect that the example of OSSSample runs ok.

Screenshots
It seems no need this.

Desktop (please complete the following information):
It seems no need this.

Smartphone (please complete the following information):
It seems no need this.
Additional context
Error log:

20/04/23 03:27:16 INFO SparkContext: Created broadcast 0 from textFile at OSSSample.scala:19
The top 10 lines are:
20/04/23 03:27:16 WARN NativeOssFileSystem: com.aliyun.oss.OSSClient
Exception in thread "main" java.io.IOException: java.lang.ClassNotFoundException: com.aliyun.oss.OSSClient
	at com.aliyun.fs.oss.nat.NativeOssFileSystem.initialize(NativeOssFileSystem.java:145)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1436)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1426)
	at org.apache.spark.rdd.RDD$$anonfun$top$1.apply(RDD.scala:1404)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.top(RDD.scala:1403)
	at top.test.logs.OSSSample$.main(OSSSample.scala:21)
	at top.test.logs.OSSSample.main(OSSSample.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.aliyun.oss.OSSClient
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at com.aliyun.fs.oss.utils.OSSClientAgent.<init>(OSSClientAgent.java:86)
	at com.aliyun.fs.oss.nat.JetOssNativeFileSystemStore.initialize(JetOssNativeFileSystemStore.java:140)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy17.initialize(Unknown Source)
	at com.aliyun.fs.oss.nat.NativeOssFileSystem.initialize(NativeOssFileSystem.java:142)
	... 48 more

Looks like similar with #156
I don't use --driver-class-path

Use the jar located at example/target/shaded/ please.

close stale issue

@NanXuejiao @uncleGen I faced the same question.
I want to know where the job run?
conf.set("spark.hadoop.mapreduce.job.run-local", "true") may means it will run local while conf.set("spark.hadoop.fs.oss.core.dependency.path", "local:///opt/spark/jars/emr-oss-2.0.0.jar,local:///opt/spark/jars/emr-common_2.11-2.0.0.jar")may means the job run the remote k8s cluster.
Thanks.