diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ac72f3e37..39948913d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -521,7 +521,7 @@ proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} = proc filterSubscribe*( node: WakuNode, - pubsubTopic: Option[PubsubTopic], + shard: Option[RelayShard], contentTopics: ContentTopic | seq[ContentTopic], peer: RemotePeerInfo | string, ): Future[FilterSubscribeResult] {.async: (raises: []).} = @@ -604,6 +604,15 @@ proc filterSubscribe*( # return the last error or ok return subRes +# proc filterSubscribe* {.deprecated: "Pass a shard instead of a pubsubtopic".} ( +# node: WakuNode, +# pubsubTopic: Option[PubsubTopic], +# contentTopics: ContentTopic | seq[ContentTopic], +# peer: RemotePeerInfo | string, +# ): Future[FilterSubscribeResult] {.async: (raises: []).} = + +# echo "TODO" + proc filterUnsubscribe*( node: WakuNode, pubsubTopic: Option[PubsubTopic], diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index 8b99abd7e..4a026b84b 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -59,6 +59,9 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag return ctx.finish() # Computes the hash +proc computeMessageHash*(shard: RelayShard, msg: WakuMessage): WakuMessageHash = + return computeMessageHash(shard.toPubsubTopic(), msg) + proc cmp*(x, y: WakuMessageHash): int = if x < y: return -1 diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 80f60fdd3..ffb3d4773 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -45,17 +45,11 @@ proc setSubscriptionTimeout*(wf: WakuFilter, newTimeout: Duration) = wf.subscriptions.setSubscriptionTimeout(newTimeout) proc subscribe( - wf: WakuFilter, - peerId: PeerID, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], + wf: WakuFilter, peerId: PeerID, shard: RelayShard, contentTopics: seq[ContentTopic] ): Future[FilterSubscribeResult] {.async.} = - # TODO: check if this condition is valid??? - if pubsubTopic.isNone() or contentTopics.len == 0: - error "pubsubTopic and contentTopics must be specified", peerId = peerId - return err( - FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified") - ) + if contentTopics.len == 0: + error "contentTopics must be specified", peerId = peerId + return err(FilterSubscribeError.badRequest("contentTopics must be specified")) if contentTopics.len > MaxContentTopicsPerRequest: error "exceeds maximum content topics", peerId = peerId @@ -65,7 +59,7 @@ proc subscribe( ) ) - let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) + let filterCriteria = toHashSet(contentTopics.mapIt((shard, it))) debug "subscribing peer to filter criteria", peerId = peerId, filterCriteria = filterCriteria @@ -78,16 +72,11 @@ proc subscribe( ok() proc unsubscribe( - wf: WakuFilter, - peerId: PeerID, - pubsubTopic: Option[PubsubTopic], - contentTopics: seq[ContentTopic], + wf: WakuFilter, peerId: PeerID, shard: RelayShard, contentTopics: seq[ContentTopic] ): FilterSubscribeResult = - if pubsubTopic.isNone() or contentTopics.len == 0: - error "pubsubTopic and contentTopics must be specified", peerId = peerId - return err( - FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified") - ) + if contentTopics.len == 0: + error "contentTopics must be specified", peerId = peerId + return err(FilterSubscribeError.badRequest("contentTopics must be specified")) if contentTopics.len > MaxContentTopicsPerRequest: error "exceeds maximum content topics", peerId = peerId @@ -97,7 +86,7 @@ proc unsubscribe( ) ) - let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) + let filterCriteria = toHashSet(contentTopics.mapIt((shard, it))) debug "unsubscribing peer from filter criteria", peerId = peerId, filterCriteria = filterCriteria @@ -125,6 +114,17 @@ proc unsubscribeAll( ok() +proc getShardFromRequest( + request: FilterSubscribeRequest +): Result[RelayShard, FilterSubscribeError] = + if request.pubsubTopic.isNone(): + return err(FilterSubscribeError.badRequest("Pubsub topic must be specified")) + else: + let shard = RelayShard.parseStaticSharding(request.pubsubTopic.get()).valueOr: + error "Invalid pubsub topic format", error = error + return err(FilterSubscribeError.badRequest("Invalid pubsub topic format")) + ok(shard) + proc handleSubscribeRequest*( wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest ): Future[FilterSubscribeResponse] {.async.} = @@ -141,11 +141,19 @@ proc handleSubscribeRequest*( of FilterSubscribeType.SUBSCRIBER_PING: subscribeResult = wf.pingSubscriber(peerId) of FilterSubscribeType.SUBSCRIBE: + let res = getShardFromRequest(request) subscribeResult = - await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics) + if res.isOk(): + await wf.subscribe(peerId, res.value, request.contentTopics) + else: + FilterSubscribeResult.err(res.error) of FilterSubscribeType.UNSUBSCRIBE: + let res = getShardFromRequest(request) subscribeResult = - wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics) + if res.isOk(): + wf.unsubscribe(peerId, res.value, request.contentTopics) + else: + FilterSubscribeResult.err(res.error) of FilterSubscribeType.UNSUBSCRIBE_ALL: subscribeResult = await wf.unsubscribeAll(peerId) @@ -240,41 +248,37 @@ proc maintainSubscriptions*(wf: WakuFilter) {.async.} = waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64) const MessagePushTimeout = 20.seconds -proc handleMessage*( - wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage -) {.async.} = - let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() +proc handleMessage*(wf: WakuFilter, shard: RelayShard, message: WakuMessage) {.async.} = + let msgHash = computeMessageHash(shard, message).to0xHex() - debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash + debug "handling message", shard = shard, msg_hash = msgHash let handleMessageStartTime = Moment.now() block: ## Find subscribers and push message to them let subscribedPeers = - wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) + wf.subscriptions.findSubscribedPeers(shard, message.contentTopic) if subscribedPeers.len == 0: error "no subscribed peers found", - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - msg_hash = msgHash + shard = shard, contentTopic = message.contentTopic, msg_hash = msgHash return - let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message) + let messagePush = MessagePush.init(shard, message) if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout( MessagePushTimeout ): error "timed out pushing message to peers", - pubsubTopic = pubsubTopic, + shard = shard, contentTopic = message.contentTopic, msg_hash = msgHash, numPeers = subscribedPeers.len, target_peer_ids = subscribedPeers.mapIt(shortLog(it)) waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - notice "pushed message succesfully to all subscribers", - pubsubTopic = pubsubTopic, + notice "pushed message successfully to all subscribers", + shard = shard, contentTopic = message.contentTopic, msg_hash = msgHash, numPeers = subscribedPeers.len, diff --git a/waku/waku_filter_v2/rpc.nim b/waku/waku_filter_v2/rpc.nim index a81a7bd9a..46e84f27c 100644 --- a/waku/waku_filter_v2/rpc.nim +++ b/waku/waku_filter_v2/rpc.nim @@ -28,6 +28,11 @@ type # Convenience functions +proc init*( + T: type MessagePush, shard: RelayShard, wakuMessage: WakuMessage +): MessagePush = + MessagePush(wakuMessage: wakuMessage, pubsubTopic: shard.toPubsubTopic()) + proc ping*(T: type FilterSubscribeRequest, requestId: string): T = FilterSubscribeRequest(requestId: requestId, filterSubscribeType: SUBSCRIBER_PING) diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 8d3b8084f..d2c465065 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -19,8 +19,8 @@ const MessageCacheTTL* = 2.minutes type - # a single filter criterion is fully defined by a pubsub topic and content topic - FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic] + # a single filter criterion is fully defined by a shard and content topic + FilterCriterion* = tuple[shard: RelayShard, contentTopic: ContentTopic] FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria @@ -74,9 +74,9 @@ proc getPeerSubscriptions*( return subscribedContentTopics proc findSubscribedPeers*( - s: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic + s: FilterSubscriptions, shard: RelayShard, contentTopic: ContentTopic ): seq[PeerID] = - let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic) + let filterCriterion: FilterCriterion = (shard, contentTopic) var foundPeers: seq[PeerID] = @[] # only peers subscribed to criteria and with legit subscription is counted