mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-11 21:16:58 +00:00
203 lines
5.5 KiB
Go
203 lines
5.5 KiB
Go
package dynamic
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/accounts/abi/bind"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
|
|
type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error
|
|
|
|
// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract
|
|
// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract`
|
|
// and collects all the events, for every received event, it calls the `handler`
|
|
func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error {
|
|
fromBlock := uint64(0)
|
|
metadata, err := gm.GetMetadata()
|
|
if err != nil {
|
|
gm.log.Warn("could not load last processed block from metadata. Starting onchain sync from scratch", zap.Error(err))
|
|
} else {
|
|
if gm.chainId.Uint64() != metadata.ChainID.Uint64() {
|
|
return errors.New("persisted data: chain id mismatch")
|
|
}
|
|
|
|
if !bytes.Equal(gm.membershipContractAddress[:], metadata.ContractAddress[:]) {
|
|
return errors.New("persisted data: contract address mismatch")
|
|
}
|
|
|
|
fromBlock = metadata.LastProcessedBlock
|
|
gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock))
|
|
}
|
|
|
|
err = gm.loadOldEvents(ctx, gm.rlnContract, fromBlock, handler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errCh := make(chan error)
|
|
|
|
gm.wg.Add(1)
|
|
go gm.watchNewEvents(ctx, gm.rlnContract, handler, gm.log, errCh)
|
|
return <-errCh
|
|
}
|
|
|
|
func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, fromBlock uint64, handler RegistrationEventHandler) error {
|
|
events, err := gm.getEvents(ctx, fromBlock, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return handler(gm, events)
|
|
}
|
|
|
|
func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
|
|
defer gm.wg.Done()
|
|
|
|
// Watch for new events
|
|
firstErr := true
|
|
headerCh := make(chan *types.Header)
|
|
subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) {
|
|
s, err := gm.ethClient.SubscribeNewHead(ctx, headerCh)
|
|
if err != nil {
|
|
if err == rpc.ErrNotificationsUnsupported {
|
|
err = errors.New("notifications not supported. The node must support websockets")
|
|
}
|
|
if firstErr {
|
|
errCh <- err
|
|
}
|
|
gm.log.Error("subscribing to rln events", zap.Error(err))
|
|
}
|
|
firstErr = false
|
|
close(errCh)
|
|
return s, err
|
|
})
|
|
|
|
defer subs.Unsubscribe()
|
|
defer close(headerCh)
|
|
|
|
for {
|
|
select {
|
|
case h := <-headerCh:
|
|
blk := h.Number.Uint64()
|
|
events, err := gm.getEvents(ctx, blk, &blk)
|
|
if err != nil {
|
|
gm.log.Error("obtaining rln events", zap.Error(err))
|
|
}
|
|
|
|
err = handler(gm, events)
|
|
if err != nil {
|
|
gm.log.Error("processing rln log", zap.Error(err))
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
case err := <-subs.Err():
|
|
if err != nil {
|
|
gm.log.Error("watching new events", zap.Error(err))
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
const maxBatchSize = uint64(5000)
|
|
const additiveFactorMultiplier = 0.10
|
|
const multiplicativeDecreaseDivisor = 2
|
|
|
|
func tooMuchDataRequestedError(err error) bool {
|
|
// this error is only infura specific (other providers might have different error messages)
|
|
return err.Error() == "query returned more than 10000 results"
|
|
}
|
|
|
|
func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
|
|
var results []*contracts.RLNMemberRegistered
|
|
|
|
// Adapted from prysm logic for fetching historical logs
|
|
|
|
toBlock := to
|
|
if to == nil {
|
|
block, err := gm.ethClient.BlockByNumber(ctx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
blockNumber := block.Number().Uint64()
|
|
toBlock = &blockNumber
|
|
}
|
|
|
|
if from == *toBlock { // Only loading a single block
|
|
return gm.fetchEvents(ctx, from, toBlock)
|
|
}
|
|
|
|
// Fetching blocks in batches
|
|
batchSize := maxBatchSize
|
|
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)
|
|
|
|
currentBlockNum := from
|
|
for currentBlockNum < *toBlock {
|
|
start := currentBlockNum
|
|
end := currentBlockNum + batchSize
|
|
if end > *toBlock {
|
|
end = *toBlock
|
|
}
|
|
|
|
evts, err := gm.fetchEvents(ctx, start, &end)
|
|
if err != nil {
|
|
if tooMuchDataRequestedError(err) {
|
|
if batchSize == 0 {
|
|
return nil, errors.New("batch size is zero")
|
|
}
|
|
|
|
// multiplicative decrease
|
|
batchSize = batchSize / multiplicativeDecreaseDivisor
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
results = append(results, evts...)
|
|
|
|
currentBlockNum = end
|
|
|
|
if batchSize < maxBatchSize {
|
|
// update the batchSize with additive increase
|
|
batchSize = batchSize + additiveFactor
|
|
if batchSize > maxBatchSize {
|
|
batchSize = maxBatchSize
|
|
}
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
|
|
logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var results []*contracts.RLNMemberRegistered
|
|
|
|
for {
|
|
if !logIterator.Next() {
|
|
break
|
|
}
|
|
|
|
if logIterator.Error() != nil {
|
|
return nil, logIterator.Error()
|
|
}
|
|
|
|
results = append(results, logIterator.Event)
|
|
}
|
|
|
|
return results, nil
|
|
}
|