/Spark2Elasticsearch

Spark Library for Bulk Loading into Elasticsearch

Primary LanguageScalaApache License 2.0Apache-2.0

Spark2Elasticsearch

Spark Library for Bulk Loading into Elasticsearch

Build Status

Requirements

Spark2Elasticsearch supports Spark 1.4 and above.

Spark2Elasticsearch Version Elasticsearch Version
2.0.X 2.0.X
2.1.X 2.1.X

Downloads

SBT

libraryDependencies += "com.github.jparkie" %% "spark2elasticsearch" % "2.0.2"

Or:

libraryDependencies += "com.github.jparkie" %% "spark2elasticsearch" % "2.1.2"

Add the following resolver if needed:

resolvers += "Sonatype OSS Releases" at "https://oss.sonatype.org/content/repositories/releases"
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

Maven

<dependency>
  <groupId>com.github.jparkie</groupId>
  <artifactId>spark2elasticsearch_2.10</artifactId>
  <version>x.y.z-SNAPSHOT</version>
</dependency>

It is planned for Spark2Elasticsearch to be available on the following:

Features

  • Utilizes Elasticsearch Java API with a TransportClient to bulk load data from a DataFrame into Elasticsearch.

Usage

Bulk Loading into Elasticsearch

// Import the following to have access to the `bulkLoadToEs()` function.
import com.github.jparkie.spark.elasticsearch.sql._

val sparkConf = new SparkConf()
val sc = SparkContext.getOrCreate(sparkConf)
val sqlContext = SQLContext.getOrCreate(sc)

val df = sqlContext.read.parquet("<PATH>")

// Specify the `index` and the `type` to write.
df.bulkLoadToEs(
  esIndex = "twitter",
  esType = "tweets"
)

Refer to for more: SparkEsDataFrameFunctions.scala

Configurations

When adding configurations to through spark-submit, prefix property names with spark..

SparkEsMapperConf

Refer to for more: SparkEsMapperConf.scala

Property Name Default Description
es.mapping.id None The document field/property name containing the document id.
es.mapping.parent None The document field/property name containing the document parent. To specify a constant, use the format.
es.mapping.version None The document field/property name containing the document version. To specify a constant, use the format.
es.mapping.version.type None Indicates the type of versioning used. http://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs-index_.html#_version_types If es.mapping.version is undefined (default), its value is unspecified. If es.mapping.version is specified, its value becomes external.
es.mapping.routing None The document field/property name containing the document routing. To specify a constant, use the format.
es.mapping.ttl None The document field/property name containing the document time-to-live. To specify a constant, use the format.
es.mapping.timestamp None The document field/property name containing the document timestamp. To specify a constant, use the format.

SparkEsTransportClientConf

Refer to for more: SparkEsTransportClientConf.scala

Property Name Default Description
es.nodes Required The minimum set of hosts to connect to when establishing a client. Comma separated, colon separated host and port.
es.port 9300 The port to connect when establishing a client.
es.cluster.name None The name of the Elasticsearch cluster to connect.
es.client.transport.sniff None If set to true, will discover other IP addresses to connect.
es.client.transport.ignore_cluster_name None Set to true to ignore cluster name validation of connected nodes.
es.client.transport.ping_timeout 5s The time to wait for a ping response from a node.
es.client.transport.nodes_sampler_interval 5s How often to sample / ping the nodes listed and connected.

SparkEsWriteConf

Refer to for more: SparkEsWriteConf.scala

Property Name Default Description
es.batch.size.entries 1000 The number of IndexRequests to batch in one request.
es.batch.size.bytes 5 The maximum size in MB of a batch.
es.batch.concurrent.request 1 The number of concurrent requests in flight.
es.batch.flush.timeout 10 The maximum time in seconds to wait while closing a BulkProcessor.

Documentation

Scaladocs are currently unavailable.