mirror of https://github.com/waku-org/nwaku.git
chore(rln-relay): gracefully handle chain forks (#1623)
* chore(rln-relay): gracefully handle chain forks * fix(rln-relay): better root windowing * fix(rln-relay): better future generation for test * fix(rln-relay): reduced width * fix: better naming of futs, collision free
This commit is contained in:
parent
11ff93c2cf
commit
00a3812b91
|
@ -6,7 +6,7 @@ else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, osproc, streams, strutils],
|
std/[options, osproc, streams, strutils, tables],
|
||||||
stew/[results, byteutils],
|
stew/[results, byteutils],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
|
@ -246,7 +246,8 @@ suite "Onchain group manager":
|
||||||
|
|
||||||
asyncTest "startGroupSync: should fetch history correctly":
|
asyncTest "startGroupSync: should fetch history correctly":
|
||||||
let manager = await setup()
|
let manager = await setup()
|
||||||
let credentials = generateCredentials(manager.rlnInstance, 5)
|
const credentialCount = 6
|
||||||
|
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
|
||||||
await manager.init()
|
await manager.init()
|
||||||
|
|
||||||
let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
|
let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
|
||||||
|
@ -254,9 +255,11 @@ suite "Onchain group manager":
|
||||||
merkleRootBeforeRes.isOk()
|
merkleRootBeforeRes.isOk()
|
||||||
let merkleRootBefore = merkleRootBeforeRes.get()
|
let merkleRootBefore = merkleRootBeforeRes.get()
|
||||||
|
|
||||||
var futures = [newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void]()]
|
type TestGroupSyncFuts = array[0..credentialCount - 1, Future[void]]
|
||||||
|
var futures: TestGroupSyncFuts
|
||||||
proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback =
|
for i in 0 ..< futures.len():
|
||||||
|
futures[i] = newFuture[void]()
|
||||||
|
proc generateCallback(futs: TestGroupSyncFuts, credentials: seq[IdentityCredential]): OnRegisterCallback =
|
||||||
var futureIndex = 0
|
var futureIndex = 0
|
||||||
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
|
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
|
||||||
if registrations.len == 1 and
|
if registrations.len == 1 and
|
||||||
|
@ -281,6 +284,7 @@ suite "Onchain group manager":
|
||||||
|
|
||||||
check:
|
check:
|
||||||
merkleRootBefore != merkleRootAfter
|
merkleRootBefore != merkleRootAfter
|
||||||
|
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize
|
||||||
|
|
||||||
asyncTest "register: should guard against uninitialized state":
|
asyncTest "register: should guard against uninitialized state":
|
||||||
let manager = await setup()
|
let manager = await setup()
|
||||||
|
@ -477,6 +481,54 @@ suite "Onchain group manager":
|
||||||
check:
|
check:
|
||||||
verifiedRes.get() == false
|
verifiedRes.get() == false
|
||||||
|
|
||||||
|
asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
|
||||||
|
let manager = await setup()
|
||||||
|
const credentialCount = 6
|
||||||
|
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
|
||||||
|
await manager.init()
|
||||||
|
|
||||||
|
type TestBackfillFuts = array[0..credentialCount - 1, Future[void]]
|
||||||
|
var futures: TestBackfillFuts
|
||||||
|
for i in 0 ..< futures.len():
|
||||||
|
futures[i] = newFuture[void]()
|
||||||
|
|
||||||
|
proc generateCallback(futs: TestBackfillFuts, credentials: seq[IdentityCredential]): OnRegisterCallback =
|
||||||
|
var futureIndex = 0
|
||||||
|
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
|
||||||
|
if registrations.len == 1 and
|
||||||
|
registrations[0].idCommitment == credentials[futureIndex].idCommitment and
|
||||||
|
registrations[0].index == MembershipIndex(futureIndex + 1):
|
||||||
|
futs[futureIndex].complete()
|
||||||
|
futureIndex += 1
|
||||||
|
return callback
|
||||||
|
|
||||||
|
manager.onRegister(generateCallback(futures, credentials))
|
||||||
|
await manager.startGroupSync()
|
||||||
|
|
||||||
|
for i in 0 ..< credentials.len():
|
||||||
|
await manager.register(credentials[i])
|
||||||
|
|
||||||
|
await allFutures(futures)
|
||||||
|
|
||||||
|
# At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root
|
||||||
|
require:
|
||||||
|
manager.validRoots.len() == credentialCount - 1
|
||||||
|
manager.validRootBuffer.len() == 1
|
||||||
|
|
||||||
|
# We can now simulate a chain reorg by calling backfillRootQueue
|
||||||
|
var blockTable = default(BlockTable)
|
||||||
|
blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)]
|
||||||
|
|
||||||
|
let expectedLastRoot = manager.validRootBuffer[0]
|
||||||
|
await manager.backfillRootQueue(blockTable)
|
||||||
|
|
||||||
|
# We should now have 5 roots in the queue, and no partial buffer
|
||||||
|
check:
|
||||||
|
manager.validRoots.len() == credentialCount - 1
|
||||||
|
manager.validRootBuffer.len() == 0
|
||||||
|
manager.validRoots[credentialCount - 2] == expectedLastRoot
|
||||||
|
|
||||||
|
|
||||||
################################
|
################################
|
||||||
## Terminating/removing Ganache
|
## Terminating/removing Ganache
|
||||||
################################
|
################################
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import
|
import
|
||||||
../protocol_types,
|
../protocol_types,
|
||||||
|
../constants,
|
||||||
../rln
|
../rln
|
||||||
import
|
import
|
||||||
options,
|
options,
|
||||||
|
@ -87,18 +88,18 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH
|
||||||
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
|
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
|
||||||
g.withdrawCb = some(cb)
|
g.withdrawCb = some(cb)
|
||||||
|
|
||||||
# Acceptable roots for merkle root validation of incoming messages
|
proc slideRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): seq[MerkleNode] =
|
||||||
const AcceptableRootWindowSize* = 5
|
|
||||||
|
|
||||||
proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void =
|
|
||||||
## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached
|
## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached
|
||||||
let overflowCount = rootQueue.len() - AcceptableRootWindowSize
|
let overflowCount = rootQueue.len() - AcceptableRootWindowSize + 1
|
||||||
if overflowCount >= 0:
|
var overflowedRoots = newSeq[MerkleNode]()
|
||||||
# Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`)
|
if overflowCount > 0:
|
||||||
for i in 0..overflowCount:
|
# Delete the oldest `overflowCount` roots in the deque (index 0..`overflowCount`)
|
||||||
rootQueue.popFirst()
|
# insert into overflowedRoots seq and return
|
||||||
|
for i in 0 ..< overflowCount:
|
||||||
|
overFlowedRoots.add(rootQueue.popFirst())
|
||||||
# Push the next root into the queue
|
# Push the next root into the queue
|
||||||
rootQueue.addLast(root)
|
rootQueue.addLast(root)
|
||||||
|
return overFlowedRoots
|
||||||
|
|
||||||
method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} =
|
method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} =
|
||||||
## returns the index of the root in the merkle tree.
|
## returns the index of the root in the merkle tree.
|
||||||
|
@ -112,12 +113,18 @@ method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,rais
|
||||||
return true
|
return true
|
||||||
return false
|
return false
|
||||||
|
|
||||||
template updateValidRootQueue*(g: GroupManager) =
|
template slideRootQueue*(g: GroupManager): untyped =
|
||||||
let rootRes = g.rlnInstance.getMerkleRoot()
|
let rootRes = g.rlnInstance.getMerkleRoot()
|
||||||
if rootRes.isErr():
|
if rootRes.isErr():
|
||||||
raise newException(ValueError, "failed to get merkle root")
|
raise newException(ValueError, "failed to get merkle root")
|
||||||
let rootAfterUpdate = rootRes.get()
|
let rootAfterUpdate = rootRes.get()
|
||||||
updateValidRootQueue(g.validRoots, rootAfterUpdate)
|
|
||||||
|
var rootBuffer: Deque[MerkleNode]
|
||||||
|
let overflowedRoots = slideRootQueue(g.validRoots, rootAfterUpdate)
|
||||||
|
if overflowedRoots.len > 0:
|
||||||
|
for root in overflowedRoots:
|
||||||
|
discard rootBuffer.slideRootQueue(root)
|
||||||
|
rootBuffer
|
||||||
|
|
||||||
method verifyProof*(g: GroupManager,
|
method verifyProof*(g: GroupManager,
|
||||||
input: openArray[byte],
|
input: openArray[byte],
|
||||||
|
|
|
@ -52,6 +52,10 @@ type
|
||||||
keystorePassword*: Option[string]
|
keystorePassword*: Option[string]
|
||||||
saveKeystore*: bool
|
saveKeystore*: bool
|
||||||
registrationHandler*: Option[RegistrationHandler]
|
registrationHandler*: Option[RegistrationHandler]
|
||||||
|
# this buffer exists to backfill appropriate roots for the merkle tree,
|
||||||
|
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
|
||||||
|
# because the average reorg depth is 1 to 2 blocks.
|
||||||
|
validRootBuffer*: Deque[MerkleNode]
|
||||||
|
|
||||||
const DefaultKeyStorePath* = "rlnKeystore.json"
|
const DefaultKeyStorePath* = "rlnKeystore.json"
|
||||||
const DefaultKeyStorePassword* = "password"
|
const DefaultKeyStorePassword* = "password"
|
||||||
|
@ -70,7 +74,7 @@ method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi
|
||||||
if g.registerCb.isSome():
|
if g.registerCb.isSome():
|
||||||
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])
|
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])
|
||||||
|
|
||||||
g.updateValidRootQueue()
|
g.validRootBuffer = g.slideRootQueue()
|
||||||
|
|
||||||
g.latestIndex += 1
|
g.latestIndex += 1
|
||||||
|
|
||||||
|
@ -92,7 +96,7 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):
|
||||||
membersSeq.add(member)
|
membersSeq.add(member)
|
||||||
await g.registerCb.get()(membersSeq)
|
await g.registerCb.get()(membersSeq)
|
||||||
|
|
||||||
g.updateValidRootQueue()
|
g.validRootBuffer = g.slideRootQueue()
|
||||||
|
|
||||||
g.latestIndex += MembershipIndex(idCommitments.len())
|
g.latestIndex += MembershipIndex(idCommitments.len())
|
||||||
|
|
||||||
|
@ -176,6 +180,22 @@ proc parseEvent*(event: type MemberRegistered,
|
||||||
|
|
||||||
type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]
|
type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]
|
||||||
|
|
||||||
|
proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
|
||||||
|
if blocktable.len() > 0:
|
||||||
|
for blockNumber, members in blocktable.pairs():
|
||||||
|
let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index))
|
||||||
|
debug "deleting members to reconcile state"
|
||||||
|
if not deletionSuccess:
|
||||||
|
error "failed to delete members from the tree", success=deletionSuccess
|
||||||
|
raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent")
|
||||||
|
# backfill the tree's acceptable roots
|
||||||
|
for i in 0..blocktable.len()-1:
|
||||||
|
# remove the last root
|
||||||
|
g.validRoots.popLast()
|
||||||
|
for i in 0..blockTable.len()-1:
|
||||||
|
# add the backfilled root
|
||||||
|
g.validRoots.addLast(g.validRootBuffer.popLast())
|
||||||
|
|
||||||
proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
|
proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
|
@ -193,6 +213,8 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
|
||||||
normalizedToBlock = fromBlock
|
normalizedToBlock = fromBlock
|
||||||
|
|
||||||
var blockTable = default(BlockTable)
|
var blockTable = default(BlockTable)
|
||||||
|
var toRemoveBlockTable = default(BlockTable)
|
||||||
|
|
||||||
let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
|
let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
|
||||||
if events.len == 0:
|
if events.len == 0:
|
||||||
debug "no events found"
|
debug "no events found"
|
||||||
|
@ -200,12 +222,23 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
|
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
|
||||||
|
let removed = event["removed"].getBool()
|
||||||
let parsedEventRes = parseEvent(MemberRegistered, event)
|
let parsedEventRes = parseEvent(MemberRegistered, event)
|
||||||
if parsedEventRes.isErr():
|
if parsedEventRes.isErr():
|
||||||
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
|
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
|
||||||
raise newException(ValueError, "failed to parse the MemberRegistered event")
|
raise newException(ValueError, "failed to parse the MemberRegistered event")
|
||||||
let parsedEvent = parsedEventRes.get()
|
let parsedEvent = parsedEventRes.get()
|
||||||
|
|
||||||
|
if removed:
|
||||||
|
# remove the registration from the tree, per block
|
||||||
|
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
|
||||||
|
if toRemoveBlockTable.hasKey(blockNumber):
|
||||||
|
toRemoveBlockTable[blockNumber].add(parsedEvent)
|
||||||
|
else:
|
||||||
|
toRemoveBlockTable[blockNumber] = @[parsedEvent]
|
||||||
|
|
||||||
|
await g.backfillRootQueue(toRemoveBlockTable)
|
||||||
|
|
||||||
if blockTable.hasKey(blockNumber):
|
if blockTable.hasKey(blockNumber):
|
||||||
blockTable[blockNumber].add(parsedEvent)
|
blockTable[blockNumber].add(parsedEvent)
|
||||||
else:
|
else:
|
||||||
|
@ -221,8 +254,8 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
|
||||||
let startingIndex = members[0].index
|
let startingIndex = members[0].index
|
||||||
try:
|
try:
|
||||||
await g.registerBatch(members.mapIt(it.idCommitment))
|
await g.registerBatch(members.mapIt(it.idCommitment))
|
||||||
except:
|
except CatchableError:
|
||||||
error "failed to insert members into the tree"
|
error "failed to insert members into the tree", error=getCurrentExceptionMsg()
|
||||||
raise newException(ValueError, "failed to insert members into the tree")
|
raise newException(ValueError, "failed to insert members into the tree")
|
||||||
trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex
|
trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex
|
||||||
let lastIndex = startingIndex + members.len.uint - 1
|
let lastIndex = startingIndex + members.len.uint - 1
|
||||||
|
@ -235,7 +268,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
|
proc getEventsAndSeedIntoTree*(g: OnchainGroupManager,
|
||||||
|
fromBlock: BlockNumber,
|
||||||
|
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
|
||||||
initializedGuard(g)
|
initializedGuard(g)
|
||||||
|
|
||||||
let events = await g.getEvents(fromBlock, toBlock)
|
let events = await g.getEvents(fromBlock, toBlock)
|
||||||
|
|
|
@ -34,7 +34,7 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
|
||||||
if not membersInserted:
|
if not membersInserted:
|
||||||
raise newException(ValueError, "Failed to insert members into the merkle tree")
|
raise newException(ValueError, "Failed to insert members into the merkle tree")
|
||||||
|
|
||||||
g.updateValidRootQueue()
|
discard g.slideRootQueue()
|
||||||
|
|
||||||
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
|
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void
|
||||||
if not memberInserted:
|
if not memberInserted:
|
||||||
raise newException(ValueError, "Failed to insert member into the merkle tree")
|
raise newException(ValueError, "Failed to insert member into the merkle tree")
|
||||||
|
|
||||||
g.updateValidRootQueue()
|
discard g.slideRootQueue()
|
||||||
|
|
||||||
g.latestIndex += 1
|
g.latestIndex += 1
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]):
|
||||||
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
|
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
|
||||||
await g.registerCb.get()(memberSeq)
|
await g.registerCb.get()(memberSeq)
|
||||||
|
|
||||||
g.updateValidRootQueue()
|
discard g.slideRootQueue()
|
||||||
|
|
||||||
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
|
g.latestIndex += MembershipIndex(idCommitments.len() - 1)
|
||||||
|
|
||||||
|
|
|
@ -252,6 +252,13 @@ proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool =
|
||||||
let deletion_success = delete_member(rlnInstance, index)
|
let deletion_success = delete_member(rlnInstance, index)
|
||||||
return deletion_success
|
return deletion_success
|
||||||
|
|
||||||
|
proc removeMembers*(rlnInstance: ptr RLN, indices: seq[MembershipIndex]): bool =
|
||||||
|
for index in indices:
|
||||||
|
let deletion_success = delete_member(rlnInstance, index)
|
||||||
|
if not deletion_success:
|
||||||
|
return false
|
||||||
|
return true
|
||||||
|
|
||||||
proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult =
|
proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult =
|
||||||
# read the Merkle Tree root after insertion
|
# read the Merkle Tree root after insertion
|
||||||
var
|
var
|
||||||
|
|
Loading…
Reference in New Issue