cluster
Cluster provides various methods to interact with a cluster. Cluster is initialized and stored in Manager with cluster.New.
Most of the fields in a cluster (scheme, cache, client, apiReader, recorderProvider, etc.) are used to injected to related components (Controller, EventHandlers, Sources, Predicates)
Types
1. Cluster interface
type Cluster interface {
SetFields(interface{}) error
GetConfig() *rest.Config
GetScheme() *runtime.Scheme
GetClient() client.Client
GetFieldIndexer() client.FieldIndexer
GetCache() cache.Cache
GetEventRecorderFor(name string) record.EventRecorder
GetRESTMapper() meta.RESTMapper
GetAPIReader() client.Reader
Start(ctx context.Context) error
}
2. cluster struct
type cluster struct {
config *rest.Config
scheme *runtime.Scheme // scheme is injected into Controllers, EventHandlers, Sources and Predicates.
cache cache.Cache // injected is injected into Sources
client client.Client // client is injected into Controllers (and EventHandlers, Sources and Predicates).
apiReader client.Reader // apiReader is the reader that will make requests to the api server and not the cache.
fieldIndexes client.FieldIndexer
recorderProvider *intrec.Provider // recorderProvider is used to generate event recorders that will be injected into Controllers (and EventHandlers, Sources and Predicates).
mapper meta.RESTMapper // mapper is used to map resources to kind, and map kind and version.
logger logr.Logger
}
New
-
SetOptionDefaults For more details, check below
-
Create a
mapper
mapper, err := options.MapperProvider(config)
-
Create a
cache
withNewCache
(cache.New)cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
For more details, read cache
-
Create
apiReader
apiReader, err := client.New(config, clientOptions)
-
Create a
writeObj
withNewClient
(DefaultNewClient -> NewDelegatingClient)writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...)
if options.NewClient == nil { options.NewClient = DefaultNewClient }
// DefaultNewClient creates the default caching client. func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { c, err := client.New(config, options) if err != nil { return nil, err } return client.NewDelegatingClient(client.NewDelegatingClientInput{ CacheReader: cache, Client: c, UncachedObjects: uncachedObjects, }) }
&delegatingClient{ scheme: in.Client.Scheme(), mapper: in.Client.RESTMapper(), Reader: &delegatingReader{ CacheReader: in.CacheReader, ClientReader: in.Client, scheme: in.Client.Scheme(), uncachedGVKs: uncachedGVKs, cacheUnstructured: in.CacheUnstructured, }, Writer: in.Client, StatusClient: in.Client, }
-
Create a
recorderProvider
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
- Create cluster
&cluster{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, mapper: mapper, logger: options.Logger, }
SetOptionDefaults
name | value | where to use |
---|---|---|
Scheme | scheme.Scheme | |
MapperProvider | func(c *rest.Config) (meta.RESTMapper, error) {return apiutil.NewDynamicRESTMapper(c, nil)} |
|
NewClient | DefaultNewClient | |
NewCache | cache.New | |
newRecorderProvider | intrec.NewProvider | |
makeBroadcaster | func() (record.EventBroadcaster, bool) {return record.NewBroadcaster(), true} |
|
Logger | logf.RuntimeLog.WithName("cluster") |
options.Scheme = scheme.Scheme
(Use the Kubernetes client-go scheme if none is specified)- MapperProvider
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { return apiutil.NewDynamicRESTMapper(c, nil) }
-
options.NewClient = DefaultNewClient
func DefaultNewClient(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { c, err := client.New(config, options) if err != nil { return nil, err } return client.NewDelegatingClient(client.NewDelegatingClientInput{ CacheReader: cache, Client: c, UncachedObjects: uncachedObjects, }) }
GetClient()
returnscluster.client
, a delegatingClient by default. For more details aboutdelegatingClient
you can check client -
options.NewCache = cache.New
options.newRecorderProvider = intrec.NewProvider
record.NewBroadcaster()
options.Logger = logf.RuntimeLog.WithName("cluster")
SetFields
func (c *cluster) SetFields(i interface{}) error {
if _, err := inject.ConfigInto(c.config, i); err != nil {
return err
}
if _, err := inject.ClientInto(c.client, i); err != nil {
return err
}
if _, err := inject.APIReaderInto(c.apiReader, i); err != nil {
return err
}
if _, err := inject.SchemeInto(c.scheme, i); err != nil {
return err
}
if _, err := inject.CacheInto(c.cache, i); err != nil {
return err
}
if _, err := inject.MapperInto(c.mapper, i); err != nil {
return err
}
return nil
}
cluster.SetFields
is called in manager.SetFieldscluster.SetFields
injectsConfig
,Client
,APIReader
,Scheme
,Cache
andMapper
into the specifiedi
.-
manager.SetFields's usage:
-
used for reconciler passed via builder in controller
// Inject dependencies into Reconciler if err := mgr.SetFields(options.Reconciler); err != nil { return nil, err }
-
used for runnables added to the Manager with add function
// Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.Lock() defer cm.Unlock() return cm.add(r) } func (cm *controllerManager) add(r Runnable) error { // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } return cm.runnables.Add(r) }
- Controller is passed in controller.New
-