go-waku/waku/v2/protocol/rln/group_manager/dynamic/web3.go

194 lines
5.2 KiB
Go
Raw Normal View History

2023-04-04 21:02:12 +00:00
package dynamic
2022-07-28 14:04:33 +00:00
import (
"context"
"errors"
"time"
2022-07-28 14:04:33 +00:00
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
2022-09-11 21:08:58 +00:00
"github.com/ethereum/go-ethereum/rpc"
"github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts"
2022-07-28 14:04:33 +00:00
"go.uber.org/zap"
)
// the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
2023-04-12 21:53:23 +00:00
type RegistrationEventHandler = func(*DynamicGroupManager, []*contracts.RLNMemberRegistered) error
2022-07-28 14:04:33 +00:00
// HandleGroupUpdates mounts the supplied handler for the registration events emitting from the membership contract
// It connects to the eth client, subscribes to the `MemberRegistered` event emitted from the `MembershipContract`
// and collects all the events, for every received event, it calls the `handler`
2023-04-04 21:02:12 +00:00
func (gm *DynamicGroupManager) HandleGroupUpdates(ctx context.Context, handler RegistrationEventHandler) error {
2023-04-05 19:44:46 +00:00
err := gm.loadOldEvents(ctx, gm.rlnContract, handler)
2022-08-09 00:02:08 +00:00
if err != nil {
2023-04-04 07:02:52 +00:00
return err
2022-08-09 00:02:08 +00:00
}
2022-07-28 14:04:33 +00:00
errCh := make(chan error)
gm.wg.Add(1)
2023-04-05 19:44:46 +00:00
go gm.watchNewEvents(ctx, gm.rlnContract, handler, gm.log, errCh)
2023-04-04 07:02:52 +00:00
return <-errCh
2022-07-28 14:04:33 +00:00
}
2023-04-04 21:02:12 +00:00
func (gm *DynamicGroupManager) loadOldEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler) error {
fromBlock := uint64(0)
metadata, err := gm.GetMetadata()
if err == nil {
fromBlock = metadata.LastProcessedBlock
gm.log.Info("resuming onchain sync", zap.Uint64("fromBlock", fromBlock))
} else {
gm.log.Warn("could not load last processed block from metadata. Starting onchain sync from scratch", zap.Error(err))
}
events, err := gm.getEvents(ctx, fromBlock, nil)
2022-07-28 14:04:33 +00:00
if err != nil {
return err
}
2023-04-12 21:53:23 +00:00
return handler(gm, events)
2022-07-28 14:04:33 +00:00
}
2023-04-04 21:02:12 +00:00
func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *contracts.RLN, handler RegistrationEventHandler, log *zap.Logger, errCh chan<- error) {
defer gm.wg.Done()
2022-07-28 14:04:33 +00:00
// Watch for new events
2023-04-04 07:02:52 +00:00
firstErr := true
2023-04-12 21:53:23 +00:00
headerCh := make(chan *types.Header)
subs := event.Resubscribe(2*time.Second, func(ctx context.Context) (event.Subscription, error) {
2023-04-12 21:53:23 +00:00
s, err := gm.ethClient.SubscribeNewHead(ctx, headerCh)
if err != nil {
if err == rpc.ErrNotificationsUnsupported {
err = errors.New("notifications not supported. The node must support websockets")
}
2023-04-04 07:02:52 +00:00
if firstErr {
errCh <- err
}
2023-04-04 21:02:12 +00:00
gm.log.Error("subscribing to rln events", zap.Error(err))
2022-09-11 21:08:58 +00:00
}
2023-04-04 07:02:52 +00:00
firstErr = false
close(errCh)
2023-04-12 21:53:23 +00:00
return s, err
})
2022-07-28 14:04:33 +00:00
2023-04-04 07:02:52 +00:00
defer subs.Unsubscribe()
2023-04-12 21:53:23 +00:00
defer close(headerCh)
2022-07-28 14:04:33 +00:00
for {
select {
2023-04-12 21:53:23 +00:00
case h := <-headerCh:
blk := h.Number.Uint64()
events, err := gm.getEvents(ctx, blk, &blk)
if err != nil {
gm.log.Error("obtaining rln events", zap.Error(err))
}
err = handler(gm, events)
if err != nil {
2023-04-04 21:02:12 +00:00
gm.log.Error("processing rln log", zap.Error(err))
}
2023-04-04 21:02:12 +00:00
case <-ctx.Done():
return
2022-07-28 14:04:33 +00:00
case err := <-subs.Err():
2022-08-18 16:27:10 +00:00
if err != nil {
2023-04-04 21:02:12 +00:00
gm.log.Error("watching new events", zap.Error(err))
2022-08-18 16:27:10 +00:00
}
return
2022-07-28 14:04:33 +00:00
}
}
}
2023-04-12 21:53:23 +00:00
2023-04-20 20:15:36 +00:00
const maxBatchSize = uint64(5000000) // TODO: tune this
const additiveFactorMultiplier = 0.10
const multiplicativeDecreaseDivisor = 2
func tooMuchDataRequestedError(err error) bool {
// this error is only infura specific (other providers might have different error messages)
return err.Error() == "query returned more than 10000 results"
}
2023-04-12 21:53:23 +00:00
func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
2023-04-20 20:15:36 +00:00
var results []*contracts.RLNMemberRegistered
// Adapted from prysm logic for fetching historical logs
toBlock := to
if to == nil {
block, err := gm.ethClient.BlockByNumber(ctx, nil)
if err != nil {
return nil, err
}
2023-04-26 15:55:03 +00:00
blockNumber := block.Number().Uint64()
2023-04-20 20:15:36 +00:00
toBlock = &blockNumber
}
if from == *toBlock { // Only loading a single block
return gm.fetchEvents(ctx, from, toBlock)
}
// Fetching blocks in batches
2023-04-20 20:15:36 +00:00
batchSize := maxBatchSize
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)
currentBlockNum := from
for currentBlockNum < *toBlock {
start := currentBlockNum
end := currentBlockNum + batchSize
if end > *toBlock {
end = *toBlock
}
evts, err := gm.fetchEvents(ctx, start, &end)
if err != nil {
if tooMuchDataRequestedError(err) {
if batchSize == 0 {
return nil, errors.New("batch size is zero")
}
// multiplicative decrease
batchSize = batchSize / multiplicativeDecreaseDivisor
continue
}
return nil, err
}
results = append(results, evts...)
2023-04-26 15:55:03 +00:00
currentBlockNum = end
2023-04-20 20:15:36 +00:00
if batchSize < maxBatchSize {
// update the batchSize with additive increase
batchSize = batchSize + additiveFactor
if batchSize > maxBatchSize {
batchSize = maxBatchSize
}
}
}
return results, nil
}
func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
2023-04-12 21:53:23 +00:00
logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx})
if err != nil {
return nil, err
}
var results []*contracts.RLNMemberRegistered
for {
if !logIterator.Next() {
break
}
if logIterator.Error() != nil {
return nil, logIterator.Error()
}
results = append(results, logIterator.Event)
}
return results, nil
}