mirror of https://github.com/status-im/go-waku.git
refactor: handle max records exceeded
This commit is contained in:
parent
966cbba4c4
commit
24587e57aa
|
@ -170,7 +170,80 @@ func (gm *DynamicGroupManager) watchNewEvents(ctx context.Context, rlnContract *
|
|||
}
|
||||
}
|
||||
|
||||
const maxBatchSize = uint64(5000000) // TODO: tune this
|
||||
const additiveFactorMultiplier = 0.10
|
||||
const multiplicativeDecreaseDivisor = 2
|
||||
|
||||
func tooMuchDataRequestedError(err error) bool {
|
||||
// this error is only infura specific (other providers might have different error messages)
|
||||
return err.Error() == "query returned more than 10000 results"
|
||||
}
|
||||
|
||||
func (gm *DynamicGroupManager) getEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
|
||||
var results []*contracts.RLNMemberRegistered
|
||||
|
||||
// Adapted from prysm logic for fetching historical logs
|
||||
|
||||
toBlock := to
|
||||
if to == nil {
|
||||
block, err := gm.ethClient.BlockByNumber(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blockNumber := block.Number().Uint64() - 50 // Keep a buffer to retrieve latest block
|
||||
toBlock = &blockNumber
|
||||
}
|
||||
|
||||
batchSize := maxBatchSize
|
||||
additiveFactor := uint64(float64(batchSize) * additiveFactorMultiplier)
|
||||
|
||||
currentBlockNum := from
|
||||
for currentBlockNum < *toBlock {
|
||||
start := currentBlockNum
|
||||
end := currentBlockNum + batchSize
|
||||
if end > *toBlock {
|
||||
end = *toBlock
|
||||
}
|
||||
|
||||
evts, err := gm.fetchEvents(ctx, start, &end)
|
||||
if err != nil {
|
||||
if tooMuchDataRequestedError(err) {
|
||||
if batchSize == 0 {
|
||||
return nil, errors.New("batch size is zero")
|
||||
}
|
||||
|
||||
// multiplicative decrease
|
||||
batchSize = batchSize / multiplicativeDecreaseDivisor
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
results = append(results, evts...)
|
||||
|
||||
if batchSize < maxBatchSize {
|
||||
// update the batchSize with additive increase
|
||||
batchSize = batchSize + additiveFactor
|
||||
if batchSize > maxBatchSize {
|
||||
batchSize = maxBatchSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if to == nil {
|
||||
evts, err := gm.fetchEvents(ctx, *toBlock, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
results = append(results, evts...)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (gm *DynamicGroupManager) fetchEvents(ctx context.Context, from uint64, to *uint64) ([]*contracts.RLNMemberRegistered, error) {
|
||||
logIterator, err := gm.rlnContract.FilterMemberRegistered(&bind.FilterOpts{Start: from, End: to, Context: ctx})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in New Issue