mirror of https://github.com/status-im/go-waku.git
fix: presist lastProcessedBlock correctly when no rln events are processed (#981)
This commit is contained in:
parent
ec468e0a26
commit
6141f94b40
|
@ -46,13 +46,15 @@ type DynamicGroupManager struct {
|
|||
membershipIndexToLoad *uint
|
||||
}
|
||||
|
||||
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error {
|
||||
func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered, latestProcessBlock uint64) error {
|
||||
gm.lastBlockProcessedMutex.Lock()
|
||||
defer gm.lastBlockProcessedMutex.Unlock()
|
||||
|
||||
toRemoveTable := om.New()
|
||||
toInsertTable := om.New()
|
||||
|
||||
if gm.lastBlockProcessed == 0 {
|
||||
gm.lastBlockProcessed = latestProcessBlock
|
||||
}
|
||||
lastBlockProcessed := gm.lastBlockProcessed
|
||||
for _, event := range events {
|
||||
if event.Raw.Removed {
|
||||
|
|
|
@ -57,7 +57,7 @@ func TestHandler(t *testing.T) {
|
|||
|
||||
events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)}
|
||||
|
||||
err = gm.handler(events)
|
||||
err = gm.handler(events, 1)
|
||||
require.NoError(t, err)
|
||||
|
||||
roots = gm.rootTracker.Roots()
|
||||
|
@ -73,7 +73,7 @@ func TestHandler(t *testing.T) {
|
|||
eventBuilder(4, false, 0xeeee, 5),
|
||||
}
|
||||
|
||||
err = gm.handler(events)
|
||||
err = gm.handler(events, 4)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Root[1] should become [0]
|
||||
|
@ -97,7 +97,7 @@ func TestHandler(t *testing.T) {
|
|||
eventBuilder(3, false, 0xeeee, 5),
|
||||
}
|
||||
|
||||
err = gm.handler(events)
|
||||
err = gm.handler(events, 3)
|
||||
require.NoError(t, err)
|
||||
|
||||
roots = gm.rootTracker.Roots()
|
||||
|
@ -111,7 +111,7 @@ func TestHandler(t *testing.T) {
|
|||
// Adding multiple events for same block
|
||||
events = []*contracts.RLNMemberRegistered{}
|
||||
|
||||
err = gm.handler(events)
|
||||
err = gm.handler(events, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
roots = gm.rootTracker.Roots()
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
)
|
||||
|
||||
// RegistrationEventHandler represents the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
|
||||
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error
|
||||
type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered, uint64) error
|
||||
|
||||
// MembershipFetcher is used for getting membershipRegsitered Events from the eth rpc
|
||||
type MembershipFetcher struct {
|
||||
|
@ -94,7 +94,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo
|
|||
t1Since := time.Since(t1)
|
||||
|
||||
t2 := time.Now()
|
||||
if err := handler(events); err != nil {
|
||||
if err := handler(events, fromBlock+maxBatchSize); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -110,7 +110,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo
|
|||
|
||||
// process all the fetched events
|
||||
t2 := time.Now()
|
||||
err = handler(events)
|
||||
err = handler(events, toBlock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint6
|
|||
fromBlock = toBlock + 1
|
||||
}
|
||||
|
||||
err = handler(events)
|
||||
err = handler(events, toBlock)
|
||||
if err != nil {
|
||||
mf.log.Error("processing rln log", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ func TestFetchingLogic(t *testing.T) {
|
|||
}
|
||||
|
||||
counts := []int{}
|
||||
mockFn := func(events []*contracts.RLNMemberRegistered) error {
|
||||
mockFn := func(events []*contracts.RLNMemberRegistered, latestProcessedBlock uint64) error {
|
||||
counts = append(counts, len(events))
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue