diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim index d291c57f3..f34e91319 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -60,7 +60,7 @@ type const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" -template initializedGuard*(g: OnchainGroupManager): untyped = +template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(ValueError, "OnchainGroupManager is not initialized") @@ -156,7 +156,7 @@ method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): # TODO: after slashing is enabled on the contract -proc parseEvent*(event: type MemberRegistered, +proc parseEvent(event: type MemberRegistered, log: JsonNode): GroupManagerResult[Membership] = ## parses the `data` parameter of the `MemberRegistered` event `log` ## returns an error if it cannot parse the `data` parameter @@ -196,7 +196,16 @@ proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[ # add the backfilled root g.validRoots.addLast(g.validRootBuffer.popLast()) -proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} = +proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership) = + if blockTable.hasKeyOrPut(blockNumber, @[member]): + try: + blockTable[blockNumber].add(member) + except KeyError: # qed + error "could not insert member into block table", blockNumber=blockNumber, member=member + +proc getRawEvents(g: OnchainGroupManager, + fromBlock: BlockNumber, + toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} = initializedGuard(g) let ethRpc = g.ethRpc.get() @@ -212,13 +221,24 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[ else: normalizedToBlock = fromBlock + let events = await rlnContract.getJsonLogs(MemberRegistered, + fromBlock = some(fromBlock.blockId()), + toBlock = some(normalizedToBlock.blockId())) + return events + +proc getBlockTables(g: OnchainGroupManager, + fromBlock: BlockNumber, + toBlock: Option[BlockNumber] = none(BlockNumber)): Future[(BlockTable, BlockTable)] {.async.} = + initializedGuard(g) + var blockTable = default(BlockTable) var toRemoveBlockTable = default(BlockTable) - let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId())) + let events = await g.getRawEvents(fromBlock, toBlock) + if events.len == 0: debug "no events found" - return blockTable + return (blockTable, toRemoveBlockTable) for event in events: let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint @@ -232,21 +252,13 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[ if removed: # remove the registration from the tree, per block warn "member removed from the tree as per canonical chain", index=parsedEvent.index - if toRemoveBlockTable.hasKey(blockNumber): - toRemoveBlockTable[blockNumber].add(parsedEvent) - else: - toRemoveBlockTable[blockNumber] = @[parsedEvent] - - await g.backfillRootQueue(toRemoveBlockTable) - - if blockTable.hasKey(blockNumber): - blockTable[blockNumber].add(parsedEvent) + toRemoveBlockTable.insert(blockNumber, parsedEvent) else: - blockTable[blockNumber] = @[parsedEvent] + blockTable.insert(blockNumber, parsedEvent) - return blockTable + return (blockTable, toRemoveBlockTable) -proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = +proc handleValidEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = initializedGuard(g) for blockNumber, members in blockTable.pairs(): @@ -268,22 +280,28 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu return -proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, - fromBlock: BlockNumber, - toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = +proc handleRemovedEvents(g: OnchainGroupManager, toRemoveBlockTable: BlockTable): Future[void] {.async.} = initializedGuard(g) - let events = await g.getEvents(fromBlock, toBlock) - await g.seedBlockTableIntoTree(events) + await g.backfillRootQueue(toRemoveBlockTable) + +proc getAndHandleEvents(g: OnchainGroupManager, + fromBlock: BlockNumber, + toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = + initializedGuard(g) + + let (validEvents, removedEvents) = await g.getBlockTables(fromBlock, toBlock) + await g.handleRemovedEvents(removedEvents) + await g.handleValidEvents(validEvents) return -proc getNewHeadCallback*(g: OnchainGroupManager): BlockHeaderHandler = +proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = let latestBlock = blockheader.number.uint debug "block received", blockNumber = latestBlock # get logs from the last block try: - asyncSpawn g.getEventsAndSeedIntoTree(latestBlock) + asyncSpawn g.getAndHandleEvents(latestBlock) except CatchableError: warn "failed to handle log: ", error=getCurrentExceptionMsg() return newHeadCallback @@ -291,7 +309,7 @@ proc getNewHeadCallback*(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadErrCallback(error: CatchableError) = warn "failed to get new head", error=error.msg -proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} = +proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} = initializedGuard(g) let ethRpc = g.ethRpc.get() @@ -301,11 +319,11 @@ proc startListeningToEvents*(g: OnchainGroupManager): Future[void] {.async.} = except CatchableError: raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg()) -proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} = +proc startOnchainSync(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNumber(0)): Future[void] {.async.} = initializedGuard(g) try: - await g.getEventsAndSeedIntoTree(fromBlock, some(fromBlock)) + await g.getAndHandleEvents(fromBlock, some(fromBlock)) except CatchableError: raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg()) @@ -315,7 +333,7 @@ proc startOnchainSync*(g: OnchainGroupManager, fromBlock: BlockNumber = BlockNum except CatchableError: raise newException(ValueError, "failed to start listening to events: " & getCurrentExceptionMsg()) -proc persistCredentials*(g: OnchainGroupManager): GroupManagerResult[void] = +proc persistCredentials(g: OnchainGroupManager): GroupManagerResult[void] = if not g.saveKeystore: return ok() if g.idCredentials.isNone():