From 6747603a73ae8df7c6562fb29c1414e0f01313de Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Tue, 4 Apr 2023 12:32:52 +0530 Subject: [PATCH] refactor: fetch membership events --- waku/node.go | 1 - waku/v2/protocol/rln/rln_relay_builder.go | 5 +-- waku/v2/protocol/rln/web3.go | 44 ++++++++--------------- 3 files changed, 16 insertions(+), 34 deletions(-) diff --git a/waku/node.go b/waku/node.go index d13cc932..f9e36797 100644 --- a/waku/node.go +++ b/waku/node.go @@ -111,7 +111,6 @@ func Execute(options Options) { node.WithHostAddress(hostAddr), node.WithKeepAlive(options.KeepAlive), } - if len(options.AdvertiseAddresses) != 0 { nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...)) } diff --git a/waku/v2/protocol/rln/rln_relay_builder.go b/waku/v2/protocol/rln/rln_relay_builder.go index 20255466..accf6b32 100644 --- a/waku/v2/protocol/rln/rln_relay_builder.go +++ b/waku/v2/protocol/rln/rln_relay_builder.go @@ -153,10 +153,7 @@ func RlnRelayDynamic( return rlnPeer.insertMember(pubkey) } - errChan := make(chan error) - go rlnPeer.HandleGroupUpdates(handler, errChan) - err = <-errChan - if err != nil { + if err = rlnPeer.HandleGroupUpdates(handler); err != nil { return nil, err } diff --git a/waku/v2/protocol/rln/web3.go b/waku/v2/protocol/rln/web3.go index 464a867a..adcabfeb 100644 --- a/waku/v2/protocol/rln/web3.go +++ b/waku/v2/protocol/rln/web3.go @@ -126,39 +126,26 @@ func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler // HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract // It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract` // and collects all the events, for every received event, it calls the `handler` -func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler, errChan chan<- error) { - defer close(errChan) - +func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler) error { backend, err := ethclient.Dial(rln.ethClientAddress) if err != nil { - errChan <- err - return + return err } rln.ethClient = backend rlnContract, err := contracts.NewRLN(rln.membershipContractAddress, backend) if err != nil { - errChan <- err - return + return err } err = rln.loadOldEvents(rlnContract, handler) if err != nil { - errChan <- err - return + return err } - doneCh := make(chan struct{}) errCh := make(chan error) - go rln.watchNewEvents(rlnContract, handler, rln.log, doneCh, errCh) - - select { - case <-doneCh: - return - case err := <-errCh: - errChan <- err - return - } + go rln.watchNewEvents(rlnContract, handler, rln.log, errCh) + return <-errCh } func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler) error { @@ -183,26 +170,29 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis return nil } -func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, doneCh chan struct{}, errCh chan error) { +func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) { // Watch for new events logSink := make(chan *contracts.RLNMemberRegistered) + firstErr := true subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) { subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink) if err != nil { if err == rpc.ErrNotificationsUnsupported { err = errors.New("notifications not supported. The node must support websockets") } - errCh <- err - subs.Unsubscribe() + if firstErr { + errCh <- err + } rln.log.Error("subscribing to rln events", zap.Error(err)) } - + firstErr = false + close(errCh) return subs, err }) - defer subs.Unsubscribe() - close(doneCh) + defer subs.Unsubscribe() + defer close(logSink) for { select { @@ -212,14 +202,10 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi rln.log.Error("processing rln log", zap.Error(err)) } case <-rln.ctx.Done(): - subs.Unsubscribe() - close(logSink) return case err := <-subs.Err(): - close(logSink) if err != nil { rln.log.Error("watching new events", zap.Error(err)) - errCh <- err } return }