mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
chore: Refactor of FilterV2 subscription management with Time-to-live maintenance (#2341)
* Refactor of FilterV2 subscription handling and maintenance with addition subscription time-to-live support. Fixed all tests and reworked where subscription handling changes needed it. Adapted REST API /admin filter subscription retrieve to new filter subscription structure. * Fix tests and PR comments * Added filter v2 subscription timeout tests and fixed * Fix review comments and suggestions. No functional change. * Remove leftover echoes from test_rest_admin * Fix failed legacy filter tests due to separation of mounting the filters. * Small fixes, fix naming typo, removed duplicated checks in test
This commit is contained in:
parent
3d816c0814
commit
c3358409bb
@ -468,6 +468,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
let peerInfo = parsePeerInfo(conf.filternode)
|
let peerInfo = parsePeerInfo(conf.filternode)
|
||||||
if peerInfo.isOk():
|
if peerInfo.isOk():
|
||||||
await node.mountFilter()
|
await node.mountFilter()
|
||||||
|
await node.mountLegacyFilter()
|
||||||
await node.mountFilterClient()
|
await node.mountFilterClient()
|
||||||
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
|
node.peerManager.addServicePeer(peerInfo.value, WakuLegacyFilterCodec)
|
||||||
|
|
||||||
@ -507,7 +508,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|||||||
echo "A spam message is found and discarded"
|
echo "A spam message is found and discarded"
|
||||||
chat.prompt = false
|
chat.prompt = false
|
||||||
showChatPrompt(chat)
|
showChatPrompt(chat)
|
||||||
|
|
||||||
echo "rln-relay preparation is in progress..."
|
echo "rln-relay preparation is in progress..."
|
||||||
|
|
||||||
let rlnConf = WakuRlnConfig(
|
let rlnConf = WakuRlnConfig(
|
||||||
|
|||||||
@ -288,6 +288,7 @@ when isMainModule:
|
|||||||
|
|
||||||
if conf.filter:
|
if conf.filter:
|
||||||
waitFor mountFilter(bridge.nodev2)
|
waitFor mountFilter(bridge.nodev2)
|
||||||
|
waitFor mountLegacyFilter(bridge.nodev2)
|
||||||
|
|
||||||
if conf.staticnodes.len > 0:
|
if conf.staticnodes.len > 0:
|
||||||
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
waitFor connectToNodes(bridge.nodev2, conf.staticnodes)
|
||||||
|
|||||||
@ -254,7 +254,7 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
|||||||
)
|
)
|
||||||
|
|
||||||
WakuDiscoveryV5.new(
|
WakuDiscoveryV5.new(
|
||||||
app.rng,
|
app.rng,
|
||||||
discv5Conf,
|
discv5Conf,
|
||||||
some(app.record),
|
some(app.record),
|
||||||
some(app.node.peerManager),
|
some(app.node.peerManager),
|
||||||
@ -326,7 +326,7 @@ proc setupWakuApp*(app: var App): AppResult[void] =
|
|||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc getPorts(listenAddrs: seq[MultiAddress]):
|
proc getPorts(listenAddrs: seq[MultiAddress]):
|
||||||
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
||||||
|
|
||||||
var tcpPort, websocketPort = none(Port)
|
var tcpPort, websocketPort = none(Port)
|
||||||
|
|
||||||
@ -548,7 +548,15 @@ proc setupProtocols(node: WakuNode,
|
|||||||
# Filter setup. NOTE Must be mounted after relay
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
if conf.filter:
|
if conf.filter:
|
||||||
try:
|
try:
|
||||||
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
await mountLegacyFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
||||||
|
except CatchableError:
|
||||||
|
return err("failed to mount waku legacy filter protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
try:
|
||||||
|
await mountFilter(node,
|
||||||
|
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
|
||||||
|
maxFilterPeers = conf.filterMaxPeersToServe,
|
||||||
|
maxFilterCriteriaPerPeer = conf.filterMaxCriteria)
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
@ -724,7 +732,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
|||||||
|
|
||||||
let filterCache = MessageCache.init()
|
let filterCache = MessageCache.init()
|
||||||
|
|
||||||
let filterDiscoHandler =
|
let filterDiscoHandler =
|
||||||
if app.wakuDiscv5.isSome():
|
if app.wakuDiscv5.isSome():
|
||||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
|
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Filter))
|
||||||
else: none(DiscoveryHandler)
|
else: none(DiscoveryHandler)
|
||||||
@ -739,7 +747,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
|||||||
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"
|
notInstalledTab["filter"] = "/filter endpoints are not available. Please check your configuration: --filternode"
|
||||||
|
|
||||||
## Store REST API
|
## Store REST API
|
||||||
let storeDiscoHandler =
|
let storeDiscoHandler =
|
||||||
if app.wakuDiscv5.isSome():
|
if app.wakuDiscv5.isSome():
|
||||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
|
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Store))
|
||||||
else: none(DiscoveryHandler)
|
else: none(DiscoveryHandler)
|
||||||
@ -749,7 +757,7 @@ proc startRestServer(app: App, address: IpAddress, port: Port, conf: WakuNodeCon
|
|||||||
## Light push API
|
## Light push API
|
||||||
if conf.lightpushnode != "" and
|
if conf.lightpushnode != "" and
|
||||||
app.node.wakuLightpushClient != nil:
|
app.node.wakuLightpushClient != nil:
|
||||||
let lightDiscoHandler =
|
let lightDiscoHandler =
|
||||||
if app.wakuDiscv5.isSome():
|
if app.wakuDiscv5.isSome():
|
||||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
||||||
else: none(DiscoveryHandler)
|
else: none(DiscoveryHandler)
|
||||||
|
|||||||
@ -95,6 +95,7 @@ type
|
|||||||
defaultValue: false,
|
defaultValue: false,
|
||||||
name: "execute" .}: bool
|
name: "execute" .}: bool
|
||||||
|
|
||||||
|
|
||||||
of noCommand:
|
of noCommand:
|
||||||
## Application-level configuration
|
## Application-level configuration
|
||||||
protectedTopics* {.
|
protectedTopics* {.
|
||||||
@ -221,7 +222,7 @@ type
|
|||||||
desc: "Rln relay identity commitment key as a Hex string",
|
desc: "Rln relay identity commitment key as a Hex string",
|
||||||
defaultValue: ""
|
defaultValue: ""
|
||||||
name: "rln-relay-id-commitment-key" }: string
|
name: "rln-relay-id-commitment-key" }: string
|
||||||
|
|
||||||
rlnRelayTreePath* {.
|
rlnRelayTreePath* {.
|
||||||
desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
|
desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
|
||||||
defaultValue: ""
|
defaultValue: ""
|
||||||
@ -304,10 +305,25 @@ type
|
|||||||
name: "filternode" }: string
|
name: "filternode" }: string
|
||||||
|
|
||||||
filterTimeout* {.
|
filterTimeout* {.
|
||||||
desc: "Timeout for filter node in seconds.",
|
desc: "Filter clients will be wiped out if not able to receive push messages within this timeout. In seconds.",
|
||||||
defaultValue: 14400 # 4 hours
|
defaultValue: 14400 # 4 hours
|
||||||
name: "filter-timeout" }: int64
|
name: "filter-timeout" }: int64
|
||||||
|
|
||||||
|
filterSubscriptionTimeout* {.
|
||||||
|
desc: "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
|
||||||
|
defaultValue: 300 # 5 minutes
|
||||||
|
name: "filter-subscription-timeout" }: int64
|
||||||
|
|
||||||
|
filterMaxPeersToServe* {.
|
||||||
|
desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.",
|
||||||
|
defaultValue: 1000
|
||||||
|
name: "filter-max-peers-to-serve" }: uint32
|
||||||
|
|
||||||
|
filterMaxCriteria* {.
|
||||||
|
desc: "Maximum number of pubsub- and content topic combination per peers at a time. Only for v2 filter protocol.",
|
||||||
|
defaultValue: 1000
|
||||||
|
name: "filter-max-criteria" }: uint32
|
||||||
|
|
||||||
## Lightpush config
|
## Lightpush config
|
||||||
|
|
||||||
lightpush* {.
|
lightpush* {.
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[
|
std/[
|
||||||
options,
|
options,
|
||||||
tables,
|
tables,
|
||||||
sequtils
|
sequtils
|
||||||
],
|
],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
@ -12,7 +12,7 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
os,
|
os,
|
||||||
libp2p/[
|
libp2p/[
|
||||||
peerstore,
|
peerstore,
|
||||||
crypto/crypto
|
crypto/crypto
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -44,7 +44,7 @@ suite "Waku Filter - End to End":
|
|||||||
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
||||||
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
|
var pushHandlerFuture {.threadvar.}: Future[(string, WakuMessage)]
|
||||||
var messagePushHandler {.threadvar.}: FilterPushHandler
|
var messagePushHandler {.threadvar.}: FilterPushHandler
|
||||||
|
|
||||||
asyncSetup:
|
asyncSetup:
|
||||||
pushHandlerFuture = newFuture[(string, WakuMessage)]()
|
pushHandlerFuture = newFuture[(string, WakuMessage)]()
|
||||||
messagePushHandler = proc(
|
messagePushHandler = proc(
|
||||||
@ -84,8 +84,8 @@ suite "Waku Filter - End to End":
|
|||||||
# Then the subscription is successful
|
# Then the subscription is successful
|
||||||
check:
|
check:
|
||||||
subscribeResponse.isOk()
|
subscribeResponse.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 1
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
server.wakuFilter.subscriptions.hasKey(clientPeerId)
|
server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
|
||||||
|
|
||||||
# When sending a message to the subscribed content topic
|
# When sending a message to the subscribed content topic
|
||||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||||
@ -106,7 +106,7 @@ suite "Waku Filter - End to End":
|
|||||||
# Then the unsubscription is successful
|
# Then the unsubscription is successful
|
||||||
check:
|
check:
|
||||||
unsubscribeResponse.isOk()
|
unsubscribeResponse.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 0
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||||
|
|
||||||
# When sending a message to the previously subscribed content topic
|
# When sending a message to the previously subscribed content topic
|
||||||
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
|
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
|
||||||
@ -116,7 +116,7 @@ suite "Waku Filter - End to End":
|
|||||||
# Then the message is not pushed to the client
|
# Then the message is not pushed to the client
|
||||||
check:
|
check:
|
||||||
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||||
|
|
||||||
asyncTest "Client Node can't receive Push from Server Node, via Relay":
|
asyncTest "Client Node can't receive Push from Server Node, via Relay":
|
||||||
# Given the server node has Relay enabled
|
# Given the server node has Relay enabled
|
||||||
await server.mountRelay()
|
await server.mountRelay()
|
||||||
@ -127,7 +127,7 @@ suite "Waku Filter - End to End":
|
|||||||
)
|
)
|
||||||
require:
|
require:
|
||||||
subscribeResponse.isOk()
|
subscribeResponse.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 1
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
|
|
||||||
# When a server node gets a Relay message
|
# When a server node gets a Relay message
|
||||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||||
@ -141,7 +141,7 @@ suite "Waku Filter - End to End":
|
|||||||
let
|
let
|
||||||
serverKey = generateSecp256k1Key()
|
serverKey = generateSecp256k1Key()
|
||||||
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
||||||
|
|
||||||
waitFor server.start()
|
waitFor server.start()
|
||||||
waitFor server.mountRelay()
|
waitFor server.mountRelay()
|
||||||
|
|
||||||
@ -162,8 +162,8 @@ suite "Waku Filter - End to End":
|
|||||||
)
|
)
|
||||||
require:
|
require:
|
||||||
subscribeResponse.isOk()
|
subscribeResponse.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 1
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
|
|
||||||
# And the client node reboots
|
# And the client node reboots
|
||||||
waitFor client.stop()
|
waitFor client.stop()
|
||||||
waitFor client.start()
|
waitFor client.start()
|
||||||
@ -189,8 +189,8 @@ suite "Waku Filter - End to End":
|
|||||||
)
|
)
|
||||||
require:
|
require:
|
||||||
subscribeResponse.isOk()
|
subscribeResponse.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 1
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
|
|
||||||
# And the client node reboots
|
# And the client node reboots
|
||||||
waitFor client.stop()
|
waitFor client.stop()
|
||||||
waitFor client.start()
|
waitFor client.start()
|
||||||
@ -209,7 +209,7 @@ suite "Waku Filter - End to End":
|
|||||||
)
|
)
|
||||||
check:
|
check:
|
||||||
subscribeResponse2.isOk()
|
subscribeResponse2.isOk()
|
||||||
server.wakuFilter.subscriptions.len == 1
|
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
|
|
||||||
# When a message is sent to the subscribed content topic, via Relay
|
# When a message is sent to the subscribed content topic, via Relay
|
||||||
pushHandlerFuture = newPushHandlerFuture()
|
pushHandlerFuture = newPushHandlerFuture()
|
||||||
|
|||||||
@ -53,6 +53,7 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||||
|
|
||||||
# Dial node2 from node1
|
# Dial node2 from node1
|
||||||
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
let conn = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
||||||
@ -528,6 +529,7 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||||
|
|
||||||
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
@ -579,6 +581,7 @@ procSuite "Peer Manager":
|
|||||||
await allFutures(nodes.mapIt(it.start()))
|
await allFutures(nodes.mapIt(it.start()))
|
||||||
await allFutures(nodes.mapIt(it.mountRelay()))
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
||||||
await allFutures(nodes.mapIt(it.mountFilter()))
|
await allFutures(nodes.mapIt(it.mountFilter()))
|
||||||
|
await allFutures(nodes.mapIt(it.mountLegacyFilter()))
|
||||||
|
|
||||||
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
|||||||
@ -29,6 +29,7 @@ suite "WakuNode - Filter":
|
|||||||
waitFor allFutures(server.start(), client.start())
|
waitFor allFutures(server.start(), client.start())
|
||||||
|
|
||||||
waitFor server.mountFilter()
|
waitFor server.mountFilter()
|
||||||
|
waitFor server.mountLegacyFilter()
|
||||||
waitFor client.mountFilterClient()
|
waitFor client.mountFilterClient()
|
||||||
|
|
||||||
## Given
|
## Given
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@ -13,7 +13,8 @@ import
|
|||||||
../../../waku/waku_filter_v2/subscriptions,
|
../../../waku/waku_filter_v2/subscriptions,
|
||||||
../../../waku/waku_core,
|
../../../waku/waku_core,
|
||||||
../testlib/common,
|
../testlib/common,
|
||||||
../testlib/wakucore
|
../testlib/wakucore,
|
||||||
|
./waku_filter_utils
|
||||||
|
|
||||||
proc newTestWakuFilter(switch: Switch): WakuFilter =
|
proc newTestWakuFilter(switch: Switch): WakuFilter =
|
||||||
let
|
let
|
||||||
@ -38,9 +39,11 @@ proc createRequest(filterSubscribeType: FilterSubscribeType, pubsubTopic = none(
|
|||||||
)
|
)
|
||||||
|
|
||||||
proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
proc getSubscribedContentTopics(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
||||||
var contentTopics: seq[ContentTopic]
|
var contentTopics: seq[ContentTopic] = @[]
|
||||||
for filterCriterion in wakuFilter.subscriptions[peerId]:
|
let peersCreitera = wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||||
contentTopics.add(filterCriterion[1])
|
|
||||||
|
for filterCriterion in peersCreitera:
|
||||||
|
contentTopics.add(filterCriterion.contentTopic)
|
||||||
|
|
||||||
return contentTopics
|
return contentTopics
|
||||||
|
|
||||||
@ -68,8 +71,8 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions[peerId].len == 1
|
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||||
response.requestId == filterSubscribeRequest.requestId
|
response.requestId == filterSubscribeRequest.requestId
|
||||||
response.statusCode == 200
|
response.statusCode == 200
|
||||||
response.statusDesc.get() == "OK"
|
response.statusDesc.get() == "OK"
|
||||||
@ -79,7 +82,7 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||||
response2.requestId == filterUnsubscribeRequest.requestId
|
response2.requestId == filterUnsubscribeRequest.requestId
|
||||||
response2.statusCode == 200
|
response2.statusCode == 200
|
||||||
response2.statusDesc.get() == "OK"
|
response2.statusDesc.get() == "OK"
|
||||||
@ -105,9 +108,9 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions[peerId].len == 2
|
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest.contentTopics
|
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest.contentTopics)
|
||||||
response.requestId == filterSubscribeRequest.requestId
|
response.requestId == filterSubscribeRequest.requestId
|
||||||
response.statusCode == 200
|
response.statusCode == 200
|
||||||
response.statusDesc.get() == "OK"
|
response.statusDesc.get() == "OK"
|
||||||
@ -117,7 +120,7 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||||
response2.requestId == filterUnsubscribeAllRequest.requestId
|
response2.requestId == filterUnsubscribeAllRequest.requestId
|
||||||
response2.statusCode == 200
|
response2.statusCode == 200
|
||||||
response2.statusDesc.get() == "OK"
|
response2.statusDesc.get() == "OK"
|
||||||
@ -155,9 +158,9 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions[peerId].len == 1
|
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest1.contentTopics
|
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest1.contentTopics)
|
||||||
response1.requestId == filterSubscribeRequest1.requestId
|
response1.requestId == filterSubscribeRequest1.requestId
|
||||||
response1.statusCode == 200
|
response1.statusCode == 200
|
||||||
response1.statusDesc.get() == "OK"
|
response1.statusDesc.get() == "OK"
|
||||||
@ -167,11 +170,11 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions[peerId].len == 2
|
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 2
|
||||||
wakuFilter.getSubscribedContentTopics(peerId) ==
|
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId),
|
||||||
filterSubscribeRequest1.contentTopics &
|
filterSubscribeRequest1.contentTopics &
|
||||||
filterSubscribeRequest2.contentTopics
|
filterSubscribeRequest2.contentTopics)
|
||||||
response2.requestId == filterSubscribeRequest2.requestId
|
response2.requestId == filterSubscribeRequest2.requestId
|
||||||
response2.statusCode == 200
|
response2.statusCode == 200
|
||||||
response2.statusDesc.get() == "OK"
|
response2.statusDesc.get() == "OK"
|
||||||
@ -181,9 +184,9 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions[peerId].len == 1
|
wakuFilter.subscriptions.peersSubscribed[peerId].criteriaCount == 1
|
||||||
wakuFilter.getSubscribedContentTopics(peerId) == filterSubscribeRequest2.contentTopics
|
unorderedCompare(wakuFilter.getSubscribedContentTopics(peerId), filterSubscribeRequest2.contentTopics)
|
||||||
response3.requestId == filterUnsubscribeRequest1.requestId
|
response3.requestId == filterUnsubscribeRequest1.requestId
|
||||||
response3.statusCode == 200
|
response3.statusCode == 200
|
||||||
response3.statusDesc.get() == "OK"
|
response3.statusDesc.get() == "OK"
|
||||||
@ -193,7 +196,7 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 0 # peerId is removed from subscriptions
|
wakuFilter.subscriptions.subscribedPeerCount() == 0 # peerId is removed from subscriptions
|
||||||
response4.requestId == filterUnsubscribeRequest2.requestId
|
response4.requestId == filterUnsubscribeRequest2.requestId
|
||||||
response4.statusCode == 200
|
response4.statusCode == 200
|
||||||
response4.statusDesc.get() == "OK"
|
response4.statusDesc.get() == "OK"
|
||||||
@ -255,9 +258,9 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
|
|
||||||
# When
|
# When
|
||||||
let
|
let
|
||||||
filterCriteria = toSeq(1 .. MaxCriteriaPerSubscription + 1).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))
|
filterCriteria = toSeq(1 .. MaxFilterCriteriaPerPeer).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))
|
||||||
|
|
||||||
wakuFilter.subscriptions[peerId] = filterCriteria.toHashSet()
|
discard wakuFilter.subscriptions.addSubscription(peerId, filterCriteria.toHashSet())
|
||||||
|
|
||||||
let
|
let
|
||||||
reqTooManyFilterCriteria = createRequest(
|
reqTooManyFilterCriteria = createRequest(
|
||||||
@ -276,9 +279,11 @@ suite "Waku Filter - handling subscribe requests":
|
|||||||
## Max subscriptions exceeded
|
## Max subscriptions exceeded
|
||||||
|
|
||||||
# When
|
# When
|
||||||
wakuFilter.subscriptions.clear()
|
wakuFilter.subscriptions.removePeer(peerId)
|
||||||
for _ in 1 .. MaxTotalSubscriptions:
|
wakuFilter.subscriptions.cleanUp()
|
||||||
wakuFilter.subscriptions[PeerId.random().get()] = @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()
|
|
||||||
|
for _ in 1 .. MaxFilterPeers:
|
||||||
|
discard wakuFilter.subscriptions.addSubscription(PeerId.random().get(), @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet())
|
||||||
|
|
||||||
let
|
let
|
||||||
reqTooManySubscriptions = createRequest(
|
reqTooManySubscriptions = createRequest(
|
||||||
@ -443,10 +448,10 @@ suite "Waku Filter - subscription maintenance":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 3
|
wakuFilter.subscriptions.subscribedPeerCount() == 3
|
||||||
wakuFilter.subscriptions.hasKey(peerId1)
|
wakuFilter.subscriptions.isSubscribed(peerId1)
|
||||||
wakuFilter.subscriptions.hasKey(peerId2)
|
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||||
wakuFilter.subscriptions.hasKey(peerId3)
|
wakuFilter.subscriptions.isSubscribed(peerId3)
|
||||||
|
|
||||||
# When
|
# When
|
||||||
# Maintenance loop should leave all peers in peer store intact
|
# Maintenance loop should leave all peers in peer store intact
|
||||||
@ -454,10 +459,10 @@ suite "Waku Filter - subscription maintenance":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 3
|
wakuFilter.subscriptions.subscribedPeerCount() == 3
|
||||||
wakuFilter.subscriptions.hasKey(peerId1)
|
wakuFilter.subscriptions.isSubscribed(peerId1)
|
||||||
wakuFilter.subscriptions.hasKey(peerId2)
|
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||||
wakuFilter.subscriptions.hasKey(peerId3)
|
wakuFilter.subscriptions.isSubscribed(peerId3)
|
||||||
|
|
||||||
# When
|
# When
|
||||||
# Remove peerId1 and peerId3 from peer store
|
# Remove peerId1 and peerId3 from peer store
|
||||||
@ -467,8 +472,8 @@ suite "Waku Filter - subscription maintenance":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 1
|
wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||||
wakuFilter.subscriptions.hasKey(peerId2)
|
wakuFilter.subscriptions.isSubscribed(peerId2)
|
||||||
|
|
||||||
# When
|
# When
|
||||||
# Remove peerId2 from peer store
|
# Remove peerId2 from peer store
|
||||||
@ -477,4 +482,4 @@ suite "Waku Filter - subscription maintenance":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
wakuFilter.subscriptions.len == 0
|
wakuFilter.subscriptions.subscribedPeerCount() == 0
|
||||||
|
|||||||
@ -1,17 +1,21 @@
|
|||||||
import
|
import
|
||||||
std/[
|
std/[
|
||||||
options,
|
options,
|
||||||
tables,
|
tables,
|
||||||
sets
|
sets,
|
||||||
|
sequtils,
|
||||||
|
algorithm
|
||||||
],
|
],
|
||||||
chronos,
|
chronos,
|
||||||
chronicles
|
chronicles,
|
||||||
|
os
|
||||||
|
|
||||||
import
|
import
|
||||||
../../../waku/[
|
../../../waku/[
|
||||||
node/peer_manager,
|
node/peer_manager,
|
||||||
waku_filter_v2,
|
waku_filter_v2,
|
||||||
waku_filter_v2/client,
|
waku_filter_v2/client,
|
||||||
|
waku_filter_v2/subscriptions,
|
||||||
waku_core
|
waku_core
|
||||||
],
|
],
|
||||||
../testlib/[
|
../testlib/[
|
||||||
@ -20,10 +24,14 @@ import
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
proc newTestWakuFilter*(switch: Switch): Future[WakuFilter] {.async.} =
|
proc newTestWakuFilter*(switch: Switch,
|
||||||
|
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||||
|
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer):
|
||||||
|
Future[WakuFilter] {.async.} =
|
||||||
let
|
let
|
||||||
peerManager = PeerManager.new(switch)
|
peerManager = PeerManager.new(switch)
|
||||||
proto = WakuFilter.new(peerManager)
|
proto = WakuFilter.new(peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer)
|
||||||
|
|
||||||
await proto.start()
|
await proto.start()
|
||||||
switch.mount(proto)
|
switch.mount(proto)
|
||||||
@ -41,8 +49,21 @@ proc newTestWakuFilterClient*(switch: Switch): Future[WakuFilterClient] {.async.
|
|||||||
return proto
|
return proto
|
||||||
|
|
||||||
proc getSubscribedContentTopics*(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
proc getSubscribedContentTopics*(wakuFilter: WakuFilter, peerId: PeerId): seq[ContentTopic] =
|
||||||
var contentTopics: seq[ContentTopic]
|
var contentTopics: seq[ContentTopic] = @[]
|
||||||
for filterCriterion in wakuFilter.subscriptions[peerId]:
|
let peersCriteria = wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||||
contentTopics.add(filterCriterion[1])
|
|
||||||
|
for filterCriterion in peersCriteria:
|
||||||
|
contentTopics.add(filterCriterion.contentTopic)
|
||||||
|
|
||||||
return contentTopics
|
return contentTopics
|
||||||
|
|
||||||
|
proc unorderedCompare*[T](a, b: seq[T]): bool =
|
||||||
|
if a == b:
|
||||||
|
return true
|
||||||
|
|
||||||
|
var aSorted = a
|
||||||
|
var bSorted = b
|
||||||
|
aSorted.sort()
|
||||||
|
bSorted.sort()
|
||||||
|
|
||||||
|
return aSorted == bSorted
|
||||||
|
|||||||
@ -206,6 +206,7 @@ procSuite "WakuNode - Store":
|
|||||||
waitFor allFutures(client.start(), server.start(), filterSource.start())
|
waitFor allFutures(client.start(), server.start(), filterSource.start())
|
||||||
|
|
||||||
waitFor filterSource.mountFilter()
|
waitFor filterSource.mountFilter()
|
||||||
|
waitFor filterSource.mountLegacyFilter()
|
||||||
let driver = newSqliteArchiveDriver()
|
let driver = newSqliteArchiveDriver()
|
||||||
|
|
||||||
let mountArchiveRes = server.mountArchive(driver)
|
let mountArchiveRes = server.mountArchive(driver)
|
||||||
|
|||||||
@ -154,6 +154,7 @@ procSuite "Waku v2 JSON-RPC API - Admin":
|
|||||||
await client.connect("127.0.0.1", rpcPort, false)
|
await client.connect("127.0.0.1", rpcPort, false)
|
||||||
|
|
||||||
await node.mountFilter()
|
await node.mountFilter()
|
||||||
|
await node.mountLegacyFilter()
|
||||||
await node.mountFilterClient()
|
await node.mountFilterClient()
|
||||||
let driver: ArchiveDriver = QueueDriver.new()
|
let driver: ArchiveDriver = QueueDriver.new()
|
||||||
let mountArchiveRes = node.mountArchive(driver)
|
let mountArchiveRes = node.mountArchive(driver)
|
||||||
|
|||||||
@ -32,6 +32,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
|
|||||||
await allFutures(node1.start(), node2.start())
|
await allFutures(node1.start(), node2.start())
|
||||||
|
|
||||||
await node1.mountFilter()
|
await node1.mountFilter()
|
||||||
|
await node1.mountLegacyFilter()
|
||||||
await node2.mountFilterClient()
|
await node2.mountFilterClient()
|
||||||
|
|
||||||
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
node2.peerManager.addServicePeer(node1.peerInfo.toRemotePeerInfo(), WakuLegacyFilterCodec)
|
||||||
|
|||||||
@ -107,10 +107,11 @@ suite "Waku v2 Rest API - Admin":
|
|||||||
pubsubTopicNode2 = DefaultPubsubTopic
|
pubsubTopicNode2 = DefaultPubsubTopic
|
||||||
pubsubTopicNode3 = PubsubTopic("/waku/2/custom-waku/proto")
|
pubsubTopicNode3 = PubsubTopic("/waku/2/custom-waku/proto")
|
||||||
|
|
||||||
|
## TODO: Note that such checks may depend heavily on the order of the returned data!
|
||||||
expectedFilterData2 = fmt"(peerId: ""{$peerInfo2}"", filterCriteria:" &
|
expectedFilterData2 = fmt"(peerId: ""{$peerInfo2}"", filterCriteria:" &
|
||||||
fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}""), " &
|
fmt" @[(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " &
|
||||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[1]}""), " &
|
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}""), " &
|
||||||
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[2]}"")]"
|
fmt"(pubsubTopic: ""{pubsubTopicNode2}"", contentTopic: ""{contentFiltersNode2[0]}"")]"
|
||||||
|
|
||||||
expectedFilterData3 = fmt"(peerId: ""{$peerInfo3}"", filterCriteria:" &
|
expectedFilterData3 = fmt"(peerId: ""{$peerInfo3}"", filterCriteria:" &
|
||||||
fmt" @[(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[0]}""), " &
|
fmt" @[(pubsubTopic: ""{pubsubTopicNode3}"", contentTopic: ""{contentFiltersNode3[0]}""), " &
|
||||||
@ -151,4 +152,4 @@ suite "Waku v2 Rest API - Admin":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
getRes.status == 400
|
getRes.status == 400
|
||||||
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
getRes.data == "Error: Filter Protocol is not mounted to the node"
|
||||||
|
|||||||
@ -11,7 +11,6 @@ import
|
|||||||
../../waku/waku_core,
|
../../waku/waku_core,
|
||||||
../../waku/waku_node,
|
../../waku/waku_node,
|
||||||
../../waku/node/peer_manager,
|
../../waku/node/peer_manager,
|
||||||
../../waku/waku_filter,
|
|
||||||
../../waku/waku_api/rest/server,
|
../../waku/waku_api/rest/server,
|
||||||
../../waku/waku_api/rest/client,
|
../../waku/waku_api/rest/client,
|
||||||
../../waku/waku_api/rest/responses,
|
../../waku/waku_api/rest/responses,
|
||||||
|
|||||||
@ -50,6 +50,7 @@ proc setupRestFilter(): Future[RestFilterTest] {.async.} =
|
|||||||
await allFutures(result.filterNode.start(), result.clientNode.start())
|
await allFutures(result.filterNode.start(), result.clientNode.start())
|
||||||
|
|
||||||
await result.filterNode.mountFilter()
|
await result.filterNode.mountFilter()
|
||||||
|
await result.filterNode.mountLegacyFilter()
|
||||||
await result.clientNode.mountFilterClient()
|
await result.clientNode.mountFilterClient()
|
||||||
|
|
||||||
result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo()
|
result.clientNode.peerManager.addServicePeer(result.filterNode.peerInfo.toRemotePeerInfo()
|
||||||
@ -174,7 +175,7 @@ suite "Waku v2 Rest API - Filter":
|
|||||||
|
|
||||||
while msg == messages[i]:
|
while msg == messages[i]:
|
||||||
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
|
msg = fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1"))
|
||||||
|
|
||||||
messages.add(msg)
|
messages.add(msg)
|
||||||
|
|
||||||
restFilterTest.messageCache.contentSubscribe(contentTopic)
|
restFilterTest.messageCache.contentSubscribe(contentTopic)
|
||||||
|
|||||||
@ -35,6 +35,7 @@ import
|
|||||||
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
|
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
|
||||||
../waku_filter_v2,
|
../waku_filter_v2,
|
||||||
../waku_filter_v2/client as filter_client,
|
../waku_filter_v2/client as filter_client,
|
||||||
|
../waku_filter_v2/subscriptions as filter_subscriptions,
|
||||||
../waku_lightpush,
|
../waku_lightpush,
|
||||||
../waku_metadata,
|
../waku_metadata,
|
||||||
../waku_lightpush/client as lightpush_client,
|
../waku_lightpush/client as lightpush_client,
|
||||||
@ -67,9 +68,6 @@ const git_version* {.strdefine.} = "n/a"
|
|||||||
# Default clientId
|
# Default clientId
|
||||||
const clientId* = "Nimbus Waku v2 node"
|
const clientId* = "Nimbus Waku v2 node"
|
||||||
|
|
||||||
# Default Waku Filter Timeout
|
|
||||||
const WakuFilterTimeout: Duration = 1.days
|
|
||||||
|
|
||||||
const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
const WakuNodeVersionString* = "version / git commit hash: " & git_version
|
||||||
|
|
||||||
# key and crypto modules different
|
# key and crypto modules different
|
||||||
@ -188,7 +186,7 @@ proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], s
|
|||||||
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
||||||
if not node.wakuMetadata.isNil():
|
if not node.wakuMetadata.isNil():
|
||||||
return err("Waku metadata already mounted, skipping")
|
return err("Waku metadata already mounted, skipping")
|
||||||
|
|
||||||
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
|
let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue)
|
||||||
|
|
||||||
node.wakuMetadata = metadata
|
node.wakuMetadata = metadata
|
||||||
@ -403,18 +401,36 @@ proc mountRelay*(node: WakuNode,
|
|||||||
|
|
||||||
## Waku filter
|
## Waku filter
|
||||||
|
|
||||||
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout)
|
proc mountLegacyFilter*(node: WakuNode, filterTimeout: Duration = WakuLegacyFilterTimeout)
|
||||||
{.async, raises: [Defect, LPError]} =
|
{.async, raises: [Defect, LPError]} =
|
||||||
|
## Mounting legacy filter protocol with separation from new v2 filter protocol for easier removal later
|
||||||
|
## TODO: remove legacy filter protocol
|
||||||
|
|
||||||
|
info "mounting legacy filter protocol"
|
||||||
|
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout)
|
||||||
|
|
||||||
|
if node.started:
|
||||||
|
await node.wakuFilterLegacy.start() #TODO: remove legacy
|
||||||
|
|
||||||
|
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec))
|
||||||
|
|
||||||
|
proc mountFilter*(node: WakuNode,
|
||||||
|
subscriptionTimeout: Duration = filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
|
||||||
|
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer)
|
||||||
|
{.async, raises: [Defect, LPError]} =
|
||||||
|
## Mounting filter v2 protocol
|
||||||
|
|
||||||
info "mounting filter protocol"
|
info "mounting filter protocol"
|
||||||
node.wakuFilter = WakuFilter.new(node.peerManager)
|
node.wakuFilter = WakuFilter.new(node.peerManager,
|
||||||
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy
|
subscriptionTimeout,
|
||||||
|
maxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer)
|
||||||
|
|
||||||
if node.started:
|
if node.started:
|
||||||
await node.wakuFilter.start()
|
await node.wakuFilter.start()
|
||||||
await node.wakuFilterLegacy.start() #TODO: remove legacy
|
|
||||||
|
|
||||||
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
|
||||||
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuLegacyFilterCodec)) #TODO: remove legacy
|
|
||||||
|
|
||||||
proc filterHandleMessage*(node: WakuNode,
|
proc filterHandleMessage*(node: WakuNode,
|
||||||
pubsubTopic: PubsubTopic,
|
pubsubTopic: PubsubTopic,
|
||||||
|
|||||||
35
waku/utils/tableutils.nim
Normal file
35
waku/utils/tableutils.nim
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import std/tables,
|
||||||
|
stew/objects,
|
||||||
|
stew/templateutils
|
||||||
|
|
||||||
|
template keepItIf*[A, B](tableParam: var Table[A, B], itPredicate: untyped) =
|
||||||
|
bind evalTemplateParamOnce
|
||||||
|
evalTemplateParamOnce(tableParam, t):
|
||||||
|
var itemsToDelete: seq[A]
|
||||||
|
var key {.inject.} : A
|
||||||
|
var val {.inject.} : B
|
||||||
|
|
||||||
|
for k, v in t.mpairs():
|
||||||
|
key = k
|
||||||
|
val = v
|
||||||
|
if not itPredicate:
|
||||||
|
itemsToDelete.add(key)
|
||||||
|
|
||||||
|
for item in itemsToDelete:
|
||||||
|
t.del(item)
|
||||||
|
|
||||||
|
template keepItIf*[A, B](tableParam: var TableRef[A, B], itPredicate: untyped) =
|
||||||
|
bind evalTemplateParamOnce
|
||||||
|
evalTemplateParamOnce(tableParam, t):
|
||||||
|
var itemsToDelete: seq[A]
|
||||||
|
let key {.inject.} : A
|
||||||
|
let val {.inject.} : B
|
||||||
|
|
||||||
|
for k, v in t[].mpairs():
|
||||||
|
key = k
|
||||||
|
val = v
|
||||||
|
if not itPredicate:
|
||||||
|
itemsToDelete.add(key)
|
||||||
|
|
||||||
|
for item in itemsToDelete:
|
||||||
|
t[].del(item)
|
||||||
@ -121,9 +121,10 @@ proc installAdminV1GetFilterSubsHandler(router: var RestRouter, node: WakuNode)
|
|||||||
subscriptions: seq[FilterSubscription] = @[]
|
subscriptions: seq[FilterSubscription] = @[]
|
||||||
filterCriteria: seq[FilterTopic]
|
filterCriteria: seq[FilterTopic]
|
||||||
|
|
||||||
for (peerId, criteria) in node.wakuFilter.subscriptions.pairs():
|
for peerId in node.wakuFilter.subscriptions.peersSubscribed.keys:
|
||||||
filterCriteria = criteria.toSeq().mapIt(FilterTopic(pubsubTopic: it[0],
|
filterCriteria = node.wakuFilter.subscriptions.getPeerSubscriptions(peerId)
|
||||||
contentTopic: it[1]))
|
.mapIt(FilterTopic(pubsubTopic: it[0],
|
||||||
|
contentTopic: it[1]))
|
||||||
|
|
||||||
subscriptions.add(FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria))
|
subscriptions.add(FilterSubscription(peerId: $peerId, filterCriteria: filterCriteria))
|
||||||
|
|
||||||
|
|||||||
@ -22,7 +22,7 @@ logScope:
|
|||||||
const
|
const
|
||||||
WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
WakuLegacyFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||||
|
|
||||||
WakuFilterTimeout: Duration = 2.hours
|
WakuLegacyFilterTimeout*: Duration = 2.hours
|
||||||
|
|
||||||
|
|
||||||
type WakuFilterResult*[T] = Result[T, string]
|
type WakuFilterResult*[T] = Result[T, string]
|
||||||
@ -113,7 +113,7 @@ proc initProtocolHandler(wf: WakuFilterLegacy) =
|
|||||||
proc new*(T: type WakuFilterLegacy,
|
proc new*(T: type WakuFilterLegacy,
|
||||||
peerManager: PeerManager,
|
peerManager: PeerManager,
|
||||||
rng: ref rand.HmacDrbgContext,
|
rng: ref rand.HmacDrbgContext,
|
||||||
timeout: Duration = WakuFilterTimeout): T =
|
timeout: Duration = WakuLegacyFilterTimeout): T =
|
||||||
let wf = WakuFilterLegacy(rng: rng,
|
let wf = WakuFilterLegacy(rng: rng,
|
||||||
peerManager: peerManager,
|
peerManager: peerManager,
|
||||||
timeout: timeout)
|
timeout: timeout)
|
||||||
@ -123,7 +123,7 @@ proc new*(T: type WakuFilterLegacy,
|
|||||||
proc init*(T: type WakuFilterLegacy,
|
proc init*(T: type WakuFilterLegacy,
|
||||||
peerManager: PeerManager,
|
peerManager: PeerManager,
|
||||||
rng: ref rand.HmacDrbgContext,
|
rng: ref rand.HmacDrbgContext,
|
||||||
timeout: Duration = WakuFilterTimeout): T {.
|
timeout: Duration = WakuLegacyFilterTimeout): T {.
|
||||||
deprecated: "WakuFilterLegacy.new()' instead".} =
|
deprecated: "WakuFilterLegacy.new()' instead".} =
|
||||||
WakuFilterLegacy.new(peerManager, rng, timeout)
|
WakuFilterLegacy.new(peerManager, rng, timeout)
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,9 @@
|
|||||||
import
|
import
|
||||||
./waku_filter_v2/common,
|
./waku_filter_v2/common,
|
||||||
./waku_filter_v2/protocol
|
./waku_filter_v2/protocol,
|
||||||
|
./waku_filter_v2/subscriptions
|
||||||
|
|
||||||
export
|
export
|
||||||
common,
|
common,
|
||||||
protocol
|
protocol,
|
||||||
|
subscriptions
|
||||||
|
|||||||
@ -25,7 +25,7 @@ logScope:
|
|||||||
topics = "waku filter"
|
topics = "waku filter"
|
||||||
|
|
||||||
const
|
const
|
||||||
MaxContentTopicsPerRequest* = 30
|
MaxContentTopicsPerRequest* = 100
|
||||||
|
|
||||||
type
|
type
|
||||||
WakuFilter* = ref object of LPProtocol
|
WakuFilter* = ref object of LPProtocol
|
||||||
@ -36,82 +36,69 @@ type
|
|||||||
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
trace "pinging subscriber", peerId=peerId
|
trace "pinging subscriber", peerId=peerId
|
||||||
|
|
||||||
if peerId notin wf.subscriptions:
|
if not wf.subscriptions.isSubscribed(peerId):
|
||||||
debug "pinging peer has no subscriptions", peerId=peerId
|
debug "pinging peer has no subscriptions", peerId=peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
|
wf.subscriptions.refreshSubscription(peerId)
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
proc subscribe(wf: WakuFilter,
|
||||||
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
peerId: PeerID,
|
||||||
|
pubsubTopic: Option[PubsubTopic],
|
||||||
|
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||||
|
|
||||||
|
# TODO: check if this condition is valid???
|
||||||
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||||
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||||
|
|
||||||
if contentTopics.len() > MaxContentTopicsPerRequest:
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||||
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " &
|
||||||
|
$MaxContentTopicsPerRequest))
|
||||||
|
|
||||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
if peerId in wf.subscriptions:
|
wf.subscriptions.addSubscription(peerId, filterCriteria).isOkOr:
|
||||||
# We already have a subscription for this peer. Try to add the new filter criteria.
|
return err(FilterSubscribeError.serviceUnavailable(error))
|
||||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
|
||||||
if peerSubscription.len() + filterCriteria.len() > MaxCriteriaPerSubscription:
|
|
||||||
return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria"))
|
|
||||||
|
|
||||||
peerSubscription.incl(filterCriteria)
|
|
||||||
wf.subscriptions[peerId] = peerSubscription
|
|
||||||
else:
|
|
||||||
# We don't have a subscription for this peer yet. Try to add it.
|
|
||||||
if wf.subscriptions.len() >= MaxTotalSubscriptions:
|
|
||||||
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
|
|
||||||
debug "creating new subscription", peerId=peerId
|
|
||||||
wf.subscriptions[peerId] = filterCriteria
|
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
proc unsubscribe(wf: WakuFilter,
|
||||||
if pubsubTopic.isNone() or contentTopics.len() == 0:
|
peerId: PeerID,
|
||||||
|
pubsubTopic: Option[PubsubTopic],
|
||||||
|
contentTopics: seq[ContentTopic]): FilterSubscribeResult =
|
||||||
|
if pubsubTopic.isNone() or contentTopics.len == 0:
|
||||||
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))
|
||||||
|
|
||||||
if contentTopics.len() > MaxContentTopicsPerRequest:
|
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||||
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))
|
||||||
|
|
||||||
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
|
||||||
|
|
||||||
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
debug "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
|
||||||
|
|
||||||
if peerId notin wf.subscriptions:
|
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
|
||||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
|
|
||||||
|
|
||||||
if not peerSubscription.containsAny(filterCriteria):
|
|
||||||
debug "unsubscribing peer is not subscribed to any of the content topics in this pubsub topic", peerId=peerId, pubsubTopic=pubsubTopic.get(), contentTopics=contentTopics
|
|
||||||
return err(FilterSubscribeError.notFound())
|
|
||||||
|
|
||||||
peerSubscription.excl(filterCriteria)
|
|
||||||
|
|
||||||
if peerSubscription.len() == 0:
|
|
||||||
debug "peer has no more subscriptions, removing subscription", peerId=peerId
|
|
||||||
wf.subscriptions.del(peerId)
|
|
||||||
else:
|
|
||||||
wf.subscriptions[peerId] = peerSubscription
|
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
|
||||||
if peerId notin wf.subscriptions:
|
if not wf.subscriptions.isSubscribed(peerId):
|
||||||
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
debug "unsubscribing peer has no subscriptions", peerId=peerId
|
||||||
return err(FilterSubscribeError.notFound())
|
return err(FilterSubscribeError.notFound())
|
||||||
|
|
||||||
debug "removing peer subscription", peerId=peerId
|
debug "removing peer subscription", peerId=peerId
|
||||||
wf.subscriptions.del(peerId)
|
wf.subscriptions.removePeer(peerId)
|
||||||
|
wf.subscriptions.cleanUp()
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubscribeRequest): FilterSubscribeResponse =
|
proc handleSubscribeRequest*(wf: WakuFilter,
|
||||||
|
peerId: PeerId,
|
||||||
|
request: FilterSubscribeRequest): FilterSubscribeResponse =
|
||||||
info "received filter subscribe request", peerId=peerId, request=request
|
info "received filter subscribe request", peerId=peerId, request=request
|
||||||
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
waku_filter_requests.inc(labelValues = [$request.filterSubscribeType])
|
||||||
|
|
||||||
@ -134,7 +121,8 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs
|
|||||||
let
|
let
|
||||||
requestDuration = Moment.now() - requestStartTime
|
requestDuration = Moment.now() - requestStartTime
|
||||||
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
|
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
|
||||||
waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType])
|
waku_filter_request_duration_seconds.observe(
|
||||||
|
requestDurationSec, labelValues = [$request.filterSubscribeType])
|
||||||
|
|
||||||
if subscribeResult.isErr():
|
if subscribeResult.isErr():
|
||||||
return FilterSubscribeResponse(
|
return FilterSubscribeResponse(
|
||||||
@ -153,6 +141,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
|||||||
trace "no addresses for peer", peer=peer
|
trace "no addresses for peer", peer=peer
|
||||||
return
|
return
|
||||||
|
|
||||||
|
## TODO: Check if dial is necessary always???
|
||||||
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec)
|
||||||
if conn.isNone():
|
if conn.isNone():
|
||||||
## We do not remove this peer, but allow the underlying peer manager
|
## We do not remove this peer, but allow the underlying peer manager
|
||||||
@ -163,7 +152,10 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
|||||||
await conn.get().writeLp(buffer)
|
await conn.get().writeLp(buffer)
|
||||||
|
|
||||||
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
|
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} =
|
||||||
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic,
|
||||||
|
contentTopic=messagePush.wakuMessage.contentTopic,
|
||||||
|
peers=peers,
|
||||||
|
hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||||
|
|
||||||
let bufferToPublish = messagePush.encode().buffer
|
let bufferToPublish = messagePush.encode().buffer
|
||||||
|
|
||||||
@ -179,18 +171,18 @@ proc maintainSubscriptions*(wf: WakuFilter) =
|
|||||||
|
|
||||||
## Remove subscriptions for peers that have been removed from peer store
|
## Remove subscriptions for peers that have been removed from peer store
|
||||||
var peersToRemove: seq[PeerId]
|
var peersToRemove: seq[PeerId]
|
||||||
for peerId, peerSubscription in wf.subscriptions.pairs():
|
for peerId in wf.subscriptions.peersSubscribed.keys:
|
||||||
## TODO: currently we only maintain by syncing with peer store. We could
|
|
||||||
## consider other metrics, such as subscription age, activity, etc.
|
|
||||||
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec):
|
||||||
debug "peer has been removed from peer store, removing subscription", peerId=peerId
|
debug "peer has been removed from peer store, removing subscription", peerId=peerId
|
||||||
peersToRemove.add(peerId)
|
peersToRemove.add(peerId)
|
||||||
|
|
||||||
if peersToRemove.len() > 0:
|
if peersToRemove.len > 0:
|
||||||
wf.subscriptions.removePeers(peersToRemove)
|
wf.subscriptions.removePeers(peersToRemove)
|
||||||
|
|
||||||
|
wf.subscriptions.cleanUp()
|
||||||
|
|
||||||
## Periodic report of number of subscriptions
|
## Periodic report of number of subscriptions
|
||||||
waku_filter_subscriptions.set(wf.subscriptions.len().float64)
|
waku_filter_subscriptions.set(wf.subscriptions.peersSubscribed.len.float64)
|
||||||
|
|
||||||
const MessagePushTimeout = 20.seconds
|
const MessagePushTimeout = 20.seconds
|
||||||
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
|
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
|
||||||
@ -201,7 +193,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
|||||||
block:
|
block:
|
||||||
## Find subscribers and push message to them
|
## Find subscribers and push message to them
|
||||||
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
||||||
if subscribedPeers.len() == 0:
|
if subscribedPeers.len == 0:
|
||||||
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
|
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -210,11 +202,15 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
|
|||||||
wakuMessage: message)
|
wakuMessage: message)
|
||||||
|
|
||||||
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
|
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
|
||||||
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic,
|
||||||
|
contentTopic=message.contentTopic,
|
||||||
|
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||||
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
||||||
else:
|
else:
|
||||||
debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
debug "pushed message succesfully to all subscribers",
|
||||||
|
pubsubTopic=pubsubTopic,
|
||||||
|
contentTopic=message.contentTopic,
|
||||||
|
hash=pubsubTopic.computeMessageHash(message).to0xHex()
|
||||||
|
|
||||||
let
|
let
|
||||||
handleMessageDuration = Moment.now() - handleMessageStartTime
|
handleMessageDuration = Moment.now() - handleMessageStartTime
|
||||||
@ -247,13 +243,21 @@ proc initProtocolHandler(wf: WakuFilter) =
|
|||||||
wf.codec = WakuFilterSubscribeCodec
|
wf.codec = WakuFilterSubscribeCodec
|
||||||
|
|
||||||
proc new*(T: type WakuFilter,
|
proc new*(T: type WakuFilter,
|
||||||
peerManager: PeerManager): T =
|
peerManager: PeerManager,
|
||||||
|
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||||
|
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): T =
|
||||||
|
|
||||||
let wf = WakuFilter(
|
let wf = WakuFilter(
|
||||||
|
subscriptions: FilterSubscriptions.init(subscriptionTimeout,
|
||||||
|
maxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer
|
||||||
|
),
|
||||||
peerManager: peerManager
|
peerManager: peerManager
|
||||||
)
|
)
|
||||||
|
|
||||||
wf.initProtocolHandler()
|
wf.initProtocolHandler()
|
||||||
wf
|
return wf
|
||||||
|
|
||||||
const MaintainSubscriptionsInterval* = 1.minutes
|
const MaintainSubscriptionsInterval* = 1.minutes
|
||||||
|
|
||||||
@ -278,4 +282,5 @@ method stop*(wf: WakuFilter) {.async.} =
|
|||||||
debug "stopping filter protocol"
|
debug "stopping filter protocol"
|
||||||
if not wf.maintenanceTask.isNil():
|
if not wf.maintenanceTask.isNil():
|
||||||
wf.maintenanceTask.clearTimer()
|
wf.maintenanceTask.clearTimer()
|
||||||
|
|
||||||
await procCall LPProtocol(wf).stop()
|
await procCall LPProtocol(wf).stop()
|
||||||
|
|||||||
@ -6,48 +6,163 @@ else:
|
|||||||
import
|
import
|
||||||
std/[sets,tables],
|
std/[sets,tables],
|
||||||
chronicles,
|
chronicles,
|
||||||
libp2p/peerid
|
chronos,
|
||||||
|
libp2p/peerid,
|
||||||
|
stew/shims/sets
|
||||||
import
|
import
|
||||||
../waku_core
|
../waku_core,
|
||||||
|
../utils/tableutils
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku filter subscriptions"
|
topics = "waku filter subscriptions"
|
||||||
|
|
||||||
const
|
const
|
||||||
MaxTotalSubscriptions* = 1000 # TODO make configurable
|
MaxFilterPeers* = 1000
|
||||||
MaxCriteriaPerSubscription* = 1000
|
MaxFilterCriteriaPerPeer* = 1000
|
||||||
|
DefaultSubscriptionTimeToLiveSec* = 5.minutes
|
||||||
|
|
||||||
type
|
type
|
||||||
FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic
|
# a single filter criterion is fully defined by a pubsub topic and content topic
|
||||||
|
FilterCriterion* = tuple
|
||||||
|
pubsubTopic: PubsubTopic
|
||||||
|
contentTopic: ContentTopic
|
||||||
|
|
||||||
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
|
FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria
|
||||||
FilterSubscriptions* = Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria
|
|
||||||
|
|
||||||
proc findSubscribedPeers*(subscriptions: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] =
|
SubscribedPeers* = HashSet[PeerID] # a sequence of peer ids
|
||||||
## Find all peers subscribed to a given topic and content topic
|
|
||||||
let filterCriterion = (pubsubTopic, contentTopic)
|
|
||||||
|
|
||||||
var subscribedPeers: seq[PeerID]
|
PeerData* = tuple
|
||||||
|
lastSeen: Moment
|
||||||
|
criteriaCount: uint
|
||||||
|
|
||||||
# TODO: for large maps, this can be optimized using a reverse index
|
FilterSubscriptions* = object
|
||||||
for (peerId, criteria) in subscriptions.pairs():
|
peersSubscribed* : Table[PeerID, PeerData]
|
||||||
if filterCriterion in criteria:
|
subscriptions : Table[FilterCriterion, SubscribedPeers]
|
||||||
subscribedPeers.add(peerId)
|
subscriptionTimeout : Duration
|
||||||
|
maxPeers : uint
|
||||||
|
maxCriteriaPerPeer : uint
|
||||||
|
|
||||||
subscribedPeers
|
proc init*(T: type FilterSubscriptions,
|
||||||
|
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
|
||||||
|
maxFilterPeers: uint32 = MaxFilterPeers,
|
||||||
|
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer): FilterSubscriptions =
|
||||||
|
## Create a new filter subscription object
|
||||||
|
return FilterSubscriptions(
|
||||||
|
peersSubscribed: initTable[PeerID, PeerData](),
|
||||||
|
subscriptions: initTable[FilterCriterion, SubscribedPeers](),
|
||||||
|
subscriptionTimeout: subscriptionTimeout,
|
||||||
|
maxPeers: maxFilterPeers,
|
||||||
|
maxCriteriaPerPeer: maxFilterCriteriaPerPeer
|
||||||
|
)
|
||||||
|
|
||||||
proc removePeer*(subscriptions: var FilterSubscriptions, peerId: PeerID) =
|
proc isSubscribed*(s: var FilterSubscriptions, peerId: PeerID): bool =
|
||||||
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
|
return Moment.now() - data.lastSeen <= s.subscriptionTimeout
|
||||||
|
|
||||||
|
return false
|
||||||
|
|
||||||
|
proc subscribedPeerCount*(s: FilterSubscriptions): uint =
|
||||||
|
return cast[uint](s.peersSubscribed.len)
|
||||||
|
|
||||||
|
proc getPeerSubscriptions*(s: var FilterSubscriptions, peerId: PeerID): seq[FilterCriterion] =
|
||||||
|
## Get all pubsub-content topics a peer is subscribed to
|
||||||
|
var subscribedContentTopics: seq[FilterCriterion] = @[]
|
||||||
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
|
if data.criteriaCount == 0:
|
||||||
|
return subscribedContentTopics
|
||||||
|
|
||||||
|
for filterCriterion, subscribedPeers in s.subscriptions.mpairs:
|
||||||
|
if peerId in subscribedPeers:
|
||||||
|
subscribedContentTopics.add(filterCriterion)
|
||||||
|
|
||||||
|
return subscribedContentTopics
|
||||||
|
|
||||||
|
proc findSubscribedPeers*(s: var FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] =
|
||||||
|
let filterCriterion : FilterCriterion = (pubsubTopic, contentTopic)
|
||||||
|
|
||||||
|
var foundPeers : seq[PeerID] = @[]
|
||||||
|
# only peers subscribed to criteria and with legit subscription is counted
|
||||||
|
s.subscriptions.withValue(filterCriterion, peers):
|
||||||
|
for peer in peers[]:
|
||||||
|
if s.isSubscribed(peer):
|
||||||
|
foundPeers.add(peer)
|
||||||
|
|
||||||
|
return foundPeers
|
||||||
|
|
||||||
|
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||||
## Remove all subscriptions for a given peer
|
## Remove all subscriptions for a given peer
|
||||||
subscriptions.del(peerId)
|
s.peersSubscribed.del(peerId)
|
||||||
|
|
||||||
proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
||||||
## Remove all subscriptions for a given list of peers
|
## Remove all subscriptions for a given list of peers
|
||||||
for peerId in peerIds:
|
s.peersSubscribed.keepItIf(key notin peerIds)
|
||||||
subscriptions.removePeer(peerId)
|
|
||||||
|
proc cleanUp*(fs: var FilterSubscriptions) =
|
||||||
|
## Remove all subscriptions for peers that have not been seen for a while
|
||||||
|
let now = Moment.now()
|
||||||
|
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
||||||
|
|
||||||
|
var filtersToRemove: seq[FilterCriterion] = @[]
|
||||||
|
for filterCriterion, subscribedPeers in fs.subscriptions.mpairs:
|
||||||
|
subscribedPeers.keepItIf(fs.isSubscribed(it)==true)
|
||||||
|
|
||||||
|
fs.subscriptions.keepItIf(val.len > 0)
|
||||||
|
|
||||||
|
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||||
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
|
data.lastSeen = Moment.now()
|
||||||
|
|
||||||
|
proc addSubscription*(s: var FilterSubscriptions, peerId: PeerID, filterCriteria: FilterCriteria): Result[void, string] =
|
||||||
|
## Add a subscription for a given peer
|
||||||
|
var peerData: ptr PeerData
|
||||||
|
|
||||||
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
|
if data.criteriaCount + cast[uint](filterCriteria.len) > s.maxCriteriaPerPeer:
|
||||||
|
return err("peer has reached maximum number of filter criteria")
|
||||||
|
|
||||||
|
data.lastSeen = Moment.now()
|
||||||
|
peerData = data
|
||||||
|
|
||||||
|
do:
|
||||||
|
## not yet subscribed
|
||||||
|
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
||||||
|
return err("node has reached maximum number of subscriptions")
|
||||||
|
|
||||||
|
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
||||||
|
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
||||||
|
|
||||||
|
for filterCriterion in filterCriteria:
|
||||||
|
var peersOfSub = addr(s.subscriptions.mgetOrPut(filterCriterion, SubscribedPeers()))
|
||||||
|
if peerId notin peersOfSub[]:
|
||||||
|
peersOfSub[].incl(peerId)
|
||||||
|
peerData.criteriaCount += 1
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc removeSubscription*(s: var FilterSubscriptions,
|
||||||
|
peerId: PeerID,
|
||||||
|
filterCriteria: FilterCriteria):
|
||||||
|
Result[void, string] =
|
||||||
|
## Remove a subscription for a given peer
|
||||||
|
|
||||||
|
s.peersSubscribed.withValue(peerId, peerData):
|
||||||
|
peerData.lastSeen = Moment.now()
|
||||||
|
for filterCriterion in filterCriteria:
|
||||||
|
s.subscriptions.withValue(filterCriterion, peers):
|
||||||
|
if peers[].missingOrexcl(peerId) == false:
|
||||||
|
peerData.criteriaCount -= 1
|
||||||
|
|
||||||
|
if peers[].len == 0:
|
||||||
|
s.subscriptions.del(filterCriterion)
|
||||||
|
if peerData.criteriaCount == 0:
|
||||||
|
s.peersSubscribed.del(peerId)
|
||||||
|
do:
|
||||||
|
## Maybe let just run through and log it as a warning
|
||||||
|
return err("Peer was not subscribed to criterion")
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
do:
|
||||||
|
return err("Peer has no subscriptions")
|
||||||
|
|
||||||
|
|
||||||
proc containsAny*(criteria: FilterCriteria, otherCriteria: FilterCriteria): bool =
|
|
||||||
## Check if a given pubsub topic is contained in a set of filter criteria
|
|
||||||
## TODO: Better way to achieve this?
|
|
||||||
for criterion in otherCriteria:
|
|
||||||
if criterion in criteria:
|
|
||||||
return true
|
|
||||||
false
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user