/fluent-operator-walkthrough

Fluent Operator Walkthrough

Primary LanguageShellApache License 2.0Apache-2.0

fluent-operator-walkthrough

Prerequisites

To get some hands-on experience on the Fluent Operator, you need a Kubernetes cluster - minikube is used here. You also need some kind of data sink to receive logs, in this case we set up a Kafka cluster and an Elasticsearch cluster in the same Kubernetes cluster.

# If you already have a K8s cluster, you can skip installing minikube.
# Please be aware that the fluentbit and fluentd cases in this walkthrough might not work properly in a KinD cluster
# A minikube cluster is recommended if you don't have a K8s cluster.
# Setup a minikube cluster on the linux
./create-minikube-cluster.sh
# Setup a minikube cluster on the mac
./create-minikube-cluster-for-mac.sh

# Setup a Kafka cluster in the kafka namespace
./deploy-kafka.sh

# Setup an Elasticsearch cluster in the elastic namespace
# run 'export INSTALL_HELM=yes' first if helm is not installed
./deploy-es.sh

# Setup Loki
./deploy-loki.sh

Note: On MacOS you may have to remove the old minikube links and link the newly installed binary:
brew unlink minikube

brew link minikube
Reference: https://minikube.sigs.k8s.io/docs/start/

For some other examples of deploying data sinks (e.g. Loki, etc.) then have a look at https://github.com/calyptia/fluent-bit-devtools.

Install Fluent Operator

Fluent Operator controls the lifecycle of the Fluent Bit and Fluentd deployments. You can use the following script to launch the Fluent Operator in the fluent namespace:

./deploy-fluent-operator.sh

You can find more details of the Fluent Bit and Fluentd Custom Resource Definitions (CRDs) in the links below:

Deploy Fluent Bit and Fluentd

The configuration of Fluent Bit and Fluentd are defined as CRDs with the Fluent Operator: you can create a Fluent Bit DaemonSet or a Fluentd StatefulSet by declaring a FluentBit or Fluentd Custom Resource (CR).

Deploy Fluent Bit

The FluentBit CR works together with ClusterFluentBitConfig and they should be created together. The following FluentBit CR is just an example. To deploy the actual Fluent Bit DaemonSet, please refer to the Using Fluent Bit to collect kubelet logs and output to Elasticsearch and Using Fluent Bit to collect K8s application logs and output to Kafka, Elasticsearch and Loki sections.

apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
  name: fluent-bit
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  image: kubesphere/fluent-bit:v1.8.11
  positionDB:
    hostPath:
      path: /var/lib/fluent-bit/
  resources:
    requests:
      cpu: 10m
      memory: 25Mi
    limits:
      cpu: 500m
      memory: 200Mi
  fluentBitConfigName: fluent-bit-only-config
  tolerations:
    - operator: Exists

Deploy Fluentd

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  name: fluentd
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluentd
spec:
  globalInputs:
  - forward: 
      bind: 0.0.0.0
      port: 24224
  replicas: 1 
  image: kubesphere/fluentd:v1.14.4
  fluentdCfgSelector: 
    matchLabels:
      config.fluentd.fluent.io/enabled: "true"
EOF

Now we've setup FluentBit and Fluentd, let's collect, process, and send logs to various sinks in the next few steps.

Fluent Bit Only mode

Fluent Bit is light-weight and if you prefer you can just use Fluent Bit to process logs.

Using Fluent Bit to collect kubelet logs and output to Elasticsearch

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
  name: fluent-bit
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  image: kubesphere/fluent-bit:v1.8.11
  positionDB:
    hostPath:
      path: /var/lib/fluent-bit/
  resources:
    requests:
      cpu: 10m
      memory: 25Mi
    limits:
      cpu: 500m
      memory: 200Mi
  fluentBitConfigName: fluent-bit-only-config
  tolerations:
    - operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
  name: fluent-bit-only-config
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  service:
    parsersFile: parsers.conf
  inputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
  filterSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
  outputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
  name: kubelet
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  systemd:
    tag: service.kubelet
    path: /var/log/journal
    db: /fluent-bit/tail/kubelet.db
    dbSync: Normal
    systemdFilter:
      - _SYSTEMD_UNIT=kubelet.service
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
  name: systemd
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  match: service.*
  filters:
  - lua:
      script:
        key: systemd.lua
        name: fluent-bit-lua
      call: add_time
      timeAsTable: true
---
apiVersion: v1
data:
  systemd.lua: |
    function add_time(tag, timestamp, record)
      new_record = {}
      timeStr = os.date("!*t", timestamp["sec"])
      t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ", timeStr["year"], timeStr["month"], timeStr["day"], timeStr["hour"], timeStr["min"], timeStr["sec"], timestamp["nsec"])
      kubernetes = {}
      kubernetes["pod_name"] = record["_HOSTNAME"]
      kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
      kubernetes["namespace_name"] = "kube-system"
      new_record["time"] = t
      new_record["log"] = record["MESSAGE"]
      new_record["kubernetes"] = kubernetes
      return 1, timestamp, new_record
    end
kind: ConfigMap
metadata:
  labels:
    app.kubernetes.io/component: operator
    app.kubernetes.io/name: fluent-bit-lua
  name: fluent-bit-lua
  namespace: fluent
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: es
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  matchRegex: (?:kube|service)\.(.*)
  es:
    host: elasticsearch-master.elastic.svc
    port: 9200
    generateID: true
    logstashPrefix: fluent-log-fb-only
    logstashFormat: true
    timeKey: "@timestamp"  
EOF

You should see the FluentBit DaemonSet up and running with other CRs:

kubectl -n fluent get daemonset
kubectl -n fluent get fluentbit
kubectl -n fluent get clusterfluentbitconfig
kubectl -n fluent get clusterinput.fluentbit.fluent.io
kubectl -n fluent get clusterfilter.fluentbit.fluent.io
kubectl -n fluent get clusteroutput.fluentbit.fluent.io

Within a couple of minutes, you can double check the results in the Elasticsearch cluster (remember to set it up first or use whichever data sink you prefer).

To double check the output, please refer to this guide.

Using Fluent Bit to collect K8s application logs and output to Kafka, Elasticsearch and Loki

This covers the various data sinks in this example, they do not all have to be enabled but you will get errors on trying to send to a sink that is not available.

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
  name: fluent-bit
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  image: kubesphere/fluent-bit:v1.8.11
  positionDB:
    hostPath:
      path: /var/lib/fluent-bit/
  resources:
    requests:
      cpu: 10m
      memory: 25Mi
    limits:
      cpu: 500m
      memory: 200Mi
  fluentBitConfigName: fluent-bit-config
  tolerations:
    - operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
  name: fluent-bit-config
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  service:
    parsersFile: parsers.conf
  inputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
  filterSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
  outputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
  name: tail
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  tail:
    tag: kube.*
    path: /var/log/containers/*.log
    # Exclude logs from util pod
    excludePath: /var/log/containers/utils_default_utils-*.log
    parser: docker   #If it is a containerd environment, then this item should be set to cri
    refreshIntervalSeconds: 10
    memBufLimit: 5MB
    skipLongLines: true
    db: /fluent-bit/tail/pos.db
    dbSync: Normal
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
  name: kubernetes
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  match: kube.*
  filters:
  - kubernetes:
      kubeURL: https://kubernetes.default.svc:443
      kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
      labels: false
      annotations: false
  - nest:
      operation: lift
      nestedUnder: kubernetes
      addPrefix: kubernetes_
  - modify:
      rules:
      - remove: stream
      - remove: kubernetes_pod_id
      - remove: kubernetes_host
      - remove: kubernetes_container_hash
  - nest:
      operation: nest
      wildcard:
      - kubernetes_*
      nestUnder: kubernetes
      removePrefix: kubernetes_
EOF

The following are each output so add/remove as necessary:

cat <<EOF | kubectl apply -f -
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: kafka
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  kafka:
    brokers: my-cluster-kafka-brokers.kafka.svc:9092
    topics: fluent-log
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: k8s-app-es
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  es:
    host: elasticsearch-master.elastic.svc
    port: 9200
    generateID: true
    logstashPrefix: fluent-app-log-fb-only
    logstashFormat: true
    timeKey: "@timestamp"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: k8s-app-loki
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  loki:
    host: loki.loki.svc
    labels:
      - job=fluentbit
EOF

We should see the FluentBit DaemonSet up and running with other CRs:

kubectl -n fluent get daemonset
kubectl -n fluent get fluentbit
kubectl -n fluent get clusterfluentbitconfig
kubectl -n fluent get clusterinput.fluentbit.fluent.io
kubectl -n fluent get clusterfilter.fluentbit.fluent.io
kubectl -n fluent get clusteroutput.fluentbit.fluent.io

Within a couple of minutes, you can double check the output in the Kafka cluster (or your preferred data sink - remember to set it up first).

To double check the output, please refer to this guide.

Fluent Bit + Fluentd mode

With its rich plugins, Fluentd acts as a log aggregation layer and is able to perform more advanced log processing. You can forward logs from Fluent Bit to Fluentd with ease using the Fluent Operator.

Forward logs from Fluent Bit to Fluentd

To forward logs from Fluent Bit to Fluentd, we need to enable the Fluent Bit forward plugin as below:

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: fluentd
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  forward:
    host: fluentd.fluent.svc
    port: 24224
EOF

Enable Fluentd Forward Input plugin to receive logs from Fluent Bit

Deploy Fluentd with its forward input plugin enabled:

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  name: fluentd
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluentd
spec:
  globalInputs:
  - forward: 
      bind: 0.0.0.0
      port: 24224
  replicas: 1
  image: kubesphere/fluentd:v1.14.6
  fluentdCfgSelector: 
    matchLabels:
      config.fluentd.fluent.io/enabled: "true"
EOF

ClusterFluentdConfig: Fluentd cluster-wide configuration

We can send logs Fluentd received from Fluent Bit to a cluster-wide sink defined by a ClusterOutput.

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  name: cluster-fluentd-config
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  watchedNamespaces: 
  - kube-system
  - default
  - kafka
  - elastic
  - fluent
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/scope: "cluster"
      output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  name: cluster-fluentd-output-es
  labels:
    output.fluentd.fluent.io/scope: "cluster"
    output.fluentd.fluent.io/enabled: "true"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-cluster-fd
EOF

We should see the Fluentd StatefulSet up and running with other CRs:

kubectl -n fluent get statefulset
kubectl -n fluent get fluentd
kubectl -n fluent get clusterfluentdconfig.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io

FluentdConfig: Fluentd namespaced-wide configuration

We can send logs Fluentd received from Fluent Bit to a namespace-wide sink defined by a Output. Only logs in the same namespace as FluentdConfig will be sent to the Output.

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: namespace-fluentd-config
  namespace: fluent
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  outputSelector:
    matchLabels:
      output.fluentd.fluent.io/scope: "namespace"
      output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Output
metadata:
  name: namespace-fluentd-output-es
  namespace: fluent
  labels:
    output.fluentd.fluent.io/scope: "namespace"
    output.fluentd.fluent.io/enabled: "true"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-namespace-fd
EOF

We should see the Fluentd CRs created:

kubectl -n fluent get fluentdconfig
kubectl -n fluent get output.fluentd.fluent.io

Use cluster-wide and namespaced-wide FluentdConfig together

We can use FluentdConfig and ClusterFluentdConfig together like below. The FluentdConfig will send logs from the fluent namespace to the ClusterOutput. The ClusterFluentdConfig will send logs from any of the watchedNamespaces to the ClusterOutput.

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  name: cluster-fluentd-config-hybrid
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  watchedNamespaces: 
  - kube-system
  - default
  - kafka
  - elastic
  - fluent
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/scope: "hybrid"
      output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: namespace-fluentd-config-hybrid
  namespace: fluent
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/scope: "hybrid"
      output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  name: cluster-fluentd-output-es-hybrid
  labels:
    output.fluentd.fluent.io/scope: "hybrid"
    output.fluentd.fluent.io/enabled: "true"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-hybrid-fd
EOF

We should see the Fluentd CRs created:

kubectl -n fluent get clusterfluentdconfig
kubectl -n fluent get fluentdconfig
kubectl -n fluent get clusteroutput.fluentd.fluent.io

Use cluster-wide and namespaced FluentdConfig together in multi-tenant scenarios

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: namespace-fluentd-config-user1
  namespace: fluent
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  outputSelector:
    matchLabels:
      output.fluentd.fluent.io/enabled: "true"
      output.fluentd.fluent.io/user: "user1"
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/enabled: "true"
      output.fluentd.fluent.io/user: "user1"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  name: cluster-fluentd-config-cluster-only
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  watchedNamespaces: 
  - kube-system
  - default
  - kafka
  - elastic
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/enabled: "true"
      output.fluentd.fluent.io/scope: "cluster-only"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Output
metadata:
  name: namespace-fluentd-output-user1
  namespace: fluent
  labels:
    output.fluentd.fluent.io/enabled: "true"
    output.fluentd.fluent.io/user: "user1"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-user1-fd
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  name: cluster-fluentd-output-user1
  labels:
    output.fluentd.fluent.io/enabled: "true"
    output.fluentd.fluent.io/user: "user1"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-cluster-user1-fd
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  name: cluster-fluentd-output-cluster-only
  labels:
    output.fluentd.fluent.io/enabled: "true"
    output.fluentd.fluent.io/scope: "cluster-only"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-cluster-only-fd
EOF

we should see the Fluentd CRs created:

kubectl -n fluent get clusterfluentdconfig
kubectl -n fluent get fluentdconfig
kubectl -n fluent get output.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io

Route logs to different Kafka topics based on namespace

We can use Fluentd filter to distribute logs to different Kafka topics based on namespace.

kubectl apply -f https://raw.githubusercontent.com/kubesphere-sigs/fluent-operator-walkthrough/master/manifest/route-to-different-kafka-topic.yaml

we should see the Fluentd CRs created:

kubectl -n fluent get clusterfluentdconfig
kubectl -n fluent get clusterfilter.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io

Using buffer for Fluentd output

We can add a buffer to cache logs for the output plugins.

kubectl apply -f https://raw.githubusercontent.com/kubesphere-sigs/fluent-operator-walkthrough/master/manifest/use-buffer-for-fluentd.yaml

we should see the Fluentd CRs created:

kubectl -n fluent get clusterfluentdconfig
kubectl -n fluent get clusterfilter.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io

Fluentd only mode

Use fluentd to receive logs from HTTP and output to stdout

We can use Fluentd to receive logs via HTTP.

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  name: fluentd-http
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluentd
spec:
  globalInputs:
    - http: 
        bind: 0.0.0.0
        port: 9880
  replicas: 1
  image: kubesphere/fluentd:v1.14.4
  fluentdCfgSelector: 
    matchLabels:
      config.fluentd.fluent.io/mode: "fluentd-only" 
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: fluentd-only-config
  namespace: fluent
  labels:
    config.fluentd.fluent.io/mode: "fluentd-only" 
spec:
  outputSelector:
    matchLabels:
      output.fluentd.fluent.io/mode: "fluentd-only"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Output
metadata:
  name: fluentd-only-stdout
  namespace: fluent
  labels:
    output.fluentd.fluent.io/mode: "fluentd-only"
spec: 
  outputs: 
  - stdout: {}
EOF

we should see the Fluentd CRs created:

kubectl -n fluent get fluentd
kubectl -n fluent get fluentdconfig
kubectl -n fluent get filter.fluentd.fluent.io
kubectl -n fluent get output.fluentd.fluent.io

Then we can send logs to the endpoint by using curl:

curl -X POST -d 'json={"foo":"bar"}' http://fluentd-http.fluent.svc:9880/app.log

How to check the configuration and data

  1. See the state of the Fluent Operator:
kubectl get pod -n fluent
  1. See the generated fluent bit configuration:
kubectl -n fluent get secrets fluent-bit-config -ojson | jq '.data."fluent-bit.conf"' | awk -F '"' '{printf $2}' | base64 --decode

[Service]
    Parsers_File    parsers.conf
[Input]
    Name    systemd
    Path    /var/log/journal
    DB    /fluent-bit/tail/docker.db
    DB.Sync    Normal
    Tag    service.docker
    Systemd_Filter    _SYSTEMD_UNIT=docker.service
[Input]
    Name    systemd
    Path    /var/log/journal
    DB    /fluent-bit/tail/kubelet.db
    DB.Sync    Normal
    Tag    service.kubelet
    Systemd_Filter    _SYSTEMD_UNIT=kubelet.service
[Input]
    Name    tail
    Path    /var/log/containers/*.log
    Refresh_Interval    10
    Skip_Long_Lines    true
    DB    /fluent-bit/tail/pos.db
    DB.Sync    Normal
    Mem_Buf_Limit    5MB
    Parser    docker
    Tag    kube.*
[Filter]
    Name    lua
    Match    kube.*
    script    /fluent-bit/config/containerd.lua
    call    containerd
    time_as_table    true
[Filter]
    Name    kubernetes
    Match    kube.*
    Kube_URL    https://kubernetes.default.svc:443
    Kube_CA_File    /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File    /var/run/secrets/kubernetes.io/serviceaccount/token
    Labels    false
    Annotations    false
[Filter]
    Name    nest
    Match    kube.*
    Operation    lift
    Nested_under    kubernetes
    Add_prefix    kubernetes_
[Filter]
    Name    modify
    Match    kube.*
    Remove    stream
    Remove    kubernetes_pod_id
    Remove    kubernetes_host
    Remove    kubernetes_container_hash
[Filter]
    Name    nest
    Match    kube.*
    Operation    nest
    Wildcard    kubernetes_*
    Nest_under    kubernetes
    Remove_prefix    kubernetes_
[Filter]
    Name    lua
    Match    service.*
    script    /fluent-bit/config/systemd.lua
    call    add_time
    time_as_table    true
[Output]
    Name    forward
    Match_Regex    (?:kube|service)\.(.*)
    Host    fluentd.fluent.svc
    Port    24224
  1. See the generated fluentd configuration:
kubectl -n fluent get secrets fluentd-config -ojson | jq '.data."app.conf"' | awk -F '"' '{printf $2}' | base64 --decode 
---
<source>
  @type  forward
  bind  0.0.0.0
  port  24224
</source>
<match **>
  @id  main
  @type  label_router
  <route>
    @label  @48b7cb809bc2361ba336802a95eca0d4
    <match>
      namespaces  kube-system,default
    </match>
  </route>
</match>
<label @48b7cb809bc2361ba336802a95eca0d4>
  <filter **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::fluentd-filter-0
    @type  record_transformer
    enable_ruby  true
    <record>
      kubernetes_ns  ${record["kubernetes"]["namespace_name"]}
    </record>
  </filter>
  <match **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-0
    @type  elasticsearch
    host  elasticsearch-master.elastic.svc
    logstash_format  true
    logstash_prefix  fluent-log
    port  9200
  </match>
  <match **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-kafka-0
    @type  kafka2
    brokers  my-cluster-kafka-brokers.kafka.svc:9092
    topic_key  kubernetes_ns
    use_event_time  true
    <format>
      @type  json
    </format>
  </match>
</label>
  1. If you chose to forward the logs to Elasticsearch in the previous steps, you could query the elastic cluster kubernetes_ns buckets:
kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch --  curl -X GET "localhost:9200/fluent-log*/_search?pretty" -H 'Content-Type: application/json' -d '{
   "size" : 0,
   "aggs" : {
      "kubernetes_ns": {
         "terms" : {
           "field": "kubernetes.namespace_name.keyword"
         }
      }
   }
}'

If you don't use fluentd to extract the namspace field, you can use other query API.

  1. You could also query the index of Elasticsearch. You will find that a new index has been created by fluent operator.
kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch -- curl 'localhost:9200/_cat/indices?v'
  1. If you chose to forward the logs to Kafka in the previous steps, you could query the kafka cluster and its topic:
# Enter a util pod to connect to kafka
kubectl run --rm utils -it --image arunvelsriram/utils bash
# Connect to kafka and read data from a kafka topic
kafkacat -C -b my-cluster-kafka-brokers.kafka.svc:9092 -t <namespace or fluent-log>
# exit the util pod
exit

Replace to the actual topic.