spoved/kube-client.cr

List or watch custom CRDs

Closed this issue · 11 comments

I'm trying to implement a K8S operator using this library. I'm running into an issue when I try to fetch or watch a CRD.

Steps to reproduce

The final code can be found here.


File structure :

./
├── crds.yaml
├── lib/
│  ├── ...
├── LICENSE
├── README.md
├── shard.lock
├── shard.yml
├── spec/
│  ├── memcached-operator_spec.cr
│  └── spec_helper.cr
├── src/
│  ├── crds/
│  │  └── com/
│  │     └── example/
│  │        └── cache/
│  │           └── v1alpha1/
│  │              ├── memcached.cr
│  │              ├── memcached_list.cr
│  │              ├── memcached_spec.cr
│  │              └── memcached_status.cr
│  └── memcached-operator.cr

The source code (./src/memcached-operator.cr) :

require "kube-client/v1.21"
require "./crds/com/example/cache/v1alpha1/*"
require "log"

module Memcached::Operator
  VERSION = "0.1.0"

  # Initialize the connection with a default client.
  client = Kube::Client.autoconfig

  # Create a channel to watch all pods within the `default` namespace.
  #
  # In fact, the channel will be used as loop to fetch information directly from
  # the Kubernetes API. It's lighweight as possible for the API Server.
  channel = client.api("cache.example.com/v1alpha1").resource("memcacheds").watch(namespace: "default")

  # While the channel is not closed (potentially if the API server restarts ?),
  # loop and iterate on every new entry.
  until channel.closed?
    event = channel.receive

    # If the event is not a pod, we don't want to parse it.
    unless event.is_a? K8S::Kubernetes::WatchEvent(K8S::Com::Example::Cache::V1alpha1::Memcached)
      # If there's an API error, just log it and continue the loop.
      if event.is_a? Kube::Error::API || event.is_a? Kube::Error::UndefinedResource
        Log.for("reconcile").error &.emit(event.to_s)
      end

      next
    end

    # Simply log the pod spec and status.
    Log.for("reconcile").info { "namespace=#{event.object.metadata!.namespace} pod=#{event.object.metadata!.name}" }
  end
end

The CRD (YAML format) (./crds.yaml) :

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.3.0
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"apiextensions.k8s.io/v1beta1","kind":"CustomResourceDefinition","metadata":{"annotations":{"controller-gen.kubebuilder.io/version":"v0.3.0"},"creationTimestamp":null,"name":"memcacheds.cache.example.com"},"spec":{"group":"cache.example.com","names":{"kind":"Memcached","listKind":"MemcachedList","plural":"memcacheds","singular":"memcached"},"scope":"Namespaced","subresources":{"status":{}},"validation":{"openAPIV3Schema":{"description":"Memcached is the Schema for the memcacheds API","properties":{"apiVersion":{"description":"APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources","type":"string"},"kind":{"description":"Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds","type":"string"},"metadata":{"type":"object"},"spec":{"description":"MemcachedSpec defines the desired state of Memcached","properties":{"size":{"format":"int32","type":"integer"}},"required":["size"],"type":"object"},"status":{"description":"MemcachedStatus defines the observed state of Memcached","properties":{"nodes":{"items":{"type":"string"},"type":"array"}},"required":["nodes"],"type":"object"}},"type":"object"}},"version":"v1alpha1","versions":[{"name":"v1alpha1","served":true,"storage":true}]},"status":{"acceptedNames":{"kind":"","plural":""},"conditions":[],"storedVersions":[]}}
  creationTimestamp: "2022-05-05T14:41:44Z"
  generation: 1
  name: memcacheds.cache.example.com
  resourceVersion: "12544"
  uid: 71715b95-3572-47d3-8409-a49329642720
spec:
  conversion:
    strategy: None
  group: cache.example.com
  names:
    kind: Memcached
    listKind: MemcachedList
    plural: memcacheds
    singular: memcached
  preserveUnknownFields: true
  scope: Namespaced
  versions:
  - name: v1alpha1
    schema:
      openAPIV3Schema:
        description: Memcached is the Schema for the memcacheds API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource
              this object represents. Servers may infer this from the endpoint the
              client submits requests to. Cannot be updated. In CamelCase. More
              info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: MemcachedSpec defines the desired state of Memcached
            properties:
              size:
                format: int32
                type: integer
            required:
            - size
            type: object
          status:
            description: MemcachedStatus defines the observed state of Memcached
            properties:
              nodes:
                items:
                  type: string
                type: array
            required:
            - nodes
            type: object
        type: object
    served: true
    storage: true
    subresources:
      status: {}
status:
  acceptedNames:
    kind: Memcached
    listKind: MemcachedList
    plural: memcacheds
    singular: memcached
  conditions:
  - lastTransitionTime: "2022-05-05T14:41:44Z"
    message: 'spec.preserveUnknownFields: Invalid value: true: must be false'
    reason: Violations
    status: "True"
    type: NonStructuralSchema
  - lastTransitionTime: "2022-05-05T14:41:44Z"
    message: no conflicts found
    reason: NoConflicts
    status: "True"
    type: NamesAccepted
  - lastTransitionTime: "2022-05-05T14:41:44Z"
    message: the initial names have been accepted
    reason: InitialNamesAccepted
    status: "True"
    type: Established
  storedVersions:
  - v1alpha1

The command used to generate the Crystal object from the Yaml CRD :

mkdir -p src/crds
crystal run ./lib/k8s/bin/gen_crd.cr -- ./crds.yaml ./src/crds

During this step, I needed to remove a few lines to make the objects works :

-require "../../../../apimachinery/apis/meta/v1/object_meta" # This file doesn't exist.
require "./memcached_spec"
require "./memcached_status"

Output log

2022-05-05T15:21:02.853023Z   INFO - kube.transport: Using config with server=https://127.0.0.1:35155
2022-05-05T15:21:02.859181Z   INFO - kube.transport: GET /apis/cache.example.com/v1alpha1 => HTTP OK: K8S::Apimachinery::Apis::Meta::V1::APIResourceList(@__object__=#<K8S::Internals::GenericObject:0x7f167add5200 @__hash__={"kind" => "APIResourceList", "apiVersion" => "v1", "groupVersion" => "cache.example.com/v1alpha1", "resources" => [{"name" => "memcacheds", "singularName" => "memcached", "namespaced" => true, "kind" => "Memcached", "verbs" => ["delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"], "storageVersionHash" => "345Up0t4GjI="}, {"name" => "memcacheds/status", "singularName" => "", "namespaced" => true, "kind" => "Memcached", "verbs" => ["get", "patch", "update"]}]}>) in 00:00:00.004777762s
2022-05-05T15:21:02.859233Z   WARN - Unknown api resource: "/v1/Memcached" -- group: "", version: "v1", kind: "Memcached"
2022-05-05T15:21:08.998127Z   WARN - Kube::ResourceClient(K8S::Kubernetes::Resource: Watching {"watch" => "true"}

Did I missed something? I'm not sure to understand why there's an unknown api resource, because the API version is already defined directly into the .api method. Also, the groupVersion is also specified into the API.

In fact, it doesn't seem to work when the apiVersion is not ending with v1. For example, when I try with the following snippet, I got the same unknown API resource output :

channel = client.api("storage.k8s.io/v1beta1").resource("csistoragecapacities").watch()

By the way, no matter I call watch or not, the issue seems to be raised during the .resource call.

It seems to work anyway with:

require "kube-client/v1.21"
require "./crds/com/example/cache/v1alpha1/*"
require "log"

Log.setup(:info)

module Memcached::Operator
  VERSION = "0.1.0"

  # Initialize the connection with a default client.
  client = Kube::Client.autoconfig
  client.apis(prefetch_resources: true)

  # Create a channel to watch all pods within the `default` namespace.
  #
  # In fact, the channel will be used as loop to fetch information directly from
  # the Kubernetes API. It's lighweight as possible for the API Server.
  channel = client.api("cache.example.com/v1alpha1").resource("memcacheds").watch(namespace: "default")

  # While the channel is not closed (potentially if the API server restarts ?),
  # loop and iterate on every new entry.
  until channel.closed?
    event = channel.receive

    # If the event is not a pod, we don't want to parse it.
    unless event.is_a? K8S::Kubernetes::WatchEvent(K8S::Kubernetes::Resource) && event.object.is_a? K8S::Com::Example::Cache::V1alpha1::Memcached
      # If there's an API error, just log it and continue the loop.
      if event.is_a? Kube::Error::API || event.is_a? Kube::Error::UndefinedResource
        Log.for("reconcile").error &.emit(event.to_s)
      end

      next
    end

    # `kube-client` doesn't handle properly CRD, so we have to cast the object
    # into the desired one.
    resource = event.object.as(K8S::Com::Example::Cache::V1alpha1::Memcached)

    # Simply log the CRD spec and status.
    Log.for("reconcile").info { "status=#{event.type} namespace=#{resource.metadata!.namespace} name=#{resource.metadata!.name} size=#{resource.spec.try &.size}" }
  end
end

But, there's something wrong in the log output:

2022-05-09T08:39:31.307902Z   INFO - kube.transport: GET /apis/cache.example.com/v1alpha1 => HTTP OK: K8S::Apimachinery::Apis::Meta::V1::APIResourceList(@__object__=#<K8S::Internals::GenericObject:0x7fd249996120 @__hash__={"kind" => "APIResourceList", "apiVersion" => "v1", "groupVersion" => "cache.example.com/v1alpha1", "resources" => [{"name" => "memcacheds", "singularName" => "memcached", "namespaced" => true, "kind" => "Memcached", "verbs" => ["delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"], "storageVersionHash" => "345Up0t4GjI="}, {"name" => "memcacheds/status", "singularName" => "", "namespaced" => true, "kind" => "Memcached", "verbs" => ["get", "patch", "update"]}]}>) in 00:00:00.000570465s
2022-05-09T08:39:31.307938Z   WARN - Unknown api resource: "/v1/Memcached" -- group: "", version: "v1", kind: "Memcached"
2022-05-09T08:39:31.307987Z   WARN - Kube::ResourceClient(K8S::Kubernetes::Resource: Watching {"watch" => "true"}
2022-05-09T08:39:31.308908Z   INFO - reconcile: status=ADDED namespace=default name=memcached-sample size=2

The unknown API resource should not appear, or maybe a request is not correctly done.

Hey! thanks for the update. I have started focusing more on CRDs last week and there is a generator which will create the Crystal classes for you. Its in the k8s.cr shard, but you should be able to call it from your root dir.

# First dump the CRDs to a file
kubectl get crds -o yaml > crds.yaml

# Then generate the resources
./lib/k8s/bin/gen_crd.cr -- ./crds.yaml <output directory>

I've also started working on a kube-sdk which will eventually implement base operators and CRD generation.

ok, so further review, it looks like you did use the generator. Nice!

So let me give this a try myself and see what i come up with.

I do get the following error from your CRD on k8s versions 1.21-1.23 :

The CustomResourceDefinition "memcacheds.cache.example.com" is invalid: spec.preserveUnknownFields: Invalid value: true: cannot set to true, set x-preserve-unknown-fields to true in spec.versions[*].schema instead

Just an FYI. shouldn't stop me from testing

Pretty nice you work on kube-sdk for operators, I'm really interested about it!

There are 3 things that didn't work as expected :

  • there's a Unknown api resource: "/v1/Memcached" -- group: "", version: "v1", kind: "Memcached" warn log when the API version is not v1
  • when loading multiple CRDs instances (from APIResourceList), we can't check from event.is_a? K8S::Kubernetes::WatchEvent(My::CRD), but instead event.is_a? K8S::Kubernetes::WatchEvent(K8S::Kubernetes::Resource) && event.object.is_a? My::CRD
  • the generated CRD from ./lib/k8s/bin/gen_crd.cr add a require "../../../../apimachinery/apis/meta/v1/object_meta" which is not found in the current workdir

Anyway, there's trick to bypass those things, but it's interesting to note them in the backlog. By the way, FIY, I'm testing with K8S API 1.21. I don't have your error from my command line... 😅

Yes, ive narrowed this down to a bug in the api client when it makes a new resource client. Somewhere the api client is for v1 and not cache.example.com/v1alpha1 it cannot find the Memcached in the v1 resource list (as to be expected). Its still somehow able to work, but heres an easier way to show the issue:

client = Kube::Client.autoconfig
client.apis(prefetch_resources: true)
client.client_for_resource(K8S::Com::Example::Cache::V1alpha1::Memcached)

This fetches the client for the specific resource you want. It soon explodes because the default client is v1. This led to me to the issue where you are getting K8S::Kubernetes::WatchEvent(K8S::Kubernetes::Resource) instead of the resource you want.

The last one with the CRD paths is due to the generator code. i should be able to exclude it for CRD generation.

I agree with all of these being issues i want to resolve. I want the types return to match the expectations and to limit the amount of work arounds needed. Majority of these are type issues. As i did source most of this from the ruby client, which is typeless. Thanks for finding them!

@cmizzi i also "fixed" the import issue with CRD gen in the underlying lib.

Nice, thank you. Next step is to build an operator now 😎

@cmizzi feel free to contribute to the https://github.com/spoved/kube-sdk.cr if you want to abstract some stuff.

Closing this as its now in the v0.4.3 release.

Also fixed the docs page: https://spoved.github.io/kube-client.cr/

I will. I'm just starting with Crystal with a real project, I don't understand everything yet. But be sure you'll see me again.

Thank you 😊