This repository contains implementations for Apache Beam batch pipelines to process private data shares stored in Firestore according to the Exposure Notification Private Analytics protocol. It assumes private data shares uploaded as Firestore documents, as done in the Exposure Notification Express template app. These documents contain encrypted packets using the Prio protocol. The pipeline implementation converts them into the format that downstream Prio data processing servers expect, defined in the Avro schema here.
This implementation make use of Firestore as a scalable NoSQL db for subsequent batching and aggregation. Alternative implementations might operate a custom backend endpoint to accumulate the packets, or use a pubsub mechanism. Since the packets are encrypted on device, the channel over which the packets travel need not be trusted.
This repository also contains the Firebase configuration to secure Firestore with security rules as well as Terraform scripts to bring up required infrastructure.
The project is structured into multiple maven modules to allow incorporation of outside implementations of attestation. Implementations need only depend on the DataShare model module, and a profile can be added to get it included in the pipeline module build. The pipeline pulls available implementations dynamically.
Since there aren't too many individual classes that make up each module, and since they are only meant to be packaged and executed together, we use a single source tree for all modules.
Follow the Getting started with Google Cloud Dataflow page. You will need the following:
-
Set up a Google Cloud project or use an existing one. Then import the Google Cloud project into Firebase.
-
Enable APIs: Container Registry, Cloud Build, Cloud Datastore and Dataflow.
-
Create a storage bucket for your outputs.
-
Create a service account with permissions for Firestore, reading the KMS key, Dataflow, and Cloud Storage.
-
Download the a key for your service account and store as
credentials.json
. Keep those credentials save!
Setting the following environment variables can be handy when working in the
project. Replace values in [...]
.
export PROJECT="[my-google-cloud-ingestion-project-id]"
export GOOGLE_APPLICATION_CREDENTIALS="credentials.json"
export TEMPLATES="gs://[my-cloud-storage-bucket]/templates"
export PHA_OUTPUT="gs://[my-cloud-storage-bucket]/output/pha"
export FACILITATOR_OUTPUT="gs://[my-cloud-storage-bucket]/output/faciliator"
export KEY_RESOURCE_NAME="projects/[some-ingestion-project]/locations/global/keyRings/[some-signature-key-ring]/cryptoKeys/[some-signature-key]/cryptoKeyVersions/1"
To run unit tests:
./mvnw test
Integration tests go against an actual test project and so need an environment variable:
./mvnw verify
There are two pipelines. One reads Prio data shares from Firestore and generates the outputs which the PHA and Facilitator data share processors will consume. The other deletes expired or already processed data shares from Firestore.
They both take as options the window of time to cover, in the form of a start
time and duration. When not supplied, start time for the ingestion pipeline is
calculated based on current time rounding back to previous window of length
duration
. For the deletion pipeline, it goes back two windows to ensure a
safety margin of not deleting unprocessed data shares.
To run the ingestion pipeline locally:
./mvnw compile exec:java \
-Djava.util.logging.config.file=logging.properties \
-Dexec.mainClass=com.google.exposurenotification.privateanalytics.ingestion.pipeline.IngestionPipeline \
-Dexec.args="--keyResourceName=$KEY_RESOURCE_NAME --phaOutput=$PHA_OUTPUT --facilitatorOutput=$FACILITATOR_OUTPUT"
To run the deletion pipeline:
./mvnw compile exec:java \
-Djava.util.logging.config.file=logging.properties \
-Dexec.mainClass=com.google.exposurenotification.privateanalytics.ingestion.pipeline.DeletionPipeline \
-Dexec.args="--project=$PROJECT"
export SERVICE_ACCOUNT_EMAIL=$(egrep -o '[^"]+@[^"]+\.iam\.gserviceaccount\.com' $GOOGLE_APPLICATION_CREDENTIALS)
export BEAM_ARGS=(
"--keyResourceName=$KEY_RESOURCE_NAME"
"--phaOutput=$PHA_OUTPUT"
"--facilitatorOutput=$FACILITATOR_OUTPUT"
"--runner=DataflowRunner"
"--region=us-central1"
"--serviceAccount=$SERVICE_ACCOUNT_EMAIL"
)
./mvnw compile exec:java \
-Dexec.mainClass=com.google.exposurenotification.privateanalytics.ingestion.pipeline.IngestionPipeline \
-Dexec.args="$BEAM_ARGS"
See below on how to generate the flex template.
export SERVICE_ACCOUNT_EMAIL=$(egrep -o '[^"]+@[^"]+\.iam\.gserviceaccount\.com' $GOOGLE_APPLICATION_CREDENTIALS)
gcloud dataflow flex-template run "ingestion-pipeline-$USER-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters project="$PROJECT" \
--parameters keyResourceName="$KEY_RESOURCE_NAME" \
--parameters phaOutput="$PHA_OUTPUT" \
--parameters facilitatorOutput="$FACILITATOR_OUTPUT" \
--service-account-email "$SERVICE_ACCOUNT_EMAIL" \
--region "us-central1"
We generate a templated dataflow job that takes all pipeline options as runtime parameters.
To build the launch container we added profiles for the ingestion and deletion pipeline.
To build the ingestion pipeline launch container with setting a git derived version:
./mvnw -Pingestion-container-build -Dcontainer-version=$(git describe --tags --always --dirty=-dirty) \
-Dcontainer_registry_tag_prefix='gcr.io/[YOUR_CLOUD_PROJECT]' package
To build the ingestion pipeline with a custom attestation implementation,
include the additional attestation
profile, which assumes the package is
available in any of your configured maven repositories
(in .m2/settings.xml or local mvn-settings.xml):
./mvnw -Pingestion-container-build,attestation -Dcontainer-version=$(git describe --tags --always --dirty=-dirty) \
-Dcontainer_registry_tag_prefix='gcr.io/[YOUR_CLOUD_PROJECT]' package
To build the deletion pipeline launch container with the setting a git derived version:
./mvnw -Pdeletion-container-build -Dcontainer-version=$(git describe --tags --always --dirty=-dirty) \
-Dcontainer_registry_tag_prefix='gcr.io/[YOUR_CLOUD_PROJECT]' package
Built containers get automatically published to the container_registry_tag_prefix
you set. E.g. for Google container
registry: gcr.io/[YOUR_CLOUD_PROJECT]/ingestion-pipeline:$VERSION
and gcr.io/[YOUR_CLOUD_PROJECT]/deletion-pipeline:$VERSION
respectively.
To generate the Flex Template Metadata files and upload them to GCS run:
The following commands require nodejs json npm install -g json
Use the same container_registry_tag_prefix
as in the builds above.
export VERSION=$(git describe --tags --always --dirty=-dirty)
json -f templates/dataflow-flex-template.json \
-e "this.metadata=`cat templates/dataflow-ingestion-metadata-template.json`" \
-e "this.image='gcr.io/[YOUR_CLOUD_PROJECT]/ingestion-pipeline:$VERSION'" > ingestion-pipeline-$VERSION.json
json -f templates/dataflow-flex-template.json \
-e "this.metadata=`cat templates/dataflow-deletion-metadata-template.json`" \
-e "this.image='gcr.io/[YOUR_CLOUD_PROJECT]/deletion-pipeline:$VERSION'" > deletion-pipeline-$VERSION.json
gsutil cp ingestion-pipeline-$VERSION.json $TEMPLATES
gsutil cp deletion-pipeline-$VERSION.json $TEMPLATES
gsutil -h "Content-Type:application/json" cp templates/scheduler-ingestion-template.tmpl \
$TEMPLATES/scheduler-ingestion-template-$VERSION.tmpl
gsutil -h "Content-Type:application/json" cp templates/scheduler-deletion-template.tmpl \
$TEMPLATES/scheduler-deletion-template-$VERSION.tmpl
unset VERSION
Contributions to this repository are always welcome and highly encouraged.
See CONTRIBUTING for more information on how to get started.
Apache 2.0 - See LICENSE for more information.
This is not an official Google product