diff --git a/flake.nix b/flake.nix index ee1a56ab..2c14d5a9 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-NtaKb8b3Pg8iSWZp4x27yFyNlrJVqaChWNyJz3yO59g="; + vendorSha256 = "sha256-17E63+sejYDLkYH43J02cE8K4JjTYoi+kpMyEt+diFU="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 1e2bb8ca..4309f8c0 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/lib/pq v1.10.4 github.com/waku-org/go-noise v0.0.4 github.com/waku-org/go-zerokit-rln v0.1.12 + github.com/wk8/go-ordered-map v1.0.0 ) require ( diff --git a/go.sum b/go.sum index 50b0745a..4cf9a609 100644 --- a/go.sum +++ b/go.sum @@ -1577,6 +1577,8 @@ github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230331181847-cba74520bae9/go. github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11-0.20200630133818-d5bec3311243/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.11/go.mod h1:83CECat5yLh5zVOf4P1ErAgKA5UDvKtgyUABdr3+MjI= +github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= +github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index fd4c75a5..214bf838 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -16,7 +16,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/protocol/rln/keystore" "github.com/waku-org/go-zerokit-rln/rln" - r "github.com/waku-org/go-zerokit-rln/rln" + om "github.com/wk8/go-ordered-map" "go.uber.org/zap" ) @@ -49,7 +49,6 @@ type DynamicGroupManager struct { chainId *big.Int rlnContract *contracts.RLN membershipFee *big.Int - lastIndexLoaded int64 saveKeystore bool keystorePath string @@ -58,6 +57,42 @@ type DynamicGroupManager struct { rootTracker *group_manager.MerkleRootTracker } +func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) error { + toRemoveTable := om.New() + toInsertTable := om.New() + for _, event := range events { + if event.Raw.Removed { + var indexes []uint64 + i_idx, ok := toRemoveTable.Get(event.Raw.BlockNumber) + if ok { + indexes = i_idx.([]uint64) + } + indexes = append(indexes, event.Index.Uint64()) + toRemoveTable.Set(event.Raw.BlockNumber, indexes) + } else { + var eventsPerBlock []*contracts.RLNMemberRegistered + i_evt, ok := toInsertTable.Get(event.Raw.BlockNumber) + if ok { + eventsPerBlock = i_evt.([]*contracts.RLNMemberRegistered) + } + eventsPerBlock = append(eventsPerBlock, event) + toInsertTable.Set(event.Raw.BlockNumber, eventsPerBlock) + } + } + + err := gm.RemoveMembers(toRemoveTable) + if err != nil { + return err + } + + err = gm.InsertMembers(toInsertTable) + if err != nil { + return err + } + + return nil +} + type RegistrationHandler = func(tx *types.Transaction) func NewDynamicGroupManager( @@ -89,7 +124,6 @@ func NewDynamicGroupManager( ethClientAddress: ethClientAddr, ethAccountPrivateKey: ethAccountPrivateKey, registrationHandler: registrationHandler, - lastIndexLoaded: -1, saveKeystore: saveKeystore, keystorePath: path, keystorePassword: password, @@ -142,11 +176,6 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return err } - err = rootTracker.Sync() - if err != nil { - return err - } - if gm.keystorePassword != "" && gm.keystorePath != "" { credentials, err := keystore.GetMembershipCredentials(gm.log, gm.keystorePath, @@ -200,10 +229,6 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return errors.New("no credentials available") } - handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error { - return gm.InsertMember(pubkey) - } - if err = gm.HandleGroupUpdates(ctx, handler); err != nil { return err } @@ -239,18 +264,38 @@ func (gm *DynamicGroupManager) persistCredentials() error { return nil } -func (gm *DynamicGroupManager) InsertMember(pubkey rln.IDCommitment) error { - gm.log.Debug("a new key is added", zap.Binary("pubkey", pubkey[:])) - // assuming all the members arrive in order - err := gm.rln.InsertMember(pubkey) - if err != nil { - gm.log.Error("inserting member into merkletree", zap.Error(err)) - return err - } +func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error { + for pair := toInsert.Oldest(); pair != nil; pair = pair.Next() { + events := pair.Value.([]*contracts.RLNMemberRegistered) // TODO: should these be sortered by index? we assume all members arrive in order + for _, evt := range events { + pubkey := rln.Bytes32(evt.Pubkey.Bytes()) + // TODO: should we track indexes to identify missing? + err := gm.rln.InsertMember(pubkey) + if err != nil { + gm.log.Error("inserting member into merkletree", zap.Error(err)) + return err + } + } - err = gm.rootTracker.Sync() - if err != nil { - return err + _, err := gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64)) + if err != nil { + return err + } + } + return nil +} + +func (gm *DynamicGroupManager) RemoveMembers(toRemove *om.OrderedMap) error { + for pair := toRemove.Newest(); pair != nil; pair = pair.Prev() { + memberIndexes := pair.Value.([]uint64) + for _, index := range memberIndexes { + err := gm.rln.DeleteMember(uint(index)) + if err != nil { + gm.log.Error("deleting member", zap.Error(err)) + return err + } + } + gm.rootTracker.Backfill(pair.Key.(uint64)) } return nil diff --git a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go new file mode 100644 index 00000000..e32757a1 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go @@ -0,0 +1,115 @@ +package dynamic + +import ( + "context" + "math/big" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-zerokit-rln/rln" +) + +func eventBuilder(blockNumber uint64, removed bool, pubkey int64, index int64) *contracts.RLNMemberRegistered { + return &contracts.RLNMemberRegistered{ + Raw: types.Log{ + BlockNumber: blockNumber, + Removed: removed, + }, + Index: big.NewInt(index), + Pubkey: big.NewInt(pubkey), + } +} + +func TestHandler(t *testing.T) { + // Create a RLN instance + rlnInstance, err := rln.NewRLN() + require.NoError(t, err) + + rootTracker, err := group_manager.NewMerkleRootTracker(5, rlnInstance) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + _ = ctx + + gm := &DynamicGroupManager{ + rln: rlnInstance, + log: utils.Logger(), + cancel: cancel, + wg: sync.WaitGroup{}, + rootTracker: rootTracker, + } + + root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33} + roots := gm.rootTracker.Roots() + require.Len(t, roots, 1) + require.Equal(t, roots[0], root0) + + events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)} + + err = handler(gm, events) + require.NoError(t, err) + + roots = gm.rootTracker.Roots() + + require.Len(t, roots, 2) + require.Equal(t, roots[0], root0) + require.Equal(t, roots[1], [32]byte{253, 232, 31, 10, 168, 25, 42, 0, 28, 221, 146, 119, 34, 212, 121, 51, 82, 55, 113, 181, 236, 3, 11, 190, 194, 144, 125, 59, 46, 171, 90, 43}) + + events = []*contracts.RLNMemberRegistered{ + eventBuilder(1, false, 0xbbbb, 2), + eventBuilder(2, false, 0xcccc, 3), + eventBuilder(3, false, 0xdddd, 4), + eventBuilder(4, false, 0xeeee, 5), + } + + err = handler(gm, events) + require.NoError(t, err) + + // Root[1] should become [0] + roots = gm.rootTracker.Roots() + require.Len(t, roots, 5) + require.Equal(t, roots[0], [32]byte{253, 232, 31, 10, 168, 25, 42, 0, 28, 221, 146, 119, 34, 212, 121, 51, 82, 55, 113, 181, 236, 3, 11, 190, 194, 144, 125, 59, 46, 171, 90, 43}) + require.Len(t, rootTracker.Buffer(), 1) + require.Equal(t, rootTracker.Buffer()[0], [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33}) + + // We detect a fork + // + // [0] -> [1] -> [2] -> [3] -> [4] Our chain + // \ + // \--> Real chain + // We should restore the valid roots from the buffer at the state the moment the chain forked + // In this case, just adding the original merkle root from empty tree + validRootsBeforeFork := roots[0:3] + events = []*contracts.RLNMemberRegistered{eventBuilder(3, true, 0xdddd, 4)} + + err = handler(gm, events) + require.NoError(t, err) + + roots = gm.rootTracker.Roots() + require.Len(t, roots, 4) + require.Equal(t, roots[0], root0) + require.Equal(t, roots[1], validRootsBeforeFork[0]) + require.Equal(t, roots[2], validRootsBeforeFork[1]) + require.Equal(t, roots[3], validRootsBeforeFork[2]) + require.Len(t, rootTracker.Buffer(), 0) + + // Adding multiple events for same block + events = []*contracts.RLNMemberRegistered{ + eventBuilder(3, false, 0xdddd, 4), + eventBuilder(3, false, 0xeeee, 5), + } + + err = handler(gm, events) + require.NoError(t, err) + + roots = gm.rootTracker.Roots() + require.Len(t, roots, 5) + +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go index fd56ac3a..e15f4598 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/web3.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/web3.go @@ -94,22 +94,7 @@ func (gm *DynamicGroupManager) Register(ctx context.Context) (*r.MembershipIndex } // the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = func(pubkey r.IDCommitment, index r.MembershipIndex) error - -func (gm *DynamicGroupManager) processLogs(evt *contracts.RLNMemberRegistered, handler RegistrationEventHandler) error { - if evt == nil { - return nil - } - - var pubkey r.IDCommitment = r.Bytes32(evt.Pubkey.Bytes()) - - index := evt.Index.Int64() - if index <= gm.lastIndexLoaded { - return nil - } - gm.lastIndexLoaded = index - return handler(pubkey, r.MembershipIndex(uint(evt.Index.Int64()))) -} +type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error // HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract // It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` @@ -128,36 +113,21 @@ func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler R } func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler) error { - logIterator, err := rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: 0, End: nil, Context: ctx}) + events, err := gm.getEvents(ctx, 0, nil) if err != nil { return err } - for { - if !logIterator.Next() { - break - } - - if logIterator.Error() != nil { - return logIterator.Error() - } - - err = gm.processLogs(logIterator.Event, handler) - if err != nil { - return err - } - } - return nil + return handler(gm, events) } func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { defer gm.wg.Done() // Watch for new events - logSink := make(chan *contracts.RLNMemberRegistered) - firstErr := true + headerCh := make(chan *types.Header) subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { - subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: ctx, Start: nil}, logSink) + s, err := gm.ethClient.SubscribeNewHead(ctx, headerCh) if err != nil { if err == rpc.ErrNotificationsUnsupported { err = errors.New("notifications not supported. The node must support websockets") @@ -169,16 +139,23 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * } firstErr = false close(errCh) - return subs, err + return s, err }) defer subs.Unsubscribe() - defer close(logSink) + defer close(headerCh) for { select { - case evt := <-logSink: - err := gm.processLogs(evt, handler) + case h := <-headerCh: + blk := h.Number.Uint64() + events, err := gm.getEvents(ctx, blk, &blk) + if err != nil { + gm.log.Error("obtaining rln events", zap.Error(err)) + + } + + err = handler(gm, events) if err != nil { gm.log.Error("processing rln log", zap.Error(err)) } @@ -192,3 +169,26 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * } } } + +func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { + logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx}) + if err != nil { + return nil, err + } + + var results []*contracts.RLNMemberRegistered + + for { + if !logIterator.Next() { + break + } + + if logIterator.Error() != nil { + return nil, logIterator.Error() + } + + results = append(results, logIterator.Event) + } + + return results, nil +} diff --git a/waku/v2/protocol/rln/group_manager/root_tracker.go b/waku/v2/protocol/rln/group_manager/root_tracker.go index 4d78642c..5fa29573 100644 --- a/waku/v2/protocol/rln/group_manager/root_tracker.go +++ b/waku/v2/protocol/rln/group_manager/root_tracker.go @@ -1,34 +1,133 @@ package group_manager -import "github.com/waku-org/go-zerokit-rln/rln" +import ( + "sync" -type MerkleRootTracker struct { - rln *rln.RLN - acceptableRootWindowSize int - validMerkleRoots []rln.MerkleNode + "github.com/waku-org/go-zerokit-rln/rln" +) + +type RootsPerBlock struct { + root rln.MerkleNode + blockNumber uint64 } -func NewMerkleRootTracker(acceptableRootWindowSize int, rlnInstance *rln.RLN) *MerkleRootTracker { - return &MerkleRootTracker{ +type MerkleRootTracker struct { + sync.RWMutex + + rln *rln.RLN + acceptableRootWindowSize int + validMerkleRoots []RootsPerBlock + merkleRootBuffer []RootsPerBlock +} + +const maxBufferSize = 20 + +func NewMerkleRootTracker(acceptableRootWindowSize int, rlnInstance *rln.RLN) (*MerkleRootTracker, error) { + result := &MerkleRootTracker{ acceptableRootWindowSize: acceptableRootWindowSize, rln: rlnInstance, } -} -func (m *MerkleRootTracker) Sync() error { - root, err := m.rln.GetMerkleRoot() + _, err := result.UpdateLatestRoot(0) if err != nil { - return err + return nil, err } - m.validMerkleRoots = append(m.validMerkleRoots, root) + return result, nil +} + +func (m *MerkleRootTracker) Backfill(fromBlockNumber uint64) { + m.Lock() + defer m.Unlock() + + numBlocks := 0 + for i := len(m.validMerkleRoots) - 1; i >= 0; i-- { + if m.validMerkleRoots[i].blockNumber >= fromBlockNumber { + numBlocks++ + } + } + + if numBlocks == 0 { + return + } + + // Remove last roots + rootsToPop := numBlocks + if len(m.validMerkleRoots) < rootsToPop { + rootsToPop = len(m.validMerkleRoots) + } + m.validMerkleRoots = m.validMerkleRoots[0 : len(m.validMerkleRoots)-rootsToPop] + + if len(m.merkleRootBuffer) == 0 { + return + } + + // Backfill the tree's acceptable roots + rootsToRestore := numBlocks + bufferLen := len(m.merkleRootBuffer) + if bufferLen < rootsToRestore { + rootsToRestore = bufferLen + } + for i := 0; i < rootsToRestore; i++ { + x, newRootBuffer := m.merkleRootBuffer[len(m.merkleRootBuffer)-1], m.merkleRootBuffer[:len(m.merkleRootBuffer)-1] // Pop + m.validMerkleRoots = append([]RootsPerBlock{x}, m.validMerkleRoots...) + m.merkleRootBuffer = newRootBuffer + } +} + +func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) (rln.MerkleNode, error) { + m.Lock() + defer m.Unlock() + + root, err := m.rln.GetMerkleRoot() + if err != nil { + return [32]byte{}, err + } + + m.pushRoot(blockNumber, root) + + return root, nil +} + +func (m *MerkleRootTracker) pushRoot(blockNumber uint64, root [32]byte) { + m.validMerkleRoots = append(m.validMerkleRoots, RootsPerBlock{ + root: root, + blockNumber: blockNumber, + }) + + // Maintain valid merkle root window if len(m.validMerkleRoots) > m.acceptableRootWindowSize { + m.merkleRootBuffer = append(m.merkleRootBuffer, m.validMerkleRoots[0]) m.validMerkleRoots = m.validMerkleRoots[1:] } - return nil + // Maintain merkle root buffer + if len(m.merkleRootBuffer) > maxBufferSize { + m.merkleRootBuffer = m.merkleRootBuffer[1:] + } + } func (m *MerkleRootTracker) Roots() []rln.MerkleNode { - return m.validMerkleRoots + m.RLock() + defer m.RUnlock() + + result := make([]rln.MerkleNode, len(m.validMerkleRoots)) + for i := range m.validMerkleRoots { + result[i] = m.validMerkleRoots[i].root + } + + return result +} + +func (m *MerkleRootTracker) Buffer() []rln.MerkleNode { + m.RLock() + defer m.RUnlock() + + result := make([]rln.MerkleNode, len(m.merkleRootBuffer)) + for i := range m.merkleRootBuffer { + result[i] = m.merkleRootBuffer[i].root + } + + return result } diff --git a/waku/v2/protocol/rln/group_manager/static/static.go b/waku/v2/protocol/rln/group_manager/static/static.go index fe2b467b..346e2dde 100644 --- a/waku/v2/protocol/rln/group_manager/static/static.go +++ b/waku/v2/protocol/rln/group_manager/static/static.go @@ -45,18 +45,9 @@ func (gm *StaticGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, r gm.rln = rlnInstance gm.rootTracker = rootTracker - err := rootTracker.Sync() - if err != nil { - return err - } - // add members to the Merkle tree - for _, member := range gm.group { - if err := rlnInstance.InsertMember(member); err != nil { - return err - } - - err = rootTracker.Sync() + for i, member := range gm.group { + err := gm.insertMember(member, uint64(i+1)) if err != nil { return err } @@ -67,8 +58,9 @@ func (gm *StaticGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, r return nil } -func (gm *StaticGroupManager) InsertMember(pubkey rln.IDCommitment) error { - gm.log.Debug("a new key is added", zap.Binary("pubkey", pubkey[:])) +func (gm *StaticGroupManager) insertMember(pubkey rln.IDCommitment, index uint64) error { + gm.log.Debug("a new key is added", zap.Binary("pubkey", pubkey[:]), zap.Uint64("index", index)) + // assuming all the members arrive in order err := gm.rln.InsertMember(pubkey) if err != nil { @@ -76,7 +68,7 @@ func (gm *StaticGroupManager) InsertMember(pubkey rln.IDCommitment) error { return err } - err = gm.rootTracker.Sync() + _, err = gm.rootTracker.UpdateLatestRoot(index) if err != nil { return err } diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index f1673937..bf7c47cc 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -78,9 +78,12 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { rlnInstance, err := r.NewRLN() s.Require().NoError(err) + rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + s.Require().NoError(err) + rlnRelay := &WakuRLNRelay{ nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), - rootTracker: group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance), + rootTracker: rootTracker, } epoch := r.GetCurrentEpoch() @@ -166,9 +169,12 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) s.Require().NoError(err) + rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + s.Require().NoError(err) + rlnRelay := &WakuRLNRelay{ groupManager: groupManager, - rootTracker: group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance), + rootTracker: rootTracker, RLN: rlnInstance, nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), log: utils.Logger(), @@ -177,6 +183,9 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { //get the current epoch time now := time.Now() + err = groupManager.Start(context.Background(), rlnInstance, rootTracker) + s.Require().NoError(err) + // create some messages from the same peer and append rln proof to them, except wm4 wm1 := &pb.WakuMessage{Payload: []byte("Valid message")} diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 18122570..f87b7c27 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -62,11 +62,16 @@ func New( return nil, err } + rootTracker, err := group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance) + if err != nil { + return nil, err + } + // create the WakuRLNRelay rlnPeer := &WakuRLNRelay{ RLN: rlnInstance, groupManager: groupManager, - rootTracker: group_manager.NewMerkleRootTracker(AcceptableRootWindowSize, rlnInstance), + rootTracker: rootTracker, pubsubTopic: pubsubTopic, contentTopic: contentTopic, relay: relay,