Flink K8S Toolbox contains tools for managing Flink clusters and jobs on Kubernetes:
- Flink Operator
- Flink Operator CLI
The Flink Operator is an implementation of the Kubernetes Operator pattern for managing Flink clusters and jobs. The operator uses a Custom Resource to represent a Flink cluster with a single Flink job. The operator detect changes to the resource and modifies the Flink cluster and job accordingly. The operator takes care of creating savepoints periodically and restarting the job from the latest savepoint if needed.
The Flink Operator CLI provides an interface for controlling Flink clusters and jobs from a terminal. It supports commands for creating or deleting clusters, starting or stopping clusters and jobs, getting metrics and other information about clusters and jobs.
The tools are distributed under the terms of BSD 3-Clause License.
Copyright (c) 2019, Andrea Medeghini
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the tools nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The operator detects new resource of kind FlinkCluster (primary resource) in a namespace, and automatically creates other managed
resources (secondary resources), like StatefulSet, Service, and Job, based on the configuration provided in the primary resource.
The operator persists some status as annotations on the primary resources, and performs several tasks automatically,
like creating savepoints periodically or recreating the cluster when the primary resource is modified.
The possible states of a FlinkCluster resource are represented in this graph:
-
UNKNOWN
This is the initial status of the Flink cluster when it is created.
-
STARTING
This status means that the cluster is starting up. The secondary resources will be created, the Flink job Jar will be uploaded to Flink, and the job will be started (optional).
-
STOPPING
This status means that the cluster is going to stop. The Flink job will be canceled (creating a new savepoint) or stopped (without creating a new savepoint), and the secondary resources will be deleted (optional).
-
SUSPENDED
This status means that the cluster has been suspended. The Flink job has been stopped, and the secondary resources have been stopped.
-
TERMINATED
This status means that the cluster has been terminated. The Flink job has been stopped, and the secondary resources have been deleted.
-
RUNNING
This status means that the cluster is running. The secondary resources have been created and the Flink job is runnning. When the cluster is in this state and the job stops running for any reason, the status is automatically changed to FAILED.
-
FAILED
This status means that the cluster has failed. The secondary resources might not work or the job might have failed (or it has been canceled manually). When the cluster is in this state but a running job is detected, the status is automatically changed to RUNNING.
-
CHECKPOINTING
This status means that the cluster is creating a savepoint. The status is automatically changed to RUNNING when the savepoint is completed, otherwise the status is changed to FAILED (when the savepoint is not completed in the expected time).
Each arrow in this graph represents a specific sequence of tasks which are executed in order to transition from a status to another. Each task of the sequence is processed according to this graph:
-
EXECUTING
This is the initial status of a task. The task is executing some operation against Kubernetes resources or Flink server. The task might repeat the operation several times before succeeding or timing out and generating a failure.
-
AWAITING
The task executed some operation and it is now awaiting for some results. The task might check for results several times before succeeding or timing out and generating a failure.
-
IDLE
The task found the expected results and it is now completed. The task is idle but it keeps checking for new tasks to execute and it can eventually schedule a new task.
-
FAILED
The task didn't complete and some errors occurred. The cluster status is changed to FAILED and the DO_NOTHING task is scheduled for execution.
The possible tasks which are executed to transition from one status to another are:
-
INITIALISE_CLUSTER
Initialise primary resource and change cluster status to starting.
-
CLUSTER_RUNNING
Detect changes in the primary resource and restart cluster if needed. Change cluster status to FAILED if job stops running. Periodically triggers a new savepoint.
-
CLUSTER_HALTED
Detect changes in the primary resource and restart cluster if needed. Change cluster status to RUNNING if there is a running job.
-
STARTING_CLUSTER
Set cluster status to STARTING.
-
STOPPING_CLUSTER
Set cluster status to STOPPING.
-
CHECKPOINTING_CLUSTER
Set cluster status to CHECKPOINTING.
-
CREATE_RESOURCES
Create secondary resources and wait until resources reach expected status.
-
DELETE_RESOURCES
Delete secondary resources and wait until resources reach expected status.
-
DELETE_UPLOAD_JOB
Remove batch job previously used to upload JAR file.
-
TERMINATE_PODS
Terminate pods scaling down resources (set replicas to 0).
-
RESTART_PODS
Restart pods scaling up resources (set replicas to expected value).
-
UPLOAD_JAR
Schedule batch job which upload JAR file to Flink.
-
CANCEL_JOB
Cancel job creating a new savepoint and wait until savepoint is completed.
-
START_JOB
Start job using configuration from primary resource. Restart job from savepoint when savepoint path is available in primary resource.
-
STOP_JOB
Cancel job without creating a savepoint.
-
CREATE_SAVEPOINT
Trigger new savepoint and wait until savepoint is completed.
-
ERASE_SAVEPOINT
Remove savepoint from primary resource.
The operator has a REST API which is exposed on port 4444 by default. The API provides information about the status of the resources, metrics of the clusters and jobs, and more:
http://localhost:4444/cluster/<name>/status
http://localhost:4444/cluster/<name>/job/details
http://localhost:4444/cluster/<name>/job/metrics
http://localhost:4444/cluster/<name>/jobmanager/metrics
http://localhost:4444/cluster/<name>/taskmanagers
http://localhost:4444/cluster/<name>/taskmanagers/<taskmanager>/metrics
Please note that you must use SSL certificates when HTTPS is enabled (see instructions for generating SSL certificates):
curl --cacert secrets/ca_cert.pem --cert secrets/operator-cli_cert.pem --key secrets/operator-cli_key.pem https://localhost:4444/cluster/test/status
Execute the script secrets.sh to generate self-signed certificates and keystores required to run the Flink Operator.
./secrets.sh flink-operator key-password keystore-password truststore-password
This command will generate new certificates and keystores in the directory secrets.
The operator exposes metrics to Prometheus on port 8080 by default. The metrics are exposed using HTTP protocol.
http://localhost:8080/metrics
Create a namespace with command:
kubectl create namespace flink
Create a secret which contain the keystore and the truststore files:
kubectl create secret generic flink-operator-ssl -n flink \
--from-file=keystore.jks=secrets/keystore-operator-api.jks \
--from-file=truststore.jks=secrets/truststore-operator-api.jks \
--from-literal=keystore-secret=keystore-password \
--from-literal=truststore-secret=truststore-password
Install the operator's global resources with commands:
helm install --name flink-k8s-toolbox-global helm/flink-k8s-toolbox-global
Install the operator's namespace resources with command:
helm install --name flink-k8s-toolbox-services --namespace flink helm/flink-k8s-toolbox-services --set ssl.secretName=flink-operator-ssl
Run the operator with command:
kubectl scale deployment -n flink flink-operator --replicas=1
Stop the operator with command:
kubectl scale deployment -n flink flink-operator --replicas=0
Remove the operator's namespace resources with command:
helm delete --purge flink-k8s-toolbox-services
Remove the operator's global resources with command:
helm delete --purge flink-k8s-toolbox-global
Remove secret with command:
kubectl delete secret -n flink flink-operator-ssl
Remove namespace with command:
helm delete namespace flink
The Docker image can be downloaded from Docker Hub:
docker fetch nextbreakpoint/flink-k8s-toolbox:1.1.6-beta
Tag and push the image into your registry if needed:
docker tag nextbreakpoint/flink-k8s-toolbox:1.1.6-beta some-registry/flink-k8s-toolbox:1.1.6-beta
docker login some-registry
docker push some-registry/flink-k8s-toolbox:1.1.6-beta
Run the operator using the image on Docker Hub:
kubectl run flink-operator --restart=Never -n flink --image=nextbreakpoint/flink-k8s-toolbox:1.1.6-beta \
--overrides='{ "apiVersion": "v1", "metadata": { "labels": { "app": "flink-operator" } }, "spec": { "serviceAccountName": "flink-operator", "imagePullPolicy": "Always" } }' -- operator run --namespace=flink
Or run the operator using your own registry and pull secrets:
kubectl run flink-operator --restart=Never -n flink --image=some-registry/flink-k8s-toolbox:1.1.6-beta \
--overrides='{ "apiVersion": "v1", "metadata": { "labels": { "app": "flink-operator" } }, "spec": { "serviceAccountName": "flink-operator", "imagePullPolicy": "Always", "imagePullSecrets": [{"name": "your-pull-secrets"}] } }' -- operator run --namespace=flink
Please note that you MUST run only one operator for each namespace to avoid conflicts.
Verify that the pod has been created:
kubectl get pod flink-operator -o yaml
Verify that there are no errors in the logs:
kubectl logs flink-operator
Check the system events if the pod doesn't start:
kubectl get events
Flink Operator requires a Custom Resource Definition:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: flinkclusters.nextbreakpoint.com
spec:
group: nextbreakpoint.com
version: "v1"
scope: Namespaced
names:
plural: flinkclusters
singular: flinkcluster
kind: FlinkCluster
shortNames:
- fc
FlinkCluster resources can be created or deleted as any other resource in Kubernetes using kubectl command.
The Custom Resource Definition is installed with the Helm chart.
Make sure the CRD has been installed (see above).
Create a Docker file like:
FROM nextbreakpoint/flink-k8s-toolbox:1.1.6-beta
COPY flink-jobs.jar /flink-jobs.jar
where flink-jobs.jar contains the code of your Flink jobs.
Create a Docker image:
docker build -t flink-jobs:1 .
Tag and push the image into your registry if needed:
docker tag flink-jobs:1 some-registry/flink-jobs:1
docker login some-registry
docker push some-registry/flink-jobs:1
Create a ConfigMap file:
cat <<EOF >config-map.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
data:
core-site.xml: |
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>com.amazonaws.auth.EnvironmentVariableCredentialsProvider</value>
</property>
</configuration>
EOF
Create a ConfigMap resource with command:
kubectl create -n flink -f config-map.yaml
Create a FlinkCluster file:
cat <<EOF >flink-cluster-test.yaml
apiVersion: "nextbreakpoint.com/v1"
kind: FlinkCluster
metadata:
name: test
spec:
flinkImage:
pullPolicy: Never
flinkImage: flink:1.7.2
flinkJob:
image: flink-jobs:1
jarPath: /flink-jobs.jar
className: com.nextbreakpoint.flink.jobs.TestJob
parallelism: 1
arguments:
- --BUCKET_BASE_PATH
- file:///var/tmp
jobManager:
serviceMode: NodePort
environment:
- name: FLINK_GRAPHITE_HOST
value: graphite.default.svc.cluster.local
environmentFrom:
- secretRef:
name: flink-secrets
volumeMounts:
- name: config-vol
mountPath: /hadoop/etc/core-site.xml
subPath: core-site.xml
- name: config-vol
mountPath: /docker-entrypoint.sh
subPath: docker-entrypoint.sh
- name: config-vol
mountPath: /opt/flink/conf/flink-conf.yaml
subPath: flink-conf.yaml
- name: jobmanager
mountPath: /var/tmp
volumes:
- name: config-vol
configMap:
name: flink-config
defaultMode: 0777
persistentVolumeClaimsTemplates:
- metadata:
name: jobmanager
spec:
storageClassName: hostpath
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
taskManager:
environment:
- name: FLINK_GRAPHITE_HOST
value: graphite.default.svc.cluster.local
volumeMounts:
- name: config-vol
mountPath: /hadoop/etc/core-site.xml
subPath: core-site.xml
- name: config-vol
mountPath: /docker-entrypoint.sh
subPath: docker-entrypoint.sh
- name: config-vol
mountPath: /opt/flink/conf/flink-conf.yaml
subPath: flink-conf.yaml
- name: taskmanager
mountPath: /var/tmp
volumes:
- name: config-vol
configMap:
name: flink-config
defaultMode: 0777
persistentVolumeClaimsTemplates:
- metadata:
name: taskmanager
spec:
storageClassName: hostpath
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
flinkOperator:
savepointMode: AUTOMATIC
savepointInterval: 60
savepointTargetPath: file:///var/tmp/test
EOF
Create a FlinkCluster resource with command:
kubectl create -n flink -f flink-cluster-test.yaml
Please note that you can use any image of Flink as far as the image uses the standard commands to run JobManager and TaskManager.
Delete the custom object with command:
kubectl delete -n flink -f flink-cluster-test.yaml
List all custom objects with command:
kubectl get -n flink flinkclusters
Build an uber JAR file with command:
./gradlew clean shadowJar
and test the JAR printing the CLI usage:
java -jar modules/cli/build/libs/flink-k8s-toolbox-1.1.6-beta-with-dependencies.jar --help
Build a Docker image with command:
docker build -t flink-k8s-toolbox:1.1.6-beta .
and test the image printing the CLI usage:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta --help
Tag and push the image to your Docker registry if needed:
docker tag flink-k8s-toolbox:1.1.6-beta some-registry/flink-k8s-toolbox:1.1.6-beta
docker login some-registry
docker push some-registry/flink-k8s-toolbox:1.1.6-beta
Print the CLI usage:
docker run --rm -it nextbreakpoint/flink-k8s-toolbox:1.1.6-beta --help
The output should look like:
Usage: flink-k8s-toolbox [OPTIONS] COMMAND [ARGS]...
Options:
-h, --help Show this message and exit
Commands:
operator Access operator subcommands
cluster Access cluster subcommands
savepoint Access savepoint subcommands
upload Access upload subcommands
job Access job subcommands
jobmanager Access JobManager subcommands
taskmanager Access TaskManager subcommands
taskmanagers Access TaskManagers subcommands
Create a Docker file like:
FROM nextbreakpoint/flink-k8s-toolbox:1.1.6-beta
COPY flink-jobs.jar /flink-jobs.jar
where flink-jobs.jar contains the code of your Flink jobs.
Create a Docker image:
docker build -t flink-jobs:1 .
Tag and push the image into your registry if needed:
docker tag flink-jobs:1 some-registry/flink-jobs:1
docker login some-registry
docker push some-registry/flink-jobs:1
Create a JSON file:
cat <<EOF >flink-cluster-test.json
{
"flinkImage": {
"pullPolicy": "Never",
"flinkImage": "flink:1.7.2"
},
"flinkJob": {
"image": "flink-jobs:1",
"jarPath": "/flink-jobs.jar",
"className": "com.nextbreakpoint.flink.jobs.TestJob",
"parallelism": 1,
"arguments": [
"--BUCKET_BASE_PATH",
"file:///var/tmp"
]
},
"jobManager": {
"serviceMode": "NodePort",
"environment": [
{
"name": "FLINK_GRAPHITE_HOST",
"value": "graphite.default.svc.cluster.local"
}
],
"environmentFrom": [
{
"secretRef": {
"name": "flink-secrets"
}
}
],
"volumeMounts": [
{
"name": "config-vol",
"mountPath": "/hadoop/etc/core-site.xml",
"subPath": "core-site.xml"
},
{
"name": "config-vol",
"mountPath": "/docker-entrypoint.sh",
"subPath": "docker-entrypoint.sh"
},
{
"name": "config-vol",
"mountPath": "/opt/flink/conf/flink-conf.yaml",
"subPath": "flink-conf.yaml"
},
{
"name": "jobmanager",
"mountPath": "/var/tmp"
}
],
"volumes": [
{
"name": "config-vol",
"configMap": {
"name": "flink-config",
"defaultMode": "511"
}
}
],
"persistentVolumeClaimsTemplates": [
{
"metadata": {
"name": "jobmanager"
},
"spec": {
"storageClassName": "hostpath",
"accessModes": [ "ReadWriteOnce" ],
"resources": {
"requests": {
"storage": "1Gi"
}
}
}
}
]
},
"taskManager": {
"environment": [
{
"name": "FLINK_GRAPHITE_HOST",
"value": "graphite.default.svc.cluster.local"
}
],
"volumeMounts": [
{
"name": "config-vol",
"mountPath": "/hadoop/etc/core-site.xml",
"subPath": "core-site.xml"
},
{
"name": "config-vol",
"mountPath": "/docker-entrypoint.sh",
"subPath": "docker-entrypoint.sh"
},
{
"name": "config-vol",
"mountPath": "/opt/flink/conf/flink-conf.yaml",
"subPath": "flink-conf.yaml"
},
{
"name": "taskmanager",
"mountPath": "/var/tmp"
}
],
"volumes": [
{
"name": "config-vol",
"configMap": {
"name": "flink-config",
"defaultMode": "511"
}
}
],
"persistentVolumeClaimsTemplates": [
{
"metadata": {
"name": "taskmanager"
},
"spec": {
"storageClassName": "hostpath",
"accessModes": [ "ReadWriteOnce" ],
"resources": {
"requests": {
"storage": "5Gi"
}
}
}
}
]
},
"flinkOperator": {
"savepointMode": "AUTOMATIC",
"savepointInterval": 60,
"savepointTargetPath": "file:///var/tmp/test"
}
}
EOF
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
create \
--cluster-name=test \
--cluster-spec=flink-cluster-test.json \
--host=flink-operator \
--port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta cluster create --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
status \
--cluster-name=test \
--host=flink-operator \
--port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta cluster status --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
delete \
--cluster-name=test \
--host=flink-operator \
--port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta cluster delete --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
stop \
--cluster-name=test \
--host=flink-operator \
--port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta cluster stop --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
start \
--cluster-name=test \
--host=flink-operator \
--port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta cluster start --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
start \
--cluster-name=test \
--without-savepoint \
--host=flink-operator \
--port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
stop \
--cluster-name=test \
--without-savepoint \
--host=flink-operator \
--port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.1.6-beta \
cluster \
status \
--cluster-name=test \
--host=flink-operator \
--port=4444
Flink jobs must be packaged in a regular JAR file and uploaded to the JobManager.
Upload a JAR file using the command:
java -jar flink-k8s-toolbox-1.1.6-beta.jar upload jar --cluster-name=test --class-name=your-main-class --jar-path=/your-job-jar.jar
When running outside Kubernetes use the command:
java -jar flink-k8s-toolbox-1.1.6-beta.jar upload jar --kube-config=/your-kube-config.conf --cluster-name=test --class-name=your-main-class --jar-path=/your-job-jar.jar
The Flink operator can be executed as Docker image or JAR file, pointing to a local or remote Kubernetes cluster.
Run the operator with a given namespace and Kubernetes config using the JAR file:
java -jar flink-k8s-toolbox:1.1.6-beta.jar operator run --namespace=test --kube-config=/path/admin.conf
Run the operator with a given namespace and Kubernetes config using the Docker image:
docker run --rm -it -v /path/admin.conf:/admin.conf flink-k8s-toolbox:1.1.6-beta operator run --namespace=test --kube-config=/admin.conf