mirror of https://github.com/waku-org/nwaku.git
fix(rln-relay): invalid start index being set results in invalid proofs (#1915)
* fix(rln-relay): invalid proof usage * fix(rln-relay): use startIndex from first event in block * fix: latestIndex set after registerBatch
This commit is contained in:
parent
cf3013962c
commit
b3bb7a1113
|
@ -738,7 +738,7 @@ when defined(rln):
|
|||
let rlnRelayRes = await WakuRlnRelay.new(rlnConf,
|
||||
registrationHandler)
|
||||
if rlnRelayRes.isErr():
|
||||
raise newException(CatchableError, "failed to mount WakuRlnRelay: {rlnRelayRes.error}")
|
||||
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
|
||||
let rlnRelay = rlnRelayRes.get()
|
||||
let validator = generateRlnValidator(rlnRelay, spamHandler)
|
||||
let pb = PubSub(node.wakuRelay)
|
||||
|
|
|
@ -69,40 +69,40 @@ template initializedGuard(g: OnchainGroupManager): untyped =
|
|||
if not g.initialized:
|
||||
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
||||
|
||||
method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
await g.registerBatch(@[idCommitment])
|
||||
|
||||
method atomicBatch*(g: OnchainGroupManager,
|
||||
start: MembershipIndex,
|
||||
idCommitments = newSeq[IDCommitment](),
|
||||
toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
let startIndex = g.latestIndex
|
||||
waku_rln_membership_insertion_duration_seconds.nanosecondTime:
|
||||
let operationSuccess = g.rlnInstance.atomicWrite(some(startIndex), idCommitments, toRemoveIndices)
|
||||
let operationSuccess = g.rlnInstance.atomicWrite(some(start), idCommitments, toRemoveIndices)
|
||||
if not operationSuccess:
|
||||
raise newException(ValueError, "atomic batch operation failed")
|
||||
|
||||
if g.registerCb.isSome():
|
||||
var membersSeq = newSeq[Membership]()
|
||||
for i in 0 ..< idCommitments.len():
|
||||
var index = g.latestIndex + MembershipIndex(i)
|
||||
debug "registering member", idCommitment = idCommitments[i], index = index, latestIndex = g.latestIndex
|
||||
var index = start + MembershipIndex(i)
|
||||
debug "registering member", idCommitment = idCommitments[i], index = index
|
||||
let member = Membership(idCommitment: idCommitments[i], index: index)
|
||||
membersSeq.add(member)
|
||||
await g.registerCb.get()(membersSeq)
|
||||
|
||||
g.validRootBuffer = g.slideRootQueue()
|
||||
|
||||
g.latestIndex += MembershipIndex(idCommitments.len())
|
||||
method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
await g.registerBatch(@[idCommitment])
|
||||
|
||||
|
||||
|
||||
method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
|
||||
initializedGuard(g)
|
||||
|
||||
await g.atomicBatch(idCommitments)
|
||||
await g.atomicBatch(g.latestIndex, idCommitments)
|
||||
g.latestIndex += MembershipIndex(idCommitments.len())
|
||||
|
||||
|
||||
method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} =
|
||||
|
@ -234,7 +234,7 @@ proc getBlockTable(g: OnchainGroupManager,
|
|||
let events = await g.getRawEvents(fromBlock, toBlock)
|
||||
|
||||
if events.len == 0:
|
||||
debug "no events found"
|
||||
trace "no events found"
|
||||
return blockTable
|
||||
|
||||
for event in events:
|
||||
|
@ -255,20 +255,17 @@ proc handleEvents(g: OnchainGroupManager,
|
|||
|
||||
for blockNumber, members in blockTable.pairs():
|
||||
try:
|
||||
await g.atomicBatch(idCommitments = members.mapIt(it[0].idCommitment),
|
||||
toRemoveIndices = members.filterIt(it[1]).mapIt(it[0].index))
|
||||
let startIndex = blockTable[blockNumber].filterIt(not it[1])[0][0].index
|
||||
let removalIndices = members.filterIt(it[1]).mapIt(it[0].index)
|
||||
let idCommitments = members.mapIt(it[0].idCommitment)
|
||||
await g.atomicBatch(start = startIndex,
|
||||
idCommitments = idCommitments,
|
||||
toRemoveIndices = removalIndices)
|
||||
g.latestIndex = startIndex + MembershipIndex(idCommitments.len())
|
||||
except CatchableError:
|
||||
error "failed to insert members into the tree", error=getCurrentExceptionMsg()
|
||||
raise newException(ValueError, "failed to insert members into the tree")
|
||||
trace "new members added to the Merkle tree", commitments=members.mapIt(it[0].idCommitment.inHex())
|
||||
g.latestProcessedBlock = some(blockNumber)
|
||||
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
||||
lastProcessedBlock: blockNumber))
|
||||
if metadataSetRes.isErr():
|
||||
# this is not a fatal error, hence we don't raise an exception
|
||||
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
||||
else:
|
||||
info "rln metadata persisted", lastProcessedBlock = blockNumber
|
||||
|
||||
return
|
||||
|
||||
|
@ -292,10 +289,21 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
|||
await g.handleEvents(blockTable)
|
||||
await g.handleRemovedEvents(blockTable)
|
||||
|
||||
let latestProcessedBlock = if toBlock.isSome(): toBlock.get()
|
||||
else: fromBlock
|
||||
g.latestProcessedBlock = some(latestProcessedBlock)
|
||||
let metadataSetRes = g.rlnInstance.setMetadata(RlnMetadata(
|
||||
lastProcessedBlock: latestProcessedBlock))
|
||||
if metadataSetRes.isErr():
|
||||
# this is not a fatal error, hence we don't raise an exception
|
||||
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
||||
else:
|
||||
debug "rln metadata persisted", blockNumber = latestProcessedBlock
|
||||
|
||||
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
|
||||
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
|
||||
let latestBlock = blockheader.number.uint
|
||||
debug "block received", blockNumber = latestBlock
|
||||
trace "block received", blockNumber = latestBlock
|
||||
# get logs from the last block
|
||||
try:
|
||||
asyncSpawn g.getAndHandleEvents(latestBlock)
|
||||
|
@ -327,7 +335,7 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
|
|||
|
||||
var fromBlock = if g.latestProcessedBlock.isSome():
|
||||
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
|
||||
g.latestProcessedBlock.get()
|
||||
g.latestProcessedBlock.get() + 1
|
||||
else:
|
||||
info "starting onchain sync from scratch"
|
||||
BlockNumber(0)
|
||||
|
|
|
@ -89,7 +89,7 @@ proc createRLNInstanceLocal(d = MerkleTreeDepth,
|
|||
cache_capacity: 15_000,
|
||||
mode: "high_throughput",
|
||||
compression: false,
|
||||
flush_interval: 12_000,
|
||||
flush_interval: 500,
|
||||
path: if tree_path != "": tree_path else: DefaultRlnTreePath
|
||||
)
|
||||
)
|
||||
|
|
|
@ -95,6 +95,7 @@ method stop*(rlnPeer: WakuRLNRelay) {.async.} =
|
|||
## Throws an error if it cannot stop the rln-relay protocol
|
||||
|
||||
# stop the group sync, and flush data to tree db
|
||||
info "stopping rln-relay"
|
||||
await rlnPeer.groupManager.stop()
|
||||
|
||||
proc hasDuplicate*(rlnPeer: WakuRLNRelay,
|
||||
|
@ -217,7 +218,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay,
|
|||
|
||||
let rootValidationRes = rlnPeer.groupManager.validateRoot(proof.merkleRoot)
|
||||
if not rootValidationRes:
|
||||
debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot, validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex())
|
||||
debug "invalid message: provided root does not belong to acceptable window of roots", provided=proof.merkleRoot.inHex(), validRoots=rlnPeer.groupManager.validRoots.mapIt(it.inHex())
|
||||
waku_rln_invalid_messages_total.inc(labelValues=["invalid_root"])
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
|
|
Loading…
Reference in New Issue