wip: remove more pubsubtopic usage

This commit is contained in:
fryorcraken 2025-06-25 16:04:03 +10:00
parent 15025fe6cc
commit ff2b59b3bc
5 changed files with 62 additions and 41 deletions

View File

@ -521,7 +521,7 @@ proc mountFilterClient*(node: WakuNode) {.async: (raises: []).} =
proc filterSubscribe*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],
shard: Option[RelayShard],
contentTopics: ContentTopic | seq[ContentTopic],
peer: RemotePeerInfo | string,
): Future[FilterSubscribeResult] {.async: (raises: []).} =
@ -604,6 +604,15 @@ proc filterSubscribe*(
# return the last error or ok
return subRes
# proc filterSubscribe* {.deprecated: "Pass a shard instead of a pubsubtopic".} (
# node: WakuNode,
# pubsubTopic: Option[PubsubTopic],
# contentTopics: ContentTopic | seq[ContentTopic],
# peer: RemotePeerInfo | string,
# ): Future[FilterSubscribeResult] {.async: (raises: []).} =
# echo "TODO"
proc filterUnsubscribe*(
node: WakuNode,
pubsubTopic: Option[PubsubTopic],

View File

@ -59,6 +59,9 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag
return ctx.finish() # Computes the hash
proc computeMessageHash*(shard: RelayShard, msg: WakuMessage): WakuMessageHash =
return computeMessageHash(shard.toPubsubTopic(), msg)
proc cmp*(x, y: WakuMessageHash): int =
if x < y:
return -1

View File

@ -45,17 +45,11 @@ proc setSubscriptionTimeout*(wf: WakuFilter, newTimeout: Duration) =
wf.subscriptions.setSubscriptionTimeout(newTimeout)
proc subscribe(
wf: WakuFilter,
peerId: PeerID,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
wf: WakuFilter, peerId: PeerID, shard: RelayShard, contentTopics: seq[ContentTopic]
): Future[FilterSubscribeResult] {.async.} =
# TODO: check if this condition is valid???
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len == 0:
error "contentTopics must be specified", peerId = peerId
return err(FilterSubscribeError.badRequest("contentTopics must be specified"))
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
@ -65,7 +59,7 @@ proc subscribe(
)
)
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
let filterCriteria = toHashSet(contentTopics.mapIt((shard, it)))
debug "subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria
@ -78,16 +72,11 @@ proc subscribe(
ok()
proc unsubscribe(
wf: WakuFilter,
peerId: PeerID,
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
wf: WakuFilter, peerId: PeerID, shard: RelayShard, contentTopics: seq[ContentTopic]
): FilterSubscribeResult =
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len == 0:
error "contentTopics must be specified", peerId = peerId
return err(FilterSubscribeError.badRequest("contentTopics must be specified"))
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
@ -97,7 +86,7 @@ proc unsubscribe(
)
)
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
let filterCriteria = toHashSet(contentTopics.mapIt((shard, it)))
debug "unsubscribing peer from filter criteria",
peerId = peerId, filterCriteria = filterCriteria
@ -125,6 +114,17 @@ proc unsubscribeAll(
ok()
proc getShardFromRequest(
request: FilterSubscribeRequest
): Result[RelayShard, FilterSubscribeError] =
if request.pubsubTopic.isNone():
return err(FilterSubscribeError.badRequest("Pubsub topic must be specified"))
else:
let shard = RelayShard.parseStaticSharding(request.pubsubTopic.get()).valueOr:
error "Invalid pubsub topic format", error = error
return err(FilterSubscribeError.badRequest("Invalid pubsub topic format"))
ok(shard)
proc handleSubscribeRequest*(
wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest
): Future[FilterSubscribeResponse] {.async.} =
@ -141,11 +141,19 @@ proc handleSubscribeRequest*(
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
let res = getShardFromRequest(request)
subscribeResult =
await wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
if res.isOk():
await wf.subscribe(peerId, res.value, request.contentTopics)
else:
FilterSubscribeResult.err(res.error)
of FilterSubscribeType.UNSUBSCRIBE:
let res = getShardFromRequest(request)
subscribeResult =
wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
if res.isOk():
wf.unsubscribe(peerId, res.value, request.contentTopics)
else:
FilterSubscribeResult.err(res.error)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = await wf.unsubscribeAll(peerId)
@ -240,41 +248,37 @@ proc maintainSubscriptions*(wf: WakuFilter) {.async.} =
waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64)
const MessagePushTimeout = 20.seconds
proc handleMessage*(
wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage
) {.async.} =
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
proc handleMessage*(wf: WakuFilter, shard: RelayShard, message: WakuMessage) {.async.} =
let msgHash = computeMessageHash(shard, message).to0xHex()
debug "handling message", pubsubTopic = pubsubTopic, msg_hash = msgHash
debug "handling message", shard = shard, msg_hash = msgHash
let handleMessageStartTime = Moment.now()
block:
## Find subscribers and push message to them
let subscribedPeers =
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
wf.subscriptions.findSubscribedPeers(shard, message.contentTopic)
if subscribedPeers.len == 0:
error "no subscribed peers found",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
msg_hash = msgHash
shard = shard, contentTopic = message.contentTopic, msg_hash = msgHash
return
let messagePush = MessagePush(pubsubTopic: pubsubTopic, wakuMessage: message)
let messagePush = MessagePush.init(shard, message)
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(
MessagePushTimeout
):
error "timed out pushing message to peers",
pubsubTopic = pubsubTopic,
shard = shard,
contentTopic = message.contentTopic,
msg_hash = msgHash,
numPeers = subscribedPeers.len,
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
else:
notice "pushed message succesfully to all subscribers",
pubsubTopic = pubsubTopic,
notice "pushed message successfully to all subscribers",
shard = shard,
contentTopic = message.contentTopic,
msg_hash = msgHash,
numPeers = subscribedPeers.len,

View File

@ -28,6 +28,11 @@ type
# Convenience functions
proc init*(
T: type MessagePush, shard: RelayShard, wakuMessage: WakuMessage
): MessagePush =
MessagePush(wakuMessage: wakuMessage, pubsubTopic: shard.toPubsubTopic())
proc ping*(T: type FilterSubscribeRequest, requestId: string): T =
FilterSubscribeRequest(requestId: requestId, filterSubscribeType: SUBSCRIBER_PING)

View File

@ -19,8 +19,8 @@ const
MessageCacheTTL* = 2.minutes
type
# a single filter criterion is fully defined by a pubsub topic and content topic
FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic]
# a single filter criterion is fully defined by a shard and content topic
FilterCriterion* = tuple[shard: RelayShard, contentTopic: ContentTopic]
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
@ -74,9 +74,9 @@ proc getPeerSubscriptions*(
return subscribedContentTopics
proc findSubscribedPeers*(
s: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
s: FilterSubscriptions, shard: RelayShard, contentTopic: ContentTopic
): seq[PeerID] =
let filterCriterion: FilterCriterion = (pubsubTopic, contentTopic)
let filterCriterion: FilterCriterion = (shard, contentTopic)
var foundPeers: seq[PeerID] = @[]
# only peers subscribed to criteria and with legit subscription is counted