diff --git a/library/libwaku.h b/library/libwaku.h index bd9b6bfed..d49a40076 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -117,11 +117,21 @@ int waku_relay_get_num_connected_peers(void* ctx, WakuCallBack callback, void* userData); +int waku_relay_get_connected_peers(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); + int waku_relay_get_num_peers_in_mesh(void* ctx, const char* pubSubTopic, WakuCallBack callback, void* userData); +int waku_relay_get_peers_in_mesh(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); + int waku_store_query(void* ctx, const char* jsonQuery, const char* peerAddr, diff --git a/library/libwaku.nim b/library/libwaku.nim index 258ac27b2..f7c14c061 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -429,6 +429,27 @@ proc waku_relay_get_num_connected_peers( initializeLibrary() checkLibwakuParams(ctx, callback, userData) + let pst = pubSubTopic.alloc() + defer: + deallocShared(pst) + + handleRequest( + ctx, + RequestType.RELAY, + RelayRequest.createShared(RelayMsgType.NUM_CONNECTED_PEERS, pst), + callback, + userData, + ) + +proc waku_relay_get_connected_peers( + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibwakuParams(ctx, callback, userData) + let pst = pubSubTopic.alloc() defer: deallocShared(pst) @@ -450,6 +471,27 @@ proc waku_relay_get_num_peers_in_mesh( initializeLibrary() checkLibwakuParams(ctx, callback, userData) + let pst = pubSubTopic.alloc() + defer: + deallocShared(pst) + + handleRequest( + ctx, + RequestType.RELAY, + RelayRequest.createShared(RelayMsgType.NUM_MESH_PEERS, pst), + callback, + userData, + ) + +proc waku_relay_get_peers_in_mesh( + ctx: ptr WakuContext, + pubSubTopic: cstring, + callback: WakuCallBack, + userData: pointer, +): cint {.dynlib, exportc.} = + initializeLibrary() + checkLibwakuParams(ctx, callback, userData) + let pst = pubSubTopic.alloc() defer: deallocShared(pst) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index 232630591..97f01488a 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -1,4 +1,4 @@ -import std/net +import std/[net, sequtils, strutils] import chronicles, chronos, stew/byteutils, results import ../../../../../waku/waku_core/message/message, @@ -8,14 +8,17 @@ import ../../../../../waku/waku_core/time, # Timestamp ../../../../../waku/waku_core/topics/pubsub_topic, ../../../../../waku/waku_relay/protocol, + ../../../../../waku/node/peer_manager, ../../../../alloc type RelayMsgType* = enum SUBSCRIBE UNSUBSCRIBE PUBLISH + NUM_CONNECTED_PEERS LIST_CONNECTED_PEERS ## to return the list of all connected peers to an specific pubsub topic + NUM_MESH_PEERS LIST_MESH_PEERS ## to return the list of only the peers that conform the mesh for a particular pubsub topic ADD_PROTECTED_SHARD ## Protects a shard with a public key @@ -122,16 +125,28 @@ proc process*( let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex return ok(msgHash) - of LIST_CONNECTED_PEERS: + of NUM_CONNECTED_PEERS: let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr: - error "LIST_CONNECTED_PEERS failed", error = error + error "NUM_CONNECTED_PEERS failed", error = error return err($error) return ok($numConnPeers) - of LIST_MESH_PEERS: + of LIST_CONNECTED_PEERS: + let connPeers = waku.node.wakuRelay.getConnectedPeers($self.pubsubTopic).valueOr: + error "LIST_CONNECTED_PEERS failed", error = error + return err($error) + ## returns a comma-separated string of peerIDs + return ok(connPeers.mapIt($it).join(",")) + of NUM_MESH_PEERS: let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr: - error "LIST_MESH_PEERS failed", error = error + error "NUM_MESH_PEERS failed", error = error return err($error) return ok($numPeersInMesh) + of LIST_MESH_PEERS: + let meshPeers = waku.node.wakuRelay.getPeersInMesh($self.pubsubTopic).valueOr: + error "LIST_MESH_PEERS failed", error = error + return err($error) + ## returns a comma-separated string of peerIDs + return ok(meshPeers.mapIt($it).join(",")) of ADD_PROTECTED_SHARD: try: let relayShard = diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 100d1b644..41fc25582 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -344,13 +344,6 @@ hence would have reachability issues.""", name: "num-shards-in-network" .}: uint32 - pubsubTopics* {. - desc: - "Deprecated. Default pubsub topic to subscribe to. Argument may be repeated.", - defaultValue: @[], - name: "pubsub-topic" - .}: seq[string] - shards* {. desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 854df8dde..91f3cee2e 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -177,31 +177,6 @@ proc new*( logging.setupLog(confCopy.logLevel, confCopy.logFormat) - # TODO: remove after pubsubtopic config gets removed - var shards = newSeq[uint16]() - if confCopy.pubsubTopics.len > 0: - let shardsRes = topicsToRelayShards(confCopy.pubsubTopics) - if shardsRes.isErr(): - error "failed to parse pubsub topic, please format according to static shard specification", - error = shardsRes.error - return err("failed to parse pubsub topic: " & $shardsRes.error) - - let shardsOpt = shardsRes.get() - - if shardsOpt.isSome(): - let relayShards = shardsOpt.get() - if relayShards.clusterId != confCopy.clusterId: - error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22", - nodeCluster = confCopy.clusterId, pubsubCluster = relayShards.clusterId - return err( - "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22" - ) - - for shard in relayShards.shardIds: - shards.add(shard) - confCopy.shards = shards - - # Why can't I replace this block with a concise `.valueOr`? confCopy = block: let res = applyPresetConfiguration(confCopy) if res.isErr(): diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 0222db0d1..126ff608c 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -323,21 +323,35 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = proc getDHigh*(T: type WakuRelay): int = return GossipsubParameters.dHigh -proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = - ## Returns the number of peers in a mesh defined by the passed pubsub topic. +proc getPeersInMesh*( + w: WakuRelay, pubsubTopic: PubsubTopic +): Result[seq[PeerId], string] = + ## Returns the list of peerIds in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object. if not w.mesh.hasKey(pubsubTopic): - debug "getNumPeersInMesh - there is no mesh peer for the given pubsub topic", + debug "getPeersInMesh - there is no mesh peer for the given pubsub topic", pubsubTopic = pubsubTopic - return ok(0) + return ok(newSeq[PeerId]()) let peersRes = catch: w.mesh[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: - return - err("getNumPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) + return err("getPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) + + let peerIds = toSeq(peers).mapIt(it.peerId) + + return ok(peerIds) + +proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = + ## Returns the number of peers in a mesh defined by the passed pubsub topic. + + let peers = w.getPeersInMesh(pubsubTopic).valueOr: + return err( + "getNumPeersInMesh - failed retrieving peers in mesh: " & pubsubTopic & ": " & + error + ) return ok(peers.len) @@ -539,22 +553,23 @@ proc publish*( return ok(relayedPeerCount) -proc getNumConnectedPeers*( +proc getConnectedPeers*( w: WakuRelay, pubsubTopic: PubsubTopic -): Result[int, string] = - ## Returns the number of connected peers and subscribed to the passed pubsub topic. +): Result[seq[PeerId], string] = + ## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. if pubsubTopic == "": ## Return all the connected peers - var numConnPeers = 0 + var peerIds = newSeq[PeerId]() for k, v in w.gossipsub: - numConnPeers.inc(v.len) - return ok(numConnPeers) + peerIds.add(toSeq(v).mapIt(it.peerId)) + # alternatively: peerIds &= toSeq(v).mapIt(it.peerId) + return ok(peerIds) if not w.gossipsub.hasKey(pubsubTopic): return err( - "getNumConnectedPeers - there is no gossipsub peer for the given pubsub topic: " & + "getConnectedPeers - there is no gossipsub peer for the given pubsub topic: " & pubsubTopic ) @@ -562,8 +577,22 @@ proc getNumConnectedPeers*( w.gossipsub[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: + return + err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg) + + let peerIds = toSeq(peers).mapIt(it.peerId) + return ok(peerIds) + +proc getNumConnectedPeers*( + w: WakuRelay, pubsubTopic: PubsubTopic +): Result[int, string] = + ## Returns the number of connected peers and subscribed to the passed pubsub topic. + + ## Return all the connected peers + let peers = w.getConnectedPeers(pubsubTopic).valueOr: return err( - "getNumConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg + "getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " & + error ) return ok(peers.len) diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index 7831918c2..98af95a1f 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -29,7 +29,10 @@ export group_manager_base logScope: topics = "waku rln_relay onchain_group_manager" -type UInt40* = StUint[40] +type EthereumUInt40* = StUint[40] +type EthereumUInt32* = StUint[32] +type EthereumUInt16* = StUint[16] + # using the when predicate does not work within the contract macro, hence need to dupe contract(WakuRlnContract): # this serves as an entrypoint into the rln membership set @@ -47,8 +50,8 @@ contract(WakuRlnContract): # this constant describes max message limit of rln contract proc maxMembershipRateLimit(): UInt256 {.view.} # this function returns the merkleProof for a given index - proc getMerkleProof(index: UInt40): seq[Uint256] {.view.} - # this function returns the Merkle root + # proc getMerkleProof(index: UInt40): seq[Uint256] {.view.} + # # this function returns the Merkle root proc root(): Uint256 {.view.} type @@ -65,7 +68,7 @@ type keystorePassword*: Option[string] registrationHandler*: Option[RegistrationHandler] latestProcessedBlock*: BlockNumber - merkleProofCache*: seq[Uint256] + merkleProofCache*: seq[UInt256] proc setMetadata*( g: OnchainGroupManager, lastProcessedBlock = none(BlockNumber) @@ -92,46 +95,97 @@ proc setMetadata*( proc fetchMerkleProofElements*( g: OnchainGroupManager -): Future[Result[seq[Uint256], string]] {.async.} = - let membershipIndex = g.membershipIndex.get() - debug " ------ Fetching merkle proof", index = membershipIndex +): Future[Result[seq[UInt256], string]] {.async.} = try: - # First check if the index is valid - # let commitmentIndexInvocation = g.wakuRlnContract.get().commitmentIndex() - # let currentCommitmentIndex = await commitmentIndexInvocation.call() + let membershipIndex = g.membershipIndex.get() + let commitmentIndexInvocation = g.wakuRlnContract.get().nextFreeIndex() + let currentCommitmentIndex = await commitmentIndexInvocation.call() let membershipIndexUint256 = stuint(membershipIndex, 256) + let index40 = stuint(membershipIndex, 40) - debug " ------ Checking membership index validity", - membershipIndex = membershipIndex, - membershipIndexAsUint256 = membershipIndexUint256.toHex() - # currentCommitmentIndex = currentCommitmentIndex.toHex() + if membershipIndexUint256 >= currentCommitmentIndex: + return err( + "Invalid membership index: " & $membershipIndex & + " is >= current commitment index: " & currentCommitmentIndex.toHex() + ) - # Convert to UInt40 for contract call (merkleProofElements takes UInt40) - let indexUint40 = stuint(membershipIndex, 40) - debug " ------ Converting membershipIndex to UInt40", - originalIndex = membershipIndex, asUint40 = indexUint40.toHex() + let methodSig = "getMerkleProof(uint40)" + let methodIdDigest = keccak.keccak256.digest(methodSig) + let methodId = methodIdDigest.data[0 .. 3] + + var paddedParam = newSeq[byte](32) + let indexBytes = index40.toBytesBE() + for i in 0 ..< min(indexBytes.len, paddedParam.len): + paddedParam[paddedParam.len - indexBytes.len + i] = indexBytes[i] + + var callData = newSeq[byte]() + for b in methodId: + callData.add(b) + callData.add(paddedParam) + + var tx: TransactionArgs + tx.to = Opt.some(fromHex(Address, g.ethContractAddress)) + tx.data = Opt.some(callData) + + let responseBytes = await g.ethRpc.get().provider.eth_call(tx, "latest") + + var merkleProof: seq[UInt256] + + for i in 0 .. 19: + let startindex = 32 + (i * 32) # skip initial 32 bytes for the array offset + if startindex + 32 <= responseBytes.len: + let elementbytes = responseBytes[startindex ..< startindex + 32] + merkleProof.add(UInt256.fromBytesBE(elementbytes)) - let merkleProofInvocation = g.wakuRlnContract.get().getMerkleProof(indexUint40) - let merkleProof = await merkleProofInvocation.call() - debug "Successfully fetched merkle proof", elementsCount = merkleProof.len return ok(merkleProof) except CatchableError: - error "Failed to fetch merkle proof", errMsg = getCurrentExceptionMsg() + error "------ Failed to fetch Merkle proof elements ------", + errMsg = getCurrentExceptionMsg(), index = g.membershipIndex.get() + return err("Failed to fetch Merkle proof elements: " & getCurrentExceptionMsg()) # proc fetchMerkleProofElements*( # g: OnchainGroupManager -# ): Future[Result[seq[Uint256], string]] {.async.} = -# let membershipIndex = g.membershipIndex.get() -# debug "Fetching merkle proof", index = membershipIndex +# ): Future[Result[seq[UInt256], string]] {.async.} = # try: -# let index = stuint(membershipIndex, 40) +# let membershipIndex = g.membershipIndex.get() # -# let merkleProofInvocation = g.wakuRlnContract.get().merkleProofElements(index) +# # First check if the index is valid and within range +# let commitmentIndexInvocation = g.wakuRlnContract.get().commitmentIndex() +# let currentCommitmentIndex = await commitmentIndexInvocation.call() +# let membershipIndexUint256 = stuint(membershipIndex, 256) +# let index40 = stuint(membershipIndex, 40) +# +# debug "------ checking if membership index is validity ------", +# membershipIndex = membershipIndex, +# membershipIndexHEX = membershipIndex.toHex(), +# membershipIndexUint256 = membershipIndexUint256, +# membershipIndexUint256HEX = membershipIndexUint256.toHex(), +# currentCommitmentIndex = currentCommitmentIndex, +# currentCommitmentIndexHEX = currentCommitmentIndex.toHex(), +# index40 = index40, +# index40HEX = index40.toHex() +# +# # Ensure the membershipIndex is less than the total number of commitments +# if membershipIndexUint256 >= currentCommitmentIndex: +# error "Invalid membership index", +# membershipIndex = membershipIndex, +# currentCommitmentIndex = currentCommitmentIndex.toHex() +# return err( +# "Invalid membership index: " & $membershipIndex & +# " is >= current commitment index: " & currentCommitmentIndex.toHex() +# ) +# +# let merkleProofInvocation = +# g.wakuRlnContract.get().merkleProofElements(membershipIndexUint256) # let merkleProof = await merkleProofInvocation.call() -# debug "Successfully fetched merkle proof", elementsCount = merkleProof.len +# +# debug "------ Merkle proof ------", merkleProof = merkleProof +# # return ok(merkleProof) # except CatchableError: -# error "Failed to fetch merkle proof", errMsg = getCurrentExceptionMsg() +# error "------ Failed to fetch Merkle proof elements ------", +# errMsg = getCurrentExceptionMsg(), index = g.membershipIndex.get() +# return err("Failed to fetch Merkle proof elements: " & getCurrentExceptionMsg()) proc fetchMerkleRoot*( g: OnchainGroupManager @@ -320,7 +374,7 @@ proc toArray32*(s: seq[byte]): array[32, byte] = return output proc toArray32Seq*(values: seq[UInt256]): seq[array[32, byte]] = - ## Converts a sequence of UInt256 to a sequence of 32-byte arrays + ## Converts a MerkleProof (array of 20 UInt256 values) to a sequence of 32-byte arrays result = newSeqOfCap[array[32, byte]](values.len) for value in values: result.add(value.toBytesLE()) @@ -341,28 +395,49 @@ method generateProof*( if g.userMessageLimit.isNone(): return err("user message limit is not set") - debug "calling generateProof from generateProof from group_manager onchain", - data = data + debug "------ calling generateProof from generateProof from group_manager onchain ------", + data = data, + membershipIndex = g.membershipIndex.get(), + userMessageLimit = g.userMessageLimit.get() let externalNullifierRes = poseidon(@[@(epoch), @(rlnIdentifier)]) try: let rootRes = waitFor g.fetchMerkleRoot() if rootRes.isErr(): - return err("Failed to fetch Merkle root") + return err("Failed to fetch Merkle root: " & rootRes.error) debug "Merkle root fetched", root = rootRes.get().toHex except CatchableError: error "Failed to fetch Merkle root", error = getCurrentExceptionMsg() + return err("Failed to fetch Merkle root: " & getCurrentExceptionMsg()) + + # Check if contract knows about the member + try: + let idCommitment = g.idCredentials.get().idCommitment.toUInt256() + let memberExistsRes = + waitFor g.wakuRlnContract.get().isInMembershipSet(idCommitment).call() + + if memberExistsRes == false: + error "------ Member does not exist in contract ------", + idCommitment = idCommitment.toHex(), membershipIndex = g.membershipIndex.get() + return err("Member ID commitment not found in contract: " & idCommitment.toHex()) + + debug "------ Member exists in contract ------", + idCommitment = idCommitment.toHex(), membershipIndex = g.membershipIndex.get() + except CatchableError as e: + error "------ Failed to check if member exists ------", error = e.msg + # Continue execution even if this check fails try: let proofResult = waitFor g.fetchMerkleProofElements() if proofResult.isErr(): - return err("Failed to fetch Merkle proof" & $proofResult.error) + return err("Failed to fetch Merkle proof: " & proofResult.error) g.merkleProofCache = proofResult.get() debug "Merkle proof fetched", membershipIndex = g.membershipIndex.get(), elementCount = g.merkleProofCache.len except CatchableError: error "Failed to fetch merkle proof", error = getCurrentExceptionMsg() + return err("Failed to fetch Merkle proof: " & getCurrentExceptionMsg()) let witness = Witness( identity_secret: g.idCredentials.get().idSecretHash.toArray32(),