This commit is contained in:
stubbsta 2025-04-07 08:41:51 +02:00
commit 5dc5b07047
7 changed files with 224 additions and 85 deletions

View File

@ -117,11 +117,21 @@ int waku_relay_get_num_connected_peers(void* ctx,
WakuCallBack callback,
void* userData);
int waku_relay_get_connected_peers(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);
int waku_relay_get_num_peers_in_mesh(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);
int waku_relay_get_peers_in_mesh(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);
int waku_store_query(void* ctx,
const char* jsonQuery,
const char* peerAddr,

View File

@ -429,6 +429,27 @@ proc waku_relay_get_num_connected_peers(
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.NUM_CONNECTED_PEERS, pst),
callback,
userData,
)
proc waku_relay_get_connected_peers(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
@ -450,6 +471,27 @@ proc waku_relay_get_num_peers_in_mesh(
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.NUM_MESH_PEERS, pst),
callback,
userData,
)
proc waku_relay_get_peers_in_mesh(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)

View File

@ -1,4 +1,4 @@
import std/net
import std/[net, sequtils, strutils]
import chronicles, chronos, stew/byteutils, results
import
../../../../../waku/waku_core/message/message,
@ -8,14 +8,17 @@ import
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_relay/protocol,
../../../../../waku/node/peer_manager,
../../../../alloc
type RelayMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
PUBLISH
NUM_CONNECTED_PEERS
LIST_CONNECTED_PEERS
## to return the list of all connected peers to an specific pubsub topic
NUM_MESH_PEERS
LIST_MESH_PEERS
## to return the list of only the peers that conform the mesh for a particular pubsub topic
ADD_PROTECTED_SHARD ## Protects a shard with a public key
@ -122,16 +125,28 @@ proc process*(
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
of LIST_CONNECTED_PEERS:
of NUM_CONNECTED_PEERS:
let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
error "NUM_CONNECTED_PEERS failed", error = error
return err($error)
return ok($numConnPeers)
of LIST_MESH_PEERS:
of LIST_CONNECTED_PEERS:
let connPeers = waku.node.wakuRelay.getConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(connPeers.mapIt($it).join(","))
of NUM_MESH_PEERS:
let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
error "NUM_MESH_PEERS failed", error = error
return err($error)
return ok($numPeersInMesh)
of LIST_MESH_PEERS:
let meshPeers = waku.node.wakuRelay.getPeersInMesh($self.pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(meshPeers.mapIt($it).join(","))
of ADD_PROTECTED_SHARD:
try:
let relayShard =

View File

@ -344,13 +344,6 @@ hence would have reachability issues.""",
name: "num-shards-in-network"
.}: uint32
pubsubTopics* {.
desc:
"Deprecated. Default pubsub topic to subscribe to. Argument may be repeated.",
defaultValue: @[],
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",

View File

@ -177,31 +177,6 @@ proc new*(
logging.setupLog(confCopy.logLevel, confCopy.logFormat)
# TODO: remove after pubsubtopic config gets removed
var shards = newSeq[uint16]()
if confCopy.pubsubTopics.len > 0:
let shardsRes = topicsToRelayShards(confCopy.pubsubTopics)
if shardsRes.isErr():
error "failed to parse pubsub topic, please format according to static shard specification",
error = shardsRes.error
return err("failed to parse pubsub topic: " & $shardsRes.error)
let shardsOpt = shardsRes.get()
if shardsOpt.isSome():
let relayShards = shardsOpt.get()
if relayShards.clusterId != confCopy.clusterId:
error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22",
nodeCluster = confCopy.clusterId, pubsubCluster = relayShards.clusterId
return err(
"clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22"
)
for shard in relayShards.shardIds:
shards.add(shard)
confCopy.shards = shards
# Why can't I replace this block with a concise `.valueOr`?
confCopy = block:
let res = applyPresetConfiguration(confCopy)
if res.isErr():

View File

@ -323,21 +323,35 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
proc getDHigh*(T: type WakuRelay): int =
return GossipsubParameters.dHigh
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
proc getPeersInMesh*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[seq[PeerId], string] =
## Returns the list of peerIds in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.
if not w.mesh.hasKey(pubsubTopic):
debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic",
debug "getPeersInMesh - there is no mesh peer for the given pubsub topic",
pubsubTopic = pubsubTopic
return ok(0)
return ok(newSeq[PeerId]())
let peersRes = catch:
w.mesh[pubsubTopic]
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
return err("getPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
let peerIds = toSeq(peers).mapIt(it.peerId)
return ok(peerIds)
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
let peers = w.getPeersInMesh(pubsubTopic).valueOr:
return err(
"getNumPeersInMesh - failed retrieving peers in mesh: " & pubsubTopic & ": " &
error
)
return ok(peers.len)
@ -539,22 +553,23 @@ proc publish*(
return ok(relayedPeerCount)
proc getNumConnectedPeers*(
proc getConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[int, string] =
## Returns the number of connected peers and subscribed to the passed pubsub topic.
): Result[seq[PeerId], string] =
## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic.
## The 'gossipsub' atribute is defined in the GossipSub ref object.
if pubsubTopic == "":
## Return all the connected peers
var numConnPeers = 0
var peerIds = newSeq[PeerId]()
for k, v in w.gossipsub:
numConnPeers.inc(v.len)
return ok(numConnPeers)
peerIds.add(toSeq(v).mapIt(it.peerId))
# alternatively: peerIds &= toSeq(v).mapIt(it.peerId)
return ok(peerIds)
if not w.gossipsub.hasKey(pubsubTopic):
return err(
"getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
"getConnectedPeers - there is no gossipsub peer for the given pubsub topic: " &
pubsubTopic
)
@ -562,8 +577,22 @@ proc getNumConnectedPeers*(
w.gossipsub[pubsubTopic]
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return
err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg)
let peerIds = toSeq(peers).mapIt(it.peerId)
return ok(peerIds)
proc getNumConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[int, string] =
## Returns the number of connected peers and subscribed to the passed pubsub topic.
## Return all the connected peers
let peers = w.getConnectedPeers(pubsubTopic).valueOr:
return err(
"getNumConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg
"getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " &
error
)
return ok(peers.len)

View File

@ -29,7 +29,10 @@ export group_manager_base
logScope:
topics = "waku rln_relay onchain_group_manager"
type UInt40* = StUint[40]
type EthereumUInt40* = StUint[40]
type EthereumUInt32* = StUint[32]
type EthereumUInt16* = StUint[16]
# using the when predicate does not work within the contract macro, hence need to dupe
contract(WakuRlnContract):
# this serves as an entrypoint into the rln membership set
@ -47,8 +50,8 @@ contract(WakuRlnContract):
# this constant describes max message limit of rln contract
proc maxMembershipRateLimit(): UInt256 {.view.}
# this function returns the merkleProof for a given index
proc getMerkleProof(index: UInt40): seq[Uint256] {.view.}
# this function returns the Merkle root
# proc getMerkleProof(index: UInt40): seq[Uint256] {.view.}
# # this function returns the Merkle root
proc root(): Uint256 {.view.}
type
@ -65,7 +68,7 @@ type
keystorePassword*: Option[string]
registrationHandler*: Option[RegistrationHandler]
latestProcessedBlock*: BlockNumber
merkleProofCache*: seq[Uint256]
merkleProofCache*: seq[UInt256]
proc setMetadata*(
g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber)
@ -92,46 +95,97 @@ proc setMetadata*(
proc fetchMerkleProofElements*(
g: OnchainGroupManager
): Future[Result[seq[Uint256], string]] {.async.} =
let membershipIndex = g.membershipIndex.get()
debug " ------ Fetching merkle proof", index = membershipIndex
): Future[Result[seq[UInt256], string]] {.async.} =
try:
# First check if the index is valid
# let commitmentIndexInvocation = g.wakuRlnContract.get().commitmentIndex()
# let currentCommitmentIndex = await commitmentIndexInvocation.call()
let membershipIndex = g.membershipIndex.get()
let commitmentIndexInvocation = g.wakuRlnContract.get().nextFreeIndex()
let currentCommitmentIndex = await commitmentIndexInvocation.call()
let membershipIndexUint256 = stuint(membershipIndex, 256)
let index40 = stuint(membershipIndex, 40)
debug " ------ Checking membership index validity",
membershipIndex = membershipIndex,
membershipIndexAsUint256 = membershipIndexUint256.toHex()
# currentCommitmentIndex = currentCommitmentIndex.toHex()
if membershipIndexUint256 >= currentCommitmentIndex:
return err(
"Invalid membership index: " & $membershipIndex &
" is >= current commitment index: " & currentCommitmentIndex.toHex()
)
# Convert to UInt40 for contract call (merkleProofElements takes UInt40)
let indexUint40 = stuint(membershipIndex, 40)
debug " ------ Converting membershipIndex to UInt40",
originalIndex = membershipIndex, asUint40 = indexUint40.toHex()
let methodSig = "getMerkleProof(uint40)"
let methodIdDigest = keccak.keccak256.digest(methodSig)
let methodId = methodIdDigest.data[0 .. 3]
var paddedParam = newSeq[byte](32)
let indexBytes = index40.toBytesBE()
for i in 0 ..< min(indexBytes.len, paddedParam.len):
paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i]
var callData = newSeq[byte]()
for b in methodId:
callData.add(b)
callData.add(paddedParam)
var tx: TransactionArgs
tx.to = Opt.some(fromHex(Address, g.ethContractAddress))
tx.data = Opt.some(callData)
let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest")
var merkleProof: seq[UInt256]
for i in 0 .. 19:
let startindex = 32 + (i * 32) # skip initial 32 bytes for the array offset
if startindex + 32 <= responseBytes.len:
let elementbytes = responseBytes[startindex ..< startindex + 32]
merkleProof.add(UInt256.fromBytesBE(elementbytes))
let merkleProofInvocation = g.wakuRlnContract.get().getMerkleProof(indexUint40)
let merkleProof = await merkleProofInvocation.call()
debug "Successfully fetched merkle proof", elementsCount = merkleProof.len
return ok(merkleProof)
except CatchableError:
error "Failed to fetch merkle proof", errMsg = getCurrentExceptionMsg()
error "------ Failed to fetch Merkle proof elements ------",
errMsg = getCurrentExceptionMsg(), index = g.membershipIndex.get()
return err("Failed to fetch Merkle proof elements: " & getCurrentExceptionMsg())
# proc fetchMerkleProofElements*(
# g: OnchainGroupManager
# ): Future[Result[seq[Uint256], string]] {.async.} =
# let membershipIndex = g.membershipIndex.get()
# debug "Fetching merkle proof", index = membershipIndex
# ): Future[Result[seq[UInt256], string]] {.async.} =
# try:
# let index = stuint(membershipIndex, 40)
# let membershipIndex = g.membershipIndex.get()
#
# let merkleProofInvocation = g.wakuRlnContract.get().merkleProofElements(index)
# # First check if the index is valid and within range
# let commitmentIndexInvocation = g.wakuRlnContract.get().commitmentIndex()
# let currentCommitmentIndex = await commitmentIndexInvocation.call()
# let membershipIndexUint256 = stuint(membershipIndex, 256)
# let index40 = stuint(membershipIndex, 40)
#
# debug "------ checking if membership index is validity ------",
# membershipIndex = membershipIndex,
# membershipIndexHEX = membershipIndex.toHex(),
# membershipIndexUint256 = membershipIndexUint256,
# membershipIndexUint256HEX = membershipIndexUint256.toHex(),
# currentCommitmentIndex = currentCommitmentIndex,
# currentCommitmentIndexHEX = currentCommitmentIndex.toHex(),
# index40 = index40,
# index40HEX = index40.toHex()
#
# # Ensure the membershipIndex is less than the total number of commitments
# if membershipIndexUint256 >= currentCommitmentIndex:
# error "Invalid membership index",
# membershipIndex = membershipIndex,
# currentCommitmentIndex = currentCommitmentIndex.toHex()
# return err(
# "Invalid membership index: " & $membershipIndex &
# " is >= current commitment index: " & currentCommitmentIndex.toHex()
# )
#
# let merkleProofInvocation =
# g.wakuRlnContract.get().merkleProofElements(membershipIndexUint256)
# let merkleProof = await merkleProofInvocation.call()
# debug "Successfully fetched merkle proof", elementsCount = merkleProof.len
#
# debug "------ Merkle proof ------", merkleProof = merkleProof
#
# return ok(merkleProof)
# except CatchableError:
# error "Failed to fetch merkle proof", errMsg = getCurrentExceptionMsg()
# error "------ Failed to fetch Merkle proof elements ------",
# errMsg = getCurrentExceptionMsg(), index = g.membershipIndex.get()
# return err("Failed to fetch Merkle proof elements: " & getCurrentExceptionMsg())
proc fetchMerkleRoot*(
g: OnchainGroupManager
@ -320,7 +374,7 @@ proc toArray32*(s: seq[byte]): array[32, byte] =
return output
proc toArray32Seq*(values: seq[UInt256]): seq[array[32, byte]] =
## Converts a sequence of UInt256 to a sequence of 32-byte arrays
## Converts a MerkleProof (array of 20 UInt256 values) to a sequence of 32-byte arrays
result = newSeqOfCap[array[32, byte]](values.len)
for value in values:
result.add(value.toBytesLE())
@ -341,28 +395,49 @@ method generateProof*(
if g.userMessageLimit.isNone():
return err("user message limit is not set")
debug "calling generateProof from generateProof from group_manager onchain",
data = data
debug "------ calling generateProof from generateProof from group_manager onchain ------",
data = data,
membershipIndex = g.membershipIndex.get(),
userMessageLimit = g.userMessageLimit.get()
let externalNullifierRes = poseidon(@[@(epoch), @(rlnIdentifier)])
try:
let rootRes = waitFor g.fetchMerkleRoot()
if rootRes.isErr():
return err("Failed to fetch Merkle root")
return err("Failed to fetch Merkle root: " & rootRes.error)
debug "Merkle root fetched", root = rootRes.get().toHex
except CatchableError:
error "Failed to fetch Merkle root", error = getCurrentExceptionMsg()
return err("Failed to fetch Merkle root: " & getCurrentExceptionMsg())
# Check if contract knows about the member
try:
let idCommitment = g.idCredentials.get().idCommitment.toUInt256()
let memberExistsRes =
waitFor g.wakuRlnContract.get().isInMembershipSet(idCommitment).call()
if memberExistsRes == false:
error "------ Member does not exist in contract ------",
idCommitment = idCommitment.toHex(), membershipIndex = g.membershipIndex.get()
return err("Member ID commitment not found in contract: " & idCommitment.toHex())
debug "------ Member exists in contract ------",
idCommitment = idCommitment.toHex(), membershipIndex = g.membershipIndex.get()
except CatchableError as e:
error "------ Failed to check if member exists ------", error = e.msg
# Continue execution even if this check fails
try:
let proofResult = waitFor g.fetchMerkleProofElements()
if proofResult.isErr():
return err("Failed to fetch Merkle proof" & $proofResult.error)
return err("Failed to fetch Merkle proof: " & proofResult.error)
g.merkleProofCache = proofResult.get()
debug "Merkle proof fetched",
membershipIndex = g.membershipIndex.get(), elementCount = g.merkleProofCache.len
except CatchableError:
error "Failed to fetch merkle proof", error = getCurrentExceptionMsg()
return err("Failed to fetch Merkle proof: " & getCurrentExceptionMsg())
let witness = Witness(
identity_secret: g.idCredentials.get().idSecretHash.toArray32(),