This repository implements an Event Source for Knative Eventing defined with a CustomResourceDefinition (CRD). This Event Source represents Google Cloud Storage. Point is to demonstrate an Event Source that does not live in the Knative Eventing Sources that can be independently maintained, deployed and so forth.
This particular example demonstrates how to perform basic operations such as:
- Create a Cloud Storage Notification when a Google Cloud Storage object is created
- Delete a Notification when that Source is deleted
- Update a Notification when that Source spec changes
Actual implementation contacts the Cloud Storage API and creates a Notification as specified in the GCSSource CRD Spec. Upon success a Knative service is created to receive calls from the Cloud Storage via GCP Pub Sub and will then forward them to the Channel or a Knative Service.
Provide an Event Source that allows subscribing to Cloud Storage Object Notifications and processing them in Knative.
Another purpose is to serve as an example of how to build an Event Source using a [Warm Image[(https://github.com/mattmoor/warm-image) as a starting point.
-
Create a Google Cloud project and install the
gcloud
CLI and rungcloud auth login
. This sample will use a mix ofgcloud
andkubectl
commands. The rest of the sample assumes that you've set the$PROJECT_ID
environment variable to your Google Cloud project id, and also set your project ID as default usinggcloud config set project $PROJECT_ID
. -
Setup Knative Serving
-
Configure outbound network access
-
Setup Knative Eventing using the
release.yaml
file. This example does not require GCP. -
Setup GCP PubSub Source Just need to do Prerequisites, no need to deploy anything unless you just want to make sure that everything is up and running correctly.
-
Have an existing bucket in GCS (or create a new one) that you have permissions to manage. Let's set up an environmental variable for that which we'll use in the rest of this document.
export MY_GCS_BUCKET=<YOUR_BUCKET_NAME>
-
Create a GCP Service Account. This sample creates one service account for both registration and receiving messages, but you can also create a separate service account for receiving messages if you want additional privilege separation.
-
Create a new service account named
gcs-source
with the following command:gcloud iam service-accounts create gcs-source
-
Give that Service Account the Admin role for storage your GCP project:
gcloud projects add-iam-policy-binding $PROJECT_ID \ --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \ --role roles/storage.admin
-
Give that Service Account the Editor role for pubsub your GCP project:
gcloud projects add-iam-policy-binding $PROJECT_ID \ --member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \ --role roles/pubsub.editor
-
Create a namespace for where the secret is created and where our controller will run
kubectl create namespace gcssource-system
-
Give Google Cloud Storage permissions to publish to GCP Pub Sub.
-
First find the Service Account that GCS uses to publish to Pub Sub (Either using UI, or using curl as shown below)
-
Use the Cloud Console or the JSON API Assume the service account you found from above was
service-XYZ@gs-project-accounts.iam.gserviceaccount.com
, you'd do:shell export GCS_SERVICE_ACCOUNT=service-XYZ@gs-project-accounts.iam.gserviceaccount.com
-
Use
curl
to fetch the email:
export GCS_SERVICE_ACCOUNT=`curl -s -X GET -H "Authorization: Bearer \`GOOGLE_APPLICATION_CREDENTIALS=./gcs-source.json gcloud auth application-default print-access-token\`" "https://www.googleapis.com/storage/v1/projects/$PROJECT_ID/serviceAccount" | grep email_address | cut -d '"' -f 4`
-
-
Then grant rights to that Service Account to publish to GCP PubSub.
gcloud projects add-iam-policy-binding $PROJECT_ID \ --member=serviceAccount:$GCS_SERVICE_ACCOUNT \ --role roles/pubsub.publisher
-
-
Download a new JSON private key for that Service Account. Be sure not to check this key into source control!
gcloud iam service-accounts keys create gcs-source.json \ --iam-account=gcs-source@$PROJECT_ID.iam.gserviceaccount.com
-
Create two secrets on the kubernetes cluster with the downloaded key:
# Secret for gcssource-system: kubectl --namespace gcssource-system create secret generic gcs-source-key --from-file=key.json=gcs-source.json # Secret for default: kubectl --namespace default create secret generic google-cloud-key --from-file=key.json=gcs-source.json
gcs-source-key
andkey.json
are pre-configured values in thegcssource-controller
Deployment which manages your GCS sources.google-cloud-key
andkey.json
are pre-configured values inone-to-one-gcs.yaml
.
-
kubectl apply -f https://raw.githubusercontent.com/vaikas-google/gcs/master/release.yaml
First list the available sources, you might have others available to you, but this is the one we'll be using in this example
kubectl get crds -l "eventing.knative.dev/source=true"
You should see something like this:
NAME AGE
gcssources.sources.aikas.org 13d
you can then get more details about it, for example what are the available configuration options for it:
kubectl get crds gcssources.sources.aikas.org -oyaml
And in particular the Spec section is of interest, because it shows configuration parameters and describes them as well as what the required parameters are:
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
bucket:
description: GCS bucket to subscribe to. For example my-test-bucket
type: string
gcpCredsSecret:
description: Optional credential to use for subscribing to the GCP PubSub
topic. If omitted, uses gcsCredsSecret. Must be a service account
key in JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys).
type: object
gcsCredsSecret:
description: Credential to use for creating a GCP notification. Must
be a service account key in JSON format (see https://cloud.google.com/iam/docs/creating-managing-service-account-keys).
type: object
googleCloudProject:
description: Google Cloud Project ID to create the scheduler job in.
type: string
objectNamePrefix:
description: Optional prefix to only notify when objects match this
prefix.
type: string
payloadFormat:
description: Optional payload format. Either NONE or JSON_API_V1. If
omitted, uses JSON_API_V1.
type: string
serviceAccountName:
description: Service Account to run Receive Adapter as. If omitted,
uses 'default'.
type: string
sink:
description: Where to sink the notificaitons to.
type: object
required:
- gcsCredsSecret
- googleCloudProject
- bucket
- sink
To verify the Cloud Storage
is working, we will create a simple Knative
Service
that dumps incoming messages to its log. The service.yaml
file
defines this basic service. Image might be different if a new version has been
released.
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: gcs-message-dumper
spec:
runLatest:
configuration:
revisionTemplate:
spec:
container:
image: us.gcr.io/probable-summer-223122/eventdumper-833f921e52f6ce76eb11f89bbfcea1df@sha256:7edb9fc190dcf350f4c49c48d3ff2bf71de836ff3dc32b1d5082fd13f90edee3
Enter the following command to create the service from service.yaml
:
kubectl --namespace default apply -f https://raw.githubusercontent.com/vaikas-google/gcs/master/service.yaml
The simplest way to consume events is to wire the Source directly into the consuming function. The logical picture looks like this:
Create a Cloud Storage instance targeting your function with the following:
curl https://raw.githubusercontent.com/vaikas-google/gcs/master/one-to-one-gcs.yaml | \
sed "s/MY_GCP_PROJECT/$PROJECT_ID/g" | sed "s/MY_GCS_BUCKET/$MY_GCS_BUCKET/g" | kubectl apply -f -
kubectl get gcssources
And you should see something like this:
vaikas@penguin:~/projects/go/src/github.com/vaikas-google/gcs$ kubectl get gcssources
NAME AGE
notification-test 8d
And inspecting the Status field of it via:
vaikas@penguin:~/projects/go/src/github.com/vaikas-google/gcs$ kubectl get gcssources -oyaml
apiVersion: v1
items:
- apiVersion: sources.aikas.org/v1alpha1
kind: GCSSource
metadata:
creationTimestamp: 2018-12-27T03:43:26Z
finalizers:
- gcs-controller
generation: 1
name: notification-test
namespace: default
resourceVersion: "12485601"
selfLink: /apis/sources.aikas.org/v1alpha1/namespaces/default/gcssources/notification-test
uid: 9176c51c-0989-11e9-8605-42010a8a0205
spec:
bucket: vaikas-knative-test-bucket
gcsCredsSecret:
key: key.json
name: google-cloud-key
googleCloudProject: quantum-reducer-434
sink:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: message-dumper
status:
conditions:
- lastTransitionTime: 2019-01-09T01:46:54Z
severity: Error
status: "True"
type: GCSReady
- lastTransitionTime: 2019-01-09T01:46:54Z
severity: Error
status: "True"
type: PubSubSourceReady
- lastTransitionTime: 2019-01-09T01:46:53Z
severity: Error
status: "True"
type: PubSubTopicReady
- lastTransitionTime: 2019-01-09T01:46:54Z
severity: Error
status: "True"
type: Ready
notificationID: "5"
sinkUri: http://message-dumper.default.svc.cluster.local/
topic: gcs-67b38ee6-64a4-4867-892d-33ee53ff24d4
kind: List
metadata:
resourceVersion: ""
selfLink: ""
We can see that the Conditions 'type: Ready" is set to 'status: "True"' indicating that the notification was correctly created. We can see that the notificationID has been filled in indicating the GCS notification was created. sinkUri is the Knative Service where the events are being delivered to. topic idenfities the GCP Pub Sub topic that we are using as a transport mechanism between GCS and your function.
vaikas@vaikas:~/projects/go/src/github.com$ gsutil notification list gs://$MY_GCS_BUCKET
projects/_/buckets/vaikas-knative-test-bucket/notificationConfigs/5
Cloud Pub/Sub topic: projects/quantum-reducer-434/topics/gcs-67b38ee6-64a4-4867-892d-33ee53ff24d4
Then upload some file to your bucket to trigger an Object Notification.
kubectl -l 'serving.knative.dev/service=gcs-message-dumper' logs -c user-container
And you should see an entry like this there
2019/01/09 17:56:01 Received Cloud Event Context as: {CloudEventsVersion:0.1 EventID:303284831868154 EventTime:2019-01-09 17:56:00.16 +0000 UTC EventType:google.pubsub.topic.publish EventTypeVersion: SchemaURL: ContentType:application/json Source://pubsub.googleapis.com/quantum-reducer-434/topics/gcs-e29de50b-e416-44fd-9c28-2ea33764096a Extensions:map[]}
2019/01/09 17:56:01 object notification metadata is: &{Name:dummytextfile Bucket:vaikas-knative-test-bucket Size:37}
Where the headers displayed are the Cloud Events Context and last few lines are the actual Notification Details.
Another option is to use Broker/Trigger to get into the eventing mesh, just update the sink like,
sink:
apiVersion: eventing.knative.dev/v1alpha1
kind: Broker
name: default
Read more about Broker/Trigger.
kubectl delete gcssources notification-test
kubectl delete services.serving gcs-message-dumper
gcloud iam service-accounts delete gcs-source@$PROJECT_ID.iam.gserviceaccount.com
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
--role roles/storage.admin
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:gcs-source@$PROJECT_ID.iam.gserviceaccount.com \
--role roles/pubsub.editor
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member=serviceAccount:$GCS_SERVICE_ACCOUNT \
--role roles/pubsub.publisher
kubectl delete secrets gcs-source-key
kubectl delete services.serving gcs-message-dumper