Flink Kubernetes Toolbox contains tools for managing Flink clusters and jobs on Kubernetes:
-
Flink Operator
The Flink Operator is an implementation of the Kubernetes Operator pattern for managing Flink clusters and jobs. The operator uses a Custom Resource Definition to represent a cluster with a single job. It detects changes of the custom resource and modifies the derived resources which constitute the cluster. It takes care of creating savepoints periodically, monitoring the status of the job to detect a failure, restarting the job from the latest savepoint if needed, and rescaling the cluster when required.
-
Flink Operator CLI
The Flink Operator CLI provides an interface for controlling Flink clusters and jobs from a terminal. It supports commands for creating or deleting clusters, starting or stopping clusters and jobs, rescaling clusters, getting metrics and other information about clusters and jobs.
Main features:
- Automatic creation of JobManager and TaskManagers using StatefulSets
- Automatic creation of service for accessing JobManager
- Support for batch and stream jobs
- Support for bare cluster or single job cluster
- Support for init containers and side containers for JobManager and TaskManagers
- Support for mounted volumes and persistent volumes claims
- Support for environment variables, including variables from config map
- Support for resource requirements
- Support for user defined annotations
- Support for user defined container ports
- Support for pull secrets and private registries
- Support for public Flink images or custom images
- Support for single job cluster or cluster without job
- Support separate image for bootstrap (with single JAR file)
- Configurable task slots and heap memory
- Configurable savepoints location
- Configurable service accounts
- Configurable periodic savepoints
- Automatic detection of failure and configurable job restart
- Automatic scaling via standard autoscaling interface
- Automatic restart of the cluster or job when specification changed
- Automatic creation of savepoint before stopping cluster or job
- Automatic recovery from latest savepoint when restarting job
- Resource status and printer columns
- Readiness and Liveness probes for JobManager
- CLI interface for operations and monitoring
- Internal metrics compatible with Prometheus
The tools are distributed under the terms of BSD 3-Clause License.
Copyright (c) 2019, Andrea Medeghini
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the tools nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The operator detects new resource of kind FlinkCluster (primary resource) in a namespace, and automatically creates other managed
resources (secondary resources), like StatefulSet, Service, and BatchJob, based on the specification provided in the custom resource.
The operator persists some status on the resource, and performs several tasks automatically, such as creating savepoints
or recreating the cluster when the specification changed.
The possible states of a FlinkCluster resource are represented in this graph:
-
UNKNOWN
This is the initial status of the Flink cluster when it is created.
-
STARTING
This status means that the cluster is starting up. The secondary resources will be created, the JAR file will be uploaded to Flink, and the job will be started (optional).
-
STOPPING
This status means that the cluster is going to stop. The Flink job will be canceled (creating a new savepoint) or stopped (without creating a new savepoint), and the secondary resources will be stopped or deleted (optional).
-
SUSPENDED
This status means that the cluster has been suspended. The Flink job has been stopped, and the secondary resources have been stopped.
-
TERMINATED
This status means that the cluster has been terminated. The Flink job has been stopped, and the secondary resources have been deleted.
-
RUNNING
This status means that the cluster is running. The secondary resources have been created and the job is running. When the cluster is in this state and the job stops running for any reason, the status is automatically changed to FAILED.
-
UPDATING
This status means that the cluster is updating the secondary resources. A change has been detected in the primary resource specification and the operator is modifying the secondary resources to reflect that change. The operator might need to destroy and recreate some resources.
-
SCALING
This status means that the cluster is scaling the secondary resources. A change has been detected in the primary resource specification and the operator is modifying the secondary resources to reflect that change. The operator might need to destroy and recreate some resources.
-
FAILED
This status means that the cluster is not running properly. The secondary resources might not work or the job might have failed (or it has been canceled manually). When the cluster is in this state but a running job is detected, the status is automatically changed to RUNNING.
-
CHECKPOINTING
This status means that the cluster is creating a savepoint. The status is automatically changed to RUNNING when the savepoint request has been created.
The operator exposes a REST API on port 4444 by default. The API provides information about the status of the resources, metrics of clusters and jobs, and more:
http://localhost:4444/clusters
http://localhost:4444/cluster/<name>/status
http://localhost:4444/cluster/<name>/job/details
http://localhost:4444/cluster/<name>/job/metrics
http://localhost:4444/cluster/<name>/jobmanager/metrics
http://localhost:4444/cluster/<name>/taskmanagers
http://localhost:4444/cluster/<name>/taskmanagers/<taskmanager>/metrics
Please note that you must use SSL certificates when HTTPS is enabled (see instructions for generating SSL certificates):
curl --cacert secrets/ca_cert.pem --cert secrets/operator-cli_cert.pem --key secrets/operator-cli_key.pem https://localhost:4444/cluster/test/status
Execute the script secrets.sh to generate self-signed certificates and keystores to use with the Flink Operator:
./secrets.sh flink-operator key-password keystore-password truststore-password
This command will generate new certificates and keystores in the directory secrets.
The operator exposes metrics to Prometheus on port 8080 by default:
http://localhost:8080/metrics
Create a namespace. Let's assume the namespace is flink, but you can use any name:
kubectl create namespace flink
Create a secret which contain the keystore and the truststore files:
kubectl -n flink create secret generic flink-operator-ssl \
--from-file=keystore.jks=secrets/keystore-operator-api.jks \
--from-file=truststore.jks=secrets/truststore-operator-api.jks \
--from-literal=keystore-secret=keystore-password \
--from-literal=truststore-secret=truststore-password
Install the operator's CRD resource with Helm command:
helm install flink-k8s-toolbox-crd helm/flink-k8s-toolbox-crd
Install the operator's resources with SSL enabled:
helm install flink-k8s-toolbox-operator --namespace flink helm/flink-k8s-toolbox-operator --set secretName=flink-operator-ssl
Or if you prefer install the operator's resources with SSL disabled:
helm install flink-k8s-toolbox-operator --namespace flink helm/flink-k8s-toolbox-operator
Run the operator with command:
kubectl -n flink scale deployment flink-operator --replicas=1
Stop the operator with command:
kubectl -n flink scale deployment flink-operator --replicas=0
Remove the operator's resources with command:
helm uninstall flink-k8s-toolbox-operator
Remove the operator's CRD resource with command:
helm uninstall flink-k8s-toolbox-crd
Remove secret with command:
kubectl -n flink delete secret flink-operator-ssl
Remove namespace with command:
kubectl delete namespace flink
PLEASE NOTE THAT THE OPERATOR IS STILL IN BETA VERSION AND IT DOESN'T HAVE A STABLE API YET, THEREFORE EACH RELEASE MIGHT INTRODUCE BREAKING CHANGES.
Before upgrading to a new release you must cancel all jobs creating a savepoint into a durable storage location (for instance AWS S3).
Create a copy of your FlinkCluster resources:
kubectl -n flink get fc -o yaml > clusters-backup.yaml
Upgrade the CRD using Helm:
helm upgrade flink-k8s-toolbox-crd helm/flink-k8s-toolbox-crd
Upgrade the operator using Helm:
helm upgrade flink-k8s-toolbox-operator --namespace flink helm/flink-k8s-toolbox-operator --set secretName=flink-operator-ssl
After installing the new version, you can restart the jobs. However, the custom resources might not be compatible with the new CRD. If that is the case, then you have to fix the resource specification, perhaps you have to delete the resource and recreate it.
The operator's Docker image can be downloaded from Docker Hub:
docker fetch nextbreakpoint/flink-k8s-toolbox:1.3.1-beta
Tag and push the image into your private registry if needed:
docker tag nextbreakpoint/flink-k8s-toolbox:1.3.1-beta some-registry/flink-k8s-toolbox:1.3.1-beta
docker login some-registry
docker push some-registry/flink-k8s-toolbox:1.3.1-beta
Run the operator using the image on Docker Hub:
kubectl run flink-operator --restart=Never -n flink --image=nextbreakpoint/flink-k8s-toolbox:1.3.1-beta \
--overrides='{ "apiVersion": "v1", "metadata": { "labels": { "app": "flink-operator" } }, "spec": { "serviceAccountName": "flink-operator", "imagePullPolicy": "Always" } }' -- operator run --namespace=flink
Or run the operator using your private registry and pull secrets:
kubectl run flink-operator --restart=Never -n flink --image=some-registry/flink-k8s-toolbox:1.3.1-beta \
--overrides='{ "apiVersion": "v1", "metadata": { "labels": { "app": "flink-operator" } }, "spec": { "serviceAccountName": "flink-operator", "imagePullPolicy": "Always", "imagePullSecrets": [{"name": "your-pull-secrets"}] } }' -- operator run --namespace=flink
Please note that you MUST run only one operator for each namespace to avoid conflicts.
A service account is created when installing the operator Helm chart:
helm install flink-k8s-toolbox-operator --namespace flink helm/flink-k8s-toolbox-operator
Verify that the pod has been created:
kubectl -n flink get pod flink-operator -o yaml
Verify that there are no errors in the logs:
kubectl -n flink logs flink-operator
Check the events in case that the pod doesn't start:
kubectl -n flink get events
Stop the operator with command:
kubectl -n flink delete pod flink-operator
Flink Operator requires a Custom Resource Definition:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: flinkclusters.nextbreakpoint.com
spec:
preserveUnknownField: false
group: nextbreakpoint.com
versions:
- name: v1
served: true
storage: true
scope: Namespaced
names:
plural: flinkclusters
singular: flinkcluster
kind: FlinkCluster
shortNames:
- fc
[...]
FlinkCluster resources can be created or deleted as any other resource in Kubernetes using kubectl command.
The Custom Resource Definition is installed with a separate Helm chart:
helm install flink-k8s-toolbox-crd helm/flink-k8s-toolbox-crd
The complete definition with the validation schema is defined in the Helm template:
https://github.com/nextbreakpoint/flink-k8s-toolbox/blob/master/helm/flink-k8s-toolbox-crd/templates/crd.yaml
Do not delete the CRD unless you want to delete all custom resources depending on it.
When updating the operator, upgrade the CRD with Helm instead of deleting and reinstalling:
helm upgrade flink-k8s-toolbox-crd helm/flink-k8s-toolbox-crd
Make sure the CRD has been installed (see above).
Create a Docker file:
FROM nextbreakpoint/flink-k8s-toolbox:1.3.1-beta
COPY flink-jobs.jar /flink-jobs.jar
where flink-jobs.jar contains the code of your Flink jobs.
Create a Docker image:
docker build -t flink-jobs:1 .
Tag and push the image into your registry if needed:
docker tag flink-jobs:1 some-registry/flink-jobs:1
docker login some-registry
docker push some-registry/flink-jobs:1
Pull Flink image:
docker pull flink:1.9.2
Tag and push the image into your registry if needed:
docker tag flink:1.9.2 some-registry/flink:1.9.2
docker login some-registry
docker push some-registry/flink:1.9.2
Create a Flink Cluster file:
cat <<EOF >test.yaml
apiVersion: "nextbreakpoint.com/v1"
kind: FlinkCluster
metadata:
name: test
spec:
taskManagers: 1
runtime:
pullPolicy: Always
image: some-registry/flink:1.9.2
bootstrap:
executionMode: Stream
pullPolicy: Always
image: some-registry/flink-jobs:1
jarPath: /flink-jobs.jar
className: com.nextbreakpoint.flink.jobs.stream.TestJob
arguments:
- --DEVELOP_MODE
- disabled
jobManager:
serviceMode: NodePort
annotations:
managed: true
environment:
- name: FLINK_GRAPHITE_HOST
value: graphite.default.svc.cluster.local
environmentFrom:
- secretRef:
name: flink-secrets
volumeMounts:
- name: jobmanager
mountPath: /var/tmp
extraPorts:
- name: prometheus
containerPort: 9999
protocol: TCP
persistentVolumeClaimsTemplates:
- metadata:
name: jobmanager
spec:
storageClassName: hostpath
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
taskManager:
taskSlots: 1
annotations:
managed: true
environment:
- name: FLINK_GRAPHITE_HOST
value: graphite.default.svc.cluster.local
volumeMounts:
- name: taskmanager
mountPath: /var/tmp
extraPorts:
- name: prometheus
containerPort: 9999
protocol: TCP
persistentVolumeClaimsTemplates:
- metadata:
name: taskmanager
spec:
storageClassName: hostpath
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
operator:
# savepointMode can be Automatic or Manual. Default value is Manual
savepointMode: Automatic
# savepointInterval in seconds. Required when savepointMode is Automatic
savepointInterval: 60
# savepointTargetPath can be any valid Hadoop filesystem (including S3)
savepointTargetPath: file:///var/tmp/test
# jobRestartPolicy can be Always or Never. Default value is Never
jobRestartPolicy: Always
EOF
Create a FlinkCluster resource with command:
kubectl create -n flink -f test.yaml
Please note that you can use any image of Flink as far as the image implements the standard commands for running JobManager and TaskManager.
Delete a FlinkCluster with command:
kubectl delete -n flink -f test.yaml
List custom objects of type FlinkCluster with command:
kubectl get -n flink flinkclusters
The command should produce an output like:
NAME CLUSTER-STATUS TASK-STATUS TASK-MANAGERS TASK-SLOTS ACTIVE-TASK-MANAGERS TOTAL-TASK-SLOTS JOB-PARALLELISM JOB-RESTART SERVICE-MODE SAVEPOINT-MODE SAVEPOINT-PATH SAVEPOINT-AGE AGE
test Running Idle 1 1 1 1 1 Always NodePort Manual file:/var/savepoints/savepoint-e0e430-7a6d1c33dee3 42s 3m55s
Build the uber JAR file with command:
./gradlew clean shadowJar
and test the JAR invoking the CLI:
java -jar build/libs/flink-k8s-toolbox-1.3.1-beta-with-dependencies.jar --help
Please note that Java 8 is required to build the JAR. Define JAVA_HOME variable to specify the correct JDK:
export JAVA_HOME=/path_to_jdk
./gradlew clean shadowJar
Build a Docker image with command:
docker build -t flink-k8s-toolbox:1.3.1-beta .
and test the image printing the CLI usage:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta --help
Tag and push the image to your Docker registry if needed:
docker tag flink-k8s-toolbox:1.3.1-beta some-registry/flink-k8s-toolbox:1.3.1-beta
docker login some-registry
docker push some-registry/flink-k8s-toolbox:1.3.1-beta
Run unit tests with command:
./gradlew clean test
Run integration tests against Docker for Desktop or Minikube with command:
./gradlew clean integrationTest
You can skip the Docker images build step, if images already exist:
export SKIP_BUILD_IMAGES=true
./gradlew clean integrationTest
Please note that only Java 8 is supported. Define JAVA_HOME variable to specify the correct JDK:
export JAVA_HOME=/path_to_jdk
export SKIP_BUILD_IMAGES=true
./gradlew clean integrationTest
The operator automatically creates savepoints before stopping the cluster. This might happen when a change is applied to the cluster specification or the cluster is rescaled or manually stopped. This feature is very handy to avoid losing the status of the job. When the operator restarts the cluster, it uses the latest savepoint to recover the status of the job. However, for this feature to work properly, the savepoints must be created in a durable storage location such as HDFS or S3. Only a durable location can be used to recover the job after recreating the Job Manager and the Task Managers.
Print the CLI usage:
docker run --rm -it nextbreakpoint/flink-k8s-toolbox:1.3.1-beta --help
The output should look like:
Usage: flink-k8s-toolbox [OPTIONS] COMMAND [ARGS]...
Options:
-h, --help Show this message and exit
Commands:
operator Access operator subcommands
clusters Access clusters subcommands
cluster Access cluster subcommands
savepoint Access savepoint subcommands
bootstrap Access bootstrap subcommands
job Access job subcommands
jobmanager Access JobManager subcommands
taskmanager Access TaskManager subcommands
taskmanagers Access TaskManagers subcommands
Create a Docker file like:
FROM nextbreakpoint/flink-k8s-toolbox:1.3.1-beta
COPY flink-jobs.jar /flink-jobs.jar
where flink-jobs.jar contains the code of your Flink job.
Create a Docker image:
docker build -t flink-jobs:1 .
Tag and push the image into your registry if needed:
docker tag flink-jobs:1 some-registry/flink-jobs:1
docker login some-registry
docker push some-registry/flink-jobs:1
Create a JSON file:
cat <<EOF >test.json
{
"taskManagers": 1,
"runtime": {
"pullPolicy": "Always",
"image": "some-registry/flink:1.9.2"
},
"bootstrap": {
"executionMode": "Stream",
"pullPolicy": "Always",
"image": "some-registry/flink-jobs:1",
"jarPath": "/flink-jobs.jar",
"className": "com.nextbreakpoint.flink.jobs.stream.TestJob",
"arguments": [
"--DEVELOP_MODE",
"disabled"
]
},
"jobManager": {
"serviceMode": "NodePort",
"environment": [
{
"name": "FLINK_GRAPHITE_HOST",
"value": "graphite.default.svc.cluster.local"
}
],
"environmentFrom": [
{
"secretRef": {
"name": "flink-secrets"
}
}
],
"volumeMounts": [
{
"name": "jobmanager",
"mountPath": "/var/tmp"
}
],
"volumes": [
{
"name": "config-vol",
"configMap": {
"name": "flink-config",
"defaultMode": "511"
}
}
],
"persistentVolumeClaimsTemplates": [
{
"metadata": {
"name": "jobmanager"
},
"spec": {
"storageClassName": "hostpath",
"accessModes": [ "ReadWriteOnce" ],
"resources": {
"requests": {
"storage": "1Gi"
}
}
}
}
]
},
"taskManager": {
"environment": [
{
"name": "FLINK_GRAPHITE_HOST",
"value": "graphite.default.svc.cluster.local"
}
],
"volumeMounts": [
{
"name": "taskmanager",
"mountPath": "/var/tmp"
}
],
"volumes": [
{
"name": "config-vol",
"configMap": {
"name": "flink-config",
"defaultMode": "511"
}
}
],
"persistentVolumeClaimsTemplates": [
{
"metadata": {
"name": "taskmanager"
},
"spec": {
"storageClassName": "hostpath",
"accessModes": [ "ReadWriteOnce" ],
"resources": {
"requests": {
"storage": "5Gi"
}
}
}
}
]
},
"operator": {
"savepointMode": "Automatic",
"savepointInterval": 60,
"savepointTargetPath": "file:///var/tmp/test",
"jobRestartPolicy": "OnFailure"
}
}
EOF
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster create --cluster-name=test --cluster-spec=test.json --host=$OPERATOR_HOST --port=4444
Pass keystore and truststore if SSL is enabled:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster create --cluster-name=test --cluster-spec=test.json --host=$OPERATOR_HOST --port=4444
--keystore-path=secrets/keystore-operator-cli.jks --truststore-path=secrets/truststore-operator-cli.jks --keystore-secret=keystore-password --truststore-secret=truststore-password
If you expose the operator on a port of Docker's host:
Set OPERATOR_HOST to localhost on Linux
Set OPERATOR_HOST to host.docker.internal on MacOS
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster create --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta clusters list --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster status --cluster-name=test --host=$OPERATOR_HOST --port=4444
Use jq to format the output:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster status --cluster-name=test --host=$OPERATOR_HOST --port=4444 | jq -r
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster status --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster delete --cluster-name=test --host=$OPERATOR_HOST --port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster delete --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster stop --cluster-name=test --host=$OPERATOR_HOST --port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster stop --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster start --cluster-name=test --host=$OPERATOR_HOST --port=4444
Show more options with the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster start --help
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster start --cluster-name=test --without-savepoint --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster stop --cluster-name=test --without-savepoint --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta savepoint trigger --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster status --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta cluster scale --cluster-name=test --task-managers=4 --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta job details --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta job metrics --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta taskmanagers list --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta jobmanager metrics --cluster-name=test --host=$OPERATOR_HOST --port=4444
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta taskmanager metrics --cluster-name=test --host=$OPERATOR_HOST --port=4444
You will be asked to provide a Task Manager id which you can get from the list of Task Managers.
Execute the command:
docker run --rm -it flink-k8s-toolbox:1.3.1-beta taskmanager details --cluster-name=test --host=$OPERATOR_HOST --port=4444
You will be asked to provide a Task Manager id which you can get from the list of Task Managers.
Flink jobs must be packaged in a regular JAR file and uploaded to the JobManager.
Upload a JAR file using the command:
java -jar flink-k8s-toolbox-1.3.1-beta.jar bootstrap run --cluster-name=test --class-name=your-main-class --jar-path=/your-job-jar.jar
When running outside Kubernetes use the command:
java -jar flink-k8s-toolbox-1.3.1-beta.jar bootstrap run --kube-config=/your-kube-config --cluster-name=test --class-name=your-main-class --jar-path=/your-job-jar.jar
The Flink Operator can be executed as Docker image or JAR file, pointing to a local or remote Kubernetes cluster.
Run the operator with a given namespace and Kubernetes config using the JAR file:
java -jar flink-k8s-toolbox:1.3.1-beta.jar operator run --namespace=test --kube-config=~/.kube/config
Run the operator with a given namespace and Kubernetes config using the Docker image:
docker run --rm -it -v ~/.kube/config:/kube/config flink-k8s-toolbox:1.3.1-beta operator run --namespace=test --kube-config=/kube/config
The Flink Operator uses timeouts to recover for anomalies. The duration of the timeout has a default value of 300 seconds and can be changed setting the environment variable TASK_TIMEOUT (number of seconds).
The Flink Operator polls periodically the status of the resources. The polling interval has a default value of 5 seconds and can be changed setting the environment variable POLLING_INTERVAL (number of seconds).