dfdx/Spark.jl

I'm lost for a title! Will update when I get clarity :-(

Closed this issue · 7 comments

Hi,

Rookie with Spark and Spark.jl here. Hope I can bother you with this question.

Using Julia 1.5.0 and Spark.jl 0.5.0.

I'm attempting a code sample that I found in #70. When I port it to Python I can successfully run it on PySpark. But in Spark.jl, I get the error below the code sample.

using Spark

Spark.init()
sc = SparkContext(master="local")
text = parallelize(sc, ["hello world", "the world is one", "we are the world"])
words = flat_map(text, s -> [string(word) for word in split(s)])  
words_tuple = cartesian(words, parallelize(sc, [1]))
counts = reduce_by_key(words_tuple, +)
result = collect(counts)

Error:

julia> result = collect(counts)
20/08/14 11:28:12 INFO SparkContext: Starting job: collect at JuliaRDD.scala:261
20/08/14 11:28:12 INFO DAGScheduler: Registering RDD 10 (cartesian at JuliaRDD.scala:229) as input to shuffle 1
20/08/14 11:28:12 INFO DAGScheduler: Got job 1 (collect at JuliaRDD.scala:261) with 1 output partitions
20/08/14 11:28:12 INFO DAGScheduler: Final stage: ResultStage 3 (collect at JuliaRDD.scala:261)
20/08/14 11:28:12 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
20/08/14 11:28:12 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 2)
20/08/14 11:28:12 INFO DAGScheduler: Submitting ShuffleMapStage 2 (CartesianRDD[10] at cartesian at JuliaRDD.scala:229), which has no missing parents
20/08/14 11:28:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.8 KiB, free 366.3 MiB)
20/08/14 11:28:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.3 KiB, free 366.3 MiB)
20/08/14 11:28:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on Dell:5460 (size: 3.3 KiB, free: 366.3 MiB)
20/08/14 11:28:12 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1200
20/08/14 11:28:12 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 2 (CartesianRDD[10] at cartesian at JuliaRDD.scala:229) (first 15 tasks are for partitions Vector(0))
20/08/14 11:28:12 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
20/08/14 11:28:13 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 1, Dell, executor driver, partition 0, PROCESS_LOCAL, 7595 bytes)
20/08/14 11:28:13 INFO Executor: Running task 0.0 in stage 2.0 (TID 1)
ERROR: UndefVarError: Spark not defined
Stacktrace:
 [1] top-level scope at none:1
20/08/14 11:28:15 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 1)
java.lang.RuntimeException: Nonzero exit value: 1
        at scala.sys.package$.error(package.scala:30)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
        at org.apache.spark.api.julia.JuliaRDD$.createWorker(JuliaRDD.scala:84)
        at org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
20/08/14 11:28:15 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 1, Dell, executor driver): java.lang.RuntimeException: Nonzero exit value: 1
        at scala.sys.package$.error(package.scala:30)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
        at org.apache.spark.api.julia.JuliaRDD$.createWorker(JuliaRDD.scala:84)
        at org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

20/08/14 11:28:15 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
20/08/14 11:28:15 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
20/08/14 11:28:15 INFO TaskSchedulerImpl: Cancelling stage 2
20/08/14 11:28:15 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage cancelled
20/08/14 11:28:15 INFO DAGScheduler: ShuffleMapStage 2 (cartesian at JuliaRDD.scala:229) failed in 2.209 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 1, Dell, executor driver): java.lang.RuntimeException: Nonzero exit value: 1
        at scala.sys.package$.error(package.scala:30)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
        at org.apache.spark.api.julia.JuliaRDD$.createWorker(JuliaRDD.scala:84)
        at org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
20/08/14 11:28:15 INFO DAGScheduler: Job 1 failed: collect at JuliaRDD.scala:261, took 2.235898 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 1, Dell, executor driver): java.lang.RuntimeException: Nonzero exit value: 1
        at scala.sys.package$.error(package.scala:30)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
        at org.apache.spark.api.julia.JuliaRDD$.createWorker(JuliaRDD.scala:84)
        at org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
        at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:361)
        at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:360)
        at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
        at org.apache.spark.api.julia.JuliaPairRDD$.collectToJulia(JuliaRDD.scala:261)
        at org.apache.spark.api.julia.JuliaPairRDD.collectToJulia(JuliaRDD.scala)
Caused by: java.lang.RuntimeException: Nonzero exit value: 1
        at scala.sys.package$.error(package.scala:30)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)
        at scala.sys.process.ProcessBuilderImpl$AbstractBuilder.$bang$bang(ProcessBuilderImpl.scala:108)
        at org.apache.spark.api.julia.JuliaRDD$.createWorker(JuliaRDD.scala:84)
        at org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
ERROR: JavaCall.JavaCallError("Error calling Java: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 1, Dell, executor driver): java.lang.RuntimeException: Nonzero exit value: 1\r\n\tat scala.sys.package\$.error(package.scala:30)\r\n\tat scala.sys.process.ProcessBuilderImpl\$AbstractBuilder.slurp(ProcessBuilderImpl.scala:138)\r\n\tat scala.sys.process.ProcessBuilderImpl\$AbstractBuilder.\$bang\$bang(ProcessBuilderImpl.scala:108)\r\n\tat org.apache.spark.api.julia.JuliaRDD\$.createWorker(JuliaRDD.scala:84)\r\n\tat org.apache.spark.api.julia.AbstractJuliaRDD.compute(JuliaRDD.scala:37)\r\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)\r\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:313)\r\n\tat org.apache.spark.rdd.CartesianRDD.compute(CartesianRDD.scala:75)\r\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)\r\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:313)\r\n\tat org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\r\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\r\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\r\n\tat org.apache.spark.scheduler.Task.run(Task.scala:127)\r\n\tat org.apache.spark.executor.Executor\$TaskRunner.\$anonfun\$run\$3(Executor.scala:444)\r\n\tat org.apache.spark.util.Utils\$.tryWithSafeFinally(Utils.scala:1377)\r\n\tat org.apache.spark.executor.Executor\$TaskRunner.run(Executor.scala:447)\r\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\r\n\tat java.util.concurrent.ThreadPoolExecutor\$Worker.run(Unknown Source)\r\n\tat java.lang.Thread.run(Unknown Source)\r\n\nDriver stacktrace:")
Stacktrace:
 [1] geterror(::Bool) at C:\Users\igitur\.julia\packages\JavaCall\JPyF5\src\core.jl:300
 [2] geterror at C:\Users\igitur\.julia\packages\JavaCall\JPyF5\src\core.jl:280 [inlined]
 [3] _jcall(::JavaCall.JavaMetaClass{Symbol("org.apache.spark.api.julia.JuliaPairRDD")}, ::Ptr{Nothing}, ::Ptr{Nothing}, ::Type{T} where T, ::Tuple{DataType}, ::JavaCall.JavaObject{Symbol("org.apache.spark.api.java.JavaPairRDD")}) at C:\Users\igitur\.julia\packages\JavaCall\JPyF5\src\core.jl:253
 [4] jcall(::Type{JavaCall.JavaObject{Symbol("org.apache.spark.api.julia.JuliaPairRDD")}}, ::String, ::Type{T} where T, ::Tuple{DataType}, ::JavaCall.JavaObject{Symbol("org.apache.spark.api.java.JavaPairRDD")}) at C:\Users\igitur\.julia\packages\JavaCall\JPyF5\src\core.jl:146
 [5] collect_internal(::Spark.PipelinedPairRDD, ::Type{T} where T, ::Type{T} where T) at C:\Users\igitur\.julia\packages\Spark\3MVGw\src\rdd.jl:234
 [6] collect(::Spark.PipelinedPairRDD) at C:\Users\igitur\.julia\packages\Spark\3MVGw\src\rdd.jl:282
 [7] top-level scope at REPL[16]:1

Any help would be appreciated.

dfdx commented

Hi,

It looks like installation issue, maybe the driver knows about Spark.jl but workers don't or something like this. How did you install the package? For example, did you just ]add Spark or cloned the repo and built it? In what environment (e.g. does the prompt look like (@v1.5) pkg> when you install Spark.jl)?

Thanks for your reply. So I just replicated it on a clean machine:

So to answer your questions, to install Spark, I just used Pkg.add, I didn't clone the repo. I'm guessing the environment is the v1.5 environment by looking at this output:

julia> import Pkg; Pkg.add("Spark")
Updating registry at `C:\Users\asdf\.julia\registries\General`
Resolving package versions...
No Changes to `C:\Users\asdf\.julia\environments\v1.5\Project.toml`
No Changes to `C:\Users\asdf\.julia\environments\v1.5\Manifest.toml`

Rookie question, but am I right in saying I don't have to install Spark separately by downloading from https://spark.apache.org/downloads.html ? I'm assuming Spark.jl compiles and installs Spark itself. (big assumption!)

And for what it's worth, here is the full output of the script:

julia
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.5.0 (2020-08-01)
 _/ |\__'_|_|_|\__'_|  |  Official https://julialang.org/ release
|__/                   |

julia> using Spark

julia> Spark.init()

julia> sc = SparkContext(master="local")
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/17 16:18:47 INFO SparkContext: Running Spark version 2.4.6
20/08/17 16:18:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/08/17 16:18:47 INFO SparkContext: Submitted application: Julia App on Spark
20/08/17 16:18:47 INFO SecurityManager: Changing view acls to: asdf
20/08/17 16:18:47 INFO SecurityManager: Changing modify acls to: asdf
20/08/17 16:18:47 INFO SecurityManager: Changing view acls groups to:
20/08/17 16:18:47 INFO SecurityManager: Changing modify acls groups to:
20/08/17 16:18:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(asdf); groups with view permissions: Set(); users  with modify permissions: Set(asdf); groups with modify permissions: Set()
20/08/17 16:18:49 INFO Utils: Successfully started service 'sparkDriver' on port 41179.
20/08/17 16:18:49 INFO SparkEnv: Registering MapOutputTracker
20/08/17 16:18:49 INFO SparkEnv: Registering BlockManagerMaster
20/08/17 16:18:49 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/08/17 16:18:49 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/08/17 16:18:49 INFO DiskBlockManager: Created local directory at C:\Users\asdf\AppData\Local\Temp\2\blockmgr-9e08c281-91f6-4e16-9ed9-0a820199f914
20/08/17 16:18:49 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/08/17 16:18:49 INFO SparkEnv: Registering OutputCommitCoordinator
20/08/17 16:18:50 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/08/17 16:18:50 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://mymachine:4040
20/08/17 16:18:50 INFO Executor: Starting executor ID driver on host localhost
20/08/17 16:18:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41194.
20/08/17 16:18:50 INFO NettyBlockTransferService: Server created on mymachine:41194
20/08/17 16:18:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/17 16:18:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, mymachine, 41194, None)
20/08/17 16:18:50 INFO BlockManagerMasterEndpoint: Registering block manager mymachine:41194 with 366.3 MB RAM, BlockManagerId(driver, mymachine, 41194, None)
20/08/17 16:18:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, mymachine, 41194, None)
20/08/17 16:18:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, mymachine, 41194, None)
20/08/17 16:18:50 INFO SparkContext: Added JAR C:\Users\asdf\.julia\packages\Spark\3MVGw\src\..\jvm\sparkjl\target\sparkjl-0.1.jar at spark://mymachine:41179/jars/sparkjl-0.1.jar with timestamp 1597673930571
SparkContext(local,Julia App on Spark)

julia> text = parallelize(sc, ["hello world", "the world is one", "we are the world"])
JavaRDD()

julia> words = flat_map(text, s -> [string(word) for word in split(s)])
PipelinedRDD(JavaRDD())

julia> words_tuple = cartesian(words, parallelize(sc, [1]))
JavaPairRDD()

julia> counts = reduce_by_key(words_tuple, +)
PipelinedPairRDD(JavaPairRDD())
dfdx commented

Rookie question, but am I right in saying I don't have to install Spark separately by downloading from https://spark.apache.org/downloads.html ? I'm assuming Spark.jl compiles and installs Spark itself. (big assumption!)

Absolutely! Maven config includes Spark core as a dependency and downloads all necessary libraries during Spark.jl build. Moreover, Spark nowadays doesn't depend on Hadoop, so you can avoid this step if don't need Hadoop for other reasons.

Unfortunately, I don't have a Windows machine and can't reproduce your error locally. Thus I will need your help to debug the error.

Let's start with a basic check which will also open the whole new SQL API for you. Download this file to a preferred location and run the following:

using Spark
Spark.init()

sess = SparkSession()
ds = read_json(sess, "/path/to/people.json")   # read JSON file as a Spark Dataset
count(ds)   # count elements in the dataset
write_parquet(ds, "/path/to/people.parquet")   # save the dataset as a Parquet file
close(sess)

The SQL (a.k.a. Dataset or DataFrame) API is the default one starting from Spark 2.0. It also has much simpler implementation in Spark.jl and should work more reliably. Let's first make sure at least this API works for you.

And just to make sure we are indeed in environment v1.5, could you please open Julia console, type symbol ] and copy the prompt here? Typing ] in Julia console opens package mode which starts with the name of environment you're in.

The script for people.json worked. I have a new people.parquet folder with files in it, but there were some errors in between.

For the read_json command, there was this exception:

Exception in thread "main" Error showing value of type Dataset:
ERROR: JavaCall.JavaCallError("Error calling Java: java.lang.IllegalMonitorStateException")
Stacktrace:
 [1] geterror(::Bool) at C:\Users\asdf\.julia\packages\JavaCall\JPyF5\src\core.jl:300
 [2] geterror at C:\Users\asdf\.julia\packages\JavaCall\JPyF5\src\core.jl:280 [inlined]
 [3] _jcall(::JavaCall.JavaObject{Symbol("org.apache.spark.sql.types.StructType")}, ::Ptr{Nothing}, ::Ptr{Nothing}, ::Type{T} where T, ::Tuple{}) at C:\Users\asdf\.julia\packages\JavaCall\JPyF5\src\core.jl:253
 [4] jcall(::JavaCall.JavaObject{Symbol("org.apache.spark.sql.types.StructType")}, ::String, ::Type{T} where T, ::Tuple{}) at C:\Users\asdf\.julia\packages\JavaCall\JPyF5\src\core.jl:157
 [5] schema_string at C:\Users\asdf\.julia\packages\Spark\3MVGw\src\sql.jl:47 [inlined]
 [6] show(::IOContext{REPL.Terminals.TTYTerminal}, ::Dataset) at C:\Users\asdf\.julia\packages\Spark\3MVGw\src\sql.jl:50
 [7] show(::IOContext{REPL.Terminals.TTYTerminal}, ::MIME{Symbol("text/plain")}, ::Dataset) at .\multimedia.jl:47
 [8] display(::REPL.REPLDisplay, ::MIME{Symbol("text/plain")}, ::Any) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:214
 [9] display(::REPL.REPLDisplay, ::Any) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:218
 [10] display(::Any) at .\multimedia.jl:328
 [11] #invokelatest#1 at .\essentials.jl:710 [inlined]
 [12] invokelatest at .\essentials.jl:709 [inlined]
 [13] print_response(::IO, ::Any, ::Bool, ::Bool, ::Any) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:238
 [14] print_response(::REPL.AbstractREPL, ::Any, ::Bool, ::Bool) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:223
 [15] (::REPL.var"#do_respond#54"{Bool,Bool,REPL.var"#64#73"{REPL.LineEditREPL,REPL.REPLHistoryProvider},REPL.LineEditREPL,REPL.LineEdit.Prompt})(::Any, ::Any, ::Any) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:822
 [16] #invokelatest#1 at .\essentials.jl:710 [inlined]
 [17] invokelatest at .\essentials.jl:709 [inlined]
 [18] run_interface(::REPL.Terminals.TextTerminal, ::REPL.LineEdit.ModalInterface, ::REPL.LineEdit.MIState) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\LineEdit.jl:2355
 [19] run_frontend(::REPL.LineEditREPL, ::REPL.REPLBackendRef) at D:\buildbot\worker\package_win64\build\usr\share\julia\stdlib\v1.5\REPL\src\REPL.jl:1143
 [20] (::REPL.var"#38#42"{REPL.LineEditREPL,REPL.REPLBackendRef})() at .\task.jl:356

But it seems like the data set loaded correctly and the next line printed 2, as expected.

Also when closing the session, some temp files could not be cleaned up:

20/08/19 10:24:52 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\asdf\AppData\Local\Temp\2\spark-6b2e6f97-7481-429a-95dd-9d806ff5164c\userFiles-50deff93-f204-4e32-88a5-beef0dceb4f2
java.io.IOException: Failed to delete: C:\Users\asdf\AppData\Local\Temp\2\spark-6b2e6f97-7481-429a-95dd-9d806ff5164c\userFiles-50deff93-f204-4e32-88a5-beef0dceb4f2\sparkjl-0.1.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:144)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:118)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:128)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:118)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:91)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1062)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:103)
        at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1974)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1973)
        at org.apache.spark.sql.SparkSession.stop(SparkSession.scala:713)
        at org.apache.spark.sql.SparkSession.close(SparkSession.scala:721)
20/08/19 10:24:52 INFO SparkContext: Successfully stopped SparkContext

And the ] prompt:
(@v1.5) pkg>

dfdx commented

The first error might be related to Julia version. Is it possible for you to try the same code on Julia 1.0?

Second error is most likely just an indication of the first one.

Ok, I'll try that, as well as try it on a Linux box and then we take it from there. Thanks for your help so far.

dfdx commented

Closing as outdated