dfdx/Spark.jl

Issue with cache

Closed this issue · 2 comments

aviks commented

Using cache seems to results in an assertion failure.

cc: @dfdx is this expected?

julia> sc = SparkContext(master="local");

julia> rdd = parallelize(sc, 1:10)
JavaRDD()

julia> map(rdd, x->x+1)
PipelinedRDD(JavaRDD())

julia> rdd2 = map(rdd, x->x+1)
PipelinedRDD(JavaRDD())

julia> rdd3 = cache(rdd2)
JavaRDD()

julia> rdd4 = map(rdd3, x->x^2)
PipelinedRDD(JavaRDD())

julia> collect(rdd4)
17/06/02 00:40:45 INFO SparkContext: Starting job: collect at JuliaRDD.scala:253
17/06/02 00:40:45 INFO DAGScheduler: Got job 0 (collect at JuliaRDD.scala:253) with 1 output partitions
17/06/02 00:40:45 INFO DAGScheduler: Final stage: ResultStage 0 (collect at JuliaRDD.scala:253)
17/06/02 00:40:45 INFO DAGScheduler: Parents of final stage: List()
17/06/02 00:40:45 INFO DAGScheduler: Missing parents: List()
17/06/02 00:40:45 INFO DAGScheduler: Submitting ResultStage 0 (JuliaRDD[3] at RDD at JuliaRDD.scala:20), which has no missing parents
17/06/02 00:40:45 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.7 KB, free 3.0 GB)
17/06/02 00:40:45 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1662.0 B, free 3.0 GB)
17/06/02 00:40:45 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.73:64258 (size: 1662.0 B, free: 3.0 GB)
17/06/02 00:40:45 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
17/06/02 00:40:45 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (JuliaRDD[3] at RDD at JuliaRDD.scala:20)
17/06/02 00:40:45 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/06/02 00:40:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 6190 bytes)
17/06/02 00:40:45 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/06/02 00:40:45 INFO Executor: Fetching spark://192.168.1.73:64257/jars/sparkjl-0.1.jar with timestamp 1496360349493
17/06/02 00:40:46 INFO TransportClientFactory: Successfully created connection to /192.168.1.73:64257 after 46 ms (0 ms spent in bootstraps)
17/06/02 00:40:46 INFO Utils: Fetching spark://192.168.1.73:64257/jars/sparkjl-0.1.jar to /private/var/folders/bd/7r22fdls1l14b1zxz1_46hb80000gn/T/spark-967754ef-4ad2-4672-84d3-be234f27004b/userFiles-81323b85-3016-42cc-985d-193decf6b7b4/fetchFileTemp3531040707804664902.tmp
17/06/02 00:40:46 INFO Executor: Adding file:/private/var/folders/bd/7r22fdls1l14b1zxz1_46hb80000gn/T/spark-967754ef-4ad2-4672-84d3-be234f27004b/userFiles-81323b85-3016-42cc-985d-193decf6b7b4/sparkjl-0.1.jar to class loader
17/06/02 00:40:46 INFO JuliaRDD: Worker creation started! for partition: 0 at 1496360446184
Loaded /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/server/libjvm.dylib
17/06/02 00:40:48 INFO JuliaRDD: EPAM Worker connected in 2691 ms  for partition: 0
17/06/02 00:40:48 INFO JuliaRDD: Worker creation started! for partition: 0 at 1496360448881
Loaded /Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/server/libjvm.dylib
17/06/02 00:40:51 INFO JuliaRDD: EPAM Worker connected in 2571 ms  for partition: 0
17/06/02 00:40:51 INFO OutputThread: EPAM - writeIteratorToStream took 1 ms.
INFO: Julia: starting partition id: 0 - at 2017-06-02T00:40:51.573
INFO: Julia: exiting normally. partition id: 0 - at 2017-06-02T00:40:52.545
17/06/02 00:40:52 INFO MemoryStore: Block rdd_2_0 stored as values in memory (estimated size 296.0 B, free 3.0 GB)
17/06/02 00:40:52 INFO BlockManagerInfo: Added rdd_2_0 in memory on 192.168.1.73:64258 (size: 296.0 B, free: 3.0 GB)
17/06/02 00:40:52 ERROR Utils: Uncaught exception in thread stdout writer for julia
java.lang.AssertionError: assertion failed: Task -1024 release lock on block rdd_2_0 more times than it acquired it
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:298)
	at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:654)
	at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:458)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.julia.OutputThread.writeIteratorToStream(OutputThread.scala:101)
	at org.apache.spark.api.julia.OutputThread$$anonfun$run$1.apply(OutputThread.scala:45)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.api.julia.OutputThread.run(OutputThread.scala:33)
Exception in thread "stdout writer for julia" java.lang.AssertionError: assertion failed: Task -1024 release lock on block rdd_2_0 more times than it acquired it
	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:298)
	at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:654)
	at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:458)
	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.julia.OutputThread.writeIteratorToStream(OutputThread.scala:101)
	at org.apache.spark.api.julia.OutputThread$$anonfun$run$1.apply(OutputThread.scala:45)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.api.julia.OutputThread.run(OutputThread.scala:33)
INFO: Julia: starting partition id: 0 - at 2017-06-02T00:40:52.576
INFO: Spark.func
it = Task (runnable) @0x0000000110d2a410
it = Base.Generator{Task,Base.Serializer.__deserialized_types__.##5#6}(Base.Serializer.__deserialized_types__.#5,Task (runnable) @0x0000000110d2a410)
dfdx commented

Hmm, this code works for me on Julia 0.5.0 and latest master of Spark.jl:

julia> using Spark
Loaded /usr/lib/jvm/java-8-oracle/jre/lib/amd64/server/libjvm.so

julia> sc = SparkContext(master="local");
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/06/02 22:29:10 INFO SparkContext: Running Spark version 1.5.2
...
julia> rdd = parallelize(sc, 1:10)
PipelinedRDD(JavaRDD())

julia> map(rdd, x->x+1)
PipelinedRDD(JavaRDD())

julia> rdd2 = map(rdd, x->x+1)
PipelinedRDD(JavaRDD())

julia> rdd3 = Spark.cache(rdd2)
PipelinedRDD(JavaRDD())

julia> rdd4 = map(rdd3, x->x^2)
PipelinedRDD(JavaRDD())

julia> collect(rdd4)
17/06/02 22:30:00 INFO SparkContext: Starting job: first at <unknown>:0
17/06/02 22:30:00 INFO DAGScheduler: Got job 0 (first at <unknown>:0) with 1 output partitions
17/06/02 22:30:00 INFO DAGScheduler: Final stage: ResultStage 0(first at <unknown>:0)
17/06/02 22:30:00 INFO DAGScheduler: Parents of final stage: List()
17/06/02 22:30:00 INFO DAGScheduler: Missing parents: List()
17/06/02 22:30:00 INFO DAGScheduler: Submitting ResultStage 0 (JuliaRDD[5] at RDD at JuliaRDD.scala:15), which has no missing parents
17/06/02 22:30:00 INFO MemoryStore: ensureFreeSpace(2992) called with curMem=0, maxMem=515553361
17/06/02 22:30:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.9 KB, free 491.7 MB)
17/06/02 22:30:00 INFO MemoryStore: ensureFreeSpace(1685) called with curMem=2992, maxMem=515553361
17/06/02 22:30:00 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1685.0 B, free 491.7 MB)
17/06/02 22:30:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35197 (size: 1685.0 B, free: 491.7 MB)
17/06/02 22:30:00 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861
17/06/02 22:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (JuliaRDD[5] at RDD at JuliaRDD.scala:15)
17/06/02 22:30:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
17/06/02 22:30:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2308 bytes)
17/06/02 22:30:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/06/02 22:30:00 INFO Executor: Fetching http://192.168.100.5:44519/jars/sparkjl-0.1.jar with timestamp 1496431751420
17/06/02 22:30:00 INFO Utils: Fetching http://192.168.100.5:44519/jars/sparkjl-0.1.jar to /tmp/spark-af4d4b7f-b4c8-41b4-a911-9bdcc269fa30/userFiles-83eba5c4-23dd-4a3c-8257-43376249b2b9/fetchFileTemp9088193860230804638.tmp
17/06/02 22:30:00 INFO Executor: Adding file:/tmp/spark-af4d4b7f-b4c8-41b4-a911-9bdcc269fa30/userFiles-83eba5c4-23dd-4a3c-8257-43376249b2b9/sparkjl-0.1.jar to class loader
Loaded /usr/lib/jvm/java-8-oracle/jre/lib/amd64/server/libjvm.so
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
======= file 'appveyor.yml' ========
environment:
  HADOOP_HOME: C:\projects\hadoop
  matrix:
  - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.5/julia-0.5-latest-win64.exe"
    JAVA_HOME: C:\Program Files\Java\jdk1.7.0\  

notifications:
  - provider: Email
    on_build_success: false
    on_build_failure: false
    on_build_status_changed: false

os: Windows Server 2012

install:
  - ps: "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12"
# Download most recent Julia Windows binary
  - ps: (new-object net.webclient).DownloadFile(
        $env:JULIA_URL,
        "C:\projects\julia-binary.exe")
        
#install hadoop winutils
  - cmd: md C:\projects\hadoop
  - cmd: md C:\projects\hadoop\bin
  - ps: (new-object System.Net.WebClient).DownloadFile(
        "https://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true", 
        "C:\projects\hadoop\bin\winutils.exe")

# Run installer silently, output to C:\projects\julia
  - C:\projects\julia-binary.exe /S /D=C:\projects\julia

build_script:
# Need to convert from shallow to complete for Pkg.clone to work
  - IF EXIST .git\shallow (git fetch --unshallow)
  - C:\projects\julia\bin\julia -e "versioninfo();
      Pkg.clone(pwd(), \"Spark\"); Pkg.build(\"Spark\")"

test_script:
  - C:\projects\julia\bin\julia --check-bounds=yes -e "Pkg.test(\"Spark\")"


!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
17/06/02 22:30:01 INFO CacheManager: Partition rdd_0_0 not found, computing it
17/06/02 22:30:01 INFO MemoryStore: ensureFreeSpace(296) called with curMem=4677, maxMem=515553361
17/06/02 22:30:01 INFO MemoryStore: Block rdd_0_0 stored as values in memory (estimated size 296.0 B, free 491.7 MB)
17/06/02 22:30:01 INFO BlockManagerInfo: Added rdd_0_0 in memory on localhost:35197 (size: 296.0 B, free: 491.7 MB)
17/06/02 22:30:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1538 bytes result sent to driver
17/06/02 22:30:03 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3229 ms on localhost (1/1)
17/06/02 22:30:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/06/02 22:30:03 INFO DAGScheduler: ResultStage 0 (first at <unknown>:0) finished in 3.239 s
17/06/02 22:30:03 INFO DAGScheduler: Job 0 finished: first at <unknown>:0, took 3.367172 s
17/06/02 22:30:03 INFO SparkContext: Starting job: collect at JuliaRDD.scala:44
17/06/02 22:30:03 INFO DAGScheduler: Got job 1 (collect at JuliaRDD.scala:44) with 1 output partitions
17/06/02 22:30:03 INFO DAGScheduler: Final stage: ResultStage 1(collect at JuliaRDD.scala:44)
17/06/02 22:30:03 INFO DAGScheduler: Parents of final stage: List()
17/06/02 22:30:03 INFO DAGScheduler: Missing parents: List()
17/06/02 22:30:03 INFO DAGScheduler: Submitting ResultStage 1 (JuliaRDD[4] at RDD at JuliaRDD.scala:15), which has no missing parents
17/06/02 22:30:03 INFO MemoryStore: ensureFreeSpace(2840) called with curMem=4973, maxMem=515553361
17/06/02 22:30:03 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.8 KB, free 491.7 MB)
17/06/02 22:30:03 INFO MemoryStore: ensureFreeSpace(1639) called with curMem=7813, maxMem=515553361
17/06/02 22:30:03 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1639.0 B, free 491.7 MB)
17/06/02 22:30:03 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:35197 (size: 1639.0 B, free: 491.7 MB)
17/06/02 22:30:03 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
17/06/02 22:30:03 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (JuliaRDD[4] at RDD at JuliaRDD.scala:15)
17/06/02 22:30:03 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/06/02 22:30:03 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 2308 bytes)
17/06/02 22:30:03 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
Loaded /usr/lib/jvm/java-8-oracle/jre/lib/amd64/server/libjvm.so
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
======= file 'appveyor.yml' ========
environment:
  HADOOP_HOME: C:\projects\hadoop
  matrix:
  - JULIA_URL: "https://julialang-s3.julialang.org/bin/winnt/x64/0.5/julia-0.5-latest-win64.exe"
    JAVA_HOME: C:\Program Files\Java\jdk1.7.0\  

notifications:
  - provider: Email
    on_build_success: false
    on_build_failure: false
    on_build_status_changed: false

os: Windows Server 2012

install:
  - ps: "[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.SecurityProtocolType]::Tls12"
# Download most recent Julia Windows binary
  - ps: (new-object net.webclient).DownloadFile(
        $env:JULIA_URL,
        "C:\projects\julia-binary.exe")
        
#install hadoop winutils
  - cmd: md C:\projects\hadoop
  - cmd: md C:\projects\hadoop\bin
  - ps: (new-object System.Net.WebClient).DownloadFile(
        "https://github.com/steveloughran/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true", 
        "C:\projects\hadoop\bin\winutils.exe")

# Run installer silently, output to C:\projects\julia
  - C:\projects\julia-binary.exe /S /D=C:\projects\julia

build_script:
# Need to convert from shallow to complete for Pkg.clone to work
  - IF EXIST .git\shallow (git fetch --unshallow)
  - C:\projects\julia\bin\julia -e "versioninfo();
      Pkg.clone(pwd(), \"Spark\"); Pkg.build(\"Spark\")"

test_script:
  - C:\projects\julia\bin\julia --check-bounds=yes -e "Pkg.test(\"Spark\")"


!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
17/06/02 22:30:05 INFO BlockManager: Found block rdd_0_0 locally
17/06/02 22:30:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2221 bytes result sent to driver
17/06/02 22:30:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2722 ms on localhost (1/1)
17/06/02 22:30:06 INFO DAGScheduler: ResultStage 1 (collect at JuliaRDD.scala:44) finished in 2.723 s
17/06/02 22:30:06 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
17/06/02 22:30:06 INFO DAGScheduler: Job 1 finished: collect at JuliaRDD.scala:44, took 2.731046 s
10-element Array{Int64,1}:
   4
   9
  16
  25
  36
  49
  64
  81
 100
 121

Not sure where lines about appveyor.yml come from and I used Spark.cache() because it turns to be not exported, but the rest seems to work perfectly well for me. Are you using a custom version of Spark.jl or Spark binaries?

aviks commented

yeah, I was using Spark 2.1.0. Probably due to this - https://issues.apache.org/jira/browse/SPARK-18406