-
A library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. The structure and test tools are mostly copied from CSV Data Source for Spark.
-
This package supports to process format-free XML files in a distributed way, unlike JSON datasource in Spark restricts in-line JSON format.
spark-xml | Spark |
---|---|
0.6.x+ | 2.3.x+ |
0.5.x | 2.2.x - 2.4.x |
0.4.x | 2.0.x - 2.1.x |
0.3.x | 1.x |
You can link against this library in your program at the following coordinates:
groupId: com.databricks
artifactId: spark-xml_2.11
version: 0.9.0
groupId: com.databricks
artifactId: spark-xml_2.12
version: 0.9.0
This package can be added to Spark using the --packages
command line option. For example, to include it when starting the spark shell:
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.9.0
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.12:0.9.0
This package allows reading XML files in local or distributed filesystem as Spark DataFrames.
When reading files the API accepts several options:
path
: Location of files. Similar to Spark can accept standard Hadoop globbing expressions.rowTag
: The row tag of your xml files to treat as a row. For example, in this xml<books> <book><book> ...</books>
, the appropriate value would bebook
. Default isROW
.samplingRatio
: Sampling ratio for inferring schema (0.0 ~ 1). Default is 1. Possible types areStructType
,ArrayType
,StringType
,LongType
,DoubleType
,BooleanType
,TimestampType
andNullType
, unless user provides a schema for this.excludeAttribute
: Whether you want to exclude attributes in elements or not. Default is false.treatEmptyValuesAsNulls
: (DEPRECATED: usenullValue
set to""
) Whether you want to treat whitespaces as a null value. Default is falsemode
: The mode for dealing with corrupt records during parsing. Default isPERMISSIVE
.PERMISSIVE
:- When it encounters a corrupted record, it sets all fields to
null
and puts the malformed string into a new field configured bycolumnNameOfCorruptRecord
. - When it encounters a field of the wrong datatype, it sets the offending field to
null
.
- When it encounters a corrupted record, it sets all fields to
DROPMALFORMED
: ignores the whole corrupted records.FAILFAST
: throws an exception when it meets corrupted records.
inferSchema
: iftrue
, attempts to infer an appropriate type for each resulting DataFrame column, like a boolean, numeric or date type. Iffalse
, all resulting columns are of string type. Default istrue
.columnNameOfCorruptRecord
: The name of new field where malformed strings are stored. Default is_corrupt_record
.attributePrefix
: The prefix for attributes so that we can differentiate attributes and elements. This will be the prefix for field names. Default is_
.valueTag
: The tag used for the value when there are attributes in the element having no child. Default is_VALUE
.charset
: Defaults to 'UTF-8' but can be set to other valid charset namesignoreSurroundingSpaces
: Defines whether or not surrounding whitespaces from values being read should be skipped. Default is false.rowValidationXSDPath
: Path to an XSD file that is used to validate the XML for each row individually. Rows that fail to validate are treated like parse errors as above. The XSD does not otherwise affect the schema provided, or inferred. Note that if the same local path is not already also visible on the executors in the cluster, then the XSD and any others it depends on should be added to the Spark executors withSparkContext.addFile
. In this case, to use local XSD/foo/bar.xsd
, calladdFile("/foo/bar.xsd")
and pass just"bar.xsd"
asrowValidationXSDPath
. New in 0.8.0.
When writing files the API accepts several options:
path
: Location to write files.rowTag
: The row tag of your xml files to treat as a row. For example, in this xml<books> <book><book> ...</books>
, the appropriate value would bebook
. Default isROW
.rootTag
: The root tag of your xml files to treat as the root. For example, in this xml<books> <book><book> ...</books>
, the appropriate value would bebooks
. Default isROWS
.nullValue
: The value to writenull
value. Default is stringnull
. When this isnull
, it does not write attributes and elements for fields.attributePrefix
: The prefix for attributes so that we can differentiating attributes and elements. This will be the prefix for field names. Default is_
.valueTag
: The tag used for the value when there are attributes in the element having no child. Default is_VALUE
.compression
: compression codec to use when saving to file. Should be the fully qualified name of a class implementingorg.apache.hadoop.io.compress.CompressionCodec
or one of case-insensitive shorten names (bzip2
,gzip
,lz4
, andsnappy
). Defaults to no compression when a codec is not specified.
Currently it supports the shortened name usage. You can use just xml
instead of com.databricks.spark.xml
.
Per above, the XML for individual rows can be validated against an XSD using rowValidationXSDPath
.
From 0.10 onwards, the utility com.databricks.spark.xml.util.XSDToSchema
can be used to extract a Spark DataFrame
schema from some XSD files. It supports only simple, complex and sequence types, only basic XSD functionality,
and is experimental.
import com.databricks.spark.xml.util.XSDToSchema
import java.nio.file.Paths
val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
val df = spark.read.schema(schema)....xml(...)
Although primarily used to convert (portions of) large XML documents into a DataFrame
, from version 0.8.0 onwards,
spark-xml
can also parse XML in a string-valued column in an existing DataFrame with from_xml
, in order to add
it as a new column with parsed results as a struct.
import com.databricks.spark.xml.functions.from_xml
import com.databricks.spark.xml.schema_of_xml
import spark.implicits._
val df = ... /// DataFrame with XML in column 'payload'
val payloadSchema = schema_of_xml(df.select("payload").as[String])
val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
- This can convert arrays of strings containing XML to arrays of parsed structs. Use
schema_of_xml_array
instead com.databricks.spark.xml.from_xml_string
is an alternative that operates on a String directly instead of a column, for use in UDFs- If you use
DROPMALFORMED
mode withfrom_xml
, then XML values that do not parse correctly will result in anull
value for the column. No rows will be dropped. - If you use
PERMISSIVE
mode withfrom_xml
et al, which is the default, then the parse mode will actually instead default toDROPMALFORMED
. If however you include a column in the schema forfrom_xml
that matches thecolumnNameOfCorruptRecord
, thenPERMISSIVE
mode will still output malformed records to that column in the resulting struct.
The functions above are exposed in the Scala API only, at the moment, as there is no separate Python package
for spark-xml
. They can be accessed from Pyspark by manually declaring some helper functions that call
into the JVM-based API from Python. Example:
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
Due to the structure differences between DataFrame
and XML, there are some conversion rules from XML data to DataFrame
and from DataFrame
to XML data. Note that handling attributes can be disabled with the option excludeAttribute
.
-
Attributes: Attributes are converted as fields with the heading prefix,
attributePrefix
.<one myOneAttrib="AAAA"> <two>two</two> <three>three</three> </one>
produces a schema below:
root |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
-
Value in an element that has no child elements but attributes: The value is put in a separate field,
valueTag
.<one> <two myTwoAttrib="BBBBB">two</two> <three>three</three> </one>
produces a schema below:
root |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)
-
Element as an array in an array: Writing a XML file from
DataFrame
having a fieldArrayType
with its element asArrayType
would have an additional nested field for the element. This would not happen in reading and writing XML data but writing aDataFrame
read from other sources. Therefore, roundtrip in reading and writing XML files has the same structure but writing aDataFrame
read from other sources is possible to have a different structure.DataFrame
with a schema below:|-- a: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: string (containsNull = true)
with data below:
+------------------------------------+ | a| +------------------------------------+ |[WrappedArray(aa), WrappedArray(bb)]| +------------------------------------+
produces a XML file below:
<a> <item>aa</item> </a> <a> <item>bb</item> </a>
These examples use a XML file available for download here:
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
XML data source for Spark can infer data types:
CREATE TABLE books
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")
You can also specify column names and types in DDL. In this case, we do not infer schema.
CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
USING com.databricks.spark.xml
OPTIONS (path "books.xml", rowTag "book")
Import com.databricks.spark.xml._
to get implicits that add the .xml(...)
method to DataFrame
.
You can also use .format("xml")
and .load(...)
.
import org.apache.spark.sql.SparkSession
import com.databricks.spark.xml._
val spark = SparkSession.builder.getOrCreate()
val df = spark.read
.option("rowTag", "book")
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
You can manually specify the schema when reading data:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
import com.databricks.spark.xml._
val spark = SparkSession.builder.getOrCreate()
val customSchema = StructType(Array(
StructField("_id", StringType, nullable = true),
StructField("author", StringType, nullable = true),
StructField("description", StringType, nullable = true),
StructField("genre", StringType, nullable = true),
StructField("price", DoubleType, nullable = true),
StructField("publish_date", StringType, nullable = true),
StructField("title", StringType, nullable = true)))
val df = spark.read
.option("rowTag", "book")
.schema(customSchema)
.xml("books.xml")
val selectedData = df.select("author", "_id")
selectedData.write
.option("rootTag", "books")
.option("rowTag", "book")
.xml("newbooks.xml")
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession.builder().getOrCreate();
DataFrame df = spark.read()
.format("xml")
.option("rowTag", "book")
.load("books.xml");
df.select("author", "_id").write()
.format("xml")
.option("rootTag", "books")
.option("rowTag", "book")
.save("newbooks.xml");
You can manually specify schema:
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
SparkSession spark = SparkSession.builder().getOrCreate();
StructType customSchema = new StructType(new StructField[] {
new StructField("_id", DataTypes.StringType, true, Metadata.empty()),
new StructField("author", DataTypes.StringType, true, Metadata.empty()),
new StructField("description", DataTypes.StringType, true, Metadata.empty()),
new StructField("genre", DataTypes.StringType, true, Metadata.empty()),
new StructField("price", DataTypes.DoubleType, true, Metadata.empty()),
new StructField("publish_date", DataTypes.StringType, true, Metadata.empty()),
new StructField("title", DataTypes.StringType, true, Metadata.empty())
});
DataFrame df = spark.read()
.format("xml")
.option("rowTag", "book")
.schema(customSchema)
.load("books.xml");
df.select("author", "_id").write()
.format("xml")
.option("rootTag", "books")
.option("rowTag", "book")
.save("newbooks.xml");
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='book').load('books.xml')
df.select("author", "_id").write \
.format('xml') \
.options(rowTag='book', rootTag='books') \
.save('newbooks.xml')
You can manually specify schema:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
customSchema = StructType([ \
StructField("_id", StringType(), True), \
StructField("author", StringType(), True), \
StructField("description", StringType(), True), \
StructField("genre", StringType(), True), \
StructField("price", DoubleType(), True), \
StructField("publish_date", StringType(), True), \
StructField("title", StringType(), True)])
df = spark.read \
.format('xml') \
.options(rowTag='book') \
.load('books.xml', schema = customSchema)
df.select("author", "_id").write \
.format('xml') \
.options(rowTag='book', rootTag='books') \
.save('newbooks.xml')
Automatically infer schema (data types)
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.11:0.9.0"))
df <- read.df("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")
You can manually specify schema:
library(SparkR)
sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.11:0.9.0"))
customSchema <- structType(
structField("_id", "string"),
structField("author", "string"),
structField("description", "string"),
structField("genre", "string"),
structField("price", "double"),
structField("publish_date", "string"),
structField("title", "string"))
df <- read.df("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
write.df(df, "newbooks.csv", "xml", "overwrite")
The library contains a Hadoop input format for reading XML files by a start tag and an end tag. This is similar with XmlInputFormat.java in Mahout but supports to read compressed files, different encodings and read elements including attributes, which you may make direct use of as follows:
import com.databricks.spark.xml.XmlInputFormat
import org.apache.spark.SparkContext
import org.apache.hadoop.io.{LongWritable, Text}
val sc: SparkContext = _
// This will detect the tags including attributes
sc.hadoopConfiguration.set(XmlInputFormat.START_TAG_KEY, "<book>")
sc.hadoopConfiguration.set(XmlInputFormat.END_TAG_KEY, "</book>")
val records = sc.newAPIHadoopFile(
"path",
classOf[XmlInputFormat],
classOf[LongWritable],
classOf[Text])
This library is built with SBT. To build a JAR file simply run sbt package
from the project root. The build configuration includes support for both Scala 2.11 and 2.12.
This project was initially created by HyukjinKwon and donated to Databricks.