dfdx/Spark.jl

Does Spark.jl support writing data to HDFS?

Closed this issue · 8 comments

Hi there, thank you for the great work. I am trying to use Spark.jl to read data from HDFS files and also write results to HDFS. The user guide (http://dfdx.github.io/Spark.jl/index.html) says we could use "text_file" to load data, but does not mention how to write to HDFS. Does Spark.jl currently support writing data to HDFS, something like "saveAsTextFile"? Or any suggestions on how to output RDD objects to HDFS? Thanks.

dfdx commented

Indeed, saveAsTextFile is missing from the API. But it should be relatively easy to add. I'm not at my main laptop right now, but something like this should do the trick:

function save_as_text_file(rdd::RDD, path::AbstractString)
    jcall(rdd.jrdd, "saveAsTextFile", Nothing, (JString,), path)
end

You can add other methods in a similar way using Spark Java docs and JavaCall.jl.

Note that RDD API is quite old, so you might also be interested in SQL API, e.g. methods read_json(), write_json(), etc.

@dfdx Thank you for the reply and suggestions. I will definitely read more about SQL and Dataframes.
I am beginner in Spark and was trying to play with the word count example with RDD API in Julia. I tried to add save_as_text_file to Spark.jl but got an empty HDFS output folder. I added the function you have above to rdd.jl and exported it save_as_text_file in Spark.jl. Did I miss something here?

dfdx commented

Can you post a reproducible snippet? If you don't see any errors during execution, it might be some generic error like saving an empty RDD or exiting before Spark has time to finish writing to HDFS.

Also, does it work if you read and write to a local file, for example?

@dfdx, thank you for your help. It seems that Spark does not overwrite folder by default. It is working now after removing the old empty folder. Could we set some parameter to make it overwrittenable? Here is the code snippet.

using Spark
filepath_input = "hdfs://..."
filepath_output = "hdfs://..."
Spark.init()
sc = SparkContext(master="local")
text = text_file(sc, filepath_input)
words = flat_map(text, s -> [string(word) for word in split(s)]) 
words_tuple = cartesian(words, parallelize(sc, [1]))
counts = reduce_by_key(words_tuple, +)
save_as_text_file(counts,filepath_output)
close(sc) 
dfdx commented

Yes, it's possible to override the output directory. In the RDD API, it should be enough to set "spark.hadoop.validateOutputSpecs" property to "false", e.g.:

conf = SparkConf(Dict("spark.hadoop.validateOutputSpecs" => "false"))
sc = SparkContext(master="local", conf=conf)

In the SQL interface there's a special method for it, but we don't have a convenient API for it in Spark.jl, so you'll have to use a chain of jcalls directly on Java API to obtain it. See more details on the solution for the SQL interface here.

Thanks. Line 4 in Spark.jl file seems to be a typo? It should be "SparkConf"? After correcting this, HDFS files can be overwritten.

`module Spark

export
SparkConfig,
SparkContext,`

dfdx commented

Ah, it's interesting that this simple mistake never appeared before! Thanks for noticing, I'll fix the typo after your PR is merged to avoid rebasing on your side.

Should we close this issue now?

Yep, thank you for the help!