mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
fix(rln-relay): chunk event fetching (#1830)
This commit is contained in:
parent
71382cac9a
commit
5cb440d2b8
@ -62,6 +62,8 @@ type
|
|||||||
const DefaultKeyStorePath* = "rlnKeystore.json"
|
const DefaultKeyStorePath* = "rlnKeystore.json"
|
||||||
const DefaultKeyStorePassword* = "password"
|
const DefaultKeyStorePassword* = "password"
|
||||||
|
|
||||||
|
const BlockChunkSize* = 100'u64
|
||||||
|
|
||||||
template initializedGuard(g: OnchainGroupManager): untyped =
|
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||||
if not g.initialized:
|
if not g.initialized:
|
||||||
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
||||||
@ -316,16 +318,32 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =
|
|||||||
proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
let fromBlock = if g.latestProcessedBlock.isSome():
|
let ethRpc = g.ethRpc.get()
|
||||||
|
|
||||||
|
var fromBlock = if g.latestProcessedBlock.isSome():
|
||||||
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
|
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
|
||||||
g.latestProcessedBlock.get()
|
g.latestProcessedBlock.get()
|
||||||
else:
|
else:
|
||||||
info "starting onchain sync from scratch"
|
info "starting onchain sync from scratch"
|
||||||
|
# chunk size is 1000 blocks
|
||||||
BlockNumber(0)
|
BlockNumber(0)
|
||||||
|
|
||||||
|
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||||
try:
|
try:
|
||||||
# we always want to sync from last processed block => latest
|
# we always want to sync from last processed block => latest
|
||||||
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
if fromBlock == BlockNumber(0) or
|
||||||
|
fromBlock + BlockNumber(BlockChunkSize) < latestBlock:
|
||||||
|
# chunk events
|
||||||
|
while true:
|
||||||
|
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
|
||||||
|
let toBlock = min(fromBlock + BlockNumber(BlockChunkSize), currentLatestBlock)
|
||||||
|
info "chunking events", fromBlock = fromBlock, toBlock = toBlock
|
||||||
|
await g.getAndHandleEvents(fromBlock, some(toBlock))
|
||||||
|
fromBlock = toBlock + 1
|
||||||
|
if fromBlock >= currentLatestBlock:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
|
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user