feat: integrate new filter protocol, other improvements (#1637)

This commit is contained in:
Hanno Cornelius 2023-04-11 10:12:54 +02:00 committed by GitHub
parent 1cfb251b65
commit 418efca27f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 497 additions and 279 deletions

View File

@ -32,6 +32,10 @@ import
./v2/waku_relay/test_waku_relay,
./v2/waku_relay/test_wakunode_relay
# Waku filter test suite
import
./v2/waku_filter_v2/test_waku_filter,
./v2/waku_filter_v2/test_waku_filter_protocol
import
# Waku v2 tests

View File

@ -15,20 +15,20 @@ import
./testlib/wakucore
proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} =
proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilterLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(peerManager, rng, timeout)
proto = WakuFilterLegacy.new(peerManager, rng, timeout)
await proto.start()
switch.mount(proto)
return proto
proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} =
proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClientLegacy] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilterClient.new(peerManager, rng)
proto = WakuFilterClientLegacy.new(peerManager, rng)
await proto.start()
switch.mount(proto)

View File

@ -211,6 +211,9 @@ suite "Waku Filter - end to end":
check:
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())
asyncTest "subscribe to multiple content topics and unsubscribe all":
# Given
var
@ -373,3 +376,6 @@ suite "Waku Filter - end to end":
check:
pushedMsgPubsubTopic3 == DefaultPubsubTopic
pushedMsg3 == msg3
# Teardown
await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop())

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options,sets,strutils,tables],
std/[options,sequtils,sets,strutils,tables],
testutils/unittests,
chronos,
chronicles,
@ -10,6 +10,7 @@ import
../../../waku/v2/node/peer_manager,
../../../waku/v2/protocol/waku_filter_v2,
../../../waku/v2/protocol/waku_filter_v2/rpc,
../../../waku/v2/protocol/waku_filter_v2/subscriptions,
../../../waku/v2/protocol/waku_message,
../testlib/common,
../testlib/wakucore
@ -197,6 +198,187 @@ suite "Waku Filter - handling subscribe requests":
response4.statusCode == 200
response4.statusDesc.get() == "OK"
asyncTest "subscribe errors":
## Tests most common error paths while subscribing
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic]
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[]
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it]))
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Max filter criteria exceeded
# When
let
filterCriteria = toSeq(1 .. MaxCriteriaPerSubscription + 1).mapIt((DefaultPubsubTopic, ContentTopic("/waku/2/content-$#/proto" % [$it])))
wakuFilter.subscriptions[peerId] = filterCriteria.toHashSet()
let
reqTooManyFilterCriteria = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyFilterCriteria)
# Then
check:
response4.requestId == reqTooManyFilterCriteria.requestId
response4.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response4.statusDesc.get().contains("peer has reached maximum number of filter criteria")
## Max subscriptions exceeded
# When
wakuFilter.subscriptions.clear()
for _ in 1 .. MaxTotalSubscriptions:
wakuFilter.subscriptions[PeerId.random().get()] = @[(DefaultPubsubTopic, DefaultContentTopic)].toHashSet()
let
reqTooManySubscriptions = createRequest(
filterSubscribeType = FilterSubscribeType.SUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqTooManySubscriptions)
# Then
check:
response5.requestId == reqTooManySubscriptions.requestId
response5.statusCode == FilterSubscribeErrorKind.SERVICE_UNAVAILABLE.uint32
response5.statusDesc.get().contains("node has reached maximum number of subscriptions")
asyncTest "unsubscribe errors":
## Tests most common error paths while unsubscribing
# Given
let
switch = newStandardSwitch()
wakuFilter = newTestWakuFilter(switch)
peerId = PeerId.random().get()
## Incomplete filter criteria
# When
let
reqNoPubsubTopic = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = none(PubsubTopic),
contentTopics = @[DefaultContentTopic]
)
reqNoContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[]
)
response1 = wakuFilter.handleSubscribeRequest(peerId, reqNoPubsubTopic)
response2 = wakuFilter.handleSubscribeRequest(peerId, reqNoContentTopics)
# Then
check:
response1.requestId == reqNoPubsubTopic.requestId
response2.requestId == reqNoContentTopics.requestId
response1.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response2.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response1.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
response2.statusDesc.get().contains("pubsubTopic and contentTopics must be specified")
## Max content topics per request exceeded
# When
let
contentTopics = toSeq(1 .. MaxContentTopicsPerRequest + 1).mapIt(ContentTopic("/waku/2/content-$#/proto" % [$it]))
reqTooManyContentTopics = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = contentTopics
)
response3 = wakuFilter.handleSubscribeRequest(peerId, reqTooManyContentTopics)
# Then
check:
response3.requestId == reqTooManyContentTopics.requestId
response3.statusCode == FilterSubscribeErrorKind.BAD_REQUEST.uint32
response3.statusDesc.get().contains("exceeds maximum content topics")
## Subscription not found - unsubscribe
# When
let
reqSubscriptionNotFound = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE,
pubsubTopic = some(DefaultPubsubTopic),
contentTopics = @[DefaultContentTopic]
)
response4 = wakuFilter.handleSubscribeRequest(peerId, reqSubscriptionNotFound)
# Then
check:
response4.requestId == reqSubscriptionNotFound.requestId
response4.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response4.statusDesc.get().contains("peer has no subscriptions")
## Subscription not found - unsubscribe all
# When
let
reqUnsubscribeAll = createRequest(
filterSubscribeType = FilterSubscribeType.UNSUBSCRIBE_ALL
)
response5 = wakuFilter.handleSubscribeRequest(peerId, reqUnsubscribeAll)
# Then
check:
response5.requestId == reqUnsubscribeAll.requestId
response5.statusCode == FilterSubscribeErrorKind.NOT_FOUND.uint32
response5.statusDesc.get().contains("peer has no subscriptions")
asyncTest "ping subscriber":
# Given
let

View File

@ -230,7 +230,7 @@ procSuite "WakuNode - Store":
await sleepAsync(100.millis)
# Send filter push message to server from source node
await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
await filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message)
# Wait for the server filter to receive the push message
require await filterFut.withTimeout(5.seconds)

View File

@ -58,7 +58,7 @@ procSuite "Waku v2 JSON-RPC API - Filter":
check:
# Light node has not yet subscribed to any filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0
let contentFilters = @[
ContentFilter(contentTopic: DefaultContentTopic),
@ -70,13 +70,13 @@ procSuite "Waku v2 JSON-RPC API - Filter":
check:
response == true
# Light node has successfully subscribed to 4 content topics
node2.wakuFilterClient.getSubscriptionsCount() == 4
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 4
response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic))
check:
response == true
# Light node has successfully unsubscribed from all filters
node2.wakuFilterClient.getSubscriptionsCount() == 0
node2.wakuFilterClientLegacy.getSubscriptionsCount() == 0
## Cleanup
await server.stop()

View File

@ -65,7 +65,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
connected: it.connectedness == Connectedness.Connected))
peers.add(relayPeers)
if not node.wakuFilter.isNil():
if not node.wakuFilterLegacy.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterPeers = node.peerManager.peerStore.peers(WakuFilterCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),

View File

@ -62,7 +62,7 @@ proc startMetricsLog*() =
let pxPeers = collectorAsF64(waku_px_peers)
let lightpushPeers = collectorAsF64(waku_lightpush_peers)
let filterPeers = collectorAsF64(waku_filter_peers)
let filterSubscribers = collectorAsF64(waku_filter_subscribers)
let filterSubscribers = collectorAsF64(waku_legacy_filter_subscribers)
info "Total connections initiated", count = $freshConnCount
info "Total messages", count = totalMessages

View File

@ -29,8 +29,9 @@ import
../protocol/waku_archive,
../protocol/waku_store,
../protocol/waku_store/client as store_client,
../protocol/waku_filter,
../protocol/waku_filter/client as filter_client,
../protocol/waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed
../protocol/waku_filter/client as filter_client, #TODO: support for legacy filter protocol will be removed
../protocol/waku_filter_v2,
../protocol/waku_lightpush,
../protocol/waku_lightpush/client as lightpush_client,
../protocol/waku_enr,
@ -87,8 +88,9 @@ type
wakuArchive*: WakuArchive
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuFilter*: WakuFilter
wakuFilterClient*: WakuFilterClient
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed
wakuFilterClientLegacy*: WakuFilterClientLegacy #TODO: support for legacy filter protocol will be removed
when defined(rln):
wakuRlnRelay*: WakuRLNRelay
wakuLightPush*: WakuLightPush
@ -247,6 +249,12 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuFilter.handleMessage(topic, msg)
##TODO: Support for legacy filter will be removed
if node.wakuFilterLegacy.isNil():
return
await node.wakuFilterLegacy.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuArchive.isNil():
return
@ -395,12 +403,15 @@ proc mountRelay*(node: WakuNode,
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(node.peerManager, node.rng, filterTimeout)
node.wakuFilter = WakuFilter.new(node.peerManager)
node.wakuFilterLegacy = WakuFilterLegacy.new(node.peerManager, node.rng, filterTimeout) #TODO: remove legacy
if node.started:
await node.wakuFilter.start()
await node.wakuFilterLegacy.start() #TODO: remove legacy
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterSubscribeCodec))
node.switch.mount(node.wakuFilterLegacy, protocolMatcher(WakuFilterCodec)) #TODO: remove legacy
proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}=
if node.wakuFilter.isNil():
@ -408,22 +419,23 @@ proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: Wak
return
await node.wakuFilter.handleMessage(pubsubTopic, message)
await node.wakuFilterLegacy.handleMessage(pubsubTopic, message) #TODO: remove legacy
proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting filter client"
node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng)
node.wakuFilterClientLegacy = WakuFilterClientLegacy.new(node.peerManager, node.rng)
if node.started:
# Node has started already. Let's start filter too.
await node.wakuFilterClient.start()
await node.wakuFilterClientLegacy.start()
node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec))
node.switch.mount(node.wakuFilterClientLegacy, protocolMatcher(WakuFilterCodec))
proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
@ -440,7 +452,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
handler(pubsubTopic, message)
let subRes = await node.wakuFilterClient.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer)
let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer)
if subRes.isOk():
info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
@ -450,7 +462,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C
proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic],
peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} =
## Unsubscribe from a content filter.
if node.wakuFilterClient.isNil():
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return
@ -459,7 +471,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer.peerId
let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
let unsubRes = await node.wakuFilterClientLegacy.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer)
if unsubRes.isOk():
info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics
else:
@ -470,7 +482,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics:
proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
if node.wakuFilterClient.isNil():
if node.wakuFilterClientLegacy.isNil():
error "cannot register filter subscription to topic", error="waku filter client is nil"
return
@ -485,7 +497,7 @@ proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Content
proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe,
deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} =
## Unsubscribe from a content filter.
if node.wakuFilterClient.isNil():
if node.wakuFilterClientLegacy.isNil():
error "cannot unregister filter subscription to content", error="waku filter client is nil"
return

View File

@ -71,13 +71,13 @@ proc getSubscriptionsCount(m: SubscriptionManager): int =
type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
type WakuFilterClient* = ref object of LPProtocol
type WakuFilterClientLegacy* = ref object of LPProtocol
rng: ref rand.HmacDrbgContext
peerManager: PeerManager
subManager: SubscriptionManager
proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) =
proc handleMessagePush(wf: WakuFilterClientLegacy, peerId: PeerId, requestId: string, rpc: MessagePush) =
for msg in rpc.messages:
let
pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation
@ -86,24 +86,24 @@ proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string,
wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg)
proc initProtocolHandler(wf: WakuFilterClient) =
proc initProtocolHandler(wf: WakuFilterClientLegacy) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeReqRes = FilterRPC.decode(buffer)
if decodeReqRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure])
return
let rpc = decodeReqRes.get()
trace "filter message received"
if rpc.push.isNone():
waku_filter_errors.inc(labelValues = [emptyMessagePushFailure])
waku_legacy_filter_errors.inc(labelValues = [emptyMessagePushFailure])
# TODO: Manage the empty push message error. Perform any action?
return
waku_filter_messages.inc(labelValues = ["MessagePush"])
waku_legacy_filter_messages.inc(labelValues = ["MessagePush"])
let
peerId = conn.peerId
@ -116,11 +116,11 @@ proc initProtocolHandler(wf: WakuFilterClient) =
wf.handler = handle
wf.codec = WakuFilterCodec
proc new*(T: type WakuFilterClient,
proc new*(T: type WakuFilterClientLegacy,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext): T =
let wf = WakuFilterClient(
let wf = WakuFilterClientLegacy(
peerManager: peerManager,
rng: rng,
subManager: SubscriptionManager.init()
@ -129,7 +129,7 @@ proc new*(T: type WakuFilterClient,
wf
proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
proc sendFilterRpc(wf: WakuFilterClientLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
@ -138,7 +138,7 @@ proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeer
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRequestRpc(wf: WakuFilterClient,
proc sendFilterRequestRpc(wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopics: seq[ContentTopic],
subscribe: bool,
@ -158,13 +158,13 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,
let sendRes = await wf.sendFilterRpc(rpc, peer)
if sendRes.isErr():
waku_filter_errors.inc(labelValues = [sendRes.error])
waku_legacy_filter_errors.inc(labelValues = [sendRes.error])
return err(sendRes.error)
return ok()
proc subscribe*(wf: WakuFilterClient,
proc subscribe*(wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic|seq[ContentTopic],
handler: FilterPushHandler,
@ -184,7 +184,7 @@ proc subscribe*(wf: WakuFilterClient,
return ok()
proc unsubscribe*(wf: WakuFilterClient,
proc unsubscribe*(wf: WakuFilterClientLegacy,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic|seq[ContentTopic],
peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} =
@ -203,8 +203,8 @@ proc unsubscribe*(wf: WakuFilterClient,
return ok()
proc clearSubscriptions*(wf: WakuFilterClient) =
proc clearSubscriptions*(wf: WakuFilterClientLegacy) =
wf.subManager.clear()
proc getSubscriptionsCount*(wf: WakuFilterClient): int =
proc getSubscriptionsCount*(wf: WakuFilterClientLegacy): int =
wf.subManager.getSubscriptionsCount()

View File

@ -1,201 +1,201 @@
import
std/[options, sets, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/protocols/protocol,
libp2p/crypto/crypto
import
../waku_message,
../../node/peer_manager,
./rpc,
./rpc_codec,
./protocol_metrics
logScope:
topics = "waku filter"
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours
type WakuFilterResult*[T] = Result[T, string]
## Subscription manager
type Subscription = object
requestId: string
peer: PeerID
pubsubTopic: PubsubTopic
contentTopics: HashSet[ContentTopic]
proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]) =
let subscription = Subscription(
requestId: requestId,
peer: peer,
pubsubTopic: pubsubTopic,
contentTopics: toHashSet(contentTopics)
)
subscriptions.add(subscription)
proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) =
for sub in subscriptions.mitems:
if sub.peer != peer:
continue
sub.contentTopics.excl(toHashSet(unsubscribeTopics))
# Delete the subscriber if no more content filters left
subscriptions.keepItIf(it.contentTopics.len > 0)
## Protocol
type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilter* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
subscriptions*: seq[Subscription]
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration
proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) =
let
requestId = rpc.requestId
subscribe = rpc.request.get().subscribe
pubsubTopic = rpc.request.get().pubsubTopic
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
if subscribe:
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics
wf.subscriptions.removeSubscription(peerId, contentTopics)
waku_filter_subscribers.set(wf.subscriptions.len.int64)
proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeRpcRes = FilterRPC.decode(buffer)
if decodeRpcRes.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
trace "filter message received"
let rpc = decodeRpcRes.get()
## Filter request
# Subscription/unsubscription request
if rpc.request.isNone():
waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action?
return
waku_filter_messages.inc(labelValues = ["FilterRequest"])
wf.handleFilterRequest(conn.peerId, rpc)
wf.handler = handler
wf.codec = WakuFilterCodec
proc new*(T: type WakuFilter,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuFilterTimeout): T =
let wf = WakuFilter(rng: rng,
peerManager: peerManager,
timeout: timeout)
wf.initProtocolHandler()
return wf
proc init*(T: type WakuFilter,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuFilterTimeout): T {.
deprecated: "WakuFilter.new()' instead".} =
WakuFilter.new(peerManager, rng, timeout)
proc sendFilterRpc(wf: WakuFilter, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
### Send message to subscriptors
proc removePeerFromFailedPeersTable(wf: WakuFilter, subs: seq[Subscription]) =
## Clear the failed peer table if subscriber was able to connect
for sub in subs:
wf.failedPeers.del($sub)
proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defect, KeyError].} =
## If we have already failed to send message to this peer,
## check for elapsed time and if it's been too long, remove the peer.
for sub in subs:
let subKey: string = $(sub)
if not wf.failedPeers.hasKey(subKey):
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
let elapsedTime = Moment.now() - wf.failedPeers[subKey]
if elapsedTime > wf.timeout:
wf.failedPeers.del(subKey)
let index = wf.subscriptions.find(sub)
wf.subscriptions.delete(index)
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len
if wf.subscriptions.len <= 0:
return
var failedSubscriptions: seq[Subscription]
var connectedSubscriptions: seq[Subscription]
for sub in wf.subscriptions:
# TODO: Review when pubsubTopic can be empty and if it is a valid case
if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic:
continue
if msg.contentTopic notin sub.contentTopics:
continue
let rpc = FilterRPC(
requestId: sub.requestId,
push: some(MessagePush(messages: @[msg]))
)
let res = await wf.sendFilterRpc(rpc, sub.peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
failedSubscriptions.add(sub)
continue
connectedSubscriptions.add(sub)
wf.removePeerFromFailedPeersTable(connectedSubscriptions)
wf.handleClientError(failedSubscriptions)
import
std/[options, sets, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl/rand,
libp2p/protocols/protocol,
libp2p/crypto/crypto
import
../waku_message,
../../node/peer_manager,
./rpc,
./rpc_codec,
./protocol_metrics
logScope:
topics = "waku filter"
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours
type WakuFilterResult*[T] = Result[T, string]
## Subscription manager
type Subscription = object
requestId: string
peer: PeerID
pubsubTopic: PubsubTopic
contentTopics: HashSet[ContentTopic]
proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]) =
let subscription = Subscription(
requestId: requestId,
peer: peer,
pubsubTopic: pubsubTopic,
contentTopics: toHashSet(contentTopics)
)
subscriptions.add(subscription)
proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) =
for sub in subscriptions.mitems:
if sub.peer != peer:
continue
sub.contentTopics.excl(toHashSet(unsubscribeTopics))
# Delete the subscriber if no more content filters left
subscriptions.keepItIf(it.contentTopics.len > 0)
## Protocol
type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilterLegacy* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
subscriptions*: seq[Subscription]
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration
proc handleFilterRequest(wf: WakuFilterLegacy, peerId: PeerId, rpc: FilterRPC) =
let
requestId = rpc.requestId
subscribe = rpc.request.get().subscribe
pubsubTopic = rpc.request.get().pubsubTopic
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)
if subscribe:
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics
wf.subscriptions.removeSubscription(peerId, contentTopics)
waku_legacy_filter_subscribers.set(wf.subscriptions.len.int64)
proc initProtocolHandler(wf: WakuFilterLegacy) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let buffer = await conn.readLp(MaxRpcSize.int)
let decodeRpcRes = FilterRPC.decode(buffer)
if decodeRpcRes.isErr():
waku_legacy_filter_errors.inc(labelValues = [decodeRpcFailure])
return
trace "filter message received"
let rpc = decodeRpcRes.get()
## Filter request
# Subscription/unsubscription request
if rpc.request.isNone():
waku_legacy_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action?
return
waku_legacy_filter_messages.inc(labelValues = ["FilterRequest"])
wf.handleFilterRequest(conn.peerId, rpc)
wf.handler = handler
wf.codec = WakuFilterCodec
proc new*(T: type WakuFilterLegacy,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuFilterTimeout): T =
let wf = WakuFilterLegacy(rng: rng,
peerManager: peerManager,
timeout: timeout)
wf.initProtocolHandler()
return wf
proc init*(T: type WakuFilterLegacy,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
timeout: Duration = WakuFilterTimeout): T {.
deprecated: "WakuFilterLegacy.new()' instead".} =
WakuFilterLegacy.new(peerManager, rng, timeout)
proc sendFilterRpc(wf: WakuFilterLegacy, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
### Send message to subscriptors
proc removePeerFromFailedPeersTable(wf: WakuFilterLegacy, subs: seq[Subscription]) =
## Clear the failed peer table if subscriber was able to connect
for sub in subs:
wf.failedPeers.del($sub)
proc handleClientError(wf: WakuFilterLegacy, subs: seq[Subscription]) {.raises: [Defect, KeyError].} =
## If we have already failed to send message to this peer,
## check for elapsed time and if it's been too long, remove the peer.
for sub in subs:
let subKey: string = $(sub)
if not wf.failedPeers.hasKey(subKey):
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
let elapsedTime = Moment.now() - wf.failedPeers[subKey]
if elapsedTime > wf.timeout:
wf.failedPeers.del(subKey)
let index = wf.subscriptions.find(sub)
wf.subscriptions.delete(index)
proc handleMessage*(wf: WakuFilterLegacy, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len
if wf.subscriptions.len <= 0:
return
var failedSubscriptions: seq[Subscription]
var connectedSubscriptions: seq[Subscription]
for sub in wf.subscriptions:
# TODO: Review when pubsubTopic can be empty and if it is a valid case
if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic:
continue
if msg.contentTopic notin sub.contentTopics:
continue
let rpc = FilterRPC(
requestId: sub.requestId,
push: some(MessagePush(messages: @[msg]))
)
let res = await wf.sendFilterRpc(rpc, sub.peer)
if res.isErr():
waku_legacy_filter_errors.inc(labelValues = [res.error()])
failedSubscriptions.add(sub)
continue
connectedSubscriptions.add(sub)
wf.removePeerFromFailedPeersTable(connectedSubscriptions)
wf.handleClientError(failedSubscriptions)

View File

@ -6,9 +6,9 @@ else:
import metrics
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
declarePublicGauge waku_legacy_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_legacy_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_legacy_filter_messages, "number of filter messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"

View File

@ -1,27 +1,27 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options
import
../waku_message
type
ContentFilter* = object
contentTopic*: string
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubsubTopic*: string
subscribe*: bool
MessagePush* = object
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: Option[FilterRequest]
push*: Option[MessagePush]
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options
import
../waku_message
type
ContentFilter* = object
contentTopic*: string
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubsubTopic*: string
subscribe*: bool
MessagePush* = object
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: Option[FilterRequest]
push*: Option[MessagePush]

View File

@ -6,7 +6,7 @@ else:
{.push raises: [].}
import
std/[options,sequtils,sets,tables],
std/[options,sequtils,sets,strutils,tables],
chronicles,
chronos,
libp2p/peerid,
@ -24,12 +24,13 @@ logScope:
topics = "waku filter"
const
MaxContentTopicsPerRequest = 30
MaxContentTopicsPerRequest* = 30
type
WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager
maintenanceTask: TimerCallback
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId=peerId
@ -52,6 +53,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
if peerId in wf.subscriptions:
# We already have a subscription for this peer. Try to add the new filter criteria.
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
if peerSubscription.len() + filterCriteria.len() >= MaxCriteriaPerSubscription:
return err(FilterSubscribeError.serviceUnavailable("peer has reached maximum number of filter criteria"))
@ -59,6 +61,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
peerSubscription.incl(filterCriteria)
wf.subscriptions[peerId] = peerSubscription
else:
# We don't have a subscription for this peer yet. Try to add it.
if wf.subscriptions.len() >= MaxTotalSubscriptions:
return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions"))
debug "creating new subscription", peerId=peerId
@ -78,7 +81,7 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
if peerId notin wf.subscriptions:
debug "unsubscibing peer has no subscriptions", peerId=peerId
debug "unsubscribing peer has no subscriptions", peerId=peerId
return err(FilterSubscribeError.notFound())
var peerSubscription = wf.subscriptions.mgetOrPut(peerId, initHashSet[FilterCriterion]())
@ -95,7 +98,7 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
if peerId notin wf.subscriptions:
debug "unsubscibing peer has no subscriptions", peerId=peerId
debug "unsubscribing peer has no subscriptions", peerId=peerId
return err(FilterSubscribeError.notFound())
debug "removing peer subscription", peerId=peerId
@ -169,6 +172,7 @@ proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {
proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
## Remove subscriptions for peers that have been removed from peer store
var peersToRemove: seq[PeerId]
for peerId, peerSubscription in wf.subscriptions.pairs():
## TODO: currently we only maintain by syncing with peer store. We could
@ -177,7 +181,11 @@ proc maintainSubscriptions*(wf: WakuFilter) =
debug "peer has been removed from peer store, removing subscription", peerId=peerId
peersToRemove.add(peerId)
wf.subscriptions.removePeers(peersToRemove)
if peersToRemove.len() > 0:
wf.subscriptions.removePeers(peersToRemove)
## Periodic report of number of subscriptions
waku_filter_subscriptions.set(wf.subscriptions.len().float64)
const MessagePushTimeout = 20.seconds
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
@ -208,6 +216,8 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa
proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peerId=conn.peerId
let buf = await conn.readLp(MaxSubscribeSize)
let decodeRes = FilterSubscribeRequest.decode(buf)
@ -220,6 +230,8 @@ proc initProtocolHandler(wf: WakuFilter) =
let response = wf.handleSubscribeRequest(conn.peerId, request)
debug "sending filter subscribe response", peerId=conn.peerId, response=response
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
return
@ -242,9 +254,9 @@ proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) =
var maintainSubs: proc(udata: pointer) {.gcsafe, raises: [Defect].}
maintainSubs = proc(udata: pointer) {.gcsafe.} =
maintainSubscriptions(wf)
discard setTimer(Moment.fromNow(interval), maintainSubs)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
discard setTimer(Moment.fromNow(interval), maintainSubs)
wf.maintenanceTask = setTimer(Moment.fromNow(interval), maintainSubs)
method start*(wf: WakuFilter) {.async.} =
debug "starting filter protocol"
@ -254,4 +266,5 @@ method start*(wf: WakuFilter) {.async.} =
method stop*(wf: WakuFilter) {.async.} =
debug "stopping filter protocol"
wf.maintenanceTask.clearTimer()
await procCall LPProtocol(wf).stop()

View File

@ -9,6 +9,7 @@ export metrics
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"]
declarePublicGauge waku_filter_subscriptions, "number of subscribed filter clients"
declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"]
declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers"