k3s-io/kine

Any plan to support `ProgressNotify` feature?

qwtsc opened this issue · 11 comments

qwtsc commented

Hi! I was wondering if there are any plans to implement the ProgressNotify feature that mirrors the functionality provided by the real etcd v3 server. Without support, it appears that we may not be able to activate this particular Kubernetes feature gate. We have observed that Custom Resource controllers with the watch-list feature enabled encounter issues during the initialization phase. It seems that the lack of ProgressNotify support could be a contributing factor to this problem. Thx.

makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

I hadn't really been tracking that, but it seems easy enough. Is a 10 minute idle keepalive really the only thing that's required to support this?

controllers with the watch-list feature enabled encounter issues during the initialization phase

That doesn't seem like it would be related, as progress notify would only make a difference when there haven't been any events for a period of time - which would not apply to the initialization phase. What specific "issues" are you seeing? Can you provide steps to reproduce?

I will say that the upstream documentation for this client option doesn't seem to be exactly accurate - it says

WithProgressNotify makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

What actually happens is that there is a configurable ticker interval on the etcd side, which defaults to 10 minutes. If a watcher has not received any events since the last time the ticker ticked, a WatchResponse is sent with an empty event list. So it's not exactly 10 minutes per client, it is more like "at least every 10 minutes", as the progress-needed flag is set on new watches when they are first created.

I don't think Kubernetes should care, but it is worth noting as a potential difference in implementation, if we don't do it exactly the same.

qwtsc commented

What specific "issues" are you seeing?

When I enabled the watch-list feature, the cache_reflector in the operator became stuck at startup. Two minutes following the initial problem, the operator underwent a reboot due to a timeout. Below are the log excerpts that illustrate the issue.

I1205 14:58:57.254258       1 reflector.go:289] Starting reflector *v1beta1.Workflow (10h13m55.864994589s) from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 14:58:57.254282       1 reflector.go:325] Listing and watching *v1beta1.Workflow from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 14:58:57.254326       1 reflector.go:289] Starting reflector *v1beta1.WorkflowTopology (10h22m7.873616144s) from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 14:58:57.254346       1 reflector.go:325] Listing and watching *v1beta1.WorkflowTopology from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 14:58:57.256015       1 reflector.go:289] Starting reflector *v1beta1.DagRun (9h32m52.718727482s) from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 14:58:57.256033       1 reflector.go:325] Listing and watching *v1beta1.DagRun from pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229
I1205 15:00:57.144333       1 shared_informer.go:337] stop requested
I1205 15:00:57.144370       1 shared_informer.go:337] stop requested
I1205 15:00:57.144484       1 shared_informer.go:337] stop requested
I1205 15:00:57.144587       1 trace.go:236] Trace[1701663128]: "Reflector WatchList" name:pkg/mod/k8s.io/client-go@v0.28.3/tools/cache/reflector.go:229 (05-Dec-2023 14:58:57.254) (total time: 119890ms):
Trace[1701663128]: [1m59.890206192s] [1m59.890206192s] END
2023-12-05T15:00:57+08:00	ERROR	Could not wait for Cache to sync	{"controller": "workflowtopology", "controllerGroup": "demo.com", "controllerKind": "WorkflowTopology", "error": "failed to wait for workflowtopology caches to sync: timed out waiting for cache to be synced for Kind *v1beta1.WorkflowTopology"}
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.1
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:203
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
I1205 15:00:57.144609       1 shared_informer.go:337] stop requested
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:208
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:234
sigs.k8s.io/controller-runtime/pkg/manager.(*runnableGroup).reconcile.func1
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/manager/runnable_group.go:223
2023-12-05T15:00:57+08:00	ERROR	Could not wait for Cache to sync	{"controller": "dagrun", "controllerGroup": "demo.com", "controllerKind": "DagRun", "error": "failed to wait for dagrun caches to sync: timed out waiting for cache to be synced for Kind *v1beta1.DagRun"}
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.1
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:203
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:208
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/internal/controller/controller.go:234
sigs.k8s.io/controller-runtime/pkg/manager.(*runnableGroup).reconcile.func1
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.16.3/pkg/manager/runnable_group.go:223

Can you provide steps to reproduce?

use a common CR operator with a kine-based kube-apiserver, and set the environment variable ENABLE_CLIENT_GO_WATCH_LIST_ALPHA to true in the operator.

qwtsc commented

I will say that the upstream documentation for this client option doesn't seem to be exactly accurate - it says

WithProgressNotify makes watch server send periodic progress updates every 10 minutes when there is no incoming events.

What actually happens is that there is a configurable ticker interval on the etcd side, which defaults to 10 minutes. If a watcher has not received any events since the last time the ticker ticked, a WatchResponse is sent with an empty event list. So it's not exactly 10 minutes per client, it is more like "at least every 10 minutes", as the progress-needed flag is set on new watches when they are first created.

I don't think Kubernetes should care, but it is worth noting as a potential difference in implementation, if we don't do it exactly the same.

The cause of this issue is speculative at the moment, as I am uncertain about the underlying reason, but once your PR is merged, I would perform another test and update you with the results.

use a common CR operator

Do you have a specific prebuilt example I can use?

qwtsc commented

use a common CR operator

Do you have a specific prebuilt example I can use?

Sorry, I don't have an open-source demo available at the moment. Let me see how I can reproduce this issue in a easy way in github.

qwtsc commented

Try with https://github.com/k3s-io/kine/releases/tag/v0.11.2

Bad news. The issue persists. I will investigate and debug kube-apiserver further to find the root cause.

qwtsc commented

I believe I may have found the actual reason for the problem. This Kubernetes feature depends on the GetCurrentResourceVersionFromStorage function to retrieve the latest version of a specified resource prefix. The function issues an empty list request with limit of 1, utilizing the ListAccessor interface to extract the resource version from the returned metadata, as demonstrated in the following:

func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
	if storage == nil {
		return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
	}
	if newListFunc == nil {
		return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
	}
	emptyList := newListFunc()
	pred := SelectionPredicate{
		Label: labels.Everything(),
		Field: fields.Everything(),
		Limit: 1, // just in case we actually hit something
	}
	err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
	if err != nil {
		return 0, err
	}
	emptyListAccessor, err := meta.ListAccessor(emptyList)
	if err != nil {
		return 0, err
	}
	if emptyListAccessor == nil {
		return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
	}
	currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
	if err != nil {
		return 0, err
	}
	if currentResourceVersion == 0 {
		return 0, fmt.Errorf("the current resource version must be greater than 0")
	}
	return uint64(currentResourceVersion), nil
}

However, it seems that Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix. This behavior is illustrated in the following code snippet:

 ---> NOTE: rev is maxRowId.
	rev, kvs, err := l.backend.List(ctx, prefix, start, limit, r.Revision)
	if err != nil {
		return nil, err
	}

	logrus.Tracef("LIST key=%s, end=%s, revision=%d, currentRev=%d count=%d, limit=%d", r.Key, r.RangeEnd, r.Revision, rev, len(kvs), r.Limit)
	resp := &RangeResponse{
		Header: txnHeader(rev),
		Count:  int64(len(kvs)),
		Kvs:    kvs,
	}

As a result, Kubernetes may disregard all bookmarks that fall below the maxRowId, as seen in the nonblockingAdd function of the cacheWatcher:

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
	// if the bookmarkAfterResourceVersion hasn't been seen
	// we will try to deliver a bookmark event every second.
	// the following check will discard a bookmark event
	// if it is < than the bookmarkAfterResourceVersion
	// so that we don't pollute the input channel
	if event.Type == watch.Bookmark && event.ResourceVersion < c.bookmarkAfterResourceVersion {
		return false
	}
	select {
	case c.input <- event:
		c.markBookmarkAfterRvAsReceived(event)
		return true
	default:
		return false
	}
}

This issue likely leads to the incorrect handling of resource versions and could be the root cause of the observed problem.

Kine responds with the resourceVersion corresponding to the maxRowId globally, rather than the one associated with the specified prefix.

Ah, interesting. I believe that should be fixable.

Just to be clear, this is a List against the current resource type prefix - so the etcd client is expecting to see the latest revision of any key in that prefix, NOT the latest revision of the keystore as a whole.

What is the expected behavior if there are no keys in that prefix? I guess I'll have to test.

Moving to a new issue.