From 08cabab41f2f39bc2a7ada3c208de4976565f3a9 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Thu, 7 Sep 2023 23:23:48 +0700 Subject: [PATCH] fix: and optimising fetching membership events (#706) * fix: and optimising fetching membership events * fix: start from lastProcessedBlock+1 * test: fetching membership logic * refactor: usage of rlnInstance,rootTracker,groupManager rlnInstance, rootTrack were previously created while creating rlnRelay but were assigned to groupManager on Start of rlnRelay. This created unncessary dependency of passing them to static and dynamic group manager. Web3Config uses interface EthClientI for client, so that we can pass mock client for testing MembershipFetcher. * fix: failing test * fix: lint error * fix: account for PR suggestions * fix: failing race test * fix: dont' increase fromBlock on error * nit: fix naming and add comments --- waku/v2/node/wakunode2_rln.go | 22 +- .../rln/group_manager/dynamic/dynamic.go | 38 +-- .../rln/group_manager/dynamic/handler_test.go | 31 ++- .../dynamic/membership_fetcher.go | 227 ++++++++++++++++++ .../dynamic/membership_fetcher.json | 100 ++++++++ .../dynamic/membership_fetcher_test.go | 67 ++++++ .../rln/group_manager/dynamic/metadata.go | 10 - .../group_manager/dynamic/mock_blockchain.go | 81 +++++++ .../rln/group_manager/dynamic/mock_client.go | 118 +++++++++ .../rln/group_manager/dynamic/web3.go | 209 ---------------- .../rln/group_manager/group_manager.go | 20 ++ .../rln/group_manager/static/static.go | 9 +- waku/v2/protocol/rln/onchain_test.go | 47 ++-- waku/v2/protocol/rln/rln_relay_test.go | 38 ++- waku/v2/protocol/rln/waku_rln_relay.go | 61 ++--- waku/v2/protocol/rln/web3/web3.go | 13 +- 16 files changed, 755 insertions(+), 336 deletions(-) create mode 100644 waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go create mode 100644 waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json create mode 100644 waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go create mode 100644 waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go create mode 100644 waku/v2/protocol/rln/group_manager/dynamic/mock_client.go delete mode 100644 waku/v2/protocol/rln/group_manager/dynamic/web3.go diff --git a/waku/v2/node/wakunode2_rln.go b/waku/v2/node/wakunode2_rln.go index 456ce396..7ea5c18d 100644 --- a/waku/v2/node/wakunode2_rln.go +++ b/waku/v2/node/wakunode2_rln.go @@ -10,6 +10,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/waku-org/go-waku/waku/v2/protocol/rln" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static" "github.com/waku-org/go-waku/waku/v2/protocol/rln/keystore" @@ -23,12 +24,17 @@ func (w *WakuNode) RLNRelay() RLNRelay { func (w *WakuNode) setupRLNRelay() error { var err error - var groupManager rln.GroupManager if !w.opts.enableRLN { return nil } + var groupManager group_manager.GroupManager + + rlnInstance, rootTracker, err := rln.GetRLNInstanceAndRootTracker(w.opts.rlnTreePath) + if err != nil { + return err + } if !w.opts.rlnRelayDynamic { w.log.Info("setting up waku-rln-relay in off-chain mode") @@ -38,7 +44,8 @@ func (w *WakuNode) setupRLNRelay() error { return err } - groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log) + groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, rlnInstance, + rootTracker, w.log) if err != nil { return err } @@ -57,6 +64,8 @@ func (w *WakuNode) setupRLNRelay() error { appKeystore, w.opts.keystorePassword, w.opts.prometheusReg, + rlnInstance, + rootTracker, w.log, ) if err != nil { @@ -64,10 +73,11 @@ func (w *WakuNode) setupRLNRelay() error { } } - rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.opts.prometheusReg, w.log) - if err != nil { - return err - } + rlnRelay := rln.New(group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, w.timesource, w.opts.prometheusReg, w.log) w.rlnRelay = rlnRelay diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index 5b88b18a..302115ec 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -28,28 +27,21 @@ var RLNAppInfo = keystore.AppInfo{ } type DynamicGroupManager struct { - rln *rln.RLN - log *zap.Logger + MembershipFetcher metrics Metrics cancel context.CancelFunc - wg sync.WaitGroup identityCredential *rln.IdentityCredential membershipIndex rln.MembershipIndex - web3Config *web3.Config lastBlockProcessed uint64 - eventHandler RegistrationEventHandler - appKeystore *keystore.AppKeystore keystorePassword string - - rootTracker *group_manager.MerkleRootTracker } -func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) error { +func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { toRemoveTable := om.New() toInsertTable := om.New() @@ -116,18 +108,19 @@ func NewDynamicGroupManager( appKeystore *keystore.AppKeystore, keystorePassword string, reg prometheus.Registerer, + rlnInstance *rln.RLN, + rootTracker *group_manager.MerkleRootTracker, log *zap.Logger, ) (*DynamicGroupManager, error) { log = log.Named("rln-dynamic") + web3Config := web3.NewConfig(ethClientAddr, memContractAddr) return &DynamicGroupManager{ - membershipIndex: membershipIndex, - web3Config: web3.NewConfig(ethClientAddr, memContractAddr), - eventHandler: handler, - appKeystore: appKeystore, - keystorePassword: keystorePassword, - log: log, - metrics: newMetrics(reg), + membershipIndex: membershipIndex, + appKeystore: appKeystore, + keystorePassword: keystorePassword, + MembershipFetcher: NewMembershipFetcher(web3Config, rlnInstance, rootTracker, log), + metrics: newMetrics(reg), }, nil } @@ -139,7 +132,7 @@ func (gm *DynamicGroupManager) memberExists(ctx context.Context, idCommitment rl return gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment)) } -func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error { +func (gm *DynamicGroupManager) Start(ctx context.Context) error { if gm.cancel != nil { return errors.New("already started") } @@ -154,9 +147,6 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return err } - gm.rln = rlnInstance - gm.rootTracker = rootTracker - // check if the contract exists by calling a static function _, err = gm.getMembershipFee(ctx) if err != nil { @@ -168,7 +158,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return err } - if err = gm.HandleGroupUpdates(ctx, gm.eventHandler); err != nil { + if err = gm.MembershipFetcher.HandleGroupUpdates(ctx, gm.handler); err != nil { return err } @@ -278,9 +268,7 @@ func (gm *DynamicGroupManager) Stop() error { return err } - gm.web3Config.ETHClient.Close() - - gm.wg.Wait() + gm.MembershipFetcher.Stop() return nil } diff --git a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go index 1479b495..271e5699 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go @@ -3,7 +3,6 @@ package dynamic import ( "context" "math/big" - "sync" "testing" "github.com/ethereum/go-ethereum/core/types" @@ -35,21 +34,21 @@ func TestHandler(t *testing.T) { rootTracker, err := group_manager.NewMerkleRootTracker(5, rlnInstance) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.TODO()) + _, cancel := context.WithCancel(context.TODO()) defer cancel() - _ = ctx - gm := &DynamicGroupManager{ - rln: rlnInstance, - log: utils.Logger(), + MembershipFetcher: NewMembershipFetcher( + &web3.Config{ + ChainID: big.NewInt(1), + }, + rlnInstance, + rootTracker, + utils.Logger(), + ), cancel: cancel, - wg: sync.WaitGroup{}, - web3Config: &web3.Config{ - ChainID: big.NewInt(1), - }, - rootTracker: rootTracker, - metrics: newMetrics(prometheus.DefaultRegisterer), + + metrics: newMetrics(prometheus.DefaultRegisterer), } root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33} @@ -59,7 +58,7 @@ func TestHandler(t *testing.T) { events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)} - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -75,7 +74,7 @@ func TestHandler(t *testing.T) { eventBuilder(4, false, 0xeeee, 5), } - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) // Root[1] should become [0] @@ -99,7 +98,7 @@ func TestHandler(t *testing.T) { eventBuilder(3, false, 0xeeee, 5), } - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -113,7 +112,7 @@ func TestHandler(t *testing.T) { // Adding multiple events for same block events = []*contracts.RLNMemberRegistered{} - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go new file mode 100644 index 00000000..e0f3b057 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -0,0 +1,227 @@ +package dynamic + +import ( + "bytes" + "context" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-zerokit-rln/rln" + "go.uber.org/zap" +) + +// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface +type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error + +// for getting membershipRegsitered Events from the eth rpc +type MembershipFetcher struct { + web3Config *web3.Config + rln *rln.RLN + log *zap.Logger + rootTracker *group_manager.MerkleRootTracker + wg sync.WaitGroup +} + +func NewMembershipFetcher(web3Config *web3.Config, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker, log *zap.Logger) MembershipFetcher { + return MembershipFetcher{ + web3Config: web3Config, + rln: rln, + log: log, + rootTracker: rootTracker, + } +} + +// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract +// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` +// and collects all the events, for every received event, it calls the `handler` +func (mf *MembershipFetcher) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error { + fromBlock := mf.web3Config.RLNContract.DeployedBlockNumber + metadata, err := mf.GetMetadata() + if err != nil { + mf.log.Warn("could not load last processed block from metadata. Starting onchain sync from deployment block", zap.Error(err), zap.Uint64("deploymentBlock", mf.web3Config.RLNContract.DeployedBlockNumber)) + } else { + if mf.web3Config.ChainID.Cmp(metadata.ChainID) != 0 { + return errors.New("persisted data: chain id mismatch") + } + + if !bytes.Equal(mf.web3Config.RegistryContract.Address.Bytes(), metadata.ContractAddress.Bytes()) { + return errors.New("persisted data: contract address mismatch") + } + + fromBlock = metadata.LastProcessedBlock + 1 + mf.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock)) + } + + mf.rootTracker.SetValidRootsPerBlock(metadata.ValidRootsPerBlock) + // + latestBlockNumber, err := mf.latestBlockNumber(ctx) + if err != nil { + return err + } + // + err = mf.loadOldEvents(ctx, fromBlock, latestBlockNumber, handler) + if err != nil { + return err + } + + errCh := make(chan error) + + mf.wg.Add(1) + go mf.watchNewEvents(ctx, latestBlockNumber+1, handler, errCh) // we have already fetched the events for latestBlocNumber in oldEvents + return <-errCh +} + +func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlock uint64, handler RegistrationEventHandler) error { + for ; fromBlock+maxBatchSize < toBlock; fromBlock += maxBatchSize + 1 { // check if the end of the batch is within the toBlock range + events, err := mf.getEvents(ctx, fromBlock, fromBlock+maxBatchSize) + if err != nil { + return err + } + if err := handler(events); err != nil { + return err + } + } + + // + events, err := mf.getEvents(ctx, fromBlock, toBlock) + if err != nil { + return err + } + // process all the fetched events + return handler(events) +} + +func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) { + defer mf.wg.Done() + + // Watch for new events + firstErr := true + headerCh := make(chan *types.Header) + subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { + s, err := mf.web3Config.ETHClient.SubscribeNewHead(ctx, headerCh) + if err != nil { + if err == rpc.ErrNotificationsUnsupported { + err = errors.New("notifications not supported. The node must support websockets") + } + mf.log.Error("subscribing to rln events", zap.Error(err)) + } + if firstErr { // errCh can be closed only once + errCh <- err + close(errCh) + firstErr = false + } + return s, err + }) + + defer subs.Unsubscribe() + defer close(headerCh) + + for { + select { + case h := <-headerCh: + toBlock := h.Number.Uint64() + events, err := mf.getEvents(ctx, fromBlock, toBlock) + if err != nil { + mf.log.Error("obtaining rln events", zap.Error(err)) + } else { + // update the last processed block + fromBlock = toBlock + 1 + } + + err = handler(events) + if err != nil { + mf.log.Error("processing rln log", zap.Error(err)) + } + case <-ctx.Done(): + return + case err := <-subs.Err(): + if err != nil { + mf.log.Error("watching new events", zap.Error(err)) + } + return + } + } +} + +const maxBatchSize = uint64(5000) + +func tooMuchDataRequestedError(err error) bool { + // this error is only infura specific (other providers might have different error messages) + return err.Error() == "query returned more than 10000 results" +} + +func (mf *MembershipFetcher) latestBlockNumber(ctx context.Context) (uint64, error) { + block, err := mf.web3Config.ETHClient.BlockByNumber(ctx, nil) + if err != nil { + return 0, err + } + + return block.Number().Uint64(), nil +} + +func (mf *MembershipFetcher) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) { + evts, err := mf.fetchEvents(ctx, fromBlock, toBlock) + if err != nil { + if tooMuchDataRequestedError(err) { // divide the range and try again + mid := (fromBlock + toBlock) / 2 + firstHalfEvents, err := mf.getEvents(ctx, fromBlock, mid) + if err != nil { + return nil, err + } + secondHalfEvents, err := mf.getEvents(ctx, mid+1, toBlock) + if err != nil { + return nil, err + } + return append(firstHalfEvents, secondHalfEvents...), nil + } + return nil, err + } + return evts, nil +} + +func (mf *MembershipFetcher) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) { + logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx}) + if err != nil { + return nil, err + } + + var results []*contracts.RLNMemberRegistered + + for { + if !logIterator.Next() { + break + } + + if logIterator.Error() != nil { + return nil, logIterator.Error() + } + + results = append(results, logIterator.Event) + } + + return results, nil +} + +// GetMetadata retrieves metadata from the zerokit's RLN database +func (mf *MembershipFetcher) GetMetadata() (RLNMetadata, error) { + b, err := mf.rln.GetMetadata() + if err != nil { + return RLNMetadata{}, err + } + + return DeserializeMetadata(b) +} + +func (mf *MembershipFetcher) Stop() { + mf.web3Config.ETHClient.Close() + // wait for the watchNewEvents goroutine to finish + mf.wg.Wait() +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json new file mode 100644 index 00000000..fe8e2bb3 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json @@ -0,0 +1,100 @@ +{ + "blocks": { + "5": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:1", + "bigint:1" + ] + } + ], + "5005": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:2", + "bigint:2" + ] + } + ], + "5006": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:3", + "bigint:3" + ] + } + ], + "5007": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:4", + "bigint:4" + ] + } + ], + "10005": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:5", + "bigint:5" + ] + } + ], + "10010": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:6", + "bigint:6" + ] + } + ], + "10011": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:7", + "bigint:7" + ] + } + ], + "10012": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:8", + "bigint:8" + ] + } + ] + } +} \ No newline at end of file diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go new file mode 100644 index 00000000..1cf0017b --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go @@ -0,0 +1,67 @@ +package dynamic + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-zerokit-rln/rln" +) + +func TestFetchingLogic(t *testing.T) { + client := NewMockClient(t, "membership_fetcher.json") + + rlnContract, err := contracts.NewRLN(common.Address{}, client) + require.NoError(t, err) + rlnInstance, err := rln.NewRLN() + require.NoError(t, err) + rootTracker, err := group_manager.NewMerkleRootTracker(1, rlnInstance) + require.NoError(t, err) + // + mf := MembershipFetcher{ + web3Config: &web3.Config{ + RLNContract: web3.RLNContract{ + RLN: rlnContract, + }, + ETHClient: client, + }, + rln: rlnInstance, + log: utils.Logger(), + rootTracker: rootTracker, + } + + counts := []int{} + mockFn := func(events []*contracts.RLNMemberRegistered) error { + counts = append(counts, len(events)) + return nil + } + // check if more than 10k error is handled or not. + client.SetErrorOnBlock(5007, fmt.Errorf("query returned more than 10000 results"), 2) + // loadOldEvents will check till 10010 + client.SetLatestBlockNumber(10010) + // watchNewEvents will check till 10012 + ctx, cancel := context.WithCancel(context.Background()) + if err := mf.HandleGroupUpdates(ctx, mockFn); err != nil { + t.Fatal(err) + } + go func() { + for { + if client.latestBlockNum.Load() == 10012 { + cancel() + return + } + time.Sleep(time.Second) + } + }() + mf.Stop() + // sleep so that watchNewEvents can finish + // check whether all the events are fetched or not. + require.Equal(t, counts, []int{1, 3, 2, 1, 1}) +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/metadata.go b/waku/v2/protocol/rln/group_manager/dynamic/metadata.go index 0d8e4559..a430a810 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/metadata.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/metadata.go @@ -79,13 +79,3 @@ func (gm *DynamicGroupManager) SetMetadata(meta RLNMetadata) error { b := meta.Serialize() return gm.rln.SetMetadata(b) } - -// GetMetadata retrieves metadata from the zerokit's RLN database -func (gm *DynamicGroupManager) GetMetadata() (RLNMetadata, error) { - b, err := gm.rln.GetMetadata() - if err != nil { - return RLNMetadata{}, err - } - - return DeserializeMetadata(b) -} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go b/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go new file mode 100644 index 00000000..ba0fb800 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go @@ -0,0 +1,81 @@ +package dynamic + +import ( + "math/big" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" +) + +// MockBlockChain is currently a chain of events for different block numbers +// it is used internal by mock client for returning events for a given block number or range in FilterLog rpc call. +type MockBlockChain struct { + Blocks map[int64]*MockBlock `json:"blocks"` +} + +type MockBlock []MockEvent + +func containsEntry[T common.Hash | common.Address](topics []T, topicA T) bool { + for _, topic := range topics { + if topic == topicA { + return true + } + } + return false +} + +func Topic(topic string) common.Hash { + return crypto.Keccak256Hash([]byte(topic)) +} +func (b MockBlock) getLogs(blockNum uint64, addrs []common.Address, topicA []common.Hash) (txLogs []types.Log) { + for ind, event := range b { + txLog := event.GetLog() + if containsEntry(addrs, txLog.Address) && (len(topicA) == 0 || containsEntry(topicA, txLog.Topics[0])) { + txLog.BlockNumber = blockNum + txLog.Index = uint(ind) + txLogs = append(txLogs, txLog) + } + } + return +} + +type MockEvent struct { + Address common.Address `json:"address"` + Topics []string `json:"topics"` + Txhash common.Hash `json:"txhash"` + Data []string `json:"data"` +} + +func (e MockEvent) GetLog() types.Log { + topics := []common.Hash{Topic(e.Topics[0])} + for _, topic := range e.Topics[1:] { + topics = append(topics, parseData(topic)) + } + // + var data []byte + for _, entry := range e.Data { + data = append(data, parseData(entry).Bytes()...) + } + return types.Log{ + Address: e.Address, + Topics: topics, + TxHash: e.Txhash, + Data: data, + } +} + +func parseData(data string) common.Hash { + splits := strings.Split(data, ":") + switch splits[0] { + case "bigint": + bigInt, ok := new(big.Int).SetString(splits[1], 10) + if !ok { + panic("invalid big int") + } + return common.BytesToHash(bigInt.Bytes()) + default: + panic("invalid data type") + } +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go b/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go new file mode 100644 index 00000000..e63eebd1 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go @@ -0,0 +1,118 @@ +package dynamic + +import ( + "context" + "encoding/json" + "io/ioutil" + "math/big" + "sort" + "sync/atomic" + "testing" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type ErrCount struct { + err error + count int +} + +type MockClient struct { + ethclient.Client + blockChain MockBlockChain + latestBlockNum atomic.Int64 + errOnBlock map[int64]*ErrCount +} + +func (c *MockClient) SetLatestBlockNumber(num int64) { + c.latestBlockNum.Store(num) +} + +func (c *MockClient) Close() { + +} +func (c *MockClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return types.NewBlock(&types.Header{Number: big.NewInt(c.latestBlockNum.Load())}, nil, nil, nil, nil), nil +} +func NewMockClient(t *testing.T, blockFile string) *MockClient { + blockChain := MockBlockChain{} + data, err := ioutil.ReadFile(blockFile) + if err != nil { + t.Fatal(err) + } + if err := json.Unmarshal(data, &blockChain); err != nil { + t.Fatal(err) + } + return &MockClient{blockChain: blockChain, errOnBlock: map[int64]*ErrCount{}} +} + +func (client *MockClient) SetErrorOnBlock(blockNum int64, err error, count int) { + client.errOnBlock[blockNum] = &ErrCount{err: err, count: count} +} + +func (c *MockClient) getFromAndToRange(query ethereum.FilterQuery) (int64, int64) { + var fromBlock int64 + if query.FromBlock == nil { + fromBlock = 0 + } else { + fromBlock = query.FromBlock.Int64() + } + + var toBlock int64 + if query.ToBlock == nil { + toBlock = 0 + } else { + toBlock = query.ToBlock.Int64() + } + return fromBlock, toBlock +} +func (c *MockClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) (allTxLogs []types.Log, err error) { + fromBlock, toBlock := c.getFromAndToRange(query) + for block, details := range c.blockChain.Blocks { + if block >= fromBlock && block <= toBlock { + if txLogs := details.getLogs(uint64(block), query.Addresses, query.Topics[0]); len(txLogs) != 0 { + allTxLogs = append(allTxLogs, txLogs...) + } + if errCount, ok := c.errOnBlock[block]; ok && errCount.count != 0 { + errCount.count-- + return nil, errCount.err + } + } + } + sort.Slice(allTxLogs, func(i, j int) bool { + return allTxLogs[i].BlockNumber < allTxLogs[j].BlockNumber || + (allTxLogs[i].BlockNumber == allTxLogs[j].BlockNumber && allTxLogs[i].Index < allTxLogs[j].Index) + }) + return allTxLogs, nil +} + +func (c *MockClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + for { + next := c.latestBlockNum.Load() + 1 + if c.blockChain.Blocks[next] != nil { + ch <- &types.Header{Number: big.NewInt(next)} + c.latestBlockNum.Store(next) + } else { + break + } + } + return testNoopSub{}, nil +} + +type testNoopSub struct { +} + +func (testNoopSub) Unsubscribe() { + +} + +// Err returns the subscription error channel. The error channel receives +// a value if there is an issue with the subscription (e.g. the network connection +// delivering the events has been closed). Only one value will ever be sent. +// The error channel is closed by Unsubscribe. +func (testNoopSub) Err() <-chan error { + ch := make(chan error) + return ch +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go deleted file mode 100644 index a3c7aa32..00000000 --- a/waku/v2/protocol/rln/group_manager/dynamic/web3.go +++ /dev/null @@ -1,209 +0,0 @@ -package dynamic - -import ( - "bytes" - "context" - "errors" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/rpc" - "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" - "go.uber.org/zap" -) - -// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error - -// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract -// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` -// and collects all the events, for every received event, it calls the `handler` -func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error { - fromBlock := gm.web3Config.RLNContract.DeployedBlockNumber - metadata, err := gm.GetMetadata() - if err != nil { - gm.log.Warn("could not load last processed block from metadata. Starting onchain sync from deployment block", zap.Error(err), zap.Uint64("deploymentBlock", gm.web3Config.RLNContract.DeployedBlockNumber)) - } else { - if gm.web3Config.ChainID.Cmp(metadata.ChainID) != 0 { - return errors.New("persisted data: chain id mismatch") - } - - if !bytes.Equal(gm.web3Config.RegistryContract.Address.Bytes(), metadata.ContractAddress.Bytes()) { - return errors.New("persisted data: contract address mismatch") - } - - fromBlock = metadata.LastProcessedBlock - gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock)) - } - - gm.rootTracker.SetValidRootsPerBlock(metadata.ValidRootsPerBlock) - - err = gm.loadOldEvents(ctx, fromBlock, handler) - if err != nil { - return err - } - - errCh := make(chan error) - - gm.wg.Add(1) - go gm.watchNewEvents(ctx, handler, gm.log, errCh) - return <-errCh -} - -func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler) error { - events, err := gm.getEvents(ctx, fromBlock, nil) - if err != nil { - return err - } - return handler(gm, events) -} - -func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { - defer gm.wg.Done() - - // Watch for new events - firstErr := true - headerCh := make(chan *types.Header) - subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { - s, err := gm.web3Config.ETHClient.SubscribeNewHead(ctx, headerCh) - if err != nil { - if err == rpc.ErrNotificationsUnsupported { - err = errors.New("notifications not supported. The node must support websockets") - } - if firstErr { - errCh <- err - } - gm.log.Error("subscribing to rln events", zap.Error(err)) - } - firstErr = false - close(errCh) - return s, err - }) - - defer subs.Unsubscribe() - defer close(headerCh) - - for { - select { - case h := <-headerCh: - blk := h.Number.Uint64() - events, err := gm.getEvents(ctx, blk, &blk) - if err != nil { - gm.log.Error("obtaining rln events", zap.Error(err)) - } - - err = handler(gm, events) - if err != nil { - gm.log.Error("processing rln log", zap.Error(err)) - } - case <-ctx.Done(): - return - case err := <-subs.Err(): - if err != nil { - gm.log.Error("watching new events", zap.Error(err)) - } - return - } - } -} - -const maxBatchSize = uint64(5000) -const additiveFactorMultiplier = 0.10 -const multiplicativeDecreaseDivisor = 2 - -func tooMuchDataRequestedError(err error) bool { - // this error is only infura specific (other providers might have different error messages) - return err.Error() == "query returned more than 10000 results" -} - -func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - var results []*contracts.RLNMemberRegistered - - // Adapted from prysm logic for fetching historical logs - - toBlock := to - if to == nil { - block, err := gm.web3Config.ETHClient.BlockByNumber(ctx, nil) - if err != nil { - return nil, err - } - - blockNumber := block.Number().Uint64() - toBlock = &blockNumber - } - - if from == *toBlock { // Only loading a single block - return gm.fetchEvents(ctx, from, toBlock) - } - - // Fetching blocks in batches - batchSize := maxBatchSize - additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier) - - currentBlockNum := from - for currentBlockNum < *toBlock { - start := currentBlockNum - end := currentBlockNum + batchSize - if end > *toBlock { - end = *toBlock - } - - gm.log.Info("loading events...", zap.Uint64("fromBlock", start), zap.Uint64("toBlock", end)) - - evts, err := gm.fetchEvents(ctx, start, &end) - if err != nil { - if tooMuchDataRequestedError(err) { - if batchSize == 0 { - return nil, errors.New("batch size is zero") - } - - // multiplicative decrease - batchSize = batchSize / multiplicativeDecreaseDivisor - - gm.log.Warn("too many logs requested!, retrying with a smaller chunk size", zap.Uint64("batchSize", batchSize)) - - continue - } - return nil, err - } - - results = append(results, evts...) - - currentBlockNum = end - - if batchSize < maxBatchSize { - // update the batchSize with additive increase - batchSize = batchSize + additiveFactor - if batchSize > maxBatchSize { - batchSize = maxBatchSize - } - } - } - - return results, nil -} - -func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - logIterator, err := gm.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx}) - if err != nil { - return nil, err - } - - var results []*contracts.RLNMemberRegistered - - for { - if !logIterator.Next() { - break - } - - if logIterator.Error() != nil { - return nil, logIterator.Error() - } - - results = append(results, logIterator.Event) - } - - return results, nil -} diff --git a/waku/v2/protocol/rln/group_manager/group_manager.go b/waku/v2/protocol/rln/group_manager/group_manager.go index 453d0228..3dbec886 100644 --- a/waku/v2/protocol/rln/group_manager/group_manager.go +++ b/waku/v2/protocol/rln/group_manager/group_manager.go @@ -1 +1,21 @@ package group_manager + +import ( + "context" + + "github.com/waku-org/go-zerokit-rln/rln" +) + +type GroupManager interface { + Start(ctx context.Context) error + IdentityCredentials() (rln.IdentityCredential, error) + MembershipIndex() rln.MembershipIndex + Stop() error +} + +type Details struct { + GroupManager GroupManager + RootTracker *MerkleRootTracker + + RLN *rln.RLN +} diff --git a/waku/v2/protocol/rln/group_manager/static/static.go b/waku/v2/protocol/rln/group_manager/static/static.go index 432a054e..251bf1b4 100644 --- a/waku/v2/protocol/rln/group_manager/static/static.go +++ b/waku/v2/protocol/rln/group_manager/static/static.go @@ -25,6 +25,8 @@ func NewStaticGroupManager( group []rln.IDCommitment, identityCredential rln.IdentityCredential, index rln.MembershipIndex, + rlnInstance *rln.RLN, + rootTracker *group_manager.MerkleRootTracker, log *zap.Logger, ) (*StaticGroupManager, error) { // check the peer's index and the inclusion of user's identity commitment in the group @@ -37,15 +39,14 @@ func NewStaticGroupManager( group: group, identityCredential: &identityCredential, membershipIndex: index, + rln: rlnInstance, + rootTracker: rootTracker, }, nil } -func (gm *StaticGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error { +func (gm *StaticGroupManager) Start(ctx context.Context) error { gm.log.Info("mounting rln-relay in off-chain/static mode") - gm.rln = rlnInstance - gm.rootTracker = rootTracker - // add members to the Merkle tree err := gm.insertMembers(gm.group) diff --git a/waku/v2/protocol/rln/onchain_test.go b/waku/v2/protocol/rln/onchain_test.go index 4b396716..41299a9b 100644 --- a/waku/v2/protocol/rln/onchain_test.go +++ b/waku/v2/protocol/rln/onchain_test.go @@ -149,14 +149,16 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() { membershipIndex := s.register(appKeystore, u1Credentials, s.u1PrivKey) - gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rlnInstance, rt, utils.Logger()) s.Require().NoError(err) // initialize the WakuRLNRelay rlnRelay := &WakuRLNRelay{ - rootTracker: rt, - groupManager: gm, - RLN: rlnInstance, + Details: group_manager.Details{ + RootTracker: rt, + GroupManager: gm, + RLN: rlnInstance, + }, log: utils.Logger(), nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), } @@ -231,11 +233,17 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() { membershipIndex := s.register(appKeystore, credentials1, s.u1PrivKey) membershipIndex = s.register(appKeystore, credentials2, s.u1PrivKey) + rlnInstance, rootTracker, err := GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) // mount the rln relay protocol in the on-chain/dynamic mode - gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rlnInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay, err := New(gm, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + rlnRelay := New(group_manager.Details{ + RLN: rlnInstance, + RootTracker: rootTracker, + GroupManager: gm, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = rlnRelay.Start(context.TODO()) @@ -265,11 +273,17 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { membershipGroupIndex := s.register(appKeystore, credentials1, s.u1PrivKey) // mount the rln relay protocol in the on-chain/dynamic mode - gm1, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + rootInstance, rootTracker, err := GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) + gm1, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rootInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay1, err := New(gm1, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + rlnRelay1 := New(group_manager.Details{ + GroupManager: gm1, + RootTracker: rootTracker, + RLN: rootInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + err = rlnRelay1.Start(context.TODO()) s.Require().NoError(err) @@ -283,11 +297,16 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { membershipGroupIndex = s.register(appKeystore2, credentials2, s.u2PrivKey) // mount the rln relay protocol in the on-chain/dynamic mode - gm2, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore2, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + rootInstance, rootTracker, err = GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) + gm2, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore2, keystorePassword, prometheus.DefaultRegisterer, rootInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay2, err := New(gm2, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + rlnRelay2 := New(group_manager.Details{ + GroupManager: gm2, + RootTracker: rootTracker, + RLN: rootInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) err = rlnRelay2.Start(context.TODO()) s.Require().NoError(err) @@ -295,8 +314,8 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { // the two nodes should be registered into the contract // since nodes are spun up sequentially // the first node has index 0 whereas the second node gets index 1 - idx1 := rlnRelay1.groupManager.MembershipIndex() - idx2 := rlnRelay2.groupManager.MembershipIndex() + idx1 := rlnRelay1.GroupManager.MembershipIndex() + idx2 := rlnRelay2.GroupManager.MembershipIndex() s.Require().NoError(err) s.Require().Equal(rln.MembershipIndex(0), idx1) s.Require().Equal(rln.MembershipIndex(1), idx2) diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index bf755303..1f99f12f 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -50,17 +50,24 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { groupIDCommitments = append(groupIDCommitments, c.IDCommitment) } + rlnInstance, rootTracker, err := GetRLNInstanceAndRootTracker("") + s.Require().NoError(err) + // index indicates the position of a membership key pair in the static list of group keys i.e., groupKeyPairs // the corresponding key pair will be used to mount rlnRelay on the current node // index also represents the index of the leaf in the Merkle tree that contains node's commitment key index := r.MembershipIndex(5) + // idCredential := groupKeyPairs[index] - groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) + groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, rlnInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + wakuRLNRelay := New(group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) err = wakuRLNRelay.Start(context.TODO()) s.Require().NoError(err) @@ -85,7 +92,9 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { rlnRelay := &WakuRLNRelay{ nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), - rootTracker: rootTracker, + Details: group_manager.Details{ + RootTracker: rootTracker, + }, } epoch := r.GetCurrentEpoch() @@ -166,18 +175,21 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { // Create a RLN instance rlnInstance, err := r.NewRLN() s.Require().NoError(err) - - idCredential := groupKeyPairs[index] - groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) - s.Require().NoError(err) - + // rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) s.Require().NoError(err) + // + idCredential := groupKeyPairs[index] + groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, rlnInstance, rootTracker, utils.Logger()) + s.Require().NoError(err) rlnRelay := &WakuRLNRelay{ - groupManager: groupManager, - rootTracker: rootTracker, - RLN: rlnInstance, + Details: group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, + nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), log: utils.Logger(), metrics: newMetrics(prometheus.DefaultRegisterer), @@ -186,7 +198,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { //get the current epoch time now := time.Now() - err = groupManager.Start(context.Background(), rlnInstance, rootTracker) + err = groupManager.Start(context.Background()) s.Require().NoError(err) // create some messages from the same peer and append rln proof to them, except wm4 diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 0f6c9345..fab6ae86 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -22,21 +22,11 @@ import ( proto "google.golang.org/protobuf/proto" ) -type GroupManager interface { - Start(ctx context.Context, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error - IdentityCredentials() (rln.IdentityCredential, error) - MembershipIndex() rln.MembershipIndex - Stop() error -} - type WakuRLNRelay struct { timesource timesource.Timesource metrics Metrics - groupManager GroupManager - rootTracker *group_manager.MerkleRootTracker - - RLN *rln.RLN + group_manager.Details // the log of nullifiers and Shamir shares of the past messages grouped per epoch nullifierLogLock sync.RWMutex @@ -47,20 +37,11 @@ type WakuRLNRelay struct { const rlnDefaultTreePath = "./rln_tree.db" -func New( - groupManager GroupManager, - treePath string, - timesource timesource.Timesource, - reg prometheus.Registerer, - log *zap.Logger) (*WakuRLNRelay, error) { - +func GetRLNInstanceAndRootTracker(treePath string) (*rln.RLN, *group_manager.MerkleRootTracker, error) { if treePath == "" { treePath = rlnDefaultTreePath } - metrics := newMetrics(reg) - - start := time.Now() rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{ CacheCapacity: 15000, Mode: rln.HighThroughput, @@ -69,31 +50,35 @@ func New( Path: treePath, }) if err != nil { - return nil, err + return nil, nil, err } - metrics.RecordInstanceCreation(time.Since(start)) rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) if err != nil { - return nil, err + return nil, nil, err } + return rlnInstance, rootTracker, nil +} +func New( + Details group_manager.Details, + timesource timesource.Timesource, + reg prometheus.Registerer, + log *zap.Logger) *WakuRLNRelay { // create the WakuRLNRelay rlnPeer := &WakuRLNRelay{ - RLN: rlnInstance, - groupManager: groupManager, - rootTracker: rootTracker, - metrics: metrics, + Details: Details, + metrics: newMetrics(reg), log: log, timesource: timesource, nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), } - return rlnPeer, nil + return rlnPeer } func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error { - err := rlnRelay.groupManager.Start(ctx, rlnRelay.RLN, rlnRelay.rootTracker) + err := rlnRelay.GroupManager.Start(ctx) if err != nil { return err } @@ -105,7 +90,7 @@ func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error { // Stop will stop any operation or goroutine started while using WakuRLNRelay func (rlnRelay *WakuRLNRelay) Stop() error { - return rlnRelay.groupManager.Stop() + return rlnRelay.GroupManager.Stop() } func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { @@ -214,7 +199,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime return invalidMessage, nil } - if !(rlnRelay.rootTracker.ContainsRoot(msgProof.MerkleRoot)) { + if !(rlnRelay.RootTracker.ContainsRoot(msgProof.MerkleRoot)) { rlnRelay.log.Debug("invalid message: unexpected root", logging.HexBytes("msgRoot", msg.RateLimitProof.MerkleRoot)) rlnRelay.metrics.RecordInvalidMessage(invalidRoot) return invalidMessage, nil @@ -261,7 +246,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime rlnRelay.log.Debug("message is valid") - rootIndex := rlnRelay.rootTracker.IndexOf(msgProof.MerkleRoot) + rootIndex := rlnRelay.RootTracker.IndexOf(msgProof.MerkleRoot) rlnRelay.metrics.RecordValidMessages(rootIndex) return validMessage, nil @@ -270,7 +255,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime func (rlnRelay *WakuRLNRelay) verifyProof(msg *pb.WakuMessage, proof *rln.RateLimitProof) (bool, error) { contentTopicBytes := []byte(msg.ContentTopic) input := append(msg.Payload, contentTopicBytes...) - return rlnRelay.RLN.Verify(input, *proof, rlnRelay.rootTracker.Roots()...) + return rlnRelay.RLN.Verify(input, *proof, rlnRelay.RootTracker.Roots()...) } func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTime time.Time) error { @@ -351,12 +336,12 @@ func (rlnRelay *WakuRLNRelay) Validator( } func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb.RateLimitProof, error) { - identityCredentials, err := rlnRelay.groupManager.IdentityCredentials() + identityCredentials, err := rlnRelay.GroupManager.IdentityCredentials() if err != nil { return nil, err } - membershipIndex := rlnRelay.groupManager.MembershipIndex() + membershipIndex := rlnRelay.GroupManager.MembershipIndex() proof, err := rlnRelay.RLN.GenerateProof(input, identityCredentials, membershipIndex, epoch) if err != nil { @@ -375,9 +360,9 @@ func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb. } func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, error) { - return rlnRelay.groupManager.IdentityCredentials() + return rlnRelay.GroupManager.IdentityCredentials() } func (rlnRelay *WakuRLNRelay) MembershipIndex() uint { - return rlnRelay.groupManager.MembershipIndex() + return rlnRelay.GroupManager.MembershipIndex() } diff --git a/waku/v2/protocol/rln/web3/web3.go b/waku/v2/protocol/rln/web3/web3.go index 7f32e911..cdd162a0 100644 --- a/waku/v2/protocol/rln/web3/web3.go +++ b/waku/v2/protocol/rln/web3/web3.go @@ -5,8 +5,10 @@ import ( "errors" "math/big" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" ) @@ -26,12 +28,21 @@ type RLNContract struct { DeployedBlockNumber uint64 } +// EthClient is an interface for the ethclient.Client, so that we can pass mock client for testing +type EthClient interface { + bind.ContractBackend + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + Close() +} + // Config is a helper struct that contains attributes for interaction with RLN smart contracts type Config struct { configured bool ETHClientAddress string - ETHClient *ethclient.Client + ETHClient EthClient ChainID *big.Int RegistryContract RegistryContract RLNContract RLNContract