diff --git a/waku/v2/node/wakunode2_rln.go b/waku/v2/node/wakunode2_rln.go index 420836f4..846bae73 100644 --- a/waku/v2/node/wakunode2_rln.go +++ b/waku/v2/node/wakunode2_rln.go @@ -6,9 +6,9 @@ package node import ( "bytes" "context" - "encoding/hex" "errors" "github.com/waku-org/go-waku/waku/v2/protocol/rln" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/dynamic" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static" r "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" @@ -39,33 +39,61 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error { return errors.New("relay protocol does not support the configured pubsub topic") } + var err error + var groupManager rln.GroupManager + if !w.opts.rlnRelayDynamic { w.log.Info("setting up waku-rln-relay in off-chain mode") + // set up rln relay inputs groupKeys, idCredential, err := static.Setup(w.opts.rlnRelayMemIndex) if err != nil { return err } - // rlnrelay in off-chain mode with a static group of user - - groupManager, err := static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log) + groupManager, err = static.NewStaticGroupManager(groupKeys, idCredential, w.opts.rlnRelayMemIndex, w.log) if err != nil { return err } + } else { + w.log.Info("setting up waku-rln-relay in on-chain mode") - rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log) + // check if the peer has provided its rln credentials + var memKeyPair *r.IdentityCredential + if w.opts.rlnRelayIDCommitment != nil && w.opts.rlnRelayIDKey != nil { + memKeyPair = &r.IdentityCredential{ + IDCommitment: *w.opts.rlnRelayIDCommitment, + IDSecretHash: *w.opts.rlnRelayIDKey, + } + } + + groupManager, err = dynamic.NewDynamicGroupManager( + w.opts.rlnETHClientAddress, + w.opts.rlnETHPrivateKey, + w.opts.rlnMembershipContractAddress, + memKeyPair, + w.opts.rlnRelayMemIndex, + w.opts.rlnRegistrationHandler, + w.log, + ) if err != nil { return err } + } - err = rlnRelay.Start(ctx) - if err != nil { - return err - } + rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log) + if err != nil { + return err + } - w.rlnRelay = rlnRelay + err = rlnRelay.Start(ctx) + if err != nil { + return err + } + w.rlnRelay = rlnRelay + + if !w.opts.rlnRelayDynamic { // check the correct construction of the tree by comparing the calculated root against the expected root // no error should happen as it is already captured in the unit tests root, err := rlnRelay.RLN.GetMerkleRoot() @@ -81,26 +109,6 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error { if !bytes.Equal(expectedRoot[:], root[:]) { return errors.New("root mismatch: something went wrong not in Merkle tree construction") } - - w.log.Debug("the calculated root", zap.String("root", hex.EncodeToString(root[:]))) - } else { - w.log.Info("setting up waku-rln-relay in on-chain mode") - - /*// check if the peer has provided its rln credentials - var memKeyPair *r.IdentityCredential - if w.opts.rlnRelayIDCommitment != nil && w.opts.rlnRelayIDKey != nil { - memKeyPair = &r.IdentityCredential{ - IDCommitment: *w.opts.rlnRelayIDCommitment, - IDSecretHash: *w.opts.rlnRelayIDKey, - } - } - - // mount the rln relay protocol in the on-chain/dynamic mode - var err error - w.rlnRelay, err = rln.RlnRelayDynamic(ctx, w.Relay(), w.opts.rlnETHClientAddress, w.opts.rlnETHPrivateKey, w.opts.rlnMembershipContractAddress, memKeyPair, w.opts.rlnRelayMemIndex, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.opts.rlnRegistrationHandler, w.timesource, w.log) - if err != nil { - return err - }*/ } w.log.Info("mounted waku RLN relay", zap.String("pubsubTopic", w.opts.rlnRelayPubsubTopic), zap.String("contentTopic", w.opts.rlnRelayContentTopic)) diff --git a/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go new file mode 100644 index 00000000..eebc5bcd --- /dev/null +++ b/waku/v2/protocol/rln/group_manager/dynamic/dynamic.go @@ -0,0 +1,158 @@ +package dynamic + +import ( + "context" + "crypto/ecdsa" + "errors" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" + "github.com/waku-org/go-zerokit-rln/rln" + r "github.com/waku-org/go-zerokit-rln/rln" + "go.uber.org/zap" +) + +type DynamicGroupManager struct { + rln *rln.RLN + log *zap.Logger + + cancel context.CancelFunc + wg sync.WaitGroup + + identityCredential *rln.IdentityCredential + membershipIndex *rln.MembershipIndex + membershipContractAddress common.Address + ethClientAddress string + ethClient *ethclient.Client + + // ethAccountPrivateKey is required for signing transactions + // TODO may need to erase this ethAccountPrivateKey when is not used + // TODO may need to make ethAccountPrivateKey mandatory + ethAccountPrivateKey *ecdsa.PrivateKey + + registrationHandler RegistrationHandler + lastIndexLoaded int64 + + rootTracker *group_manager.MerkleRootTracker +} + +type RegistrationHandler = func(tx *types.Transaction) + +func NewDynamicGroupManager( + ethClientAddr string, + ethAccountPrivateKey *ecdsa.PrivateKey, + memContractAddr common.Address, + identityCredential *rln.IdentityCredential, + index rln.MembershipIndex, + registrationHandler RegistrationHandler, + log *zap.Logger, +) (*DynamicGroupManager, error) { + return &DynamicGroupManager{ + identityCredential: identityCredential, + membershipIndex: &index, + membershipContractAddress: memContractAddr, + ethClientAddress: ethClientAddr, + ethAccountPrivateKey: ethAccountPrivateKey, + registrationHandler: registrationHandler, + lastIndexLoaded: -1, + }, nil +} + +func (gm *DynamicGroupManager) Start(ctx context.Context, rlnInstance *rln.RLN, rootTracker *group_manager.MerkleRootTracker) error { + if gm.cancel != nil { + return errors.New("already started") + } + + ctx, cancel := context.WithCancel(ctx) + gm.cancel = cancel + + gm.log.Info("mounting rln-relay in on-chain/dynamic mode") + + gm.rln = rlnInstance + gm.rootTracker = rootTracker + + err := rootTracker.Sync() + if err != nil { + return err + } + + // prepare rln membership key pair + if gm.identityCredential == nil && gm.ethAccountPrivateKey != nil { + gm.log.Debug("no rln-relay key is provided, generating one") + identityCredential, err := rlnInstance.MembershipKeyGen() + if err != nil { + return err + } + + gm.identityCredential = identityCredential + + // register the rln-relay peer to the membership contract + membershipIndex, err := gm.Register(ctx) + if err != nil { + return err + } + + gm.membershipIndex = membershipIndex + + gm.log.Info("registered peer into the membership contract") + } + + handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error { + return gm.InsertMember(pubkey) + } + + errChan := make(chan error) + gm.wg.Add(1) + go gm.HandleGroupUpdates(ctx, handler, errChan) + err = <-errChan + if err != nil { + return err + } + + return nil +} + +func (gm *DynamicGroupManager) InsertMember(pubkey rln.IDCommitment) error { + gm.log.Debug("a new key is added", zap.Binary("pubkey", pubkey[:])) + // assuming all the members arrive in order + err := gm.rln.InsertMember(pubkey) + if err != nil { + gm.log.Error("inserting member into merkletree", zap.Error(err)) + return err + } + + err = gm.rootTracker.Sync() + if err != nil { + return err + } + + return nil +} + +func (gm *DynamicGroupManager) IdentityCredentials() (rln.IdentityCredential, error) { + if gm.identityCredential == nil { + return rln.IdentityCredential{}, errors.New("identity credential has not been setup") + } + + return *gm.identityCredential, nil +} + +func (gm *DynamicGroupManager) MembershipIndex() (rln.MembershipIndex, error) { + if gm.membershipIndex == nil { + return 0, errors.New("membership index has not been setup") + } + + return *gm.membershipIndex, nil +} + +func (gm *DynamicGroupManager) Stop() { + if gm.cancel == nil { + return + } + + gm.cancel() + gm.wg.Wait() +} diff --git a/waku/v2/protocol/rln/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go similarity index 75% rename from waku/v2/protocol/rln/web3.go rename to waku/v2/protocol/rln/group_manager/dynamic/web3.go index adcabfeb..838f2f0c 100644 --- a/waku/v2/protocol/rln/web3.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/web3.go @@ -1,4 +1,4 @@ -package rln +package dynamic import ( "context" @@ -99,15 +99,15 @@ func register(ctx context.Context, idComm r.IDCommitment, ethAccountPrivateKey * // Register registers the public key of the rlnPeer which is rlnPeer.membershipKeyPair.publicKey // into the membership contract whose address is in rlnPeer.membershipContractAddress -func (rln *WakuRLNRelay) Register(ctx context.Context) (*r.MembershipIndex, error) { - pk := rln.membershipKeyPair.IDCommitment - return register(ctx, pk, rln.ethAccountPrivateKey, rln.ethClientAddress, rln.membershipContractAddress, rln.registrationHandler, rln.log) +func (gm *DynamicGroupManager) Register(ctx context.Context) (*r.MembershipIndex, error) { + pk := gm.identityCredential.IDCommitment + return register(ctx, pk, gm.ethAccountPrivateKey, gm.ethClientAddress, gm.membershipContractAddress, gm.registrationHandler, gm.log) } // the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface type RegistrationEventHandler = func(pubkey r.IDCommitment, index r.MembershipIndex) error -func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler RegistrationEventHandler) error { +func (gm *DynamicGroupManager) processLogs(evt *contracts.RLNMemberRegistered, handler RegistrationEventHandler) error { if evt == nil { return nil } @@ -115,41 +115,43 @@ func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler var pubkey r.IDCommitment = r.Bytes32(evt.Pubkey.Bytes()) index := evt.Index.Int64() - if index <= rln.lastIndexLoaded { + if index <= gm.lastIndexLoaded { return nil } - rln.lastIndexLoaded = index + gm.lastIndexLoaded = index return handler(pubkey, r.MembershipIndex(uint(evt.Index.Int64()))) } // HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract // It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` // and collects all the events, for every received event, it calls the `handler` -func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler) error { - backend, err := ethclient.Dial(rln.ethClientAddress) +func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error { + defer gm.wg.Done() + + backend, err := ethclient.Dial(gm.ethClientAddress) if err != nil { return err } - rln.ethClient = backend + gm.ethClient = backend - rlnContract, err := contracts.NewRLN(rln.membershipContractAddress, backend) + rlnContract, err := contracts.NewRLN(gm.membershipContractAddress, backend) if err != nil { return err } - err = rln.loadOldEvents(rlnContract, handler) + err = gm.loadOldEvents(ctx, rlnContract, handler) if err != nil { return err } errCh := make(chan error) - go rln.watchNewEvents(rlnContract, handler, rln.log, errCh) + go gm.watchNewEvents(ctx, rlnContract, handler, gm.log, errCh) return <-errCh } -func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler) error { - logIterator, err := rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: 0, End: nil, Context: rln.ctx}) +func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler) error { + logIterator, err := rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: 0, End: nil, Context: ctx}) if err != nil { return err } @@ -162,7 +164,7 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis return logIterator.Error() } - err = rln.processLogs(logIterator.Event, handler) + err = gm.processLogs(logIterator.Event, handler) if err != nil { return err } @@ -170,13 +172,13 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis return nil } -func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { +func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { // Watch for new events logSink := make(chan *contracts.RLNMemberRegistered) firstErr := true subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { - subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink) + subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: ctx, Start: nil}, logSink) if err != nil { if err == rpc.ErrNotificationsUnsupported { err = errors.New("notifications not supported. The node must support websockets") @@ -184,7 +186,7 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi if firstErr { errCh <- err } - rln.log.Error("subscribing to rln events", zap.Error(err)) + gm.log.Error("subscribing to rln events", zap.Error(err)) } firstErr = false close(errCh) @@ -197,15 +199,15 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi for { select { case evt := <-logSink: - err := rln.processLogs(evt, handler) + err := gm.processLogs(evt, handler) if err != nil { - rln.log.Error("processing rln log", zap.Error(err)) + gm.log.Error("processing rln log", zap.Error(err)) } - case <-rln.ctx.Done(): + case <-ctx.Done(): return case err := <-subs.Err(): if err != nil { - rln.log.Error("watching new events", zap.Error(err)) + gm.log.Error("watching new events", zap.Error(err)) } return } diff --git a/waku/v2/protocol/rln/rln_relay_builder.go b/waku/v2/protocol/rln/rln_relay_builder.go deleted file mode 100644 index accf6b32..00000000 --- a/waku/v2/protocol/rln/rln_relay_builder.go +++ /dev/null @@ -1,172 +0,0 @@ -package rln - -import ( - "context" - "crypto/ecdsa" - "errors" - - "github.com/ethereum/go-ethereum/common" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" - r "github.com/waku-org/go-zerokit-rln/rln" - "go.uber.org/zap" -) - -func RlnRelayStatic( - ctx context.Context, - relay *relay.WakuRelay, - group []r.IDCommitment, - memKeyPair r.IdentityCredential, - memIndex r.MembershipIndex, - pubsubTopic string, - contentTopic string, - spamHandler SpamHandler, - timesource timesource.Timesource, - log *zap.Logger, -) (*WakuRLNRelay, error) { - log = log.Named("rln-static") - - log.Info("mounting rln-relay in off-chain/static mode") - - // check the peer's index and the inclusion of user's identity commitment in the group - if memKeyPair.IDCommitment != group[int(memIndex)] { - return nil, errors.New("peer's IDCommitment does not match commitment in group") - } - - rlnInstance, err := r.NewRLN() - if err != nil { - return nil, err - } - - // create the WakuRLNRelay - rlnPeer := &WakuRLNRelay{ - ctx: ctx, - membershipKeyPair: &memKeyPair, - membershipIndex: memIndex, - RLN: rlnInstance, - pubsubTopic: pubsubTopic, - contentTopic: contentTopic, - log: log, - timesource: timesource, - nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), - } - - root, err := rlnPeer.RLN.GetMerkleRoot() - if err != nil { - return nil, err - } - - rlnPeer.validMerkleRoots = append(rlnPeer.validMerkleRoots, root) - - // add members to the Merkle tree - for _, member := range group { - if err := rlnPeer.insertMember(member); err != nil { - return nil, err - } - } - - // adds a topic validator for the supplied pubsub topic at the relay protocol - // messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped - // the topic validator checks for the correct non-spamming proof of the message - err = rlnPeer.addValidator(relay, pubsubTopic, contentTopic, spamHandler) - if err != nil { - return nil, err - } - - log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", contentTopic)) - - return rlnPeer, nil -} - -func RlnRelayDynamic( - ctx context.Context, - relay *relay.WakuRelay, - ethClientAddr string, - ethAccountPrivateKey *ecdsa.PrivateKey, - memContractAddr common.Address, - memKeyPair *r.IdentityCredential, - memIndex r.MembershipIndex, - pubsubTopic string, - contentTopic string, - spamHandler SpamHandler, - registrationHandler RegistrationHandler, - timesource timesource.Timesource, - log *zap.Logger, -) (*WakuRLNRelay, error) { - log = log.Named("rln-dynamic") - - log.Info("mounting rln-relay in onchain/dynamic mode") - - rlnInstance, err := r.NewRLN() - if err != nil { - return nil, err - } - - // create the WakuRLNRelay - rlnPeer := &WakuRLNRelay{ - ctx: ctx, - membershipIndex: memIndex, - membershipContractAddress: memContractAddr, - ethClientAddress: ethClientAddr, - ethAccountPrivateKey: ethAccountPrivateKey, - RLN: rlnInstance, - pubsubTopic: pubsubTopic, - contentTopic: contentTopic, - log: log, - timesource: timesource, - nullifierLog: make(map[r.Nullifier][]r.ProofMetadata), - registrationHandler: registrationHandler, - lastIndexLoaded: -1, - } - - root, err := rlnPeer.RLN.GetMerkleRoot() - if err != nil { - return nil, err - } - - rlnPeer.validMerkleRoots = append(rlnPeer.validMerkleRoots, root) - - // prepare rln membership key pair - if memKeyPair == nil && ethAccountPrivateKey != nil { - log.Debug("no rln-relay key is provided, generating one") - memKeyPair, err = rlnInstance.MembershipKeyGen() - if err != nil { - return nil, err - } - - rlnPeer.membershipKeyPair = memKeyPair - - // register the rln-relay peer to the membership contract - membershipIndex, err := rlnPeer.Register(ctx) - if err != nil { - return nil, err - } - - rlnPeer.membershipIndex = *membershipIndex - - log.Info("registered peer into the membership contract") - } else if memKeyPair != nil { - rlnPeer.membershipKeyPair = memKeyPair - } - - handler := func(pubkey r.IDCommitment, index r.MembershipIndex) error { - return rlnPeer.insertMember(pubkey) - } - - if err = rlnPeer.HandleGroupUpdates(handler); err != nil { - return nil, err - } - - // adds a topic validator for the supplied pubsub topic at the relay protocol - // messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped - // the topic validator checks for the correct non-spamming proof of the message - err = rlnPeer.addValidator(relay, pubsubTopic, contentTopic, spamHandler) - if err != nil { - return nil, err - } - - log.Info("rln relay topic validator mounted", zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", contentTopic)) - - return rlnPeer, nil - -}