mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: Refactor of FilterV2 subscription management with Time-to-live maintenance (#2341)
* Refactor of FilterV2 subscription handling and maintenance with addition subscription time-to-live support. Fixed all tests and reworked where subscription handling changes needed it. Adapted REST API /admin filter subscription retrieve to new filter subscription structure. * Fix tests and PR comments * Added filter v2 subscription timeout tests and fixed * Fix review comments and suggestions. No functional change. * Remove leftover echoes from test_rest_admin * Fix failed legacy filter tests due to separation of mounting the filters. * Small fixes, fix naming typo, removed duplicated checks in test
This commit is contained in:
parent
ce32156a8b
commit
f048babdc4
@ -468,6 +468,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
let peerInfo = parsePeerInfo(conf.filternode)
|
||||
if peerInfo.isOk():
|
||||
await node.mountFilter()
|
||||
await node.mountLegacyFilter()
|
||||
await node.mountFilterClient()
|
||||
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
|
||||
|
||||
@ -507,7 +508,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
echo "A spam message is found and discarded"
|
||||
chat.prompt = false
|
||||
showChatPrompt(chat)
|
||||
|
||||
|
||||
echo "rln-relay preparation is in progress..."
|
||||
|
||||
let rlnConf = WakuRlnConfig(
|
||||
|
||||
@ -288,6 +288,7 @@ when isMainModule:
|
||||
|
||||
if conf.filter:
|
||||
waitFor mountFilter(bridge.nodev2)
|
||||
waitFor mountLegacyFilter(bridge.nodev2)
|
||||
|
||||
if conf.staticnodes.len > 0:
|
||||
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
||||
|
||||
@ -254,7 +254,7 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
||||
)
|
||||
|
||||
WakuDiscoveryV5.new(
|
||||
app.rng,
|
||||
app.rng,
|
||||
discv5Conf,
|
||||
some(app.record),
|
||||
some(app.node.peerManager),
|
||||
@ -326,7 +326,7 @@ proc setupWakuApp*(app: var App): AppResult[void] =
|
||||
ok()
|
||||
|
||||
proc getPorts(listenAddrs: seq[MultiAddress]):
|
||||
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
||||
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
||||
|
||||
var tcpPort, websocketPort = none(Port)
|
||||
|
||||
@ -548,7 +548,15 @@ proc setupProtocols(node: WakuNode,
|
||||
# Filter setup. NOTE Must be mounted after relay
|
||||
if conf.filter:
|
||||
try:
|
||||
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
||||
await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
||||
except CatchableError:
|
||||
return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
await mountFilter(node,
|
||||
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
|
||||
maxFilterPeers = conf.filterMaxPeersToServe,
|
||||
maxFilterCriteriaPerPeer = conf.filterMaxCriteria)
|
||||
except CatchableError:
|
||||
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -724,7 +732,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
||||
|
||||
let filterCache = MessageCache.init()
|
||||
|
||||
let filterDiscoHandler =
|
||||
let filterDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
|
||||
else: none(DiscoveryHandler)
|
||||
@ -739,7 +747,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
||||
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"
|
||||
|
||||
## Store REST API
|
||||
let storeDiscoHandler =
|
||||
let storeDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
|
||||
else: none(DiscoveryHandler)
|
||||
@ -749,7 +757,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
||||
## Light push API
|
||||
if conf.lightpushnode != "" and
|
||||
app.node.wakuLightpushClient != nil:
|
||||
let lightDiscoHandler =
|
||||
let lightDiscoHandler =
|
||||
if app.wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
||||
else: none(DiscoveryHandler)
|
||||
|
||||
@ -95,6 +95,7 @@ type
|
||||
defaultValue: false,
|
||||
name: "execute" .}: bool
|
||||
|
||||
|
||||
of noCommand:
|
||||
## Application-level configuration
|
||||
protectedTopics* {.
|
||||
@ -221,7 +222,7 @@ type
|
||||
desc: "Rln relay identity commitment key as a Hex string",
|
||||
defaultValue: ""
|
||||
name: "rln-relay-id-commitment-key" }: string
|
||||
|
||||
|
||||
rlnRelayTreePath* {.
|
||||
desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
|
||||
defaultValue: ""
|
||||
@ -304,10 +305,25 @@ type
|
||||
name: "filternode" }: string
|
||||
|
||||
filterTimeout* {.
|
||||
desc: "Timeout for filter node in seconds.",
|
||||
desc: "Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.",
|
||||
defaultValue: 14400 # 4 hours
|
||||
name: "filter-timeout" }: int64
|
||||
|
||||
filterSubscriptionTimeout* {.
|
||||
desc: "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
|
||||
defaultValue: 300 # 5 minutes
|
||||
name: "filter-subscription-timeout" }: int64
|
||||
|
||||
filterMaxPeersToServe* {.
|
||||
desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.",
|
||||
defaultValue: 1000
|
||||
name: "filter-max-peers-to-serve" }: uint32
|
||||
|
||||
filterMaxCriteria* {.
|
||||
desc: "Maximum number of pubsub- and content topic combination per peers at a time. Only for v2 filter protocol.",
|
||||
defaultValue: 1000
|
||||
name: "filter-max-criteria" }: uint32
|
||||
|
||||
## Lightpush config
|
||||
|
||||
lightpush* {.
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
{.used.}
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[
|
||||
options,
|
||||
tables,
|
||||
options,
|
||||
tables,
|
||||
sequtils
|
||||
],
|
||||
stew/shims/net as stewNet,
|
||||
@ -12,7 +12,7 @@ import
|
||||
chronicles,
|
||||
os,
|
||||
libp2p/[
|
||||
peerstore,
|
||||
peerstore,
|
||||
crypto/crypto
|
||||
]
|
||||
|
||||
@ -44,7 +44,7 @@ suite "Waku Filter - End to End":
|
||||
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
||||
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
|
||||
var messagePushHandler {.threadvar.}: FilterPushHandler
|
||||
|
||||
|
||||
asyncSetup:
|
||||
pushHandlerFuture = newFuture[(string, WakuMessage)]()
|
||||
messagePushHandler = proc(
|
||||
@ -84,8 +84,8 @@ suite "Waku Filter - End to End":
|
||||
# Then the subscription is successful
|
||||
check:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
server.wakuFilter.subscriptions.hasKey(clientPeerId)
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
|
||||
|
||||
# When sending a message to the subscribed content topic
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
@ -106,7 +106,7 @@ suite "Waku Filter - End to End":
|
||||
# Then the unsubscription is successful
|
||||
check:
|
||||
unsubscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 0
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
|
||||
# When sending a message to the previously subscribed content topic
|
||||
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
|
||||
@ -116,7 +116,7 @@ suite "Waku Filter - End to End":
|
||||
# Then the message is not pushed to the client
|
||||
check:
|
||||
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
|
||||
|
||||
asyncTest "Client Node can't receive Push from Server Node, via Relay":
|
||||
# Given the server node has Relay enabled
|
||||
await server.mountRelay()
|
||||
@ -127,7 +127,7 @@ suite "Waku Filter - End to End":
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
|
||||
# When a server node gets a Relay message
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
@ -141,7 +141,7 @@ suite "Waku Filter - End to End":
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
||||
|
||||
|
||||
waitFor server.start()
|
||||
waitFor server.mountRelay()
|
||||
|
||||
@ -162,8 +162,8 @@ suite "Waku Filter - End to End":
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
|
||||
# And the client node reboots
|
||||
waitFor client.stop()
|
||||
waitFor client.start()
|
||||
@ -189,8 +189,8 @@ suite "Waku Filter - End to End":
|
||||
)
|
||||
require:
|
||||
subscribeResponse.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
|
||||
# And the client node reboots
|
||||
waitFor client.stop()
|
||||
waitFor client.start()
|
||||
@ -209,7 +209,7 @@ suite "Waku Filter - End to End":
|
||||
)
|
||||
check:
|
||||
subscribeResponse2.isOk()
|
||||
server.wakuFilter.subscriptions.len == 1
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Relay
|
||||
pushHandlerFuture = newPushHandlerFuture()
|
||||
|
||||
@ -53,6 +53,7 @@ procSuite "Peer Manager":
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||
|
||||
# Dial node2 from node1
|
||||
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
||||
@ -528,6 +529,7 @@ procSuite "Peer Manager":
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||
|
||||
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
@ -579,6 +581,7 @@ procSuite "Peer Manager":
|
||||
await allFutures(nodes.mapIt(it.start()))
|
||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||
|
||||
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@ suite "WakuNode - Filter":
|
||||
waitFor allFutures(server.start(), client.start())
|
||||
|
||||
waitFor server.mountFilter()
|
||||
waitFor server.mountLegacyFilter()
|
||||
waitFor client.mountFilterClient()
|
||||
|
||||
## Given
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -13,7 +13,8 @@ import
|
||||
../../../waku/waku_filter_v2/subscriptions,
|
||||
../../../waku/waku_core,
|
||||
../testlib/common,
|
||||
../testlib/wakucore
|
||||
../testlib/wakucore,
|
||||
./waku_filter_utils
|
||||
|
||||
proc newTestWakuFilter(switch: Switch): WakuFilter =
|
||||
let
|
||||
@ -38,9 +39,11 @@ proc createRequest(filterSubscribeType: FilterSubscribeType, pubsubTopic = none(
|
||||
)
|
||||
|
||||
proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
||||
var contentTopics: seq[ContentTopic]
|
||||
for filterCriterion in wakuFilter.subscriptions[peerId]:
|
||||
contentTopics.add(filterCriterion[1])
|
||||
var contentTopics: seq[ContentTopic] = @[]
|
||||
let peersCreitera = wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||
|
||||
for filterCriterion in peersCreitera:
|
||||
contentTopics.add(filterCriterion.contentTopic)
|
||||
|
||||
return contentTopics
|
||||
|
||||
@ -68,8 +71,8 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions[peerId].len == 1
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
@ -79,7 +82,7 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
@ -105,9 +108,9 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions[peerId].len == 2
|
||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest.contentTopics
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest.contentTopics)
|
||||
response.requestId == filterSubscribeRequest.requestId
|
||||
response.statusCode == 200
|
||||
response.statusDesc.get() == "OK"
|
||||
@ -117,7 +120,7 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||
response2.requestId == filterUnsubscribeAllRequest.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
@ -155,9 +158,9 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions[peerId].len == 1
|
||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest1.contentTopics
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest1.contentTopics)
|
||||
response1.requestId == filterSubscribeRequest1.requestId
|
||||
response1.statusCode == 200
|
||||
response1.statusDesc.get() == "OK"
|
||||
@ -167,11 +170,11 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions[peerId].len == 2
|
||||
wakuFilter.getSubscribedContentTopics(peerId) ==
|
||||
filterSubscribeRequest1.contentTopics &
|
||||
filterSubscribeRequest2.contentTopics
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId),
|
||||
filterSubscribeRequest1.contentTopics &
|
||||
filterSubscribeRequest2.contentTopics)
|
||||
response2.requestId == filterSubscribeRequest2.requestId
|
||||
response2.statusCode == 200
|
||||
response2.statusDesc.get() == "OK"
|
||||
@ -181,9 +184,9 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions[peerId].len == 1
|
||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest2.contentTopics
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest2.contentTopics)
|
||||
response3.requestId == filterUnsubscribeRequest1.requestId
|
||||
response3.statusCode == 200
|
||||
response3.statusDesc.get() == "OK"
|
||||
@ -193,7 +196,7 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||
response4.requestId == filterUnsubscribeRequest2.requestId
|
||||
response4.statusCode == 200
|
||||
response4.statusDesc.get() == "OK"
|
||||
@ -255,9 +258,9 @@ suite "Waku Filter - handling subscribe requests":
|
||||
|
||||
# When
|
||||
let
|
||||
filterCriteria = toSeq(1 .. MaxCriteriaPerSubscription + 1).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))
|
||||
filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))
|
||||
|
||||
wakuFilter.subscriptions[peerId] = filterCriteria.toHashSet()
|
||||
discard wakuFilter.subscriptions.addSubscription(peerId, filterCriteria.toHashSet())
|
||||
|
||||
let
|
||||
reqTooManyFilterCriteria = createRequest(
|
||||
@ -276,9 +279,11 @@ suite "Waku Filter - handling subscribe requests":
|
||||
## Max subscriptions exceeded
|
||||
|
||||
# When
|
||||
wakuFilter.subscriptions.clear()
|
||||
for _ in 1 .. MaxTotalSubscriptions:
|
||||
wakuFilter.subscriptions[PeerId.random().get()] = @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()
|
||||
wakuFilter.subscriptions.removePeer(peerId)
|
||||
wakuFilter.subscriptions.cleanUp()
|
||||
|
||||
for _ in 1 .. MaxFilterPeers:
|
||||
discard wakuFilter.subscriptions.addSubscription(PeerId.random().get(), @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet())
|
||||
|
||||
let
|
||||
reqTooManySubscriptions = createRequest(
|
||||
@ -443,10 +448,10 @@ suite "Waku Filter - subscription maintenance":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 3
|
||||
wakuFilter.subscriptions.hasKey(peerId1)
|
||||
wakuFilter.subscriptions.hasKey(peerId2)
|
||||
wakuFilter.subscriptions.hasKey(peerId3)
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 3
|
||||
wakuFilter.subscriptions.isSubscribed(peerId1)
|
||||
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||
wakuFilter.subscriptions.isSubscribed(peerId3)
|
||||
|
||||
# When
|
||||
# Maintenance loop should leave all peers in peer store intact
|
||||
@ -454,10 +459,10 @@ suite "Waku Filter - subscription maintenance":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 3
|
||||
wakuFilter.subscriptions.hasKey(peerId1)
|
||||
wakuFilter.subscriptions.hasKey(peerId2)
|
||||
wakuFilter.subscriptions.hasKey(peerId3)
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 3
|
||||
wakuFilter.subscriptions.isSubscribed(peerId1)
|
||||
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||
wakuFilter.subscriptions.isSubscribed(peerId3)
|
||||
|
||||
# When
|
||||
# Remove peerId1 and peerId3 from peer store
|
||||
@ -467,8 +472,8 @@ suite "Waku Filter - subscription maintenance":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 1
|
||||
wakuFilter.subscriptions.hasKey(peerId2)
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||
|
||||
# When
|
||||
# Remove peerId2 from peer store
|
||||
@ -477,4 +482,4 @@ suite "Waku Filter - subscription maintenance":
|
||||
|
||||
# Then
|
||||
check:
|
||||
wakuFilter.subscriptions.len == 0
|
||||
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||
|
||||
@ -1,17 +1,21 @@
|
||||
import
|
||||
std/[
|
||||
options,
|
||||
options,
|
||||
tables,
|
||||
sets
|
||||
sets,
|
||||
sequtils,
|
||||
algorithm
|
||||
],
|
||||
chronos,
|
||||
chronicles
|
||||
chronicles,
|
||||
os
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
node/peer_manager,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
waku_core
|
||||
],
|
||||
../testlib/[
|
||||
@ -20,10 +24,14 @@ import
|
||||
]
|
||||
|
||||
|
||||
proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =
|
||||
proc newTestWakuFilter*(switch: Switch,
|
||||
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer):
|
||||
Future[WakuFilter] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
proto = WakuFilter.new(peerManager)
|
||||
proto = WakuFilter.new(peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer)
|
||||
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
@ -41,8 +49,21 @@ proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.
|
||||
return proto
|
||||
|
||||
proc getSubscribedContentTopics*(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
||||
var contentTopics: seq[ContentTopic]
|
||||
for filterCriterion in wakuFilter.subscriptions[peerId]:
|
||||
contentTopics.add(filterCriterion[1])
|
||||
var contentTopics: seq[ContentTopic] = @[]
|
||||
let peersCriteria = wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||
|
||||
for filterCriterion in peersCriteria:
|
||||
contentTopics.add(filterCriterion.contentTopic)
|
||||
|
||||
return contentTopics
|
||||
|
||||
proc unorderedCompare*[T](a, b: seq[T]): bool =
|
||||
if a == b:
|
||||
return true
|
||||
|
||||
var aSorted = a
|
||||
var bSorted = b
|
||||
aSorted.sort()
|
||||
bSorted.sort()
|
||||
|
||||
return aSorted == bSorted
|
||||
|
||||
@ -206,6 +206,7 @@ procSuite "WakuNode - Store":
|
||||
waitFor allFutures(client.start(), server.start(), filterSource.start())
|
||||
|
||||
waitFor filterSource.mountFilter()
|
||||
waitFor filterSource.mountLegacyFilter()
|
||||
let driver = newSqliteArchiveDriver()
|
||||
|
||||
let mountArchiveRes = server.mountArchive(driver)
|
||||
|
||||
@ -154,6 +154,7 @@ procSuite "Waku v2 JSON-RPC API - Admin":
|
||||
await client.connect("127.0.0.1", rpcPort, false)
|
||||
|
||||
await node.mountFilter()
|
||||
await node.mountLegacyFilter()
|
||||
await node.mountFilterClient()
|
||||
let driver: ArchiveDriver = QueueDriver.new()
|
||||
let mountArchiveRes = node.mountArchive(driver)
|
||||
|
||||
@ -32,6 +32,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
|
||||
await allFutures(node1.start(), node2.start())
|
||||
|
||||
await node1.mountFilter()
|
||||
await node1.mountLegacyFilter()
|
||||
await node2.mountFilterClient()
|
||||
|
||||
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
||||
|
||||
@ -107,10 +107,11 @@ suite "Waku v2 Rest API - Admin":
|
||||
pubsubTopicNode2 = DefaultPubsubTopic
|
||||
pubsubTopicNode3 = PubsubTopic("/waku/2/custom-waku/proto")
|
||||
|
||||
## TODO: Note that such checks may depend heavily on the order of the returned data!
|
||||
expectedFilterData2 = fmt"(peerId: ""{$peerInfo2}"", filterCriteria:" &
|
||||
fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}""), " &
|
||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " &
|
||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}"")]"
|
||||
fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " &
|
||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}""), " &
|
||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}"")]"
|
||||
|
||||
expectedFilterData3 = fmt"(peerId: ""{$peerInfo3}"", filterCriteria:" &
|
||||
fmt" @[(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[0]}""), " &
|
||||
@ -151,4 +152,4 @@ suite "Waku v2 Rest API - Admin":
|
||||
|
||||
check:
|
||||
getRes.status == 400
|
||||
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
||||
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
||||
|
||||
@ -11,7 +11,6 @@ import
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_node,
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_filter,
|
||||
../../waku/waku_api/rest/server,
|
||||
../../waku/waku_api/rest/client,
|
||||
../../waku/waku_api/rest/responses,
|
||||
|
||||
@ -50,6 +50,7 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} =
|
||||
await allFutures(result.filterNode.start(), result.clientNode.start())
|
||||
|
||||
await result.filterNode.mountFilter()
|
||||
await result.filterNode.mountLegacyFilter()
|
||||
await result.clientNode.mountFilterClient()
|
||||
|
||||
result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo()
|
||||
@ -174,7 +175,7 @@ suite "Waku v2 Rest API - Filter":
|
||||
|
||||
while msg == messages[i]:
|
||||
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
|
||||
|
||||
|
||||
messages.add(msg)
|
||||
|
||||
restFilterTest.messageCache.contentSubscribe(contentTopic)
|
||||
|
||||
@ -35,6 +35,7 @@ import
|
||||
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
|
||||
../waku_filter_v2,
|
||||
../waku_filter_v2/client as filter_client,
|
||||
../waku_filter_v2/subscriptions as filter_subscriptions,
|
||||
../waku_lightpush,
|
||||
../waku_metadata,
|
||||
../waku_lightpush/client as lightpush_client,
|
||||
@ -67,9 +68,6 @@ const git_version* {.strdefine.} = "n/a"
|
||||
# Default clientId
|
||||
const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
# Default Waku Filter Timeout
|
||||
const WakuFilterTimeout: Duration = 1.days
|
||||
|
||||
const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
||||
|
||||
# key and crypto modules different
|
||||
@ -188,7 +186,7 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
|
||||
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
||||
if not node.wakuMetadata.isNil():
|
||||
return err("Waku metadata already mounted, skipping")
|
||||
|
||||
|
||||
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
|
||||
|
||||
node.wakuMetadata = metadata
|
||||
@ -403,18 +401,36 @@ proc mountRelay*(node: WakuNode,
|
||||
|
||||
## Waku filter
|
||||
|
||||
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout)
|
||||
proc mountLegacyFilter*(node: WakuNode, filterTimeout: Duration = WakuLegacyFilterTimeout)
|
||||
{.async, raises: [Defect, LPError]} =
|
||||
## Mounting legacy filter protocol with separation from new v2 filter protocol for easier removal later
|
||||
## TODO: remove legacy filter protocol
|
||||
|
||||
info "mounting legacy filter protocol"
|
||||
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout)
|
||||
|
||||
if node.started:
|
||||
await node.wakuFilterLegacy.start() #TODO: remove legacy
|
||||
|
||||
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec))
|
||||
|
||||
proc mountFilter*(node: WakuNode,
|
||||
subscriptionTimeout: Duration = filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer)
|
||||
{.async, raises: [Defect, LPError]} =
|
||||
## Mounting filter v2 protocol
|
||||
|
||||
info "mounting filter protocol"
|
||||
node.wakuFilter = WakuFilter.new(node.peerManager)
|
||||
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy
|
||||
node.wakuFilter = WakuFilter.new(node.peerManager,
|
||||
subscriptionTimeout,
|
||||
maxFilterPeers,
|
||||
maxFilterCriteriaPerPeer)
|
||||
|
||||
if node.started:
|
||||
await node.wakuFilter.start()
|
||||
await node.wakuFilterLegacy.start() #TODO: remove legacy
|
||||
|
||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
||||
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy
|
||||
|
||||
proc filterHandleMessage*(node: WakuNode,
|
||||
pubsubTopic: PubsubTopic,
|
||||
|
||||
35
waku/utils/tableutils.nim
Normal file
35
waku/utils/tableutils.nim
Normal file
@ -0,0 +1,35 @@
|
||||
import std/tables,
|
||||
stew/objects,
|
||||
stew/templateutils
|
||||
|
||||
template keepItIf*[A, B](tableParam: var Table[A, B], itPredicate: untyped) =
|
||||
bind evalTemplateParamOnce
|
||||
evalTemplateParamOnce(tableParam, t):
|
||||
var itemsToDelete: seq[A]
|
||||
var key {.inject.} : A
|
||||
var val {.inject.} : B
|
||||
|
||||
for k, v in t.mpairs():
|
||||
key = k
|
||||
val = v
|
||||
if not itPredicate:
|
||||
itemsToDelete.add(key)
|
||||
|
||||
for item in itemsToDelete:
|
||||
t.del(item)
|
||||
|
||||
template keepItIf*[A, B](tableParam: var TableRef[A, B], itPredicate: untyped) =
|
||||
bind evalTemplateParamOnce
|
||||
evalTemplateParamOnce(tableParam, t):
|
||||
var itemsToDelete: seq[A]
|
||||
let key {.inject.} : A
|
||||
let val {.inject.} : B
|
||||
|
||||
for k, v in t[].mpairs():
|
||||
key = k
|
||||
val = v
|
||||
if not itPredicate:
|
||||
itemsToDelete.add(key)
|
||||
|
||||
for item in itemsToDelete:
|
||||
t[].del(item)
|
||||
@ -121,9 +121,10 @@ proc installAdminV1GetFilterSubsHandler(router: var RestRouter, node: WakuNode)
|
||||
subscriptions: seq[FilterSubscription] = @[]
|
||||
filterCriteria: seq[FilterTopic]
|
||||
|
||||
for (peerId, criteria) in node.wakuFilter.subscriptions.pairs():
|
||||
filterCriteria = criteria.toSeq().mapIt(FilterTopic(pubsubTopic: it[0],
|
||||
contentTopic: it[1]))
|
||||
for peerId in node.wakuFilter.subscriptions.peersSubscribed.keys:
|
||||
filterCriteria = node.wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||
.mapIt(FilterTopic(pubsubTopic: it[0],
|
||||
contentTopic: it[1]))
|
||||
|
||||
subscriptions.add(FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria))
|
||||
|
||||
|
||||
@ -22,7 +22,7 @@ logScope:
|
||||
const
|
||||
WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||
|
||||
WakuFilterTimeout: Duration = 2.hours
|
||||
WakuLegacyFilterTimeout*: Duration = 2.hours
|
||||
|
||||
|
||||
type WakuFilterResult*[T] = Result[T, string]
|
||||
@ -113,7 +113,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) =
|
||||
proc new*(T: type WakuFilterLegacy,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
timeout: Duration = WakuFilterTimeout): T =
|
||||
timeout: Duration = WakuLegacyFilterTimeout): T =
|
||||
let wf = WakuFilterLegacy(rng: rng,
|
||||
peerManager: peerManager,
|
||||
timeout: timeout)
|
||||
@ -123,7 +123,7 @@ proc new*(T: type WakuFilterLegacy,
|
||||
proc init*(T: type WakuFilterLegacy,
|
||||
peerManager: PeerManager,
|
||||
rng: ref rand.HmacDrbgContext,
|
||||
timeout: Duration = WakuFilterTimeout): T {.
|
||||
timeout: Duration = WakuLegacyFilterTimeout): T {.
|
||||
deprecated: "WakuFilterLegacy.new()' instead".} =
|
||||
WakuFilterLegacy.new(peerManager, rng, timeout)
|
||||
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import
|
||||
./waku_filter_v2/common,
|
||||
./waku_filter_v2/protocol
|
||||
./waku_filter_v2/protocol,
|
||||
./waku_filter_v2/subscriptions
|
||||
|
||||
export
|
||||
common,
|
||||
protocol
|
||||
protocol,
|
||||
subscriptions
|
||||
|
||||
@ -25,7 +25,7 @@ logScope:
|
||||
topics = "waku filter"
|
||||
|
||||
const
|
||||
MaxContentTopicsPerRequest* = 30
|
||||
MaxContentTopicsPerRequest* = 100
|
||||
|
||||
type
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
@ -36,82 +36,69 @@ type
|
||||
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||
trace "pinging subscriber", peerId=peerId
|
||||
|
||||
if peerId notin wf.subscriptions:
|
||||
if not wf.subscriptions.isSubscribed(peerId):
|
||||
debug "pinging peer has no subscriptions", peerId=peerId
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
wf.subscriptions.refreshSubscription(peerId)
|
||||
|
||||
ok()
|
||||
|
||||
proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
||||
proc subscribe(wf: WakuFilter,
|
||||
peerId: PeerID,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||
|
||||
# TODO: check if this condition is valid???
|
||||
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||
|
||||
if contentTopics.len() > MaxContentTopicsPerRequest:
|
||||
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
||||
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " &
|
||||
$MaxContentTopicsPerRequest))
|
||||
|
||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||
|
||||
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||
|
||||
if peerId in wf.subscriptions:
|
||||
# We already have a subscription for this peer. Try to add the new filter criteria.
|
||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||
if peerSubscription.len() + filterCriteria.len() > MaxCriteriaPerSubscription:
|
||||
return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria"))
|
||||
|
||||
peerSubscription.incl(filterCriteria)
|
||||
wf.subscriptions[peerId] = peerSubscription
|
||||
else:
|
||||
# We don't have a subscription for this peer yet. Try to add it.
|
||||
if wf.subscriptions.len() >= MaxTotalSubscriptions:
|
||||
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
|
||||
debug "creating new subscription", peerId=peerId
|
||||
wf.subscriptions[peerId] = filterCriteria
|
||||
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
|
||||
return err(FilterSubscribeError.serviceUnavailable(error))
|
||||
|
||||
ok()
|
||||
|
||||
proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
||||
proc unsubscribe(wf: WakuFilter,
|
||||
peerId: PeerID,
|
||||
pubsubTopic: Option[PubsubTopic],
|
||||
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||
|
||||
if contentTopics.len() > MaxContentTopicsPerRequest:
|
||||
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
||||
|
||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||
|
||||
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||
debug "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||
|
||||
if peerId notin wf.subscriptions:
|
||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
||||
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
||||
|
||||
if not peerSubscription.containsAny(filterCriteria):
|
||||
debug "unsubscribing peer is not subscribed to any of the content topics in this pubsub topic", peerId=peerId, pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
peerSubscription.excl(filterCriteria)
|
||||
|
||||
if peerSubscription.len() == 0:
|
||||
debug "peer has no more subscriptions, removing subscription", peerId=peerId
|
||||
wf.subscriptions.del(peerId)
|
||||
else:
|
||||
wf.subscriptions[peerId] = peerSubscription
|
||||
|
||||
ok()
|
||||
|
||||
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||
if peerId notin wf.subscriptions:
|
||||
if not wf.subscriptions.isSubscribed(peerId):
|
||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
||||
return err(FilterSubscribeError.notFound())
|
||||
|
||||
debug "removing peer subscription", peerId=peerId
|
||||
wf.subscriptions.del(peerId)
|
||||
wf.subscriptions.removePeer(peerId)
|
||||
wf.subscriptions.cleanUp()
|
||||
|
||||
ok()
|
||||
|
||||
proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse =
|
||||
proc handleSubscribeRequest*(wf: WakuFilter,
|
||||
peerId: PeerId,
|
||||
request: FilterSubscribeRequest): FilterSubscribeResponse =
|
||||
info "received filter subscribe request", peerId=peerId, request=request
|
||||
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||
|
||||
@ -134,7 +121,8 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs
|
||||
let
|
||||
requestDuration = Moment.now() - requestStartTime
|
||||
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
|
||||
waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType])
|
||||
waku_filter_request_duration_seconds.observe(
|
||||
requestDurationSec, labelValues = [$request.filterSubscribeType])
|
||||
|
||||
if subscribeResult.isErr():
|
||||
return FilterSubscribeResponse(
|
||||
@ -153,6 +141,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||
trace "no addresses for peer", peer=peer
|
||||
return
|
||||
|
||||
## TODO: Check if dial is necessary always???
|
||||
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
||||
if conn.isNone():
|
||||
## We do not remove this peer, but allow the underlying peer manager
|
||||
@ -163,7 +152,10 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||
await conn.get().writeLp(buffer)
|
||||
|
||||
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
|
||||
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic,
|
||||
contentTopic=messagePush.wakuMessage.contentTopic,
|
||||
peers=peers,
|
||||
hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||
|
||||
let bufferToPublish = messagePush.encode().buffer
|
||||
|
||||
@ -179,18 +171,18 @@ proc maintainSubscriptions*(wf: WakuFilter) =
|
||||
|
||||
## Remove subscriptions for peers that have been removed from peer store
|
||||
var peersToRemove: seq[PeerId]
|
||||
for peerId, peerSubscription in wf.subscriptions.pairs():
|
||||
## TODO: currently we only maintain by syncing with peer store. We could
|
||||
## consider other metrics, such as subscription age, activity, etc.
|
||||
for peerId in wf.subscriptions.peersSubscribed.keys:
|
||||
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
||||
debug "peer has been removed from peer store, removing subscription", peerId=peerId
|
||||
peersToRemove.add(peerId)
|
||||
|
||||
if peersToRemove.len() > 0:
|
||||
if peersToRemove.len > 0:
|
||||
wf.subscriptions.removePeers(peersToRemove)
|
||||
|
||||
wf.subscriptions.cleanUp()
|
||||
|
||||
## Periodic report of number of subscriptions
|
||||
waku_filter_subscriptions.set(wf.subscriptions.len().float64)
|
||||
waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64)
|
||||
|
||||
const MessagePushTimeout = 20.seconds
|
||||
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
|
||||
@ -201,7 +193,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
||||
block:
|
||||
## Find subscribers and push message to them
|
||||
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
||||
if subscribedPeers.len() == 0:
|
||||
if subscribedPeers.len == 0:
|
||||
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
|
||||
return
|
||||
|
||||
@ -210,11 +202,15 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
||||
wakuMessage: message)
|
||||
|
||||
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
|
||||
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic,
|
||||
contentTopic=message.contentTopic,
|
||||
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
||||
else:
|
||||
debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
debug "pushed message succesfully to all subscribers",
|
||||
pubsubTopic=pubsubTopic,
|
||||
contentTopic=message.contentTopic,
|
||||
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||
|
||||
let
|
||||
handleMessageDuration = Moment.now() - handleMessageStartTime
|
||||
@ -247,13 +243,21 @@ proc initProtocolHandler(wf: WakuFilter) =
|
||||
wf.codec = WakuFilterSubscribeCodec
|
||||
|
||||
proc new*(T: type WakuFilter,
|
||||
peerManager: PeerManager): T =
|
||||
peerManager: PeerManager,
|
||||
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): T =
|
||||
|
||||
let wf = WakuFilter(
|
||||
subscriptions: FilterSubscriptions.init(subscriptionTimeout,
|
||||
maxFilterPeers,
|
||||
maxFilterCriteriaPerPeer
|
||||
),
|
||||
peerManager: peerManager
|
||||
)
|
||||
|
||||
wf.initProtocolHandler()
|
||||
wf
|
||||
return wf
|
||||
|
||||
const MaintainSubscriptionsInterval* = 1.minutes
|
||||
|
||||
@ -278,4 +282,5 @@ method stop*(wf: WakuFilter) {.async.} =
|
||||
debug "stopping filter protocol"
|
||||
if not wf.maintenanceTask.isNil():
|
||||
wf.maintenanceTask.clearTimer()
|
||||
|
||||
await procCall LPProtocol(wf).stop()
|
||||
|
||||
@ -6,48 +6,163 @@ else:
|
||||
import
|
||||
std/[sets,tables],
|
||||
chronicles,
|
||||
libp2p/peerid
|
||||
chronos,
|
||||
libp2p/peerid,
|
||||
stew/shims/sets
|
||||
import
|
||||
../waku_core
|
||||
../waku_core,
|
||||
../utils/tableutils
|
||||
|
||||
logScope:
|
||||
topics = "waku filter subscriptions"
|
||||
|
||||
const
|
||||
MaxTotalSubscriptions* = 1000 # TODO make configurable
|
||||
MaxCriteriaPerSubscription* = 1000
|
||||
MaxFilterPeers* = 1000
|
||||
MaxFilterCriteriaPerPeer* = 1000
|
||||
DefaultSubscriptionTimeToLiveSec* = 5.minutes
|
||||
|
||||
type
|
||||
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic
|
||||
# a single filter criterion is fully defined by a pubsub topic and content topic
|
||||
FilterCriterion* = tuple
|
||||
pubsubTopic: PubsubTopic
|
||||
contentTopic: ContentTopic
|
||||
|
||||
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
|
||||
FilterSubscriptions* = Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria
|
||||
|
||||
proc findSubscribedPeers*(subscriptions: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] =
|
||||
## Find all peers subscribed to a given topic and content topic
|
||||
let filterCriterion = (pubsubTopic, contentTopic)
|
||||
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
|
||||
|
||||
var subscribedPeers: seq[PeerID]
|
||||
PeerData* = tuple
|
||||
lastSeen: Moment
|
||||
criteriaCount: uint
|
||||
|
||||
# TODO: for large maps, this can be optimized using a reverse index
|
||||
for (peerId, criteria) in subscriptions.pairs():
|
||||
if filterCriterion in criteria:
|
||||
subscribedPeers.add(peerId)
|
||||
FilterSubscriptions* = object
|
||||
peersSubscribed* : Table[PeerID, PeerData]
|
||||
subscriptions : Table[FilterCriterion, SubscribedPeers]
|
||||
subscriptionTimeout : Duration
|
||||
maxPeers : uint
|
||||
maxCriteriaPerPeer : uint
|
||||
|
||||
subscribedPeers
|
||||
proc init*(T: type FilterSubscriptions,
|
||||
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): FilterSubscriptions =
|
||||
## Create a new filter subscription object
|
||||
return FilterSubscriptions(
|
||||
peersSubscribed: initTable[PeerID, PeerData](),
|
||||
subscriptions: initTable[FilterCriterion, SubscribedPeers](),
|
||||
subscriptionTimeout: subscriptionTimeout,
|
||||
maxPeers: maxFilterPeers,
|
||||
maxCriteriaPerPeer: maxFilterCriteriaPerPeer
|
||||
)
|
||||
|
||||
proc removePeer*(subscriptions: var FilterSubscriptions, peerId: PeerID) =
|
||||
proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool =
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
|
||||
|
||||
return false
|
||||
|
||||
proc subscribedPeerCount*(s: FilterSubscriptions): uint =
|
||||
return cast[uint](s.peersSubscribed.len)
|
||||
|
||||
proc getPeerSubscriptions*(s: var FilterSubscriptions, peerId: PeerID): seq[FilterCriterion] =
|
||||
## Get all pubsub-content topics a peer is subscribed to
|
||||
var subscribedContentTopics: seq[FilterCriterion] = @[]
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
if data.criteriaCount == 0:
|
||||
return subscribedContentTopics
|
||||
|
||||
for filterCriterion, subscribedPeers in s.subscriptions.mpairs:
|
||||
if peerId in subscribedPeers:
|
||||
subscribedContentTopics.add(filterCriterion)
|
||||
|
||||
return subscribedContentTopics
|
||||
|
||||
proc findSubscribedPeers*(s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] =
|
||||
let filterCriterion : FilterCriterion = (pubsubTopic, contentTopic)
|
||||
|
||||
var foundPeers : seq[PeerID] = @[]
|
||||
# only peers subscribed to criteria and with legit subscription is counted
|
||||
s.subscriptions.withValue(filterCriterion, peers):
|
||||
for peer in peers[]:
|
||||
if s.isSubscribed(peer):
|
||||
foundPeers.add(peer)
|
||||
|
||||
return foundPeers
|
||||
|
||||
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||
## Remove all subscriptions for a given peer
|
||||
subscriptions.del(peerId)
|
||||
s.peersSubscribed.del(peerId)
|
||||
|
||||
proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
||||
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
||||
## Remove all subscriptions for a given list of peers
|
||||
for peerId in peerIds:
|
||||
subscriptions.removePeer(peerId)
|
||||
s.peersSubscribed.keepItIf(key notin peerIds)
|
||||
|
||||
proc cleanUp*(fs: var FilterSubscriptions) =
|
||||
## Remove all subscriptions for peers that have not been seen for a while
|
||||
let now = Moment.now()
|
||||
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
||||
|
||||
var filtersToRemove: seq[FilterCriterion] = @[]
|
||||
for filterCriterion, subscribedPeers in fs.subscriptions.mpairs:
|
||||
subscribedPeers.keepItIf(fs.isSubscribed(it)==true)
|
||||
|
||||
fs.subscriptions.keepItIf(val.len > 0)
|
||||
|
||||
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
data.lastSeen = Moment.now()
|
||||
|
||||
proc addSubscription*(s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria): Result[void, string] =
|
||||
## Add a subscription for a given peer
|
||||
var peerData: ptr PeerData
|
||||
|
||||
s.peersSubscribed.withValue(peerId, data):
|
||||
if data.criteriaCount + cast[uint](filterCriteria.len) > s.maxCriteriaPerPeer:
|
||||
return err("peer has reached maximum number of filter criteria")
|
||||
|
||||
data.lastSeen = Moment.now()
|
||||
peerData = data
|
||||
|
||||
do:
|
||||
## not yet subscribed
|
||||
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
||||
return err("node has reached maximum number of subscriptions")
|
||||
|
||||
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
||||
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
||||
|
||||
for filterCriterion in filterCriteria:
|
||||
var peersOfSub = addr(s.subscriptions.mgetOrPut(filterCriterion, SubscribedPeers()))
|
||||
if peerId notin peersOfSub[]:
|
||||
peersOfSub[].incl(peerId)
|
||||
peerData.criteriaCount += 1
|
||||
|
||||
return ok()
|
||||
|
||||
proc removeSubscription*(s: var FilterSubscriptions,
|
||||
peerId: PeerID,
|
||||
filterCriteria: FilterCriteria):
|
||||
Result[void, string] =
|
||||
## Remove a subscription for a given peer
|
||||
|
||||
s.peersSubscribed.withValue(peerId, peerData):
|
||||
peerData.lastSeen = Moment.now()
|
||||
for filterCriterion in filterCriteria:
|
||||
s.subscriptions.withValue(filterCriterion, peers):
|
||||
if peers[].missingOrexcl(peerId) == false:
|
||||
peerData.criteriaCount -= 1
|
||||
|
||||
if peers[].len == 0:
|
||||
s.subscriptions.del(filterCriterion)
|
||||
if peerData.criteriaCount == 0:
|
||||
s.peersSubscribed.del(peerId)
|
||||
do:
|
||||
## Maybe let just run through and log it as a warning
|
||||
return err("Peer was not subscribed to criterion")
|
||||
|
||||
return ok()
|
||||
|
||||
do:
|
||||
return err("Peer has no subscriptions")
|
||||
|
||||
|
||||
proc containsAny*(criteria: FilterCriteria, otherCriteria: FilterCriteria): bool =
|
||||
## Check if a given pubsub topic is contained in a set of filter criteria
|
||||
## TODO: Better way to achieve this?
|
||||
for criterion in otherCriteria:
|
||||
if criterion in criteria:
|
||||
return true
|
||||
false
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user