DeltaFIFO
Overview
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.
- The actual data is stored in
items
in the for ofmap[string]Deltas
. - The order is stored in
queue
as[]string
.
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
}
type Deltas []Delta
type DeltaType string
type Delta struct {
Type DeltaType
Object interface{}
}
-
Difference between DeltaFIFO and FIFO (Need to summarize the long explanation later)
-
One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.
-
The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.
- DeltaFIFO solves this use case
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of some of the objects.
- You might want to periodically reprocess objects.
-
Usage: how DeltaFIFO is used in informer
- Create Indexer
- Create DeltaFIFO
- Call
fifo.Add(xx)
orfifo.Update(xx)
orfifo.Delete(xx)
- Call
fifo.Pop(process)
withprocess
functiontype PopProcessFunc func(interface{}) error
, which converts the object into Deltas and process deltas withprocessDeltas
.func process(obj interface{}) error { // type PopProcessFunc func(interface{}) error if deltas, ok := obj.(cache.Deltas); ok { return processDeltas(deltas) } return errors.New("object given as Process argument is not Deltas") }
processDeltas
updates/add/delete indexer.
For more details, you can check informer