This project includes tools for reading data from Solr as a Spark RDD and indexing objects from Spark into Solr using SolrJ.
cd $SPARK_HOME
./bin/spark-shell --packages "com.lucidworks.spark:spark-solr:3.0.0-alpha"
or
./bin/spark-shell --jars spark-solr-3.0.0-alpha.jar
val options = Map(
"collection" -> "{solr_collection_name}",
"zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
.options(options)
.load
import com.lucidworks.spark.rdd.SolrRDD
val solrRDD = new SolrRDD(zkHost, collectionName, sc)
SolrRDD is an RDD of SolrDocument
The released jar files (1.1.2, 2.0.0, etc..) can be downloaded from the Maven Central repository. Maven Central also holds the shaded, sources, and javadoc .jars for each release.
<dependency>
<groupId>com.lucidworks.spark</groupId>
<artifactId>spark-solr</artifactId>
<version>3.0.0-alpha</version>
</dependency>
Snapshots of spark-solr are built for every commit on master branch. The snapshots can be accessed from OSS Sonatype.
mvn clean package -DskipTests
This will build 2 jars in the target
directory:
-
spark-solr-${VERSION}.jar
-
spark-solr-${VERSION}-shaded.jar
${VERSION}
will be something like 2.1.0-SNAPSHOT, for development builds.
The first .jar is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.
-
Send objects from a Spark (Streaming or DataFrames) into Solr.
-
Read the results from a Solr query as a Spark RDD or DataFrame.
-
Stream documents from Solr using
/export
handler (only works for exporting fields that have docValues enabled). -
Read large result sets from Solr using cursors or with
/export
handler. -
Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.
Cursors are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.
If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See splits section to enable and configure intra shard splitting.
Cursors won’t work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using [solr.params].
If the fields that are being queried have docValues enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option request_handler allows you to enable Streaming API via DataFrame.
Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See ManagedIndexSchemaFactory.
See Index parameters for configuration and tuning.
The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost
and collection
.
Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:
Usage: option("query","body_t:solr")
Default: *:*
If you don’t specify the "query" option, then all rows are read using the "match all documents" query (*:*
).
You can use the fields
option to specify a subset of fields to retrieve for each document in your results:
Usage: option("fields","id,author_s,favorited_b,…")
By default, all fields for each document are pulled back from Solr.
You can use the rows
option to specify the number of rows to retrieve from Solr per request. Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.
To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows
parameter is the page size.
By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.
Usage: option("rows","10000")
Default: 1000
Set the Solr request handler for queries. This option can be used to export results from Solr via /export
handler which streams data out of Solr. See Exporting Result Sets for more information.
The /export
handler needs fields to be explicitly specified. Please use the fields
option or specify the fields in the query.
Usage: option("request_handler", "/export")
Default: /select
If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.
Shard splitting is disabled by default.
Enable shard splitting on default field _version_
.
Usage: option("splits", "true")
Default: false
The above option is equivalent to option("split_field", "_version_")
The field to split on can be changed using split_field
option.
Usage: option("split_field", "id")
Default: _version_
Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.
Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.
Usage: option("splits_per_shard", "30")
Default: 20
This option is enabled by default and flattens multi valued fields from Solr.
Usage: option("flatten_multivalued", "false")
Default: true
If specified, the soft_commit_secs
option will be set via SolrConfig API during indexing
Usage: option("soft_commit_secs", "10")
Default: None
The batch_size
option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.
Usage: option("batch_size", "10000")
Default: 500
If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key
option will generate a unique value for each document before indexing to Solr. Instead of this option, the UUIDUpdateProcessorFactory can be used to generate UUID values for documents that are missing the unique key field
Usage: option("gen_uniq_key", "true")
Default: false
The sample_seed
option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct
option to control the sample size.
Usage: option("sample_seed", "5150")
Default: None
The solr.params
option can be used to specify any arbitrary Solr parameters in the form of a Solr query.
Usage: option("solr.params", "fq=userId:[10 TO 1000]&sort=userId desc")
Set this option as time, in order to query mutiple time series collections, partitioned according to some time period
Usage: option("partition_by", "time")
Default:none
This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.
Usage: option("time_period", "1MINUTES")
Default: 1DAYS
This pattern can be inferred from time_period. But this option can be used to explicitly specify.
Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")
Default: yyyy_MM_dd
This option is used to specify the field name in the indexed documents where time stamp is found.
Usage: option("time_stamp_field_name", "ts")
Default: timestamp_tdt
Solr can provide the number of matching documents nearly instantly, so why is calling count
on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.
If you’re just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr
utility function.
The rows
option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.
Use the sample_seed
option to limit the size of the results returned from Solr.
The com.lucidworks.spark.SparkApp
provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.
To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.
Implement the com.lucidworks.spark.SparkApp$RDDProcessor
interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).
Extend the com.lucidworks.spark.SparkApp$StreamProcessor
abstract class to build a Spark streaming application.
See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor
or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor
for examples of how to write a StreamProcessor.
For background on Solr security, see: https://cwiki.apache.org/confluence/display/solr/Security.
The SparkApp framework allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option
.
For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf
that sets the location of your Kerberos keytab file, such as:
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/solr.keytab"
storeKey=true
useTicketCache=true
debug=true
principal="solr";
};
To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf
created above using the -solrJaasAuthConfig option
, such as:
spark-submit --master yarn-server \
--class com.lucidworks.spark.SparkApp \
$SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \
hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
-hdfsPath /user/spark/testdata/syn_sample_50k \
-solrJaasAuthConfig=/path/to/jaas-client.conf