feat(rln-relay): close db connection appropriately (#1858)

This commit is contained in:
Aaryamann Challani 2023-07-27 17:21:21 +05:30 committed by GitHub
parent fdd7eaae18
commit 0132640308
7 changed files with 47 additions and 5 deletions

2
vendor/zerokit vendored

@ -1 +1 @@
Subproject commit c6b7a8c0a401dc9a3f5b0511ebfb8727fc19b53a Subproject commit 9d4ed68450e20626081e24490ba320278f6d3f7b

View File

@ -828,4 +828,8 @@ proc stop*(node: WakuNode) {.async.} =
await node.switch.stop() await node.switch.stop()
node.peerManager.stop() node.peerManager.stop()
when defined(rln):
if not node.wakuRlnRelay.isNil():
await node.wakuRlnRelay.stop()
node.started = false node.started = false

View File

@ -89,6 +89,9 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH
method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} = method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet") raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet")
method stop*(g: GroupManager): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "stop proc for " & $g.type & " is not implemented yet")
# This proc is used to set a callback that will be called when an identity commitment is withdrawn # This proc is used to set a callback that will be called when an identity commitment is withdrawn
# The callback may be called multiple times, and should be used to for any post processing # The callback may be called multiple times, and should be used to for any post processing
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} = method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =

View File

@ -62,7 +62,8 @@ type
const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password" const DefaultKeyStorePassword* = "password"
const BlockChunkSize* = 100'u64 const DecayFactor* = 1.2
const DefaultChunkSize* = 1000
template initializedGuard(g: OnchainGroupManager): untyped = template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized: if not g.initialized:
@ -320,28 +321,33 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
# the block chunk size decays exponentially with the number of blocks
# the minimum chunk size is 100
var blockChunkSize = 1_000_000
var fromBlock = if g.latestProcessedBlock.isSome(): var fromBlock = if g.latestProcessedBlock.isSome():
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get() info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get()
g.latestProcessedBlock.get() g.latestProcessedBlock.get()
else: else:
info "starting onchain sync from scratch" info "starting onchain sync from scratch"
# chunk size is 1000 blocks
BlockNumber(0) BlockNumber(0)
let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try: try:
# we always want to sync from last processed block => latest # we always want to sync from last processed block => latest
if fromBlock == BlockNumber(0) or if fromBlock == BlockNumber(0) or
fromBlock + BlockNumber(BlockChunkSize) < latestBlock: fromBlock + BlockNumber(blockChunkSize) < latestBlock:
# chunk events # chunk events
while true: while true:
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
let toBlock = min(fromBlock + BlockNumber(BlockChunkSize), currentLatestBlock) let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
info "chunking events", fromBlock = fromBlock, toBlock = toBlock info "chunking events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, some(toBlock)) await g.getAndHandleEvents(fromBlock, some(toBlock))
fromBlock = toBlock + 1 fromBlock = toBlock + 1
if fromBlock >= currentLatestBlock: if fromBlock >= currentLatestBlock:
break break
let newChunkSize = float(blockChunkSize) / DecayFactor
blockChunkSize = max(int(newChunkSize), DefaultChunkSize)
else: else:
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
except CatchableError: except CatchableError:
@ -496,3 +502,13 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to restart group sync", error = getCurrentExceptionMsg() error "failed to restart group sync", error = getCurrentExceptionMsg()
g.initialized = true g.initialized = true
method stop*(g: OnchainGroupManager): Future[void] {.async.} =
if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close()
let flushed = g.rlnInstance.flush()
if not flushed:
error "failed to flush to the tree db"
g.initialized = false

View File

@ -102,3 +102,10 @@ method onRegister*(g: StaticGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
method onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = method onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
g.withdrawCb = some(cb) g.withdrawCb = some(cb)
method stop*(g: StaticGroupManager): Future[void] =
initializedGuard(g)
# No-op
var retFut = newFuture[void]("StaticGroupManager.stop")
retFut.complete()
return retFut

View File

@ -188,3 +188,8 @@ proc get_metadata*(ctx: ptr RLN, output_buffer: ptr Buffer): bool {.importc: "ge
## gets the metadata stored by ctx and populates the passed pointer output_buffer with it ## gets the metadata stored by ctx and populates the passed pointer output_buffer with it
## the output_buffer holds the metadata as a byte seq ## the output_buffer holds the metadata as a byte seq
## the return bool value indicates the success or failure of the operation ## the return bool value indicates the success or failure of the operation
proc flush*(ctx: ptr RLN): bool {.importc: "flush".}
## flushes the write buffer to the database
## the return bool value indicates the success or failure of the operation
## This allows more robust and graceful handling of the database connection

View File

@ -90,6 +90,13 @@ type WakuRLNRelay* = ref object of RootObj
groupManager*: GroupManager groupManager*: GroupManager
messageBucket*: Option[TokenBucket] messageBucket*: Option[TokenBucket]
method stop*(rlnPeer: WakuRLNRelay) {.async.} =
## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol
# stop the group sync, and flush data to tree db
await rlnPeer.groupManager.stop()
proc hasDuplicate*(rlnPeer: WakuRLNRelay, proc hasDuplicate*(rlnPeer: WakuRLNRelay,
proofMetadata: ProofMetadata): RlnRelayResult[bool] = proofMetadata: ProofMetadata): RlnRelayResult[bool] =
## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same ## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same