/spark-pip

Spark job to perform massive feature Point in Polygon (PiP) operations

Primary LanguageScalaApache License 2.0Apache-2.0

Spark Point In Polygon

Spark job to perform massive feature Point in Polygon (PiP) operations on a distributed share-nothing cluster.

In this job context, a feature is defined to have a geometry and a set of attributes. The geometry can be a point, a polygon or a multi-polygon, and an attribute is a string encoded value.

The job takes 2 inputs, a path to a set of files containing point features and a path to a set of files containing polygon features. The job iterates through every point feature and identifies which polygon it is inside of and then emits a union of user-defined attributes from the point feature and the polygon feature.

In this current implementation, each point feature is read from a field delimited text file where the x and y coordinates are double-typed fields. And each polygon feature is also read from a field delimited text file where the coordinates are defined in a WKT format.

Here is a point feature sample:

111.509,80.9801,7.07783,R14

And here is polygon feature sample:

0|EC02|ECU-AZU|Azuay|EC|ECU|Ecuador|555881|Province|Provincia|8788.83|3393.37|1|0|MULTIPOLYGON (((-79.373981904060528 -3.3344269140348786, ...

Note that in the above sample, the point fields are separated by a comma, where the polygon fields are separated by a pipe |. The delimited characters can be defined at execution time.

Building the Project

The build process uses Maven

mvn package

This will create a Spark jar in the target folder, and any runtime jar dependencies, will be copied to the target/libs folder.

Configuring the job

The job can accept a command line argument that is a file in a properties format. If no argument is found, then it tries to load a file named applications.properties from the current directory.

Key Description Default Value
geometry.precision The geometry factory precision 1000000.0
extent.xmin minimum horizontal bounding box filter -180.0
extent.xmax maximum horizontal bounding box filter 180.0
extent.ymin minimum vertical bounding box filter -90.0
extent.ymax maximum vertical bounding box filter 90.0
points.path path of point features must be defined
points.sep the points field character separator tab
points.x the index of the horizontal point coordinate 0
points.y the index of the vertical point coordinate 1
points.fields comma separated list of field value indexes to emit empty
polygons.path path of polygon features must be defined
polygons.sep the polygons field character separator tab
polygons.wky the index of the WKT field 0
output.path path where to write the emitted fields must be defined
output.sep the output fields character separator tab
reduce.size the grid size of the spatial join (see below for details) 1.0

The file can also include Spark configuration properties

Implementation details

Because the data can be massive, it cannot fit in memory all at once to perform the PiP operation and to, for example, create a spatial index of all the polygons for quick lookup. So...you will have to segment the input data and operate on each segment individually. And since these segments share nothing, they can be processed in parallel. So, in the geospatial domain, segmentation takes the form of subdividing the area of interest into smaller rectangular areas. Rectangles are "nice" because of their straight edges. You can, for example, quickly find out if a point is inside or outside of it, and complex shapes can be subdivided into coarse edge matching rectangles. It is that last fact that we will use to segment the input points and polygons in such that that we can perform a massive PiP per segment.

Take for example the below points and polygons in an area of interest.

Feature Polygon A can be segmented into (0,1) (0,2) (0,3) (1,1) (1,2) (1,3) (2,1) (2,2) (2,3).

Feature Polygon B can be segmented into (1,0) (1,1) (2,0) (2,1) because of its bounding box.

Feature Polygon C can be segmented into (3,1) (3,2).

Feature Point A can be segmented into (1,3).

Feature Point B can be segmented into (1,2).

Feature Point C can be segmented into (1,0).

Feature Point D can be segmented into (3,0).

The features are now encapsulated by their segments and are operated on in parallel.

Segment (1,2) has Polygon A and Point B. And since Point B is inside Polygon A then, the selected attributed of Point B and the selected attributes of Polygon A will be written to the output file.

In the case of Segment (1,3), it contains Polygon A and Point A. However, because Point A is not inside Polygon A, then nothing is written out.

In this implementation, the rectangle is actually square, and the size of the square (the segmentation area) is defined by the reduce.size properties. This value is very data specific and you might have to run the job a multiple times with different values while tracking the execution time to determine the "sweet spot".

Running the job

It is assumed that Spark was downloaded and installed in a folder that is hereon referred to as ${SPARK_HOME}

Unzip data/points.tsv.zip and data/world.tsv.zip into the /tmp folder.

points.tsv is a 1,000,000 randomly generated points and world.tsv contains the world administrative polygons.

Run In Local Mode

${SPARK_HOME}/bin/spark-submit\
 --driver-java-options "-server -Xms1g -Xmx16g"\
 target/spark-pip-0.1.jar\
 local.properties

The output can be viewed using:

more /tmp/output/part-*

Run on a Hadoop Cluster

For testing purposes, a Hadoop pseudo cluster can be created using Docker with HDFS, YARN and Spark. Check out README.md in the docker folder.

Let ${CODE} be the folder where you cloned the spark-pip project. And let ${DATA} be the folder where you unzipped the content of the data folder.

docker run\
  -it\
  --rm=true\
  --volume=${CODE}:/spark-pip\
  --volume=${DATA}:/data\
  -h boot2docker\
  -p 8088:8088\
  -p 9000:9000\
  -p 50010:50010\
  -p 50070:50070\
  -p 50075:50075\
  mraad/hdfs\
  /etc/bootstrap.sh -bash

Put the supporting data in HDFS:

cd $HADOOP_PREFIX
bin/hdfs dfs -put /data/world.tsv world.tsv
bin/hdfs dfs -put /data/points.tsv points.tsv

Execute the following to perform PiP. This reads the application properties by default from the application.properties file in the current folder.

cd /spark-pip
${SPARK_HOME}/bin/spark-submit\
 --master yarn-client\
 --driver-memory 1g\
 --executor-memory 2g\
 --executor-cores 1\
 --driver-java-options "-server -Xms1g -Xmx16g"\
 --jars target/libs/jts-1.13.jar\
 target/spark-pip-0.1.jar

View Data in HDFS

The output of the previous job resides in HDFS as CSV text files in hdfs://boot2docker:9000/tmp/output/part-*. This form of output from a BigData job is what I term GIS Data and needs to be visualized. The HDFS configuration in the docker container has the WebHDFS REST API enabled.

<property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
</property>

The WebHDFSToolbox ArcPy toolbox contains WebHDFSTool as a tool to open and read the content of CSV files in HDFS and converts each row into a feature in an in-memory feature class.