fix(rln): attempt to re-subscribe if websocket conn fails

This commit is contained in:
Richard Ramos 2022-10-21 14:42:54 -04:00 committed by RichΛrd
parent 0ba2d9ea97
commit 721c27e101
1 changed files with 14 additions and 8 deletions

View File

@ -5,11 +5,13 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"math/big" "math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/go-waku/waku/v2/protocol/rln/contracts" "github.com/status-im/go-waku/waku/v2/protocol/rln/contracts"
r "github.com/status-im/go-zerokit-rln/rln" r "github.com/status-im/go-zerokit-rln/rln"
@ -180,14 +182,18 @@ func (rln *WakuRLNRelay) loadOldEvents(rlnContract *contracts.RLN, handler Regis
func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, doneCh chan struct{}, errCh chan error) { func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, doneCh chan struct{}, errCh chan error) {
// Watch for new events // Watch for new events
logSink := make(chan *contracts.RLNMemberRegistered) logSink := make(chan *contracts.RLNMemberRegistered)
subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink)
if err != nil { subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) {
if err == rpc.ErrNotificationsUnsupported { subs, err := rlnContract.WatchMemberRegistered(&bind.WatchOpts{Context: rln.ctx, Start: nil}, logSink)
err = errors.New("notifications not supported. The node must support websockets") if err != nil {
if err == rpc.ErrNotificationsUnsupported {
err = errors.New("notifications not supported. The node must support websockets")
}
errCh <- err
subs.Unsubscribe()
} }
errCh <- err return subs, err
return })
}
defer subs.Unsubscribe() defer subs.Unsubscribe()
close(doneCh) close(doneCh)
@ -195,7 +201,7 @@ func (rln *WakuRLNRelay) watchNewEvents(rlnContract *contracts.RLN, handler Regi
for { for {
select { select {
case evt := <-logSink: case evt := <-logSink:
err = processLogs(evt, handler) err := processLogs(evt, handler)
if err != nil { if err != nil {
rln.log.Error("processing rln log", zap.Error(err)) rln.log.Error("processing rln log", zap.Error(err))
} }