XJoin (Cross Join) Operator

Openshift operator that manages the XJoin pipeline. It currently manages version 1 of XJoin, i.e. it maintains the replication pipeline between HBI and ElasticSearch. Modifications will be necessary to support version 2 (joining data between applications).

A XJoin pipeline is defined by the XJoinPipeline custom resource. It indexes the hosts table of the HBI database into an ElasticSearch index. A Debezium Kafka Connector is used to read from the HBI database's replication slot. An ElasticSearch connector is used to index the hosts. Kafka Connect transformations are performed on the ElasticSearch connector to prepare the host records to be indexed. An ElasticSearch pipeline is used to transform the JSON fields on a host prior to being indexed.

Architecture

The operator is responsible for:

  • management of an ElasticSearch index, alias, and pipeline
  • management of Debezium (source) and ElasticSearch (sink) connectors in a Kafka Connect cluster (using Strimzi)
  • management of a Kafka Topic in a Kafka cluster (using Strimzi)
  • management of the HBI replication slot. The Debezium connector should manage this. The operator ensures there are no orphaned replication slots.
  • periodic validation of the indexed data
  • automated recovery (e.g. when the data becomes out-of sync)

Implementation

The operator defines two controllers that reconcile a XJoinPipeline

  • PipelineController which manages all the resources (connectors, elasticsearch resources, topic, replication slots) and handles recovery
  • ValidationController which periodically compares the data in the ElasticSearch index with what is stored in HBI to determine whether the pipeline is valid

Development

Setting up the development environment using Clowder

  1. Install dependencies

  2. Set up a local minikube environment

  3. Configure Kubernetes to use at least 8GB of memory and 5 cpus. This is known to work, although you can try with less.

    minikube config set cpus 5
    minikube config set memory 8000
    minikube config set driver kvm2
    
  4. Start minikube

    minikube start
    
  5. Login to https://quay.io and https://registry.redhat.io

    • docker login -u=<quay-username> -p="password" quay.io

    • docker login https://registry.redhat.io

    • For MacOS, do the following to place the creds in .docker/config.json, which are stored in "credsStore": "desktop"|"osxkeystore" and are not available for pulling images from private repos.

      1. docker logout quay.io
      2. docker logout registry.redhat.io
      3. Remove the "credStore" block from .docker/config.json.
      4. docker login -u=<quay-username> -p="password" quay.io
      5. docker login https://registry.redhat.io
      • NOTE: Manually creating the .docker/config.json and adding "auth": base64-encoded username:password does not work.
  6. Do one of the following

    • Append the following line into /etc/hosts
      127.0.0.1 inventory-db host-inventory-db.test.svc xjoin-elasticsearch-es-default.test.svc connect-connect-api.test.svc kafka-kafka-0.kafka-kafka-brokers.test.svc apicurio apicurio.test.svc .test.svc
      
    • Install and run kubefwd
      sudo -E kubefwd svc -n test --kubeconfig ~/.kube/config -m 8080:8090 -m 8081:8091
      
  7. ./dev/setup-clowder.sh

Linting

This project uses golint-ci

Forward ports

To access the services within the Kubernetes cluster there is a script to forward ports to each of the useful services:

./dev/forward-ports-clowder.sh

or the ports can be forwarded via the kubefwd utility:

sudo -E kubefwd svc -n test --kubeconfig ~/.kube/config -m 8080:8090 -m 8081:8091

Dev environment credentials

./dev/get_credentials.sh is a helper script to populate a shell's environment with credentials to the databases and Elasticsearch. Use the following command to populate the environment.

source ./dev/get_credentials.sh test

Reset the development environment

It is easiest to completely delete the minikube instance then rerun the setup-clowder.sh script when necessary.

minikube delete && minikube start && ./dev/setup-clowder.sh

Running the operator locally

With the cluster set up it is now possible to install manifests and run the operator locally.

  1. Install CRDs

    make install
    
  2. Run the operator

    make run ENABLE_WEBHOOKS=false
    
  3. Finally, create a new pipeline

    kubectl apply -f ../config/samples/xjoin_v1alpha1_xjoinpipeline.yaml -n test
    

There is also make delve to debug the operator. After starting the Delve server process, connect to it with a Delve debugger.

Running the operator locally via OLM

This is useful when testing deployment related changes. It's a little cumbersome for everyday development because an image needs to be built by app-interface and pushed to the cluster for each change.

  • To deploy the operator via locally OLM run
./dev/install.operator.locally.sh
  • To uninstall the OLM deployed operator run
./dev/uninstall.operator.locally.sh

Running the operator locally via OLM using operator-sdk run bundle

This is more convenient than using the app-interface build because the build is done locally then pushed to quay.io. More info

docker login -u=$QUAY_USERNAME -p $QUAY_PASSWORD
./dev/install.operator.with.operator.sdk.sh

./dev/uninstall.operator.with.operator.sdk.sh to uninstall.

Running tests

  • The tests require an initialized Kubernetes environment. See Setting up the development environment.
  • Before running the tests, make sure the operator is not already running on your machine or in the cluster.
    kubectl scale --replicas=0 deployments/xjoin-operator-controller-manager -n xjoin-operator-system
    
  • They can be executed via make test.
  • There is also make delve-test to run the tests in debug mode. Then delve can be used to connect to the test run.
  • The tests take a while to run. To whitelist one or a few tests, prepend It with an F. e.g. change It("Creates a connector... to FIt("Creates a connector...) {
  • Sometimes when the test execution fails unexpectedly it will leave orphaned projects in kubernetes. Use dev/cleanup.projects.sh to remove them.

Version 2

Local development

See the version 1 development section for details on setting up minikube.

After setting up a kubernetes environment, the xjoin-operator can be run like this:

make run ENABLE_WEBHOOKS=false

Now that the xjoin-operator is running, it is time to create the k8s custom resources that define how the data is streamed from the database into Elasticsearch. Once the custom resources are defined, the xjoin-operator will start reconciling each resource and create the components necessary to stream the data.

kubectl apply -f config/samples/xjoin_v1alpha1_xjoindatasource.yaml -n test
kubectl apply -f config/samples/xjoin_v1alpha1_xjoinindex.yaml -n test

These commands create two k8s resources (XJoinIndex and XJoinDataSource) which then create many more k8s resources. The xjoin k8s resources are defined in the api/v1alpha1 directory.

Running the tests

The xjoin.v2 tests utilize mocks, so they don't require kubernetes or any other services to run.

make generic-test

Troubleshooting tips

This is a good order of where to look when the data is not syncing. When troubleshooting data sync issues, always query the Elasticsearch index directly instead of querying the GraphQL API.

  1. Verify the Kafka and Kafka Connect pods are running
  2. Check the logs of the xjoin-operator
  3. Look at the status of each of the following resources via kubectl get -o yaml <resource name>: XJoinIndex, XJoinIndexPipeline, XJoinDataSource, XJoinDataSourcePipeline, KafkaConnector
  4. Check the logs of Kafka Connect via
    kubectl logs -f -l app.kubernetes.io/instance=connect --all-containers=true`
    
  5. Check the logs of Kafka via
    kubectl logs -f -l app.kubernetes.io/instance=kafka --all-containers=true
    
  6. Check the logs of xjoin-core via
    kubectl logs -f -l xjoin.index=xjoin-core-xjoinindexpipeline-hosts --all-containers=true
    
  7. Check if messages are on the datasource and index topics by using kcat to consume each topic from the beginning
    kcat -b localhost:29092 -C -o beginning -f '%h\n%s\n\n\n' -t xjoin.inventory.1663597318878465070.public.hosts
    

xjoin.v2 code walkthrough

The CRDs are contained in the api/v1alpha1 directory.

Custom Resource Definition Description Created By
XJoinDataSource This defines a single data source (e.g. a database table). This is created by the user. Human
XJoinDataSourcePipeline This defines the pipeline for a DataSource. Each DataSource can have multiple DataSourcePipelines. e.g. there will be two DataSourcePipeline's for a single DataSource when the DataSource is being refreshed. The old DataSource will continue to be used by applications until the new DataSource is in sync. At that point, the old DataSource is deleted and the new DataSource will be used by applications. Code
XJoinIndex This defines an Index that is composed of one or more DataSources. The Index defines how the data is indexed into Elasticsearch. This is created by the user. Human
XJoinIndexPipeline This defines the pipeline for an Index. This is similar to the XJoinDataSourcePipeline where each Index can have multiple IndexPipelines. Code
XJoinIndexValidator This defines the validator for an Index. The validator periodically compares the data in each DataSource with the data in the Index. The validator is responsible for updating the status of the Index and each DataSource used by the Index. Code

The entrypoint to the reconcile loop for each Custom Resource Definition is in a separate file in the top level of the controllers directory. e.g. the Reconcile method in the xjoindatasource_controller file is the entrypoint for a DataSource.

Almost all the remaining business logic is contained in the controllers directory. The other directories are mostly boilerplate and related to deployments/builds.

Both the DataSourcePipeline and the IndexPipeline manage multiple resources to construct a pipeline for the data to flow. Throughout the code these resources are referred to as a Component and they are managed by a ComponentManager. More details can by found in the controllers/components/README.md.

The code for the top level resources (XJoinDataSource and XJoinIndex) can be found in the controllers/datasource and the controllers/index directory.

There are many different parameters and sources of parameters across the operator. These are handled by the ConfigManager. The parameters for the xjoin.v2 resources are defined in controllers/parameters.

Validation

Each IndexPipeline is continuously validated via an IndexValidator. For a given Index there can be at most 2 IndexPipelines. One pipeline is considered active while the other is refreshing. The active pipeline is served to users via the GraphQL Gateway. When the refreshing pipeline becomes valid it replaces the active pipeline. The status fields on each Kubernetes CRD is used to manage the active and refreshing state.

This is the high level flow of what happens when an active IndexPipeline becomes out of sync:

  • The IndexValidator updates the status to invalid for each DataSourcePipeline that is referenced by the IndexPipeline
  • The DataSource (parent of the DataSourcePipeline) watches the DataSourcePipeline. So, when the active DataSourcePipeline's status is set to invalid, the DataSource is reconciled and starts to refresh by creating a new DataSourcePipeline
  • The IndexPipeline also watches the DataSourcePipeline and is reconciled to be invalid
  • The Index is watching the IndexPipeline, so it is reconciled and starts to refresh by creating a new IndexPipeline

This is a summary of the different states an Index can be in:

  • Newly created
    • ActiveVersion: ""
    • ActiveVersionIsValid: false
    • RefreshingVersion: "1234"
  • Data is in sync
    • ActiveVersion: "1234"
    • ActiveVersionIsValid: true
    • RefreshingVersion: ""
  • Data is out of sync, refreshing
    • ActiveVersion: "1234"
    • ActiveVersionIsValid: false
    • RefreshingVersion: "5678"
  • Data is back in sync, refreshing version replaced active version
    • ActiveVersion: "5678"
    • ActiveVersionIsValid: true
    • RefreshingVersion: ""