From 24587e57aa6bf20a6c605373e5f64d0d36c38620 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 20 Apr 2023 16:15:36 -0400 Subject: [PATCH] refactor: handle max records exceeded --- .../rln/group_manager/dynamic/web3.go | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/waku/v2/protocol/rln/group_manager/dynamic/web3.go b/waku/v2/protocol/rln/group_manager/dynamic/web3.go index e15f4598..8282b013 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/web3.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/web3.go @@ -170,7 +170,80 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract * } } +const maxBatchSize = uint64(5000000) // TODO: tune this +const additiveFactorMultiplier = 0.10 +const multiplicativeDecreaseDivisor = 2 + +func tooMuchDataRequestedError(err error) bool { + // this error is only infura specific (other providers might have different error messages) + return err.Error() == "query returned more than 10000 results" +} + func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { + var results []*contracts.RLNMemberRegistered + + // Adapted from prysm logic for fetching historical logs + + toBlock := to + if to == nil { + block, err := gm.ethClient.BlockByNumber(ctx, nil) + if err != nil { + return nil, err + } + + blockNumber := block.Number().Uint64() - 50 // Keep a buffer to retrieve latest block + toBlock = &blockNumber + } + + batchSize := maxBatchSize + additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier) + + currentBlockNum := from + for currentBlockNum < *toBlock { + start := currentBlockNum + end := currentBlockNum + batchSize + if end > *toBlock { + end = *toBlock + } + + evts, err := gm.fetchEvents(ctx, start, &end) + if err != nil { + if tooMuchDataRequestedError(err) { + if batchSize == 0 { + return nil, errors.New("batch size is zero") + } + + // multiplicative decrease + batchSize = batchSize / multiplicativeDecreaseDivisor + continue + } + return nil, err + } + + results = append(results, evts...) + + if batchSize < maxBatchSize { + // update the batchSize with additive increase + batchSize = batchSize + additiveFactor + if batchSize > maxBatchSize { + batchSize = maxBatchSize + } + } + } + + if to == nil { + evts, err := gm.fetchEvents(ctx, *toBlock, nil) + if err != nil { + return nil, err + } + + results = append(results, evts...) + } + + return results, nil +} + +func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) { logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx}) if err != nil { return nil, err