refactor: fetch membership events

This commit is contained in:
harsh-98 2023-04-04 12:32:52 +05:30 committed by RichΛrd
parent 41691a44e5
commit 6747603a73
3 changed files with 16 additions and 34 deletions

View File

@ -111,7 +111,6 @@ func Execute(options Options) {
node.WithHostAddress(hostAddr),
node.WithKeepAlive(options.KeepAlive),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
}

View File

@ -153,10 +153,7 @@ func RlnRelayDynamic(
return rlnPeer.insertMember(pubkey)
}
errChan := make(chan error)
go rlnPeer.HandleGroupUpdates(handler, errChan)
err = <-errChan
if err != nil {
if err = rlnPeer.HandleGroupUpdates(handler); err != nil {
return nil, err
}

View File

@ -126,39 +126,26 @@ func (rln *WakuRLNRelay) processLogs(evt *contracts.RLNMemberRegistered, handler
// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract
// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract`
// and collects all the events, for every received event, it calls the `handler`
func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler, errChan chan<- error) {
defer close(errChan)
func (rln *WakuRLNRelay) HandleGroupUpdates(handler RegistrationEventHandler) error {
backend, err := ethclient.Dial(rln.ethClientAddress)
if err != nil {
errChan <- err
return
return err
}
rln.ethClient = backend
rlnContract, err := contracts.NewRLN(rln.membershipContractAddress, backend)
if err != nil {
errChan <- err
return
return err
}
err = rln.loadOldEvents(rlnContract, handler)
if err != nil {
errChan <- err
return
return err
}
doneCh := make(chan struct{})
errCh := make(chan error)
go rln.watchNewEvents(rlnContract, handler, rln.log, doneCh, errCh)
select {
case <-doneCh:
return
case err := <-errCh:
errChan <- err
return
}
go rln.watchNewEvents(rlnContract, handler, rln.log, errCh)
return <-errCh
}
func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler) error {
@ -183,26 +170,29 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis
return nil
}
func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, doneCh chan struct{}, errCh chan error) {
func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
// Watch for new events
logSink := make(chan *contracts.RLNMemberRegistered)
firstErr := true
subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) {
subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink)
if err != nil {
if err == rpc.ErrNotificationsUnsupported {
err = errors.New("notifications not supported. The node must support websockets")
}
errCh <- err
subs.Unsubscribe()
if firstErr {
errCh <- err
}
rln.log.Error("subscribing to rln events", zap.Error(err))
}
firstErr = false
close(errCh)
return subs, err
})
defer subs.Unsubscribe()
close(doneCh)
defer subs.Unsubscribe()
defer close(logSink)
for {
select {
@ -212,14 +202,10 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi
rln.log.Error("processing rln log", zap.Error(err))
}
case <-rln.ctx.Done():
subs.Unsubscribe()
close(logSink)
return
case err := <-subs.Err():
close(logSink)
if err != nil {
rln.log.Error("watching new events", zap.Error(err))
errCh <- err
}
return
}