mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 07:06:42 +00:00
chore(rln-relay): remove websocket from OnchainGroupManager (#2364)
* chore(rln-relay): remove websocket from OnchainGroupManager * fix: swap ws for http
This commit is contained in:
parent
7de91d92bb
commit
efdc5244f6
@ -252,8 +252,8 @@ type
|
||||
name: "rln-relay-id-commitment-key" }: string
|
||||
|
||||
rlnRelayEthClientAddress* {.
|
||||
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
|
||||
defaultValue: "ws://localhost:8540/"
|
||||
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
|
||||
defaultValue: "http://localhost:8540/"
|
||||
name: "rln-relay-eth-client-address" }: string
|
||||
|
||||
rlnRelayEthContractAddress* {.
|
||||
|
@ -61,8 +61,8 @@ type
|
||||
name: "rln-relay-cred-path" }: string
|
||||
|
||||
rlnRelayEthClientAddress* {.
|
||||
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
|
||||
defaultValue: "ws://localhost:8540/",
|
||||
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
|
||||
defaultValue: "http://localhost:8540/",
|
||||
name: "rln-relay-eth-client-address" }: string
|
||||
|
||||
rlnRelayEthContractAddress* {.
|
||||
|
@ -232,9 +232,9 @@ suite "Onchain group manager":
|
||||
|
||||
try:
|
||||
await manager.startGroupSync()
|
||||
except ValueError:
|
||||
except CatchableError:
|
||||
assert true
|
||||
except Exception, CatchableError:
|
||||
except Exception:
|
||||
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
|
||||
|
||||
await manager.stop()
|
||||
@ -330,9 +330,9 @@ suite "Onchain group manager":
|
||||
|
||||
try:
|
||||
await manager.register(dummyCommitment)
|
||||
except ValueError:
|
||||
except CatchableError:
|
||||
assert true
|
||||
except Exception, CatchableError:
|
||||
except Exception:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
|
||||
await manager.stop()
|
||||
@ -399,9 +399,9 @@ suite "Onchain group manager":
|
||||
|
||||
try:
|
||||
await manager.withdraw(idSecretHash)
|
||||
except ValueError:
|
||||
except CatchableError:
|
||||
assert true
|
||||
except Exception, CatchableError:
|
||||
except Exception:
|
||||
assert false, "exception raised: " & getCurrentExceptionMsg()
|
||||
|
||||
await manager.stop()
|
||||
@ -627,7 +627,7 @@ suite "Onchain group manager":
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return false if ethRpc is none":
|
||||
var manager = await setup()
|
||||
let manager = await setup()
|
||||
await manager.init()
|
||||
|
||||
manager.ethRpc = none(Web3)
|
||||
@ -644,7 +644,7 @@ suite "Onchain group manager":
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
|
||||
var manager = await setup()
|
||||
let manager = await setup()
|
||||
await manager.init()
|
||||
|
||||
var isReady = true
|
||||
@ -659,14 +659,13 @@ suite "Onchain group manager":
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "isReady should return true if ethRpc is ready":
|
||||
var manager = await setup()
|
||||
let manager = await setup()
|
||||
await manager.init()
|
||||
# node can only be ready after group sync is done
|
||||
try:
|
||||
await manager.startGroupSync()
|
||||
except Exception, CatchableError:
|
||||
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()
|
||||
|
||||
var isReady = false
|
||||
try:
|
||||
isReady = await manager.isReady()
|
||||
|
@ -18,7 +18,7 @@ const
|
||||
MembershipFee* = 1000000000000000.u256
|
||||
# the current implementation of the rln lib supports a circuit for Merkle tree with depth 20
|
||||
MerkleTreeDepth* = 20
|
||||
EthClient* = "ws://127.0.0.1:8540"
|
||||
EthClient* = "http://127.0.0.1:8540"
|
||||
|
||||
const
|
||||
# the size of poseidon hash output in bits
|
||||
|
@ -74,13 +74,17 @@ type
|
||||
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
|
||||
# because the average reorg depth is 1 to 2 blocks.
|
||||
validRootBuffer*: Deque[MerkleNode]
|
||||
# interval loop to shut down gracefully
|
||||
blockFetchingActive*: bool
|
||||
|
||||
const DefaultKeyStorePath* = "rlnKeystore.json"
|
||||
const DefaultKeyStorePassword* = "password"
|
||||
|
||||
const DefaultBlockPollRate* = 6.seconds
|
||||
|
||||
template initializedGuard(g: OnchainGroupManager): untyped =
|
||||
if not g.initialized:
|
||||
raise newException(ValueError, "OnchainGroupManager is not initialized")
|
||||
raise newException(CatchableError, "OnchainGroupManager is not initialized")
|
||||
|
||||
|
||||
proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
|
||||
@ -316,12 +320,15 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable):
|
||||
|
||||
proc getAndHandleEvents(g: OnchainGroupManager,
|
||||
fromBlock: BlockNumber,
|
||||
toBlock: BlockNumber): Future[void] {.async: (raises: [Exception]).} =
|
||||
toBlock: BlockNumber): Future[bool] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
let blockTable = await g.getBlockTable(fromBlock, toBlock)
|
||||
await g.handleEvents(blockTable)
|
||||
await g.handleRemovedEvents(blockTable)
|
||||
try:
|
||||
await g.handleEvents(blockTable)
|
||||
await g.handleRemovedEvents(blockTable)
|
||||
except CatchableError:
|
||||
error "failed to handle events", error=getCurrentExceptionMsg()
|
||||
raise newException(ValueError, "failed to handle events")
|
||||
|
||||
g.latestProcessedBlock = toBlock
|
||||
let metadataSetRes = g.setMetadata()
|
||||
@ -330,32 +337,49 @@ proc getAndHandleEvents(g: OnchainGroupManager,
|
||||
warn "failed to persist rln metadata", error=metadataSetRes.error()
|
||||
else:
|
||||
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock
|
||||
|
||||
return true
|
||||
|
||||
proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
|
||||
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
|
||||
let latestBlock = BlockNumber(blockheader.number)
|
||||
trace "block received", blockNumber = latestBlock
|
||||
# get logs from the last block
|
||||
try:
|
||||
# inc by 1 to prevent double processing
|
||||
let fromBlock = g.latestProcessedBlock + 1
|
||||
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
|
||||
except CatchableError:
|
||||
warn "failed to handle log: ", error=getCurrentExceptionMsg()
|
||||
return newHeadCallback
|
||||
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
|
||||
g.blockFetchingActive = false
|
||||
|
||||
proc newHeadErrCallback(error: CatchableError) =
|
||||
warn "failed to get new head", error=error.msg
|
||||
proc runIntervalLoop() {.async, gcsafe.} =
|
||||
g.blockFetchingActive = true
|
||||
|
||||
while g.blockFetchingActive:
|
||||
var retCb: bool
|
||||
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
|
||||
await cb()
|
||||
await sleepAsync(interval)
|
||||
|
||||
asyncSpawn runIntervalLoop()
|
||||
|
||||
|
||||
proc getNewBlockCallback(g: OnchainGroupManager): proc =
|
||||
let ethRpc = g.ethRpc.get()
|
||||
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
|
||||
var latestBlock: BlockNumber
|
||||
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
|
||||
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
|
||||
|
||||
if latestBlock <= g.latestProcessedBlock:
|
||||
return
|
||||
# get logs from the last block
|
||||
# inc by 1 to prevent double processing
|
||||
let fromBlock = g.latestProcessedBlock + 1
|
||||
var handleBlockRes: bool
|
||||
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
|
||||
await g.getAndHandleEvents(fromBlock, latestBlock)
|
||||
return true
|
||||
return wrappedCb
|
||||
|
||||
proc startListeningToEvents(g: OnchainGroupManager):
|
||||
Future[void] {.async: (raises: [Exception]).} =
|
||||
initializedGuard(g)
|
||||
|
||||
let ethRpc = g.ethRpc.get()
|
||||
let newHeadCallback = g.getNewHeadCallback()
|
||||
var blockHeaderSub: Subscription
|
||||
retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
|
||||
await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
|
||||
let newBlockCallback = g.getNewBlockCallback()
|
||||
g.runInInterval(newBlockCallback, DefaultBlockPollRate)
|
||||
|
||||
proc startOnchainSync(g: OnchainGroupManager):
|
||||
Future[void] {.async: (raises: [Exception]).} =
|
||||
@ -385,7 +409,9 @@ proc startOnchainSync(g: OnchainGroupManager):
|
||||
|
||||
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
|
||||
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
|
||||
await g.getAndHandleEvents(fromBlock, toBlock)
|
||||
var handleBlockRes: bool
|
||||
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
|
||||
await g.getAndHandleEvents(fromBlock, toBlock)
|
||||
fromBlock = toBlock + 1
|
||||
|
||||
except CatchableError:
|
||||
@ -523,13 +549,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
|
||||
error "failed to restart group sync", error = getCurrentExceptionMsg()
|
||||
|
||||
ethRpc.ondisconnect = proc() =
|
||||
asyncCheck onDisconnect()
|
||||
asyncSpawn onDisconnect()
|
||||
|
||||
|
||||
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
|
||||
g.initialized = true
|
||||
|
||||
method stop*(g: OnchainGroupManager): Future[void] {.async.} =
|
||||
method stop*(g: OnchainGroupManager): Future[void] {.async,gcsafe.} =
|
||||
g.blockFetchingActive = false
|
||||
|
||||
if g.ethRpc.isSome():
|
||||
g.ethRpc.get().ondisconnect = nil
|
||||
await g.ethRpc.get().close()
|
||||
|
@ -21,12 +21,15 @@ template retryWrapper*(res: auto,
|
||||
body: untyped): auto =
|
||||
var retryCount = retryStrategy.retryCount
|
||||
var shouldRetry = retryStrategy.shouldRetry
|
||||
var exceptionMessage = ""
|
||||
|
||||
while shouldRetry and retryCount > 0:
|
||||
try:
|
||||
res = body
|
||||
shouldRetry = false
|
||||
except:
|
||||
retryCount -= 1
|
||||
exceptionMessage = getCurrentExceptionMsg()
|
||||
await sleepAsync(retryStrategy.retryDelay)
|
||||
if shouldRetry:
|
||||
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())
|
||||
raise newException(CatchableError, errStr & ": " & exceptionMessage)
|
||||
|
Loading…
x
Reference in New Issue
Block a user