Dirty Streams
FluxaORM allows you to define special Redis Streams that hold information about entities that are added, edited or deleted. All you need to do is to add dirty tag to your entity with a stream name and optional attributes.
Defining dirty streams
Check the below example:
type UserEntity struct {
ID uint32 `orm:"dirty=AllChanges,NewUsers:add,DeletedUsers:delete,EmailAddedOrChanged:add"`
Name string
Email string `orm:"dirty=EmailAddedOrChanged"`
}
type CategoryEntity struct {
ID uint16 `orm:"dirty=AllChanges,CategoryAddedDeleted:add|delete"`
}
Above example will create five Redis Streams in entity redis cache pool (or fluxabee.DefaultPoolCode if you don't specify pool code):
dirty_AllChanges- triggered when UserEntity or CategoryEntity is added, updated or deleteddirty_NewUsers- triggered when UserEntity is addeddirty_DeletedUsers- triggered when UserEntity is deleteddirty_EmailAddedOrChanged- triggered when UserEntity is added or email is changeddirty_CategoryAddedDeleted- triggered when CategoryEntity is added or deleted
Consuming dirty stream events
To consume events from dirty streams simply create a consumer using fluxaorm.NewDirtyStreamConsumerSingle() or fluxaorm.NewDirtyStreamConsumerMany() function and call Digest method.
consumer := fluxaorm.NewDirtyStreamConsumerSingle(orm, "AllChanges", func(events []*fluxaorm.DirtyStreamEvent) {
for _, event := range events {
event.EntityName // for example "mypkg.UserEntity"
event.ID // ID of added, edited or deleted entity
event.Operation // fluxaorm.Insert or fluxaorm.Update or fluxaorm.Delete
event.Bind // map of attributes that were changed
}
})
defer consumer.Cleanup()
consumer.Consume(100, time.Second * 5) // wait max 5 secondas for new events and consume max 100 event as consumer with name "consumer_single"
event.Bind is a map of attributes that were changed. When entity is added Bind holds all attributes of the entity. When is deleted Bind holds also all fields of deleted entity. Edited entity holds only changed fields.
Tips
In case your entity uses Fake Delete feature when you mark entity as fake deleted event.Operation is equal to fluxaorm.Delete and event.Bind holds only changed field {"FakeDelete": 13243}.
In case one consumer is not enough to consume all events you can create more consumers:
consumer1 := fluxaorm.NewDirtyStreamConsumerMany(orm, "AllChanges", func(events []*fluxaorm.DirtyStreamEvent) {
// ....
})
defer consumer1.Cleanup()
consumer2 := fluxaorm.NewDirtyStreamConsumerMany(orm, "AllChanges", func(events []*fluxaorm.DirtyStreamEvent) {
// ....
})
defer consumer2.Cleanup()
go func() {
consumer1.Consume(100, time.Second * 5)
}()
go func() {
consumer2.Consume(100, time.Second * 5)
}()
By default all events passed to above function are automatically acknowledged when function finishes. If you want to acknowledge event sooner you can run event.Ack() in your code:
consumer := fluxaorm.NewDirtyStreamConsumerSingle(orm, "AllChanges", func(events []*fluxaorm.DirtyStreamEvent) {
for _, event := range events {
event.Ack() // acknowledge event immediately in stream
}
})
Dirty stream statistics
To get statistics of dirty stream use GetStreamStatistics() method of fluxaorm.EventBroker. You must add dirty_ prefix to stream name.
stats := ctx.GetEventBroker().GetStreamStatistics("dirty_AllChanges")
stats.Len // number of events in stream
stats.OldestEventSeconds // time in seconds since oldest event
...
Auto-claim old events
consumer := fluxaorm.NewDirtyStreamConsumerMany(orm, "AllChanges", func(events []*fluxaorm.DirtyStreamEvent) {
// ....
})
consumer.AutoClaim(1000, time.Second * 5) // auto-claim max 1000 penfing events older than 5 seconds