/spark-text-pipes

Primary LanguageScalaGNU General Public License v3.0GPL-3.0

Spark Text Pipes

Spark Text Pipes (STP) is a library for text processing and Feature creation for machine learning algorithms based on Spark and Kafka. It's able to perform semantic match and search analysis over text fields, including retrieving geographical coordinates from pure textual data by querying Dbpedia.

Architecture

The main component of the DSE is a long-lived Spark application that runs on a Spark cluster. It was designed to extensively employ Spark ML pipelines, so to apply sets of transformations over data.

Multiple pipelines can operate over multiple fields of data in the same or different DataFrames, each transformer adding new columns (i.e. new fields) to the tables as data flows along the pipeline. Transformers can be combined freely and flexibly to adjust pipelines behaviours to the task at hand. Data can be injected in Spark through Kafka. Flexible interfaces and classes provide logic to obtain always fresh in our DataFrames.

Transformers

For now, we have engineered transformers with three different purposes:

  • processing natural language text
  • dealing with geographical data
  • calculating scoring metrics or features.

They all share the same interface with spark.ml (DataFrame API). These transformers are meant to be mixed together with other spark.ml transformers and estimators in the same pipeline, hence providing easy access to machine learning techniques.

For instance, text transformers can clean-up text, convert HTML to text, detect natural languages, filter words, calculate word embeddings, and perform other NLP transformations. Stanford CoreNLP is also supported in our pipelines. Since transformers are are run in a distributed environment while also using non-serialisable code (e.g. language-detection libraries), we make extensive use of the @transient lazy val pattern.

Word Embeddings

Word embeddings refer to a set of techniques that aim to calculate vector representations of words. They were initially discussed in the research area of distributional semantics and are used as a fundamental building block in modern-AI NLP. They are calculated from large corpora of text, often Wikipedia, one semantic model for each language.

Distances between pairs of vectors can straightforwardly be employed to calculate and thus express similarities between pairs of documents. Interestingly, recent embeddings can solve most analogy relationships via linear algebra, e.g.

v(king) – v(man) + v(woman) ~= v(queen)
v(paris) – v(france) + v(germany) ~= v(berlin)

Conveniently, several research groups at major companies (e.g. Facebook, Google, etc.) distribute pre-trained embeddings under permissive licenses. We use Facebook's Muse vectors. They are state-of-the-art multilingual word embeddings (fastText embeddings aligned in a common space). This allows us to calculate semantics similarities between vectors relative to words from different languages.

By aggregating vectors in paragraphs and documents, it is possible to define representations for arbitrary pieces of texts, such as sentences, paragraphs, or entire documents. While a simple average of the words constituting a sentence is the most popular approach, we also provide classes that support more advanced techniques, such as the Word Mover Distance, a specialised version of the Earth Mover Distance.

Geographical Data

Thanks to Dbpedia doing the heavy lifting, we provide classes to find the geographical coordinates of places given their (possibly incomplete) name (and, optionally, their country), in multiple languages (including Asian languages).

Ranking Functions

Textual fields similarity

Once semantic vectors are calculated for every text row of two DataFrames A and B, for each row of A, it is possible to rank rows of B according to a similarity metric.

Geographical similarity

Geographical similarities are calculated using the Haversine formula.

Kafka integration

STP supports reading from and writing to Kafka topics, including Avro-serialized topics (only reading for now) through Spark's Structured Streaming. Streaming DataFrames can be sinked to Parquet files. Logic to get fresh copies of these DataFrames are provided (so that they can be used at the same time).

Usage

Deployment

STP has a few dependendencies. Dockerfiles are included under docker along with a docker-compose.yml in /.

Given a user-defined ROOT_DIR, Docker services share two directories, $ROOT_DIR/data and $ROOT_DIR/apps. While the former hosts data used and produced by STP and other services, the latter holds scripts and jar files. Most Docker containers will mount these two directories.

ROOT_DIR="../" # user-defined root directory
mkdir ${ROOT_DIR}/data
mkdir ${ROOT_DIR}/apps
mkdir ${ROOT_DIR}/apps/corenlp
mkdir ${ROOT_DIR}/apps/scripts

# If Stanford CoreNLP is used, download the models
sh scripts/get_stanford_corenlp_models.sh

# How to launch the Spark Shell for development (example)
cp scripts/spark-shell.sh ../apps/scripts

# By doing this the spark shell history is preserved after the container is gone
touch ../data/scala_history

Additional Service Dependencies

STP has two major dependencies (git clone them in $ROOT_DIR):

  • Words-Embeddings-Dict
    An akka-based microservice that acts as dictionary for multi-language word embeddings.
  • Text2Geolocation
    A REST microservice returning latitude and longitude from text.

Both services should be running before running STP.

Building

Install sbt

apt-get update
apt-get install apt-transport-https
echo "deb https://dl.bintray.com/sbt/debian /" | tee -a /etc/apt/sources.list.d/sbt.list
apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
apt-get update
apt-get install sbt

Building

 sbt 'set test in assembly := {}' assembly
 # If it fails, increase the RAM:
 # sbt -mem 4000 'set test in assembly := {}' assembly
 # you might need to pass a suitable java home (java 8 required) to sbt, e.g.
 # sbt -java-home /usr/lib/jvm/java-8-openjdk-amd64 'set test in assembly := {}' assembly

Running the Spark Shell

# On the host machine
docker exec -it --env COLUMNS=`tput cols` --env LINES=`tput lines` master bash -c "stty cols $COLUMNS rows $LINES && /bin/bash"

# In the Docker container
/apps/scripts/spark-shell.sh

Testing from command line

Inside a docker container with access to all necessary docker container dependencies, assuming your $ROOT_DIR/apps/ is /apps/,

cd /apps/
git clone https://github.com/Haufe-Lexware/spark-text-pipes.git
cd spark-text-pipes

Running all tests

env TESTING="true" DATA_ROOT="/data/" APPS_ROOT="/apps/" sbt -ivy /apps/ivy -mem 20000 test

Running just a subset of the tests

env TESTING="true" DATA_ROOT="/data/" APPS_ROOT="/apps/" sbt -ivy /apps/ivy -mem 8000 "test:testOnly *TopicSource*"

Running tests from idea

Setup environment variables

TESTING=true
DATA_ROOT=~/code/data
APPS_ROOT=~/code/apps

DATA_ROOT corresponds to the directory where data is.

Please consider that both EmbeddingsDict and Text2Geolocation need to be reacheable for tests to work.

Debugging the dependencies graph

sbt dependencyBrowseGraph

Hint: Edit the resulting html where <svg width=1280 height=1024> and change the windows size if you wish so.

In your application

Define the build.sbt of your Spark/Scala application as follows:

name := "MySparkApp"
version := "0.1"
scalaVersion := "2.11.12"

lazy val root = (project in file(".")).dependsOn(
  RootProject(uri("https://github.com/Haufe-Lexware/spark-text-pipes.git"))
)

// Your apps path (deployment, scripts, etc. must be accessible by Spark)
val appsPath = sys.env.getOrElse("APPS_PATH", "../apps")

lazy val sparkDependencies = {
  val sparkVer = "2.4.4"
  Seq(
    "org.apache.spark" %% "spark-core" % sparkVer,
    "org.apache.spark" %% "spark-sql" % sparkVer,
    "org.apache.spark" %% "spark-avro" % sparkVer,
    "org.apache.spark" %% "spark-streaming" % sparkVer,
    "org.apache.spark" %% "spark-mllib" % sparkVer,
    "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVer
  )
}

// spark
libraryDependencies ++= sparkDependencies.map(_ % "provided")
Test / test / libraryDependencies ++= sparkDependencies

// ALL YOUR APP DEPENDENCIES GO HERE
// libraryDependencies += ...
// libraryDependencies ++= Seq(...

// merge strategies 
assemblyMergeStrategy in assembly := {
  case PathList("reference.conf") => MergeStrategy.concat
  case x if x.contains("EncodingDetector") => MergeStrategy.deduplicate
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.last
}

// final Fat-Jar artifact
assemblyOutputPath in assembly := file(s"$appsPath/${name.value}.jar")

Project.inConfig(Test)(baseAssemblySettings)
assemblyJarName in (Test, assembly) := s"${name.value}-test-${version.value}.jar"

fork in ThisBuild in Test:= false

Usage Examples

See com.haufe.umantis.ds.examples.Search.