diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index b9e9d266..41970984 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -46,13 +46,15 @@ type DynamicGroupManager struct { membershipIndexToLoad *uint } -func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { +func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered, latestProcessBlock uint64) error { gm.lastBlockProcessedMutex.Lock() defer gm.lastBlockProcessedMutex.Unlock() toRemoveTable := om.New() toInsertTable := om.New() - + if gm.lastBlockProcessed == 0 { + gm.lastBlockProcessed = latestProcessBlock + } lastBlockProcessed := gm.lastBlockProcessed for _, event := range events { if event.Raw.Removed { diff --git a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go index 8474c25d..7a63cfbb 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go @@ -57,7 +57,7 @@ func TestHandler(t *testing.T) { events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)} - err = gm.handler(events) + err = gm.handler(events, 1) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -73,7 +73,7 @@ func TestHandler(t *testing.T) { eventBuilder(4, false, 0xeeee, 5), } - err = gm.handler(events) + err = gm.handler(events, 4) require.NoError(t, err) // Root[1] should become [0] @@ -97,7 +97,7 @@ func TestHandler(t *testing.T) { eventBuilder(3, false, 0xeeee, 5), } - err = gm.handler(events) + err = gm.handler(events, 3) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -111,7 +111,7 @@ func TestHandler(t *testing.T) { // Adding multiple events for same block events = []*contracts.RLNMemberRegistered{} - err = gm.handler(events) + err = gm.handler(events, 0) require.NoError(t, err) roots = gm.rootTracker.Roots() diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go index 3b99850b..42c3b5f2 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -20,7 +20,7 @@ import ( ) // RegistrationEventHandler represents the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error +type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered, uint64) error // MembershipFetcher is used for getting membershipRegsitered Events from the eth rpc type MembershipFetcher struct { @@ -94,7 +94,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo t1Since := time.Since(t1) t2 := time.Now() - if err := handler(events); err != nil { + if err := handler(events, fromBlock+maxBatchSize); err != nil { return err } @@ -110,7 +110,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo // process all the fetched events t2 := time.Now() - err = handler(events) + err = handler(events, toBlock) if err != nil { return err } @@ -156,7 +156,7 @@ func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint6 fromBlock = toBlock + 1 } - err = handler(events) + err = handler(events, toBlock) if err != nil { mf.log.Error("processing rln log", zap.Error(err)) } diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go index 4c70fba0..12bcb4f0 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go @@ -38,7 +38,7 @@ func TestFetchingLogic(t *testing.T) { } counts := []int{} - mockFn := func(events []*contracts.RLNMemberRegistered) error { + mockFn := func(events []*contracts.RLNMemberRegistered, latestProcessedBlock uint64) error { counts = append(counts, len(events)) return nil }