From 00a3812b9153c5c3ce261735ab468d1841582bda Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Fri, 31 Mar 2023 19:15:04 +0530 Subject: [PATCH] chore(rln-relay): gracefully handle chain forks (#1623) * chore(rln-relay): gracefully handle chain forks * fix(rln-relay): better root windowing * fix(rln-relay): better future generation for test * fix(rln-relay): reduced width * fix: better naming of futs, collision free --- .../test_rln_group_manager_onchain.nim | 62 +++++++++++++++++-- .../group_manager/group_manager_base.nim | 29 +++++---- .../group_manager/on_chain/group_manager.nim | 45 ++++++++++++-- .../group_manager/static/group_manager.nim | 6 +- .../protocol/waku_rln_relay/rln/wrappers.nim | 7 +++ 5 files changed, 125 insertions(+), 24 deletions(-) diff --git a/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim index 73c8d844a..3c817bde7 100644 --- a/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[options, osproc, streams, strutils], + std/[options, osproc, streams, strutils, tables], stew/[results, byteutils], stew/shims/net as stewNet, testutils/unittests, @@ -246,7 +246,8 @@ suite "Onchain group manager": asyncTest "startGroupSync: should fetch history correctly": let manager = await setup() - let credentials = generateCredentials(manager.rlnInstance, 5) + const credentialCount = 6 + let credentials = generateCredentials(manager.rlnInstance, credentialCount) await manager.init() let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot() @@ -254,9 +255,11 @@ suite "Onchain group manager": merkleRootBeforeRes.isOk() let merkleRootBefore = merkleRootBeforeRes.get() - var futures = [newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void]()] - - proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback = + type TestGroupSyncFuts = array[0..credentialCount - 1, Future[void]] + var futures: TestGroupSyncFuts + for i in 0 ..< futures.len(): + futures[i] = newFuture[void]() + proc generateCallback(futs: TestGroupSyncFuts, credentials: seq[IdentityCredential]): OnRegisterCallback = var futureIndex = 0 proc callback(registrations: seq[Membership]): Future[void] {.async.} = if registrations.len == 1 and @@ -281,6 +284,7 @@ suite "Onchain group manager": check: merkleRootBefore != merkleRootAfter + manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize asyncTest "register: should guard against uninitialized state": let manager = await setup() @@ -477,6 +481,54 @@ suite "Onchain group manager": check: verifiedRes.get() == false + asyncTest "backfillRootQueue: should backfill roots in event of chain reorg": + let manager = await setup() + const credentialCount = 6 + let credentials = generateCredentials(manager.rlnInstance, credentialCount) + await manager.init() + + type TestBackfillFuts = array[0..credentialCount - 1, Future[void]] + var futures: TestBackfillFuts + for i in 0 ..< futures.len(): + futures[i] = newFuture[void]() + + proc generateCallback(futs: TestBackfillFuts, credentials: seq[IdentityCredential]): OnRegisterCallback = + var futureIndex = 0 + proc callback(registrations: seq[Membership]): Future[void] {.async.} = + if registrations.len == 1 and + registrations[0].idCommitment == credentials[futureIndex].idCommitment and + registrations[0].index == MembershipIndex(futureIndex + 1): + futs[futureIndex].complete() + futureIndex += 1 + return callback + + manager.onRegister(generateCallback(futures, credentials)) + await manager.startGroupSync() + + for i in 0 ..< credentials.len(): + await manager.register(credentials[i]) + + await allFutures(futures) + + # At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root + require: + manager.validRoots.len() == credentialCount - 1 + manager.validRootBuffer.len() == 1 + + # We can now simulate a chain reorg by calling backfillRootQueue + var blockTable = default(BlockTable) + blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)] + + let expectedLastRoot = manager.validRootBuffer[0] + await manager.backfillRootQueue(blockTable) + + # We should now have 5 roots in the queue, and no partial buffer + check: + manager.validRoots.len() == credentialCount - 1 + manager.validRootBuffer.len() == 0 + manager.validRoots[credentialCount - 2] == expectedLastRoot + + ################################ ## Terminating/removing Ganache ################################ diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim b/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim index 80a043d62..ac06baecf 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/group_manager_base.nim @@ -1,5 +1,6 @@ import ../protocol_types, + ../constants, ../rln import options, @@ -87,18 +88,18 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} = g.withdrawCb = some(cb) -# Acceptable roots for merkle root validation of incoming messages -const AcceptableRootWindowSize* = 5 - -proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void = +proc slideRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): seq[MerkleNode] = ## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached - let overflowCount = rootQueue.len() - AcceptableRootWindowSize - if overflowCount >= 0: - # Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`) - for i in 0..overflowCount: - rootQueue.popFirst() + let overflowCount = rootQueue.len() - AcceptableRootWindowSize + 1 + var overflowedRoots = newSeq[MerkleNode]() + if overflowCount > 0: + # Delete the oldest `overflowCount` roots in the deque (index 0..`overflowCount`) + # insert into overflowedRoots seq and return + for i in 0 ..< overflowCount: + overFlowedRoots.add(rootQueue.popFirst()) # Push the next root into the queue rootQueue.addLast(root) + return overFlowedRoots method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} = ## returns the index of the root in the merkle tree. @@ -112,12 +113,18 @@ method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,rais return true return false -template updateValidRootQueue*(g: GroupManager) = +template slideRootQueue*(g: GroupManager): untyped = let rootRes = g.rlnInstance.getMerkleRoot() if rootRes.isErr(): raise newException(ValueError, "failed to get merkle root") let rootAfterUpdate = rootRes.get() - updateValidRootQueue(g.validRoots, rootAfterUpdate) + + var rootBuffer: Deque[MerkleNode] + let overflowedRoots = slideRootQueue(g.validRoots, rootAfterUpdate) + if overflowedRoots.len > 0: + for root in overflowedRoots: + discard rootBuffer.slideRootQueue(root) + rootBuffer method verifyProof*(g: GroupManager, input: openArray[byte], diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim index 1c06d27c6..31c372845 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -52,6 +52,10 @@ type keystorePassword*: Option[string] saveKeystore*: bool registrationHandler*: Option[RegistrationHandler] + # this buffer exists to backfill appropriate roots for the merkle tree, + # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, + # because the average reorg depth is 1 to 2 blocks. + validRootBuffer*: Deque[MerkleNode] const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" @@ -70,7 +74,7 @@ method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi if g.registerCb.isSome(): await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)]) - g.updateValidRootQueue() + g.validRootBuffer = g.slideRootQueue() g.latestIndex += 1 @@ -92,7 +96,7 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): membersSeq.add(member) await g.registerCb.get()(membersSeq) - g.updateValidRootQueue() + g.validRootBuffer = g.slideRootQueue() g.latestIndex += MembershipIndex(idCommitments.len()) @@ -176,6 +180,22 @@ proc parseEvent*(event: type MemberRegistered, type BlockTable* = OrderedTable[BlockNumber, seq[Membership]] +proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = + if blocktable.len() > 0: + for blockNumber, members in blocktable.pairs(): + let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index)) + debug "deleting members to reconcile state" + if not deletionSuccess: + error "failed to delete members from the tree", success=deletionSuccess + raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent") + # backfill the tree's acceptable roots + for i in 0..blocktable.len()-1: + # remove the last root + g.validRoots.popLast() + for i in 0..blockTable.len()-1: + # add the backfilled root + g.validRoots.addLast(g.validRootBuffer.popLast()) + proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} = initializedGuard(g) @@ -193,6 +213,8 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[ normalizedToBlock = fromBlock var blockTable = default(BlockTable) + var toRemoveBlockTable = default(BlockTable) + let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId())) if events.len == 0: debug "no events found" @@ -200,12 +222,23 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[ for event in events: let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint + let removed = event["removed"].getBool() let parsedEventRes = parseEvent(MemberRegistered, event) if parsedEventRes.isErr(): error "failed to parse the MemberRegistered event", error=parsedEventRes.error() raise newException(ValueError, "failed to parse the MemberRegistered event") let parsedEvent = parsedEventRes.get() + if removed: + # remove the registration from the tree, per block + warn "member removed from the tree as per canonical chain", index=parsedEvent.index + if toRemoveBlockTable.hasKey(blockNumber): + toRemoveBlockTable[blockNumber].add(parsedEvent) + else: + toRemoveBlockTable[blockNumber] = @[parsedEvent] + + await g.backfillRootQueue(toRemoveBlockTable) + if blockTable.hasKey(blockNumber): blockTable[blockNumber].add(parsedEvent) else: @@ -221,8 +254,8 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu let startingIndex = members[0].index try: await g.registerBatch(members.mapIt(it.idCommitment)) - except: - error "failed to insert members into the tree" + except CatchableError: + error "failed to insert members into the tree", error=getCurrentExceptionMsg() raise newException(ValueError, "failed to insert members into the tree") trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex let lastIndex = startingIndex + members.len.uint - 1 @@ -235,7 +268,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu return -proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = +proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, + fromBlock: BlockNumber, + toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = initializedGuard(g) let events = await g.getEvents(fromBlock, toBlock) diff --git a/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim b/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim index 213f89bd3..b70f9f600 100644 --- a/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/v2/protocol/waku_rln_relay/group_manager/static/group_manager.nim @@ -34,7 +34,7 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} = if not membersInserted: raise newException(ValueError, "Failed to insert members into the merkle tree") - g.updateValidRootQueue() + discard g.slideRootQueue() g.latestIndex += MembershipIndex(idCommitments.len() - 1) @@ -56,7 +56,7 @@ method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void if not memberInserted: raise newException(ValueError, "Failed to insert member into the merkle tree") - g.updateValidRootQueue() + discard g.slideRootQueue() g.latestIndex += 1 @@ -77,7 +77,7 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i))) await g.registerCb.get()(memberSeq) - g.updateValidRootQueue() + discard g.slideRootQueue() g.latestIndex += MembershipIndex(idCommitments.len() - 1) diff --git a/waku/v2/protocol/waku_rln_relay/rln/wrappers.nim b/waku/v2/protocol/waku_rln_relay/rln/wrappers.nim index 6a2b940ef..d5698fc2d 100644 --- a/waku/v2/protocol/waku_rln_relay/rln/wrappers.nim +++ b/waku/v2/protocol/waku_rln_relay/rln/wrappers.nim @@ -252,6 +252,13 @@ proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool = let deletion_success = delete_member(rlnInstance, index) return deletion_success +proc removeMembers*(rlnInstance: ptr RLN, indices: seq[MembershipIndex]): bool = + for index in indices: + let deletion_success = delete_member(rlnInstance, index) + if not deletion_success: + return false + return true + proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult = # read the Merkle Tree root after insertion var