diff --git a/waku/v2/node/wakunode2_rln.go b/waku/v2/node/wakunode2_rln.go index 456ce396..7ea5c18d 100644 --- a/waku/v2/node/wakunode2_rln.go +++ b/waku/v2/node/wakunode2_rln.go @@ -10,6 +10,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/waku-org/go-waku/waku/v2/protocol/rln" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static" "github.com/waku-org/go-waku/waku/v2/protocol/rln/keystore" @@ -23,12 +24,17 @@ func (w *WakuNode) RLNRelay() RLNRelay { func (w *WakuNode) setupRLNRelay() error { var err error - var groupManager rln.GroupManager if !w.opts.enableRLN { return nil } + var groupManager group_manager.GroupManager + + rlnInstance, rootTracker, err := rln.GetRLNInstanceAndRootTracker(w.opts.rlnTreePath) + if err != nil { + return err + } if !w.opts.rlnRelayDynamic { w.log.Info("setting up waku-rln-relay in off-chain mode") @@ -38,7 +44,8 @@ func (w *WakuNode) setupRLNRelay() error { return err } - groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log) + groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, rlnInstance, + rootTracker, w.log) if err != nil { return err } @@ -57,6 +64,8 @@ func (w *WakuNode) setupRLNRelay() error { appKeystore, w.opts.keystorePassword, w.opts.prometheusReg, + rlnInstance, + rootTracker, w.log, ) if err != nil { @@ -64,10 +73,11 @@ func (w *WakuNode) setupRLNRelay() error { } } - rlnRelay, err := rln.New(groupManager, w.opts.rlnTreePath, w.timesource, w.opts.prometheusReg, w.log) - if err != nil { - return err - } + rlnRelay := rln.New(group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, w.timesource, w.opts.prometheusReg, w.log) w.rlnRelay = rlnRelay diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go index 5b88b18a..302115ec 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -4,7 +4,6 @@ import ( "context" "errors" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -28,28 +27,21 @@ var RLNAppInfo = keystore.AppInfo{ } type DynamicGroupManager struct { - rln *rln.RLN - log *zap.Logger + MembershipFetcher metrics Metrics cancel context.CancelFunc - wg sync.WaitGroup identityCredential *rln.IdentityCredential membershipIndex rln.MembershipIndex - web3Config *web3.Config lastBlockProcessed uint64 - eventHandler RegistrationEventHandler - appKeystore *keystore.AppKeystore keystorePassword string - - rootTracker *group_manager.MerkleRootTracker } -func handler(gm *DynamicGroupManager, events []*contracts.RLNMemberRegistered) error { +func (gm *DynamicGroupManager) handler(events []*contracts.RLNMemberRegistered) error { toRemoveTable := om.New() toInsertTable := om.New() @@ -116,18 +108,19 @@ func NewDynamicGroupManager( appKeystore *keystore.AppKeystore, keystorePassword string, reg prometheus.Registerer, + rlnInstance *rln.RLN, + rootTracker *group_manager.MerkleRootTracker, log *zap.Logger, ) (*DynamicGroupManager, error) { log = log.Named("rln-dynamic") + web3Config := web3.NewConfig(ethClientAddr, memContractAddr) return &DynamicGroupManager{ - membershipIndex: membershipIndex, - web3Config: web3.NewConfig(ethClientAddr, memContractAddr), - eventHandler: handler, - appKeystore: appKeystore, - keystorePassword: keystorePassword, - log: log, - metrics: newMetrics(reg), + membershipIndex: membershipIndex, + appKeystore: appKeystore, + keystorePassword: keystorePassword, + MembershipFetcher: NewMembershipFetcher(web3Config, rlnInstance, rootTracker, log), + metrics: newMetrics(reg), }, nil } @@ -139,7 +132,7 @@ func (gm *DynamicGroupManager) memberExists(ctx context.Context, idCommitment rl return gm.web3Config.RLNContract.MemberExists(&bind.CallOpts{Context: ctx}, rln.Bytes32ToBigInt(idCommitment)) } -func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error { +func (gm *DynamicGroupManager) Start(ctx context.Context) error { if gm.cancel != nil { return errors.New("already started") } @@ -154,9 +147,6 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return err } - gm.rln = rlnInstance - gm.rootTracker = rootTracker - // check if the contract exists by calling a static function _, err = gm.getMembershipFee(ctx) if err != nil { @@ -168,7 +158,7 @@ func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, return err } - if err = gm.HandleGroupUpdates(ctx, gm.eventHandler); err != nil { + if err = gm.MembershipFetcher.HandleGroupUpdates(ctx, gm.handler); err != nil { return err } @@ -278,9 +268,7 @@ func (gm *DynamicGroupManager) Stop() error { return err } - gm.web3Config.ETHClient.Close() - - gm.wg.Wait() + gm.MembershipFetcher.Stop() return nil } diff --git a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go index 1479b495..271e5699 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/handler_test.go @@ -3,7 +3,6 @@ package dynamic import ( "context" "math/big" - "sync" "testing" "github.com/ethereum/go-ethereum/core/types" @@ -35,21 +34,21 @@ func TestHandler(t *testing.T) { rootTracker, err := group_manager.NewMerkleRootTracker(5, rlnInstance) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.TODO()) + _, cancel := context.WithCancel(context.TODO()) defer cancel() - _ = ctx - gm := &DynamicGroupManager{ - rln: rlnInstance, - log: utils.Logger(), + MembershipFetcher: NewMembershipFetcher( + &web3.Config{ + ChainID: big.NewInt(1), + }, + rlnInstance, + rootTracker, + utils.Logger(), + ), cancel: cancel, - wg: sync.WaitGroup{}, - web3Config: &web3.Config{ - ChainID: big.NewInt(1), - }, - rootTracker: rootTracker, - metrics: newMetrics(prometheus.DefaultRegisterer), + + metrics: newMetrics(prometheus.DefaultRegisterer), } root0 := [32]byte{62, 31, 25, 34, 223, 182, 113, 211, 249, 18, 247, 234, 70, 30, 10, 136, 238, 132, 143, 221, 225, 43, 108, 24, 171, 26, 210, 197, 106, 231, 52, 33} @@ -59,7 +58,7 @@ func TestHandler(t *testing.T) { events := []*contracts.RLNMemberRegistered{eventBuilder(1, false, 0xaaaa, 1)} - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -75,7 +74,7 @@ func TestHandler(t *testing.T) { eventBuilder(4, false, 0xeeee, 5), } - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) // Root[1] should become [0] @@ -99,7 +98,7 @@ func TestHandler(t *testing.T) { eventBuilder(3, false, 0xeeee, 5), } - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() @@ -113,7 +112,7 @@ func TestHandler(t *testing.T) { // Adding multiple events for same block events = []*contracts.RLNMemberRegistered{} - err = handler(gm, events) + err = gm.handler(events) require.NoError(t, err) roots = gm.rootTracker.Roots() diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go new file mode 100644 index 00000000..e0f3b057 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -0,0 +1,227 @@ +package dynamic + +import ( + "bytes" + "context" + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-zerokit-rln/rln" + "go.uber.org/zap" +) + +// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface +type RegistrationEventHandler = func([]*contracts.RLNMemberRegistered) error + +// for getting membershipRegsitered Events from the eth rpc +type MembershipFetcher struct { + web3Config *web3.Config + rln *rln.RLN + log *zap.Logger + rootTracker *group_manager.MerkleRootTracker + wg sync.WaitGroup +} + +func NewMembershipFetcher(web3Config *web3.Config, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker, log *zap.Logger) MembershipFetcher { + return MembershipFetcher{ + web3Config: web3Config, + rln: rln, + log: log, + rootTracker: rootTracker, + } +} + +// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract +// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` +// and collects all the events, for every received event, it calls the `handler` +func (mf *MembershipFetcher) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error { + fromBlock := mf.web3Config.RLNContract.DeployedBlockNumber + metadata, err := mf.GetMetadata() + if err != nil { + mf.log.Warn("could not load last processed block from metadata. Starting onchain sync from deployment block", zap.Error(err), zap.Uint64("deploymentBlock", mf.web3Config.RLNContract.DeployedBlockNumber)) + } else { + if mf.web3Config.ChainID.Cmp(metadata.ChainID) != 0 { + return errors.New("persisted data: chain id mismatch") + } + + if !bytes.Equal(mf.web3Config.RegistryContract.Address.Bytes(), metadata.ContractAddress.Bytes()) { + return errors.New("persisted data: contract address mismatch") + } + + fromBlock = metadata.LastProcessedBlock + 1 + mf.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock)) + } + + mf.rootTracker.SetValidRootsPerBlock(metadata.ValidRootsPerBlock) + // + latestBlockNumber, err := mf.latestBlockNumber(ctx) + if err != nil { + return err + } + // + err = mf.loadOldEvents(ctx, fromBlock, latestBlockNumber, handler) + if err != nil { + return err + } + + errCh := make(chan error) + + mf.wg.Add(1) + go mf.watchNewEvents(ctx, latestBlockNumber+1, handler, errCh) // we have already fetched the events for latestBlocNumber in oldEvents + return <-errCh +} + +func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlock uint64, handler RegistrationEventHandler) error { + for ; fromBlock+maxBatchSize < toBlock; fromBlock += maxBatchSize + 1 { // check if the end of the batch is within the toBlock range + events, err := mf.getEvents(ctx, fromBlock, fromBlock+maxBatchSize) + if err != nil { + return err + } + if err := handler(events); err != nil { + return err + } + } + + // + events, err := mf.getEvents(ctx, fromBlock, toBlock) + if err != nil { + return err + } + // process all the fetched events + return handler(events) +} + +func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) { + defer mf.wg.Done() + + // Watch for new events + firstErr := true + headerCh := make(chan *types.Header) + subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { + s, err := mf.web3Config.ETHClient.SubscribeNewHead(ctx, headerCh) + if err != nil { + if err == rpc.ErrNotificationsUnsupported { + err = errors.New("notifications not supported. The node must support websockets") + } + mf.log.Error("subscribing to rln events", zap.Error(err)) + } + if firstErr { // errCh can be closed only once + errCh <- err + close(errCh) + firstErr = false + } + return s, err + }) + + defer subs.Unsubscribe() + defer close(headerCh) + + for { + select { + case h := <-headerCh: + toBlock := h.Number.Uint64() + events, err := mf.getEvents(ctx, fromBlock, toBlock) + if err != nil { + mf.log.Error("obtaining rln events", zap.Error(err)) + } else { + // update the last processed block + fromBlock = toBlock + 1 + } + + err = handler(events) + if err != nil { + mf.log.Error("processing rln log", zap.Error(err)) + } + case <-ctx.Done(): + return + case err := <-subs.Err(): + if err != nil { + mf.log.Error("watching new events", zap.Error(err)) + } + return + } + } +} + +const maxBatchSize = uint64(5000) + +func tooMuchDataRequestedError(err error) bool { + // this error is only infura specific (other providers might have different error messages) + return err.Error() == "query returned more than 10000 results" +} + +func (mf *MembershipFetcher) latestBlockNumber(ctx context.Context) (uint64, error) { + block, err := mf.web3Config.ETHClient.BlockByNumber(ctx, nil) + if err != nil { + return 0, err + } + + return block.Number().Uint64(), nil +} + +func (mf *MembershipFetcher) getEvents(ctx context.Context, fromBlock uint64, toBlock uint64) ([]*contracts.RLNMemberRegistered, error) { + evts, err := mf.fetchEvents(ctx, fromBlock, toBlock) + if err != nil { + if tooMuchDataRequestedError(err) { // divide the range and try again + mid := (fromBlock + toBlock) / 2 + firstHalfEvents, err := mf.getEvents(ctx, fromBlock, mid) + if err != nil { + return nil, err + } + secondHalfEvents, err := mf.getEvents(ctx, mid+1, toBlock) + if err != nil { + return nil, err + } + return append(firstHalfEvents, secondHalfEvents...), nil + } + return nil, err + } + return evts, nil +} + +func (mf *MembershipFetcher) fetchEvents(ctx context.Context, from uint64, to uint64) ([]*contracts.RLNMemberRegistered, error) { + logIterator, err := mf.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: &to, Context: ctx}) + if err != nil { + return nil, err + } + + var results []*contracts.RLNMemberRegistered + + for { + if !logIterator.Next() { + break + } + + if logIterator.Error() != nil { + return nil, logIterator.Error() + } + + results = append(results, logIterator.Event) + } + + return results, nil +} + +// GetMetadata retrieves metadata from the zerokit's RLN database +func (mf *MembershipFetcher) GetMetadata() (RLNMetadata, error) { + b, err := mf.rln.GetMetadata() + if err != nil { + return RLNMetadata{}, err + } + + return DeserializeMetadata(b) +} + +func (mf *MembershipFetcher) Stop() { + mf.web3Config.ETHClient.Close() + // wait for the watchNewEvents goroutine to finish + mf.wg.Wait() +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json new file mode 100644 index 00000000..fe8e2bb3 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.json @@ -0,0 +1,100 @@ +{ + "blocks": { + "5": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:1", + "bigint:1" + ] + } + ], + "5005": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:2", + "bigint:2" + ] + } + ], + "5006": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:3", + "bigint:3" + ] + } + ], + "5007": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:4", + "bigint:4" + ] + } + ], + "10005": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:5", + "bigint:5" + ] + } + ], + "10010": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:6", + "bigint:6" + ] + } + ], + "10011": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:7", + "bigint:7" + ] + } + ], + "10012": [ + { + "address": "0x0000000000000000000000000000000000000000", + "topics": [ + "MemberRegistered(uint256,uint256)" + ], + "data": [ + "bigint:8", + "bigint:8" + ] + } + ] + } +} \ No newline at end of file diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go new file mode 100644 index 00000000..1cf0017b --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher_test.go @@ -0,0 +1,67 @@ +package dynamic + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-zerokit-rln/rln" +) + +func TestFetchingLogic(t *testing.T) { + client := NewMockClient(t, "membership_fetcher.json") + + rlnContract, err := contracts.NewRLN(common.Address{}, client) + require.NoError(t, err) + rlnInstance, err := rln.NewRLN() + require.NoError(t, err) + rootTracker, err := group_manager.NewMerkleRootTracker(1, rlnInstance) + require.NoError(t, err) + // + mf := MembershipFetcher{ + web3Config: &web3.Config{ + RLNContract: web3.RLNContract{ + RLN: rlnContract, + }, + ETHClient: client, + }, + rln: rlnInstance, + log: utils.Logger(), + rootTracker: rootTracker, + } + + counts := []int{} + mockFn := func(events []*contracts.RLNMemberRegistered) error { + counts = append(counts, len(events)) + return nil + } + // check if more than 10k error is handled or not. + client.SetErrorOnBlock(5007, fmt.Errorf("query returned more than 10000 results"), 2) + // loadOldEvents will check till 10010 + client.SetLatestBlockNumber(10010) + // watchNewEvents will check till 10012 + ctx, cancel := context.WithCancel(context.Background()) + if err := mf.HandleGroupUpdates(ctx, mockFn); err != nil { + t.Fatal(err) + } + go func() { + for { + if client.latestBlockNum.Load() == 10012 { + cancel() + return + } + time.Sleep(time.Second) + } + }() + mf.Stop() + // sleep so that watchNewEvents can finish + // check whether all the events are fetched or not. + require.Equal(t, counts, []int{1, 3, 2, 1, 1}) +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/metadata.go b/waku/v2/protocol/rln/group_manager/dynamic/metadata.go index 0d8e4559..a430a810 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/metadata.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/metadata.go @@ -79,13 +79,3 @@ func (gm *DynamicGroupManager) SetMetadata(meta RLNMetadata) error { b := meta.Serialize() return gm.rln.SetMetadata(b) } - -// GetMetadata retrieves metadata from the zerokit's RLN database -func (gm *DynamicGroupManager) GetMetadata() (RLNMetadata, error) { - b, err := gm.rln.GetMetadata() - if err != nil { - return RLNMetadata{}, err - } - - return DeserializeMetadata(b) -} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go b/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go new file mode 100644 index 00000000..ba0fb800 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/mock_blockchain.go @@ -0,0 +1,81 @@ +package dynamic + +import ( + "math/big" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" +) + +// MockBlockChain is currently a chain of events for different block numbers +// it is used internal by mock client for returning events for a given block number or range in FilterLog rpc call. +type MockBlockChain struct { + Blocks map[int64]*MockBlock `json:"blocks"` +} + +type MockBlock []MockEvent + +func containsEntry[T common.Hash | common.Address](topics []T, topicA T) bool { + for _, topic := range topics { + if topic == topicA { + return true + } + } + return false +} + +func Topic(topic string) common.Hash { + return crypto.Keccak256Hash([]byte(topic)) +} +func (b MockBlock) getLogs(blockNum uint64, addrs []common.Address, topicA []common.Hash) (txLogs []types.Log) { + for ind, event := range b { + txLog := event.GetLog() + if containsEntry(addrs, txLog.Address) && (len(topicA) == 0 || containsEntry(topicA, txLog.Topics[0])) { + txLog.BlockNumber = blockNum + txLog.Index = uint(ind) + txLogs = append(txLogs, txLog) + } + } + return +} + +type MockEvent struct { + Address common.Address `json:"address"` + Topics []string `json:"topics"` + Txhash common.Hash `json:"txhash"` + Data []string `json:"data"` +} + +func (e MockEvent) GetLog() types.Log { + topics := []common.Hash{Topic(e.Topics[0])} + for _, topic := range e.Topics[1:] { + topics = append(topics, parseData(topic)) + } + // + var data []byte + for _, entry := range e.Data { + data = append(data, parseData(entry).Bytes()...) + } + return types.Log{ + Address: e.Address, + Topics: topics, + TxHash: e.Txhash, + Data: data, + } +} + +func parseData(data string) common.Hash { + splits := strings.Split(data, ":") + switch splits[0] { + case "bigint": + bigInt, ok := new(big.Int).SetString(splits[1], 10) + if !ok { + panic("invalid big int") + } + return common.BytesToHash(bigInt.Bytes()) + default: + panic("invalid data type") + } +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go b/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go new file mode 100644 index 00000000..e63eebd1 --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/mock_client.go @@ -0,0 +1,118 @@ +package dynamic + +import ( + "context" + "encoding/json" + "io/ioutil" + "math/big" + "sort" + "sync/atomic" + "testing" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type ErrCount struct { + err error + count int +} + +type MockClient struct { + ethclient.Client + blockChain MockBlockChain + latestBlockNum atomic.Int64 + errOnBlock map[int64]*ErrCount +} + +func (c *MockClient) SetLatestBlockNumber(num int64) { + c.latestBlockNum.Store(num) +} + +func (c *MockClient) Close() { + +} +func (c *MockClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return types.NewBlock(&types.Header{Number: big.NewInt(c.latestBlockNum.Load())}, nil, nil, nil, nil), nil +} +func NewMockClient(t *testing.T, blockFile string) *MockClient { + blockChain := MockBlockChain{} + data, err := ioutil.ReadFile(blockFile) + if err != nil { + t.Fatal(err) + } + if err := json.Unmarshal(data, &blockChain); err != nil { + t.Fatal(err) + } + return &MockClient{blockChain: blockChain, errOnBlock: map[int64]*ErrCount{}} +} + +func (client *MockClient) SetErrorOnBlock(blockNum int64, err error, count int) { + client.errOnBlock[blockNum] = &ErrCount{err: err, count: count} +} + +func (c *MockClient) getFromAndToRange(query ethereum.FilterQuery) (int64, int64) { + var fromBlock int64 + if query.FromBlock == nil { + fromBlock = 0 + } else { + fromBlock = query.FromBlock.Int64() + } + + var toBlock int64 + if query.ToBlock == nil { + toBlock = 0 + } else { + toBlock = query.ToBlock.Int64() + } + return fromBlock, toBlock +} +func (c *MockClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) (allTxLogs []types.Log, err error) { + fromBlock, toBlock := c.getFromAndToRange(query) + for block, details := range c.blockChain.Blocks { + if block >= fromBlock && block <= toBlock { + if txLogs := details.getLogs(uint64(block), query.Addresses, query.Topics[0]); len(txLogs) != 0 { + allTxLogs = append(allTxLogs, txLogs...) + } + if errCount, ok := c.errOnBlock[block]; ok && errCount.count != 0 { + errCount.count-- + return nil, errCount.err + } + } + } + sort.Slice(allTxLogs, func(i, j int) bool { + return allTxLogs[i].BlockNumber < allTxLogs[j].BlockNumber || + (allTxLogs[i].BlockNumber == allTxLogs[j].BlockNumber && allTxLogs[i].Index < allTxLogs[j].Index) + }) + return allTxLogs, nil +} + +func (c *MockClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { + for { + next := c.latestBlockNum.Load() + 1 + if c.blockChain.Blocks[next] != nil { + ch <- &types.Header{Number: big.NewInt(next)} + c.latestBlockNum.Store(next) + } else { + break + } + } + return testNoopSub{}, nil +} + +type testNoopSub struct { +} + +func (testNoopSub) Unsubscribe() { + +} + +// Err returns the subscription error channel. The error channel receives +// a value if there is an issue with the subscription (e.g. the network connection +// delivering the events has been closed). Only one value will ever be sent. +// The error channel is closed by Unsubscribe. +func (testNoopSub) Err() <-chan error { + ch := make(chan error) + return ch +} diff --git a/waku/v2/protocol/rln/group_manager/dynamic/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go deleted file mode 100644 index a3c7aa32..00000000 --- a/waku/v2/protocol/rln/group_manager/dynamic/web3.go +++ /dev/null @@ -1,209 +0,0 @@ -package dynamic - -import ( - "bytes" - "context" - "errors" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/rpc" - "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" - "go.uber.org/zap" -) - -// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error - -// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract -// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` -// and collects all the events, for every received event, it calls the `handler` -func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error { - fromBlock := gm.web3Config.RLNContract.DeployedBlockNumber - metadata, err := gm.GetMetadata() - if err != nil { - gm.log.Warn("could not load last processed block from metadata. Starting onchain sync from deployment block", zap.Error(err), zap.Uint64("deploymentBlock", gm.web3Config.RLNContract.DeployedBlockNumber)) - } else { - if gm.web3Config.ChainID.Cmp(metadata.ChainID) != 0 { - return errors.New("persisted data: chain id mismatch") - } - - if !bytes.Equal(gm.web3Config.RegistryContract.Address.Bytes(), metadata.ContractAddress.Bytes()) { - return errors.New("persisted data: contract address mismatch") - } - - fromBlock = metadata.LastProcessedBlock - gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock)) - } - - gm.rootTracker.SetValidRootsPerBlock(metadata.ValidRootsPerBlock) - - err = gm.loadOldEvents(ctx, fromBlock, handler) - if err != nil { - return err - } - - errCh := make(chan error) - - gm.wg.Add(1) - go gm.watchNewEvents(ctx, handler, gm.log, errCh) - return <-errCh -} - -func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler) error { - events, err := gm.getEvents(ctx, fromBlock, nil) - if err != nil { - return err - } - return handler(gm, events) -} - -func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { - defer gm.wg.Done() - - // Watch for new events - firstErr := true - headerCh := make(chan *types.Header) - subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { - s, err := gm.web3Config.ETHClient.SubscribeNewHead(ctx, headerCh) - if err != nil { - if err == rpc.ErrNotificationsUnsupported { - err = errors.New("notifications not supported. The node must support websockets") - } - if firstErr { - errCh <- err - } - gm.log.Error("subscribing to rln events", zap.Error(err)) - } - firstErr = false - close(errCh) - return s, err - }) - - defer subs.Unsubscribe() - defer close(headerCh) - - for { - select { - case h := <-headerCh: - blk := h.Number.Uint64() - events, err := gm.getEvents(ctx, blk, &blk) - if err != nil { - gm.log.Error("obtaining rln events", zap.Error(err)) - } - - err = handler(gm, events) - if err != nil { - gm.log.Error("processing rln log", zap.Error(err)) - } - case <-ctx.Done(): - return - case err := <-subs.Err(): - if err != nil { - gm.log.Error("watching new events", zap.Error(err)) - } - return - } - } -} - -const maxBatchSize = uint64(5000) -const additiveFactorMultiplier = 0.10 -const multiplicativeDecreaseDivisor = 2 - -func tooMuchDataRequestedError(err error) bool { - // this error is only infura specific (other providers might have different error messages) - return err.Error() == "query returned more than 10000 results" -} - -func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - var results []*contracts.RLNMemberRegistered - - // Adapted from prysm logic for fetching historical logs - - toBlock := to - if to == nil { - block, err := gm.web3Config.ETHClient.BlockByNumber(ctx, nil) - if err != nil { - return nil, err - } - - blockNumber := block.Number().Uint64() - toBlock = &blockNumber - } - - if from == *toBlock { // Only loading a single block - return gm.fetchEvents(ctx, from, toBlock) - } - - // Fetching blocks in batches - batchSize := maxBatchSize - additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier) - - currentBlockNum := from - for currentBlockNum < *toBlock { - start := currentBlockNum - end := currentBlockNum + batchSize - if end > *toBlock { - end = *toBlock - } - - gm.log.Info("loading events...", zap.Uint64("fromBlock", start), zap.Uint64("toBlock", end)) - - evts, err := gm.fetchEvents(ctx, start, &end) - if err != nil { - if tooMuchDataRequestedError(err) { - if batchSize == 0 { - return nil, errors.New("batch size is zero") - } - - // multiplicative decrease - batchSize = batchSize / multiplicativeDecreaseDivisor - - gm.log.Warn("too many logs requested!, retrying with a smaller chunk size", zap.Uint64("batchSize", batchSize)) - - continue - } - return nil, err - } - - results = append(results, evts...) - - currentBlockNum = end - - if batchSize < maxBatchSize { - // update the batchSize with additive increase - batchSize = batchSize + additiveFactor - if batchSize > maxBatchSize { - batchSize = maxBatchSize - } - } - } - - return results, nil -} - -func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { - logIterator, err := gm.web3Config.RLNContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx}) - if err != nil { - return nil, err - } - - var results []*contracts.RLNMemberRegistered - - for { - if !logIterator.Next() { - break - } - - if logIterator.Error() != nil { - return nil, logIterator.Error() - } - - results = append(results, logIterator.Event) - } - - return results, nil -} diff --git a/waku/v2/protocol/rln/group_manager/group_manager.go b/waku/v2/protocol/rln/group_manager/group_manager.go index 453d0228..3dbec886 100644 --- a/waku/v2/protocol/rln/group_manager/group_manager.go +++ b/waku/v2/protocol/rln/group_manager/group_manager.go @@ -1 +1,21 @@ package group_manager + +import ( + "context" + + "github.com/waku-org/go-zerokit-rln/rln" +) + +type GroupManager interface { + Start(ctx context.Context) error + IdentityCredentials() (rln.IdentityCredential, error) + MembershipIndex() rln.MembershipIndex + Stop() error +} + +type Details struct { + GroupManager GroupManager + RootTracker *MerkleRootTracker + + RLN *rln.RLN +} diff --git a/waku/v2/protocol/rln/group_manager/static/static.go b/waku/v2/protocol/rln/group_manager/static/static.go index 432a054e..251bf1b4 100644 --- a/waku/v2/protocol/rln/group_manager/static/static.go +++ b/waku/v2/protocol/rln/group_manager/static/static.go @@ -25,6 +25,8 @@ func NewStaticGroupManager( group []rln.IDCommitment, identityCredential rln.IdentityCredential, index rln.MembershipIndex, + rlnInstance *rln.RLN, + rootTracker *group_manager.MerkleRootTracker, log *zap.Logger, ) (*StaticGroupManager, error) { // check the peer's index and the inclusion of user's identity commitment in the group @@ -37,15 +39,14 @@ func NewStaticGroupManager( group: group, identityCredential: &identityCredential, membershipIndex: index, + rln: rlnInstance, + rootTracker: rootTracker, }, nil } -func (gm *StaticGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error { +func (gm *StaticGroupManager) Start(ctx context.Context) error { gm.log.Info("mounting rln-relay in off-chain/static mode") - gm.rln = rlnInstance - gm.rootTracker = rootTracker - // add members to the Merkle tree err := gm.insertMembers(gm.group) diff --git a/waku/v2/protocol/rln/onchain_test.go b/waku/v2/protocol/rln/onchain_test.go index 4b396716..41299a9b 100644 --- a/waku/v2/protocol/rln/onchain_test.go +++ b/waku/v2/protocol/rln/onchain_test.go @@ -149,14 +149,16 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() { membershipIndex := s.register(appKeystore, u1Credentials, s.u1PrivKey) - gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rlnInstance, rt, utils.Logger()) s.Require().NoError(err) // initialize the WakuRLNRelay rlnRelay := &WakuRLNRelay{ - rootTracker: rt, - groupManager: gm, - RLN: rlnInstance, + Details: group_manager.Details{ + RootTracker: rt, + GroupManager: gm, + RLN: rlnInstance, + }, log: utils.Logger(), nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), } @@ -231,11 +233,17 @@ func (s *WakuRLNRelayDynamicSuite) TestMerkleTreeConstruction() { membershipIndex := s.register(appKeystore, credentials1, s.u1PrivKey) membershipIndex = s.register(appKeystore, credentials2, s.u1PrivKey) + rlnInstance, rootTracker, err := GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) // mount the rln relay protocol in the on-chain/dynamic mode - gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + gm, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rlnInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay, err := New(gm, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + rlnRelay := New(group_manager.Details{ + RLN: rlnInstance, + RootTracker: rootTracker, + GroupManager: gm, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) s.Require().NoError(err) err = rlnRelay.Start(context.TODO()) @@ -265,11 +273,17 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { membershipGroupIndex := s.register(appKeystore, credentials1, s.u1PrivKey) // mount the rln relay protocol in the on-chain/dynamic mode - gm1, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + rootInstance, rootTracker, err := GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) + gm1, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore, keystorePassword, prometheus.DefaultRegisterer, rootInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay1, err := New(gm1, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + rlnRelay1 := New(group_manager.Details{ + GroupManager: gm1, + RootTracker: rootTracker, + RLN: rootInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + err = rlnRelay1.Start(context.TODO()) s.Require().NoError(err) @@ -283,11 +297,16 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { membershipGroupIndex = s.register(appKeystore2, credentials2, s.u2PrivKey) // mount the rln relay protocol in the on-chain/dynamic mode - gm2, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore2, keystorePassword, prometheus.DefaultRegisterer, utils.Logger()) + rootInstance, rootTracker, err = GetRLNInstanceAndRootTracker(s.tmpRLNDBPath()) + s.Require().NoError(err) + gm2, err := dynamic.NewDynamicGroupManager(s.web3Config.ETHClientAddress, s.web3Config.RegistryContract.Address, membershipGroupIndex, appKeystore2, keystorePassword, prometheus.DefaultRegisterer, rootInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - rlnRelay2, err := New(gm2, s.tmpRLNDBPath(), timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + rlnRelay2 := New(group_manager.Details{ + GroupManager: gm2, + RootTracker: rootTracker, + RLN: rootInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) err = rlnRelay2.Start(context.TODO()) s.Require().NoError(err) @@ -295,8 +314,8 @@ func (s *WakuRLNRelayDynamicSuite) TestCorrectRegistrationOfPeers() { // the two nodes should be registered into the contract // since nodes are spun up sequentially // the first node has index 0 whereas the second node gets index 1 - idx1 := rlnRelay1.groupManager.MembershipIndex() - idx2 := rlnRelay2.groupManager.MembershipIndex() + idx1 := rlnRelay1.GroupManager.MembershipIndex() + idx2 := rlnRelay2.GroupManager.MembershipIndex() s.Require().NoError(err) s.Require().Equal(rln.MembershipIndex(0), idx1) s.Require().Equal(rln.MembershipIndex(1), idx2) diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index bf755303..1f99f12f 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -50,17 +50,24 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { groupIDCommitments = append(groupIDCommitments, c.IDCommitment) } + rlnInstance, rootTracker, err := GetRLNInstanceAndRootTracker("") + s.Require().NoError(err) + // index indicates the position of a membership key pair in the static list of group keys i.e., groupKeyPairs // the corresponding key pair will be used to mount rlnRelay on the current node // index also represents the index of the leaf in the Merkle tree that contains node's commitment key index := r.MembershipIndex(5) + // idCredential := groupKeyPairs[index] - groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) + groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, rlnInstance, rootTracker, utils.Logger()) s.Require().NoError(err) - wakuRLNRelay, err := New(groupManager, "", timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - s.Require().NoError(err) + wakuRLNRelay := New(group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) err = wakuRLNRelay.Start(context.TODO()) s.Require().NoError(err) @@ -85,7 +92,9 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() { rlnRelay := &WakuRLNRelay{ nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), - rootTracker: rootTracker, + Details: group_manager.Details{ + RootTracker: rootTracker, + }, } epoch := r.GetCurrentEpoch() @@ -166,18 +175,21 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { // Create a RLN instance rlnInstance, err := r.NewRLN() s.Require().NoError(err) - - idCredential := groupKeyPairs[index] - groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger()) - s.Require().NoError(err) - + // rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) s.Require().NoError(err) + // + idCredential := groupKeyPairs[index] + groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, rlnInstance, rootTracker, utils.Logger()) + s.Require().NoError(err) rlnRelay := &WakuRLNRelay{ - groupManager: groupManager, - rootTracker: rootTracker, - RLN: rlnInstance, + Details: group_manager.Details{ + GroupManager: groupManager, + RootTracker: rootTracker, + RLN: rlnInstance, + }, + nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), log: utils.Logger(), metrics: newMetrics(prometheus.DefaultRegisterer), @@ -186,7 +198,7 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() { //get the current epoch time now := time.Now() - err = groupManager.Start(context.Background(), rlnInstance, rootTracker) + err = groupManager.Start(context.Background()) s.Require().NoError(err) // create some messages from the same peer and append rln proof to them, except wm4 diff --git a/waku/v2/protocol/rln/waku_rln_relay.go b/waku/v2/protocol/rln/waku_rln_relay.go index 0f6c9345..fab6ae86 100644 --- a/waku/v2/protocol/rln/waku_rln_relay.go +++ b/waku/v2/protocol/rln/waku_rln_relay.go @@ -22,21 +22,11 @@ import ( proto "google.golang.org/protobuf/proto" ) -type GroupManager interface { - Start(ctx context.Context, rln *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error - IdentityCredentials() (rln.IdentityCredential, error) - MembershipIndex() rln.MembershipIndex - Stop() error -} - type WakuRLNRelay struct { timesource timesource.Timesource metrics Metrics - groupManager GroupManager - rootTracker *group_manager.MerkleRootTracker - - RLN *rln.RLN + group_manager.Details // the log of nullifiers and Shamir shares of the past messages grouped per epoch nullifierLogLock sync.RWMutex @@ -47,20 +37,11 @@ type WakuRLNRelay struct { const rlnDefaultTreePath = "./rln_tree.db" -func New( - groupManager GroupManager, - treePath string, - timesource timesource.Timesource, - reg prometheus.Registerer, - log *zap.Logger) (*WakuRLNRelay, error) { - +func GetRLNInstanceAndRootTracker(treePath string) (*rln.RLN, *group_manager.MerkleRootTracker, error) { if treePath == "" { treePath = rlnDefaultTreePath } - metrics := newMetrics(reg) - - start := time.Now() rlnInstance, err := rln.NewWithConfig(rln.DefaultTreeDepth, &rln.TreeConfig{ CacheCapacity: 15000, Mode: rln.HighThroughput, @@ -69,31 +50,35 @@ func New( Path: treePath, }) if err != nil { - return nil, err + return nil, nil, err } - metrics.RecordInstanceCreation(time.Since(start)) rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance) if err != nil { - return nil, err + return nil, nil, err } + return rlnInstance, rootTracker, nil +} +func New( + Details group_manager.Details, + timesource timesource.Timesource, + reg prometheus.Registerer, + log *zap.Logger) *WakuRLNRelay { // create the WakuRLNRelay rlnPeer := &WakuRLNRelay{ - RLN: rlnInstance, - groupManager: groupManager, - rootTracker: rootTracker, - metrics: metrics, + Details: Details, + metrics: newMetrics(reg), log: log, timesource: timesource, nullifierLog: make(map[rln.MerkleNode][]rln.ProofMetadata), } - return rlnPeer, nil + return rlnPeer } func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error { - err := rlnRelay.groupManager.Start(ctx, rlnRelay.RLN, rlnRelay.rootTracker) + err := rlnRelay.GroupManager.Start(ctx) if err != nil { return err } @@ -105,7 +90,7 @@ func (rlnRelay *WakuRLNRelay) Start(ctx context.Context) error { // Stop will stop any operation or goroutine started while using WakuRLNRelay func (rlnRelay *WakuRLNRelay) Stop() error { - return rlnRelay.groupManager.Stop() + return rlnRelay.GroupManager.Stop() } func (rlnRelay *WakuRLNRelay) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { @@ -214,7 +199,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime return invalidMessage, nil } - if !(rlnRelay.rootTracker.ContainsRoot(msgProof.MerkleRoot)) { + if !(rlnRelay.RootTracker.ContainsRoot(msgProof.MerkleRoot)) { rlnRelay.log.Debug("invalid message: unexpected root", logging.HexBytes("msgRoot", msg.RateLimitProof.MerkleRoot)) rlnRelay.metrics.RecordInvalidMessage(invalidRoot) return invalidMessage, nil @@ -261,7 +246,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime rlnRelay.log.Debug("message is valid") - rootIndex := rlnRelay.rootTracker.IndexOf(msgProof.MerkleRoot) + rootIndex := rlnRelay.RootTracker.IndexOf(msgProof.MerkleRoot) rlnRelay.metrics.RecordValidMessages(rootIndex) return validMessage, nil @@ -270,7 +255,7 @@ func (rlnRelay *WakuRLNRelay) ValidateMessage(msg *pb.WakuMessage, optionalTime func (rlnRelay *WakuRLNRelay) verifyProof(msg *pb.WakuMessage, proof *rln.RateLimitProof) (bool, error) { contentTopicBytes := []byte(msg.ContentTopic) input := append(msg.Payload, contentTopicBytes...) - return rlnRelay.RLN.Verify(input, *proof, rlnRelay.rootTracker.Roots()...) + return rlnRelay.RLN.Verify(input, *proof, rlnRelay.RootTracker.Roots()...) } func (rlnRelay *WakuRLNRelay) AppendRLNProof(msg *pb.WakuMessage, senderEpochTime time.Time) error { @@ -351,12 +336,12 @@ func (rlnRelay *WakuRLNRelay) Validator( } func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb.RateLimitProof, error) { - identityCredentials, err := rlnRelay.groupManager.IdentityCredentials() + identityCredentials, err := rlnRelay.GroupManager.IdentityCredentials() if err != nil { return nil, err } - membershipIndex := rlnRelay.groupManager.MembershipIndex() + membershipIndex := rlnRelay.GroupManager.MembershipIndex() proof, err := rlnRelay.RLN.GenerateProof(input, identityCredentials, membershipIndex, epoch) if err != nil { @@ -375,9 +360,9 @@ func (rlnRelay *WakuRLNRelay) generateProof(input []byte, epoch rln.Epoch) (*pb. } func (rlnRelay *WakuRLNRelay) IdentityCredential() (rln.IdentityCredential, error) { - return rlnRelay.groupManager.IdentityCredentials() + return rlnRelay.GroupManager.IdentityCredentials() } func (rlnRelay *WakuRLNRelay) MembershipIndex() uint { - return rlnRelay.groupManager.MembershipIndex() + return rlnRelay.GroupManager.MembershipIndex() } diff --git a/waku/v2/protocol/rln/web3/web3.go b/waku/v2/protocol/rln/web3/web3.go index 7f32e911..cdd162a0 100644 --- a/waku/v2/protocol/rln/web3/web3.go +++ b/waku/v2/protocol/rln/web3/web3.go @@ -5,8 +5,10 @@ import ( "errors" "math/big" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" ) @@ -26,12 +28,21 @@ type RLNContract struct { DeployedBlockNumber uint64 } +// EthClient is an interface for the ethclient.Client, so that we can pass mock client for testing +type EthClient interface { + bind.ContractBackend + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) + SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) + Close() +} + // Config is a helper struct that contains attributes for interaction with RLN smart contracts type Config struct { configured bool ETHClientAddress string - ETHClient *ethclient.Client + ETHClient EthClient ChainID *big.Int RegistryContract RegistryContract RLNContract RLNContract