chore(rln-relay): remove websocket from OnchainGroupManager (#2364)

* chore(rln-relay): remove websocket from OnchainGroupManager

* fix: swap ws for http
This commit is contained in:
Aaryamann Challani 2024-01-23 23:22:45 +05:30 committed by GitHub
parent 105b3c2089
commit 2065f3db3c
5 changed files with 71 additions and 41 deletions

View File

@ -252,8 +252,8 @@ type
name: "rln-relay-id-commitment-key" }: string name: "rln-relay-id-commitment-key" }: string
rlnRelayEthClientAddress* {. rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/", desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "ws://localhost:8540/" defaultValue: "http://localhost:8540/"
name: "rln-relay-eth-client-address" }: string name: "rln-relay-eth-client-address" }: string
rlnRelayEthContractAddress* {. rlnRelayEthContractAddress* {.

View File

@ -61,8 +61,8 @@ type
name: "rln-relay-cred-path" }: string name: "rln-relay-cred-path" }: string
rlnRelayEthClientAddress* {. rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/", desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "ws://localhost:8540/", defaultValue: "http://localhost:8540/",
name: "rln-relay-eth-client-address" }: string name: "rln-relay-eth-client-address" }: string
rlnRelayEthContractAddress* {. rlnRelayEthContractAddress* {.

View File

@ -232,9 +232,9 @@ suite "Onchain group manager":
try: try:
await manager.startGroupSync() await manager.startGroupSync()
except ValueError: except CatchableError:
assert true assert true
except Exception, CatchableError: except Exception:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
await manager.stop() await manager.stop()
@ -330,9 +330,9 @@ suite "Onchain group manager":
try: try:
await manager.register(dummyCommitment) await manager.register(dummyCommitment)
except ValueError: except CatchableError:
assert true assert true
except Exception, CatchableError: except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg() assert false, "exception raised: " & getCurrentExceptionMsg()
await manager.stop() await manager.stop()
@ -399,9 +399,9 @@ suite "Onchain group manager":
try: try:
await manager.withdraw(idSecretHash) await manager.withdraw(idSecretHash)
except ValueError: except CatchableError:
assert true assert true
except Exception, CatchableError: except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg() assert false, "exception raised: " & getCurrentExceptionMsg()
await manager.stop() await manager.stop()
@ -627,7 +627,7 @@ suite "Onchain group manager":
await manager.stop() await manager.stop()
asyncTest "isReady should return false if ethRpc is none": asyncTest "isReady should return false if ethRpc is none":
var manager = await setup() let manager = await setup()
await manager.init() await manager.init()
manager.ethRpc = none(Web3) manager.ethRpc = none(Web3)
@ -644,7 +644,7 @@ suite "Onchain group manager":
await manager.stop() await manager.stop()
asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed": asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup() let manager = await setup()
await manager.init() await manager.init()
var isReady = true var isReady = true
@ -659,14 +659,13 @@ suite "Onchain group manager":
await manager.stop() await manager.stop()
asyncTest "isReady should return true if ethRpc is ready": asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup() let manager = await setup()
await manager.init() await manager.init()
# node can only be ready after group sync is done # node can only be ready after group sync is done
try: try:
await manager.startGroupSync() await manager.startGroupSync()
except Exception, CatchableError: except Exception, CatchableError:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg() assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
var isReady = false var isReady = false
try: try:
isReady = await manager.isReady() isReady = await manager.isReady()

View File

@ -74,13 +74,17 @@ type
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this, # in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks. # because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode] validRootBuffer*: Deque[MerkleNode]
# interval loop to shut down gracefully
blockFetchingActive*: bool
const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password" const DefaultKeyStorePassword* = "password"
const DefaultBlockPollRate* = 6.seconds
template initializedGuard(g: OnchainGroupManager): untyped = template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized: if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized") raise newException(CatchableError, "OnchainGroupManager is not initialized")
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
@ -316,12 +320,15 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable):
proc getAndHandleEvents(g: OnchainGroupManager, proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber, fromBlock: BlockNumber,
toBlock: BlockNumber): Future[void] {.async: (raises: [Exception]).} = toBlock: BlockNumber): Future[bool] {.async: (raises: [Exception]).} =
initializedGuard(g) initializedGuard(g)
let blockTable = await g.getBlockTable(fromBlock, toBlock) let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable) try:
await g.handleRemovedEvents(blockTable) await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
except CatchableError:
error "failed to handle events", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to handle events")
g.latestProcessedBlock = toBlock g.latestProcessedBlock = toBlock
let metadataSetRes = g.setMetadata() let metadataSetRes = g.setMetadata()
@ -330,32 +337,49 @@ proc getAndHandleEvents(g: OnchainGroupManager,
warn "failed to persist rln metadata", error=metadataSetRes.error() warn "failed to persist rln metadata", error=metadataSetRes.error()
else: else:
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock trace "rln metadata persisted", blockNumber = g.latestProcessedBlock
return true
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = g.blockFetchingActive = false
let latestBlock = BlockNumber(blockheader.number)
trace "block received", blockNumber = latestBlock
# get logs from the last block
try:
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback
proc newHeadErrCallback(error: CatchableError) = proc runIntervalLoop() {.async, gcsafe.} =
warn "failed to get new head", error=error.msg g.blockFetchingActive = true
while g.blockFetchingActive:
var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
await cb()
await sleepAsync(interval)
asyncSpawn runIntervalLoop()
proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if latestBlock <= g.latestProcessedBlock:
return
# get logs from the last block
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
return true
return wrappedCb
proc startListeningToEvents(g: OnchainGroupManager): proc startListeningToEvents(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} = Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g) initializedGuard(g)
let ethRpc = g.ethRpc.get() let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback() let newBlockCallback = g.getNewBlockCallback()
var blockHeaderSub: Subscription g.runInInterval(newBlockCallback, DefaultBlockPollRate)
retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
proc startOnchainSync(g: OnchainGroupManager): proc startOnchainSync(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} = Future[void] {.async: (raises: [Exception]).} =
@ -385,7 +409,9 @@ proc startOnchainSync(g: OnchainGroupManager):
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, toBlock) var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1 fromBlock = toBlock + 1
except CatchableError: except CatchableError:
@ -523,13 +549,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to restart group sync", error = getCurrentExceptionMsg() error "failed to restart group sync", error = getCurrentExceptionMsg()
ethRpc.ondisconnect = proc() = ethRpc.ondisconnect = proc() =
asyncCheck onDisconnect() asyncSpawn onDisconnect()
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true g.initialized = true
method stop*(g: OnchainGroupManager): Future[void] {.async.} = method stop*(g: OnchainGroupManager): Future[void] {.async,gcsafe.} =
g.blockFetchingActive = false
if g.ethRpc.isSome(): if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close() await g.ethRpc.get().close()

View File

@ -21,12 +21,15 @@ template retryWrapper*(res: auto,
body: untyped): auto = body: untyped): auto =
var retryCount = retryStrategy.retryCount var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry var shouldRetry = retryStrategy.shouldRetry
var exceptionMessage = ""
while shouldRetry and retryCount > 0: while shouldRetry and retryCount > 0:
try: try:
res = body res = body
shouldRetry = false shouldRetry = false
except: except:
retryCount -= 1 retryCount -= 1
exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay) await sleepAsync(retryStrategy.retryDelay)
if shouldRetry: if shouldRetry:
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg()) raise newException(CatchableError, errStr & ": " & exceptionMessage)