DeltaFIFO
Overview
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.
- The actual data is stored in
itemsin the for ofmap[string]Deltas. - The order is stored in
queueas[]string.
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
items map[string]Deltas
queue []string
populated bool
initialPopulationCount int
keyFunc KeyFunc
knownObjects KeyListerGetter
closed bool
emitDeltaTypeReplaced bool
}
type Deltas []Delta
type DeltaType string
type Delta struct {
Type DeltaType
Object interface{}
}
-
Difference between DeltaFIFO and FIFO (Need to summarize the long explanation later)
-
One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.
-
The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.
- DeltaFIFO solves this use case
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of some of the objects.
- You might want to periodically reprocess objects.
-
Usage: how DeltaFIFO is used in informer
- Create Indexer
- Create DeltaFIFO
- Call
fifo.Add(xx)orfifo.Update(xx)orfifo.Delete(xx) - Call
fifo.Pop(process)withprocessfunctiontype PopProcessFunc func(interface{}) error, which converts the object into Deltas and process deltas withprocessDeltas.func process(obj interface{}) error { // type PopProcessFunc func(interface{}) error if deltas, ok := obj.(cache.Deltas); ok { return processDeltas(deltas) } return errors.New("object given as Process argument is not Deltas") } processDeltasupdates/add/delete indexer.
For more details, you can check informer