/gcs

Proof of concept Event Source for Google Cloud Storage for Knative

Primary LanguageGoApache License 2.0Apache-2.0

KnativeGoogle Cloud Storage Source CRD.

Overview

This repository implements an Event Source for Knative Eventing defined with a CustomResourceDefinition (CRD). This Event Source represents Google Cloud Storage. Point is to demonstrate an Event Source that does not live in the Knative Eventing Sources that can be independently maintained, deployed and so forth.

This particular example demonstrates how to perform basic operations such as:

  • Create a Cloud Storage Notification when a Google Cloud Storage object is created
  • Delete a Notification when that Source is deleted
  • Update a Notification when that Source spec changes

Details

Actual implementation contacts the Cloud Storage API and creates a Notification as specified in the GCSSource CRD Spec. Upon success a Knative service is created to receive calls from the Cloud Storage via GCP Pub Sub and will then forward them to the Channel or a Knative Service.

Purpose

Provide an Event Source that allows subscribing to Cloud Storage Object Notifications and processing them in Knative.

Another purpose is to serve as an example of how to build an Event Source using a [Warm Image[(https://github.com/mattmoor/warm-image) as a starting point.

Prerequisites

  1. Create a Google Cloud project and install the gcloud CLI and run gcloud auth login. This sample will use a mix of gcloud and kubectl commands. The rest of the sample assumes that you've set the $PROJECT_ID environment variable to your Google Cloud project id, and also set your project ID as default using gcloud config set project $PROJECT_ID.

  2. Setup Knative Serving

  3. Configure outbound network access

  4. Setup Knative Eventing using the release.yaml file. This example does not require GCP.

  5. Setup GCP PubSub Source Just need to do Prerequisites, no need to deploy anything unless you just want to make sure that everything is up and running correctly.

  6. Have an existing bucket in GCS (or create a new one) that you have permissions to manage. Let's set up an environmental variable for that which we'll use in the rest of this document.

    export MY_GCS_BUCKET=<YOUR_BUCKET_NAME>

Create a GCP Service Account and a corresponding secret in Kubernetes

  1. Create a GCP Service Account. This sample creates one service account for both registration and receiving messages, but you can also create a separate service account for receiving messages if you want additional privilege separation.

    1. Create a new service account named gcs-source with the following command:

      gcloud iam service-accounts create gcs-source
    2. Give that Service Account the Admin role for storage your GCP project:

      gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/storage.admin
    3. Give that Service Account the Editor role for pubsub your GCP project:

      gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
        --role roles/pubsub.editor
    4. Create a namespace for where the secret is created and where our controller will run

      kubectl create namespace gcssource-system
    5. Give Google Cloud Storage permissions to publish to GCP Pub Sub.

      1. First find the Service Account that GCS uses to publish to Pub Sub (Either using UI, or using curl as shown below)

        1. Use the Cloud Console or the JSON API Assume the service account you found from above was service-XYZ@gs-project-accounts.iam.gserviceaccount.com, you'd do: shell export GCS_SERVICE_ACCOUNT=service-XYZ@gs-project-accounts.iam.gserviceaccount.com

        2. Use curl to fetch the email:

        export GCS_SERVICE_ACCOUNT=`curl -s -X GET -H "Authorization: Bearer \`GOOGLE_APPLICATION_CREDENTIALS=./gcs-source.json gcloud auth application-default print-access-token\`" "https://www.googleapis.com/storage/v1/projects/$PROJECT_ID/serviceAccount" | grep email_address | cut -d '"' -f 4`
      2. Then grant rights to that Service Account to publish to GCP PubSub.

        gcloud projects add-iam-policy-binding $PROJECT_ID \
          --member=serviceAccount:$GCS_SERVICE_ACCOUNT \
          --role roles/pubsub.publisher
    6. Download a new JSON private key for that Service Account. Be sure not to check this key into source control!

      gcloud iam service-accounts keys create gcs-source.json \
       --iam-account=gcs-source@$PROJECT_ID.iam.gserviceaccount.com
    7. Create two secrets on the kubernetes cluster with the downloaded key:

      # Secret for gcssource-system:
      kubectl --namespace gcssource-system create secret generic gcs-source-key --from-file=key.json=gcs-source.json
      
      # Secret for default:
      kubectl --namespace default create secret generic google-cloud-key --from-file=key.json=gcs-source.json

      gcs-source-key and key.json are pre-configured values in the gcssource-controller Deployment which manages your GCS sources.

      google-cloud-key and key.json are pre-configured values in one-to-one-gcs.yaml.

Install Cloud Storage Source

kubectl apply -f https://raw.githubusercontent.com/vaikas-google/gcs/master/release.yaml

Inspect the Cloud Storage Source

First list the available sources, you might have others available to you, but this is the one we'll be using in this example

 kubectl get crds -l "eventing.knative.dev/source=true"

You should see something like this:

NAME                                      AGE
gcssources.sources.aikas.org              13d

you can then get more details about it, for example what are the available configuration options for it:

kubectl get crds gcssources.sources.aikas.org -oyaml

And in particular the Spec section is of interest, because it shows configuration parameters and describes them as well as what the required parameters are:

  validation:
    openAPIV3Schema:
      properties:
        apiVersion:
          type: string
        kind:
          type: string
        metadata:
          type: object
        spec:
          properties:
            bucket:
              description: GCS bucket to subscribe to. For example my-test-bucket
              type: string
            gcpCredsSecret:
              description: Optional credential to use for subscribing to the GCP PubSub
                topic. If omitted, uses gcsCredsSecret. Must be a service account
                key in JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys).
              type: object
            gcsCredsSecret:
              description: Credential to use for creating a GCP notification. Must
                be a service account key in JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys).
              type: object
            googleCloudProject:
              description: Google Cloud Project ID to create the scheduler job in.
              type: string
            objectNamePrefix:
              description: Optional prefix to only notify when objects match this
                prefix.
              type: string
            payloadFormat:
              description: Optional payload format. Either NONE or JSON_API_V1. If
                omitted, uses JSON_API_V1.
              type: string
            serviceAccountName:
              description: Service Account to run Receive Adapter as. If omitted,
                uses 'default'.
              type: string
            sink:
              description: Where to sink the notificaitons to.
              type: object
          required:
          - gcsCredsSecret
          - googleCloudProject
          - bucket
          - sink

Create a Knative Service that will be invoked for each Cloud Storage object notification

To verify the Cloud Storage is working, we will create a simple Knative Service that dumps incoming messages to its log. The service.yaml file defines this basic service. Image might be different if a new version has been released.

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: gcs-message-dumper
spec:
  runLatest:
    configuration:
      revisionTemplate:
        spec:
          container:
            image: us.gcr.io/probable-summer-223122/eventdumper-833f921e52f6ce76eb11f89bbfcea1df@sha256:7edb9fc190dcf350f4c49c48d3ff2bf71de836ff3dc32b1d5082fd13f90edee3

Enter the following command to create the service from service.yaml:

kubectl --namespace default apply -f https://raw.githubusercontent.com/vaikas-google/gcs/master/service.yaml

Configure Cloud Storage to send events directly to the service

The simplest way to consume events is to wire the Source directly into the consuming function. The logical picture looks like this:

Source Directly To Function

Wire Cloud Storage events to the function

Create a Cloud Storage instance targeting your function with the following:

curl https://raw.githubusercontent.com/vaikas-google/gcs/master/one-to-one-gcs.yaml | \
sed "s/MY_GCP_PROJECT/$PROJECT_ID/g" | sed "s/MY_GCS_BUCKET/$MY_GCS_BUCKET/g" | kubectl apply -f -

Check that the Cloud Storage Source was created

kubectl get gcssources

And you should see something like this:

vaikas@penguin:~/projects/go/src/github.com/vaikas-google/gcs$ kubectl get gcssources
NAME                AGE
notification-test   8d

And inspecting the Status field of it via:

vaikas@penguin:~/projects/go/src/github.com/vaikas-google/gcs$ kubectl get gcssources -oyaml
apiVersion: v1
items:
- apiVersion: sources.aikas.org/v1alpha1
  kind: GCSSource
  metadata:
    creationTimestamp: 2018-12-27T03:43:26Z
    finalizers:
    - gcs-controller
    generation: 1
    name: notification-test
    namespace: default
    resourceVersion: "12485601"
    selfLink: /apis/sources.aikas.org/v1alpha1/namespaces/default/gcssources/notification-test
    uid: 9176c51c-0989-11e9-8605-42010a8a0205
  spec:
    bucket: vaikas-knative-test-bucket
    gcsCredsSecret:
      key: key.json
      name: google-cloud-key
    googleCloudProject: quantum-reducer-434
    sink:
      apiVersion: serving.knative.dev/v1alpha1
      kind: Service
      name: message-dumper
  status:
    conditions:
    - lastTransitionTime: 2019-01-09T01:46:54Z
      severity: Error
      status: "True"
      type: GCSReady
    - lastTransitionTime: 2019-01-09T01:46:54Z
      severity: Error
      status: "True"
      type: PubSubSourceReady
    - lastTransitionTime: 2019-01-09T01:46:53Z
      severity: Error
      status: "True"
      type: PubSubTopicReady
    - lastTransitionTime: 2019-01-09T01:46:54Z
      severity: Error
      status: "True"
      type: Ready
    notificationID: "5"
    sinkUri: http://message-dumper.default.svc.cluster.local/
    topic: gcs-67b38ee6-64a4-4867-892d-33ee53ff24d4
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

We can see that the Conditions 'type: Ready" is set to 'status: "True"' indicating that the notification was correctly created. We can see that the notificationID has been filled in indicating the GCS notification was created. sinkUri is the Knative Service where the events are being delivered to. topic idenfities the GCP Pub Sub topic that we are using as a transport mechanism between GCS and your function.

(Optional) Check that the Cloud Storage notification was created with gsutil

vaikas@vaikas:~/projects/go/src/github.com$ gsutil notification list gs://$MY_GCS_BUCKET
projects/_/buckets/vaikas-knative-test-bucket/notificationConfigs/5
	Cloud Pub/Sub topic: projects/quantum-reducer-434/topics/gcs-67b38ee6-64a4-4867-892d-33ee53ff24d4

Then upload some file to your bucket to trigger an Object Notification.

Check that Cloud Storage invoked the function

kubectl -l 'serving.knative.dev/service=gcs-message-dumper' logs -c user-container

And you should see an entry like this there

2019/01/09 17:56:01 Received Cloud Event Context as: {CloudEventsVersion:0.1 EventID:303284831868154 EventTime:2019-01-09 17:56:00.16 +0000 UTC EventType:google.pubsub.topic.publish EventTypeVersion: SchemaURL: ContentType:application/json Source://pubsub.googleapis.com/quantum-reducer-434/topics/gcs-e29de50b-e416-44fd-9c28-2ea33764096a Extensions:map[]}
2019/01/09 17:56:01 object notification metadata is: &{Name:dummytextfile Bucket:vaikas-knative-test-bucket Size:37}

Where the headers displayed are the Cloud Events Context and last few lines are the actual Notification Details.

Configure Cloud Storage to send events to the default Broker

Another option is to use Broker/Trigger to get into the eventing mesh, just update the sink like,

  sink:
    apiVersion: eventing.knative.dev/v1alpha1
    kind: Broker
    name: default

Read more about Broker/Trigger.

Uninstall

kubectl delete gcssources notification-test
kubectl delete services.serving gcs-message-dumper
gcloud iam service-accounts delete gcs-source@$PROJECT_ID.iam.gserviceaccount.com
gcloud projects remove-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
  --role roles/storage.admin

gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
  --role roles/pubsub.editor

gcloud projects remove-iam-policy-binding $PROJECT_ID \
  --member=serviceAccount:$GCS_SERVICE_ACCOUNT \
  --role roles/pubsub.publisher
kubectl delete secrets gcs-source-key
kubectl delete services.serving gcs-message-dumper