/bigsolr

Primary LanguageJavaApache License 2.0Apache-2.0

#BigSolr

BigSolr will try to provide comprehensive implementation of Solr connectors for Apache Hadoop, Spark and other big data technologies with Hadoop and Spark.

##Features

  • Provides custom Hadoop APIs to access Solr servers
  • Allows Apache Spark to read and write data with Solr servers through the Hadoop APIs
  • Integration with Cascading/Scalding, Pig, Hive, etc. (future plan-- not supported yet)

Requirements

  • Apache Spark 1.1 or higher (1.2 is recommended)
  • Apache Solr 4.10.x

How to Build

The following maven command creates bigsolr-0.1.jar in the target directory.

``` $ mvn clean compile package ```

Prerequisites

Before running the Spark program, Solr has to be up and running either in StandAlone or SolrCloud mode.

``` In StandAlone/HttpServer mode: $ bin/solr start

In SolrCloud (cloud) mode: $ bin/solr -e cloud

For the following example, create collection/core (index table) named "collection1" once the Solr server is up and running. 


## Using Spark Shell

<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>

$ spark-shell --jars target/bigsolr-0.1.jar


### Reading with New Hadoop API (mapreduce)

scala> import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.io.NullWritable

scala> import org.bigsolr.hadoop.SolrInputFormat

scala> import org.bigsolr.hadoop.SolrRecord


For SolrCloud mode

scala> val serverMode: String = "cloud"

scala> val serverUrl: String = "localhost:9983"

For StandAlone/HttpServer mode

scala> val serverMode: String = "standalone"

scala> val serverUrl: String = "http://localhost:8983/solr"


scala> val collection: String = "collection1"

scala> val fields: String = "id,description"

scala> val queryStr: String = "description:*"

scala> var conf = new Configuration()

scala> conf.set("solr.query", queryStr)

scala> conf.set("solr.server.url", serverUrl)

scala> conf.set("solr.server.mode", serverMode)

scala> conf.set("solr.server.collection", collection)

scala> conf.set("solr.server.fields", fields)

scala> val rdds = sc.newAPIHadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }

scala> rdds.count

scala> rdds.first

scala> rdds.first.getFieldValue("id")

scala> rdds.first.getFieldValue("description")

scala> rdds.first.getFieldValuesMap()


### Indexing with New Hadoop API (mapreduce)

scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord

scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapreduce.Job // New Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;

scala> var conf = new Configuration() scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)

// Example with MapWritable

scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> "John") scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)

scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })

// Index with MapWritable scala> rdds1a.saveAsNewAPIHadoopFile( "-", // this path parameter will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf )

// Example with SolrInputRecord (Wrapper for SolrInputDocument)

scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)

scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })

// Index with SolrInputRecord scala> rdds2a.saveAsNewAPIHadoopFile( "-", classOf[NullWritable], classOf[SolrInputRecord], classOf[SolrOutputFormat], conf )



### Reading with old Hadoop API (mapred)</p>

scala> import org.apache.hadoop.mapred.JobConf

scala> import org.apache.hadoop.io.NullWritable

scala> import org.bigsolr.hadoop.SolrInputFormat

scala> import org.bigsolr.hadoop.SolrRecord


For SolrCloud mode

scala> val serverMode: String = "cloud"

scala> val serverUrl: String = "localhost:9983"

For StandAlone/HttpServer mode

scala> val serverMode: String = "standalone"

scala> val serverUrl: String = "http://localhost:8983/solr"


scala> val collection: String = "collection1"

scala> val fields: String = "id,description"

scala> val queryStr: String = "description:*"

scala> var conf = new JobConf(sc.hadoopConfiguration)

scala> conf.set("solr.query", queryStr)

scala> conf.set("solr.server.url", serverUrl)

scala> conf.set("solr.server.mode", serverMode)

scala> conf.set("solr.server.collection", collection)

scala> conf.set("solr.server.fields", fields)

scala> val rdds = sc.hadoopRDD(conf, classOf[SolrInputFormat], classOf[NullWritable], classOf[SolrRecord]).map { case (key, value) => { value.getSolrDocument() } }

scala> rdds.count

scala> rdds.first

scala> rdds.first.getFieldValue("id")

scala> rdds.first.getFieldValue("description")

scala> rdds.first.getFieldValuesMap()


### Indexing with Old Hadoop API (mapred)

scala> import org.bigsolr.hadoop.SolrOutputFormat scala> import org.bigsolr.hadoop.SolrInputRecord

scala> import org.apache.hadoop.io.MapWritable scala> import org.apache.hadoop.io.NullWritable scala> import org.apache.hadoop.mapred.JobConf // Old Hadoop API scala> import org.apache.hadoop.conf.Configuration scala> import org.apache.hadoop.io.Text;

scala> var conf = new JobConf(sc.hadoopConfiguration) scala> conf.set("solr.server.url", serverUrl) scala> conf.set("solr.server.mode", serverMode) scala> conf.set("solr.server.collection", collection) scala> conf.set("solr.server.fields", fields)

scala> val m1 = Map("id" -> "1", "description" -> "apple orange New York", "author" -> scala> val m2 = Map("id" -> "2", "description" -> "apple peach San Diego", "author" -> "Kevin") scala> val m3 = Map("id" -> "3", "description" -> "Apple tomato San Francisco", "author" -> "Nick") scala> val l1 = ListMap[String,String] scala> val rdds1 = sc.parallelize(l1)

// Example with MapWritable scala> val rdds1a = rdds1.map(e => { val record = new MapWritable() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.put(new Text("id"), new Text(id)) record.put(new Text("description"), new Text(description)) record.put(new Text("author"), new Text(author)) (NullWritable.get, record) })

// Index with MapWritable scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )

scala> val m4 = Map("id" -> "4", "description" -> "orange lake Florida", "author" -> "Emily") scala> val m5 = Map("id" -> "5", "description" -> "cherry forest Vermont", "author" -> "Kate") scala> val m6 = Map("id" -> "6", "description" -> "strawberry beach California", "author" -> "Monica") scala> val l2 = ListMap[String,String] scala> val rdds2 = sc.parallelize(l2)

// Example with SolrInputRecord (Wrapper for SolrInputDocument) scala> val rdds2a = rdds2.map(e => { val record = new SolrInputRecord() val id = e.getOrElse("id", "") val description = e.getOrElse("description", "") val author = e.getOrElse("author", "") record.setField("id", id) record.setField("description", description) record.setField("author", author) //record.put(new Text(id), new Text(description)) (NullWritable.get, record) })

// Index with SolrInputRecord scala> rdds1a.saveAsHadoopFile( "-", // No Path-- will be ignored classOf[NullWritable], classOf[MapWritable], classOf[SolrOutputFormat], conf, None )


## SparkSQL

### Scala API

$ spark-shell --jars target/bigsolr-0.1.jar


scala> import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)

scala> import org.bigsolr.spark.solr._

scala> val rdds = sqlContext.query("id:*", "http://localhost:8983/solr", "standalone", "collection1", "id,description")

scala> rdds.count

scala> rdds.first


### SQL API

$ spark-sql --jars target/bigsolr-0.1.jar


spark-sql> CREATE TEMPORARY TABLE solr
> USING org.bigsolr.spark
> OPTIONS (query "id:*", serverUrl "http://localhost:8983/solr", serverMode "standalone", collection "collection1", fields "id,description");

spark-sql> select description from solr;


## Using PySpark Shell
<p><i>Note: please ensure your Spark distribution's Hadoop version! For Hadoop 2.3/2.4 or higher distributions, follow the instructions for New Hadoop API. If your Spark is Hadoop 1.x, please follow the instructions for old Hadoop API below.</i></p>

$ pyspark --jars target/bigsolr-0.1.jar


### Reading from Solr with PySpark

For SolrCloud mode

conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}

For StandAlone/HttpServer mode

conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.query":"id:*", "solr.server.fields":"id,description"}


rdds = sc.hadoopRDD("org.bigsolr.hadoop.SolrInputFormat", "org.apache.hadoop.io.NullWritable", "org.bigsolr.hadoop.SolrRecord", conf=conf)

rdd.count()

import json results = rdds.collect() for r in results: ... print json.loads(r[1])["id"] ... print json.loads(r[1])["description"]


### Indexing (Saving) RDDs in Solr with PySpark

For SolrCloud mode

conf = {"solr.server.url":"localhost:9983", "solr.server.mode":"cloud", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}

For StandAlone/HttpServer mode

conf = {"solr.server.url":"http://localhost:8983/solr", "solr.server.mode":"standalone", "solr.server.collection":"collection1", "solr.server.fields":"id,description"}


m1 = (None, {"id": "1", "description": "apple orange New York", "author": "John"}) m2 = (None, {"id": "2", "description": "apple peach San Diego", "author": "Kevin"}) data = [m1,m2] rdds = sc.parallelize(data)

rdds.saveAsHadoopFile("-", "org.bigsolr.hadoop.SolrOutputFormat", "org.apache.hadoop.io.NullWritable", "org.apache.hadoop.io.MapWritable", conf=conf)




## License
This software is available under the [Apache License, Version 2.0](LICENSE.txt).


## Reporting Bugs
Please use GitHub to report feature requests or bugs.