ListerWatcher
type
Interface
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
ListWatch struct
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
type Interface interface {
// Stop stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// ResultChan returns a chan which will receive all the events. If an error occurs
// or Stop() is called, the implementation will close this channel and
// release any resources used by the watch.
ResultChan() <-chan Event
}
How ListWatch is used
-
Created with NewFilteredListWatchFromClient:
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { listFunc := func(options metav1.ListOptions) (runtime.Object, error) { optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Do(context.TODO()). Get() } watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { options.Watch = true optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Watch(context.TODO()) } return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} }
-
Getter can be got from
clientset
:clientset.CoreV1().RESTClient()
1. listFunc and watchFunc is set inpodListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
NewFilteredListWatchFromClient
: 1.c.Get()
callsNewRequest().Verb("GET")
1. rest.NewRequest creates and returns a request. 1. Request.Watch 1. get retry func 1. in a for loop, runretry.Before
,client.Do(req)
, andretry.After
-
Then, listwatcher will be passed to informer.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{...
ListWatcher
is used to initialize the store (FIFODelta) withList
and keep the store up-to-date withWatch
. For more details, you can check informer
Example
Code: You can get the event through w.ResultChan()
. The example is simplified version (no error handling).
w, err := podListWatcher.Watch(metav1.ListOptions{}) // returns watch.Interface
if err != nil {
klog.Fatal(err)
}
loop:
for {
event, ok := <-w.ResultChan()
if !ok {
break loop
}
meta, err := meta.Accessor(event.Object)
if err != nil {
continue
}
resourceVersion := meta.GetResourceVersion()
klog.Infof("event: %s, resourceVersion: %s", event.Type, resourceVersion)
}
Run:
- Start the ListerWatcher for Pods.
go run main.go I0913 07:54:29.394053 92277 main.go:57] resourceVersion: 2728
- Create a Pod
You'll see the event logs:
kubectl run nginx --image=nginx
I0913 07:54:29.394239 92277 main.go:64] items: 1 I0913 07:54:29.401483 92277 main.go:84] event: ADDED, resourceVersion: 503 I0913 07:55:20.475959 92277 main.go:84] event: MODIFIED, resourceVersion: 2789 I0913 07:55:38.769688 92277 main.go:84] event: MODIFIED, resourceVersion: 2812
- Patch the Pod
You'll see
kubectl patch pod nginx -p '{"metadata":{"annotations": {"key": "val"}}}' --type=merge
MODIFIED
in the logs. -
Delete the Pod
kubectl delete pod nginx
I0913 08:02:23.486861 92277 main.go:84] event: MODIFIED, resourceVersion: 3294 I0913 08:02:23.929399 92277 main.go:84] event: MODIFIED, resourceVersion: 3298 I0913 08:02:24.239499 92277 main.go:84] event: MODIFIED, resourceVersion: 3300 I0913 08:02:24.244502 92277 main.go:84] event: DELETED, resourceVersion: 3301
Referece: example