mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
adding a shit ton of logs
This commit is contained in:
parent
8dd31c200b
commit
3d2a54ebe8
@ -118,27 +118,37 @@ method atomicBatch*(
|
||||
rateCommitments = newSeq[RawRateCommitment](),
|
||||
toRemoveIndices = newSeq[MembershipIndex](),
|
||||
): Future[void] {.async: (raises: [Exception]), base.} =
|
||||
echo "-------------- atomicBatch 1"
|
||||
initializedGuard(g)
|
||||
echo "-------------- atomicBatch 2"
|
||||
|
||||
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
|
||||
let operationSuccess =
|
||||
g.rlnInstance.atomicWrite(some(start), rateCommitments, toRemoveIndices)
|
||||
echo "-------------- atomicBatch 3"
|
||||
if not operationSuccess:
|
||||
echo "-------------- atomicBatch "
|
||||
raise newException(CatchableError, "atomic batch operation failed")
|
||||
# TODO: when slashing is enabled, we need to track slashed members
|
||||
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||
echo "-------------- atomicBatch 5"
|
||||
|
||||
if g.registerCb.isSome():
|
||||
echo "-------------- atomicBatch 6"
|
||||
var membersSeq = newSeq[Membership]()
|
||||
echo "-------------- atomicBatch rateCommitments.len: ", rateCommitments.len
|
||||
for i in 0 ..< rateCommitments.len:
|
||||
var index = start + MembershipIndex(i)
|
||||
debug "registering member to callback",
|
||||
rateCommitment = rateCommitments[i], index = index
|
||||
let member = Membership(rateCommitment: rateCommitments[i], index: index)
|
||||
membersSeq.add(member)
|
||||
echo "-------------- atomicBatch 7"
|
||||
await g.registerCb.get()(membersSeq)
|
||||
echo "-------------- atomicBatch 8"
|
||||
|
||||
g.validRootBuffer = g.slideRootQueue()
|
||||
echo "-------------- atomicBatch 9"
|
||||
|
||||
method register*(
|
||||
g: OnchainGroupManager, rateCommitment: RateCommitment
|
||||
@ -404,6 +414,7 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
|
||||
proc getNewBlockCallback(g: OnchainGroupManager): proc =
|
||||
let ethRpc = g.ethRpc.get()
|
||||
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
|
||||
echo "------------ calling wrappedCb"
|
||||
var latestBlock: BlockNumber
|
||||
g.retryWrapper(latestBlock, "Failed to get the latest block number"):
|
||||
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
@ -430,10 +441,14 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
|
||||
proc startListeningToEvents(
|
||||
g: OnchainGroupManager
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
echo "---------- startListeningToEvents 1"
|
||||
initializedGuard(g)
|
||||
echo "---------- startListeningToEvents 2"
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
echo "---------- startListeningToEvents 3"
|
||||
let newBlockCallback = g.getNewBlockCallback()
|
||||
echo "---------- startListeningToEvents 4"
|
||||
g.runInInterval(newBlockCallback, DefaultBlockPollRate)
|
||||
|
||||
proc batchAwaitBlockHandlingFuture(
|
||||
@ -454,6 +469,7 @@ proc startOnchainSync(
|
||||
): Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
echo "---------- startOnchainSync 1"
|
||||
let ethRpc = g.ethRpc.get()
|
||||
|
||||
# static block chunk size
|
||||
@ -463,6 +479,7 @@ proc startOnchainSync(
|
||||
# max number of futures to run concurrently
|
||||
let maxFutures = 10
|
||||
|
||||
echo "---------- startOnchainSync 2"
|
||||
var fromBlock: BlockNumber =
|
||||
if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
|
||||
info "syncing from last processed block", blockNumber = g.latestProcessedBlock
|
||||
@ -472,34 +489,47 @@ proc startOnchainSync(
|
||||
blockNumber = g.rlnContractDeployedBlockNumber
|
||||
g.rlnContractDeployedBlockNumber
|
||||
|
||||
echo "---------- startOnchainSync 3"
|
||||
var futs = newSeq[Future[bool]]()
|
||||
var currentLatestBlock: BlockNumber
|
||||
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
|
||||
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
echo "---------- startOnchainSync 4"
|
||||
|
||||
try:
|
||||
# we always want to sync from last processed block => latest
|
||||
# chunk events
|
||||
while true:
|
||||
echo "---------- startOnchainSync fromBlock", fromBlock
|
||||
echo "---------- startOnchainSync currentLatestBlock", currentLatestBlock
|
||||
# if the fromBlock is less than 2k blocks behind the current block
|
||||
# then fetch the new toBlock
|
||||
if fromBlock >= currentLatestBlock:
|
||||
echo "---------- startOnchainSync 5"
|
||||
break
|
||||
|
||||
if fromBlock + blockChunkSize > currentLatestBlock:
|
||||
echo "---------- startOnchainSync 6"
|
||||
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
|
||||
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
|
||||
let toBlock = min(fromBlock + blockChunkSize, currentLatestBlock)
|
||||
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
|
||||
echo "---------- startOnchainSync 7"
|
||||
await sleepAsync(rpcDelay)
|
||||
echo "---------- startOnchainSync 8"
|
||||
futs.add(g.getAndHandleEvents(fromBlock, toBlock))
|
||||
echo "---------- startOnchainSync 9"
|
||||
if futs.len >= maxFutures or toBlock == currentLatestBlock:
|
||||
echo "---------- startOnchainSync 10"
|
||||
await g.batchAwaitBlockHandlingFuture(futs)
|
||||
echo "---------- startOnchainSync 11"
|
||||
g.setMetadata(lastProcessedBlock = some(toBlock)).isOkOr:
|
||||
error "failed to persist rln metadata", error = $error
|
||||
futs = newSeq[Future[bool]]()
|
||||
echo "---------- startOnchainSync 12"
|
||||
fromBlock = toBlock + 1
|
||||
echo "---------- startOnchainSync 13"
|
||||
except CatchableError:
|
||||
raise newException(
|
||||
CatchableError,
|
||||
@ -508,6 +538,7 @@ proc startOnchainSync(
|
||||
|
||||
# listen to blockheaders and contract events
|
||||
try:
|
||||
echo "---------- startOnchainSync "
|
||||
await g.startListeningToEvents()
|
||||
except CatchableError:
|
||||
raise newException(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user