refactor(waku-filter): waku filter protocol code reorganisation

This commit is contained in:
Lorenzo Delgado 2022-08-12 12:15:51 +02:00 committed by Lorenzo Delgado
parent caa47896c1
commit c964aea885
20 changed files with 783 additions and 644 deletions

View File

@ -23,7 +23,7 @@ import
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time

View File

@ -17,7 +17,7 @@ import
../../waku/v1/protocol/waku_protocol,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_filter,
../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/utils/peers,
../test_helpers

View File

@ -2,286 +2,316 @@
import
std/[options, tables, sets],
testutils/unittests, chronos, chronicles,
testutils/unittests,
chronos,
chronicles,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/multistream
import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/waku_filter/waku_filter,
../test_helpers, ./utils
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_filter,
../test_helpers,
./utils
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
const dummyHandler = proc(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard
proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
return newStandardSwitch(some(peerKey), addrs=peerAddr)
# TODO: Extend test coverage
procSuite "Waku Filter":
asyncTest "handle filter":
const defaultTopic = "/waku/2/default-waku/proto"
asyncTest "should forward messages to client after subscribed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler)
serverSwitch.mount(serverProto)
# Client
let handlerFuture = newFuture[(string, MessagePush)]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete((requestId, push))
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
clientSwitch.mount(clientProto)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resSubscription.isOk()
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseRequestIdFuture.complete(requestId)
await sleepAsync(5.milliseconds)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
await serverProto.handleMessage(DefaultPubsubTopic, message)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
## Then
let subscriptionRequestId = resSubscription.get()
let (requestId, push) = await handlerFuture
check:
(await responseRequestIdFuture) == id
asyncTest "Can subscribe and unsubscribe from content filter":
const defaultTopic = "/waku/2/default-waku/proto"
requestId == subscriptionRequestId
push.messages == @[message]
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "should not forward messages to client after unsuscribed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler)
serverSwitch.mount(serverProto)
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
clientSwitch.mount(clientProto)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
## Given
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resSubscription.isOk()
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
await sleepAsync(5.milliseconds)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerWasCalledAfterSubscription = await handlerFuture.withTimeout(1.seconds)
require handlerWasCalledAfterSubscription
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
# Reset to test unsubscribe
responseCompletionFuture = newFuture[bool]()
handlerFuture = newFuture[void]()
let
rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: false)
let resUnsubscription = await clientProto.unsubscribe(DefaultPubsubTopic, @[DefaultContentTopic])
require resUnsubscription.isOk()
await proto.unsubscribe(rpcU)
await sleepAsync(5.milliseconds)
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
await serverProto.handleMessage(DefaultPubsubTopic, message)
## Then
let handlerWasCalledAfterUnsubscription = await handlerFuture.withTimeout(1.seconds)
check:
# Check that unsubscribe works as expected
(await responseCompletionFuture.withTimeout(5.seconds)) == false
not handlerWasCalledAfterUnsubscription
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "handle filter subscribe failures":
const defaultTopic = "/waku/2/default-waku/proto"
asyncTest "subscription should fail if no filter peer is provided":
## Setup
let clientSwitch = newTestSwitch()
await clientSwitch.start()
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
## Given
let clientProto = WakuFilter.init(PeerManager.new(clientSwitch), crypto.newRng(), dummyHandler)
clientSwitch.mount(clientProto)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
let idOpt = (await proto.subscribe(rpc))
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
## Then
check:
idOpt.isNone
resSubscription.isErr()
resSubscription.error() == "peer_not_found_failure"
asyncTest "Handle failed clients":
const defaultTopic = "/waku/2/default-waku/proto"
asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed":
## Setup
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch()
serverSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=1.seconds)
serverSwitch.mount(serverProto)
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
clientSwitch.mount(clientProto)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
## When
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
check resSubscription.isOk()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
await sleepAsync(5.milliseconds)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds)
require handlerShouldHaveBeenCalled
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
# Stop client node to test timeout unsubscription
await clientSwitch.stop()
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
await sleepAsync(5.milliseconds)
# Stop switch to test unsubscribe
discard dialSwitch.stop()
await sleepAsync(2.seconds)
# First failure should not remove the subscription
await serverProto.handleMessage(DefaultPubsubTopic, message)
let
subscriptionsBeforeTimeout = serverProto.subscriptions.len()
failedPeersBeforeTimeout = serverProto.failedPeers.len()
#First failure should not remove the subscription
await proto2.handleMessage(defaultTopic, post)
await sleepAsync(2000.millis)
check:
proto2.subscribers.len() == 1
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
check:
proto2.subscribers.len() == 0
asyncTest "Handles failed clients coming back up":
const defaultTopic = "/waku/2/default-waku/proto"
let
dialKey = PrivateKey.random(ECDSA, rng[]).get()
listenKey = PrivateKey.random(ECDSA, rng[]).get()
contentTopic = ContentTopic("/waku/2/default-content/proto")
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch(privKey = some(dialKey), addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").tryGet())
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(listenKey))
await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)
listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds)
await proto2.handleMessage(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
responseCompletionFuture = newFuture[bool]()
# Stop switch to test unsubscribe
await dialSwitch.stop()
# Wait for peer connection failure timeout to elapse
await sleepAsync(1.seconds)
#First failure should add to failure list
await proto2.handleMessage(defaultTopic, post)
check:
proto2.failedPeers.len() == 1
# Start switch with same key as before
var dialSwitch2 = newStandardSwitch(some(dialKey), addrs = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").tryGet())
await dialSwitch2.start()
dialSwitch2.mount(proto)
#Second failure should remove the subscription
await proto2.handleMessage(defaultTopic, post)
await serverProto.handleMessage(DefaultPubsubTopic, message)
let
subscriptionsAfterTimeout = serverProto.subscriptions.len()
failedPeersAfterTimeout = serverProto.failedPeers.len()
## Then
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
subscriptionsBeforeTimeout == 1
failedPeersBeforeTimeout == 1
subscriptionsAfterTimeout == 0
failedPeersAfterTimeout == 0
check:
proto2.failedPeers.len() == 0
## Cleanup
await serverSwitch.stop()
asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses":
## Setup
let
clientKey = PrivateKey.random(ECDSA, rng[]).get()
clientAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").get()
await dialSwitch2.stop()
await listenSwitch.stop()
let rng = crypto.newRng()
let
clientSwitch = newTestSwitch(some(clientKey), some(clientAddress))
serverSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
# Server
let
serverPeerManager = PeerManager.new(serverSwitch)
serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=2.seconds)
serverSwitch.mount(serverProto)
# Client
var handlerFuture = newFuture[void]()
proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} =
handlerFuture.complete()
let
clientPeerManager = PeerManager.new(clientSwitch)
clientProto = WakuFilter.init(clientPeerManager, rng, handler)
clientSwitch.mount(clientProto)
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
## When
let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic])
check resSubscription.isOk()
await sleepAsync(5.milliseconds)
await serverProto.handleMessage(DefaultPubsubTopic, message)
handlerFuture = newFuture[void]()
let
subscriptionsBeforeFailure = serverProto.subscriptions.len()
failedPeersBeforeFailure = serverProto.failedPeers.len()
# Stop switch to test unsubscribe
await clientSwitch.stop()
await sleepAsync(5.milliseconds)
# First failure should add to failure list
await serverProto.handleMessage(DefaultPubsubTopic, message)
handlerFuture = newFuture[void]()
let
subscriptionsAfterFailure = serverProto.subscriptions.len()
failedPeersAfterFailure = serverProto.failedPeers.len()
await sleepAsync(250.milliseconds)
# Start switch with same key as before
var clientSwitch2 = newTestSwitch(some(clientKey), some(clientAddress))
await clientSwitch2.start()
clientSwitch2.mount(clientProto)
# If push succeeds after failure, the peer should removed from failed peers list
await serverProto.handleMessage(DefaultPubsubTopic, message)
let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds)
let
subscriptionsAfterSuccessfulConnection = serverProto.subscriptions.len()
failedPeersAfterSuccessfulConnection = serverProto.failedPeers.len()
## Then
check:
handlerShouldHaveBeenCalled
check:
subscriptionsBeforeFailure == 1
subscriptionsAfterFailure == 1
subscriptionsAfterSuccessfulConnection == 1
check:
failedPeersBeforeFailure == 0
failedPeersAfterFailure == 1
failedPeersAfterSuccessfulConnection == 0
## Cleanup
await allFutures(clientSwitch2.stop(), serverSwitch.stop())

View File

@ -490,7 +490,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList:
await proto.handleMessage(DEFAULT_PUBSUB_TOPIC, msg)
await proto.handleMessage(DefaultPubsubTopic, msg)
let (listenSwitch2, dialSwitch2, proto2) = await newTestWakuStore()
let msgList2 = @[
@ -505,7 +505,7 @@ procSuite "Waku Store - fault tolerant store":
]
for msg in msgList2:
await proto2.handleMessage(DEFAULT_PUBSUB_TOPIC, msg)
await proto2.handleMessage(DefaultPubsubTopic, msg)
asyncTest "handle temporal history query with a valid time window":

View File

@ -18,7 +18,7 @@ import
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,

View File

@ -6,7 +6,7 @@ import
json_rpc/[rpcclient, rpcserver],
libp2p/protobuf/minprotobuf
import
../protocol/waku_filter/waku_filter_types,
../protocol/waku_filter,
../protocol/waku_store,
../protocol/waku_message,
../utils/time,

View File

@ -8,7 +8,7 @@ import
../wakunode2,
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_filter,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings

View File

@ -8,7 +8,7 @@ import
../wakunode2,
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_filter,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings

View File

@ -8,7 +8,7 @@ import
../wakunode2,
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_filter,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings

View File

@ -7,7 +7,7 @@ import
../wakunode2,
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_filter,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings

View File

@ -8,7 +8,7 @@ import
../wakunode2,
../waku_payload,
../jsonrpc/jsonrpc_types,
../../protocol/waku_filter/waku_filter_types,
../../protocol/waku_filter,
../../protocol/waku_store,
../../../v1/node/rpc/hexstrings

View File

@ -19,7 +19,7 @@ import
../protocol/[waku_relay, waku_message],
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_filter,
../protocol/waku_lightpush,
../protocol/waku_rln_relay/[waku_rln_relay_types],
../utils/[peers, requests, wakuswitch, wakuenr],
@ -67,30 +67,6 @@ proc protocolMatcher(codec: string): Matcher =
return match
proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq
let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
var rIdToRemove: seq[string] = @[]
for rId, f in filters.mpairs:
# Iterate filter entries to remove matching content topics
# make sure we delete the content filter
# if no more topics are left
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
if f.contentFilters.len == 0:
rIdToRemove.add(rId)
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove:
filters.del(rId)
debug "filters modified", filters=filters
proc updateSwitchPeerInfo(node: WakuNode) =
## TODO: remove this when supported upstream
##
@ -217,7 +193,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
switch: switch,
rng: rng,
enr: enr,
filters: initTable[string, Filter](),
filters: Filters.init(),
announcedAddresses: announcedAddresses
)
@ -279,18 +255,20 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
var id = generateRequestId(node.rng)
if node.wakuFilter.isNil == false:
let idOpt = await node.wakuFilter.subscribe(request)
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
if idOpt.isSome():
# Subscribed successfully.
id = idOpt.get()
if resSubscription.isOk():
id = resSubscription.get()
else:
# Failed to subscribe
error "remote subscription to filter failed", filter = request
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler, pubSubTopic: request.pubSubTopic)
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
@ -333,7 +311,10 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
info "unsubscribe content", filter=request
await node.wakuFilter.unsubscribe(request)
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
node.filters.removeContentFilters(request.contentFilters)
waku_node_filters.set(node.filters.len.int64)
@ -420,10 +401,9 @@ proc info*(node: WakuNode): WakuInfo =
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
return wakuInfo
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, LPError]} =
info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush)
{.async, gcsafe, raises: [Defect, KeyError].} =
proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} =
info "push received"
for message in msg.messages:

View File

@ -5,7 +5,7 @@ import
../protocol/waku_relay,
../protocol/waku_store,
../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter,
../protocol/waku_filter,
../protocol/waku_lightpush,
../protocol/waku_rln_relay/waku_rln_relay_types,
./peer_manager/peer_manager,

View File

@ -0,0 +1,13 @@
{.push raises: [Defect].}
import
./waku_filter/rpc,
./waku_filter/rpc_codec,
./waku_filter/protocol,
./waku_filter/client
export
rpc,
rpc_codec,
protocol,
client

View File

@ -0,0 +1,69 @@
{.push raises: [Defect].}
import
std/[tables, sequtils],
chronicles
import
../waku_message,
./rpc
type
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].}
Filter* = object
pubSubTopic*: string
contentFilters*: seq[ContentFilter]
handler*: ContentFilterHandler
Filters* = Table[string, Filter]
proc init*(T: type Filters): T =
initTable[string, Filter]()
proc addContentFilters*(filters: var Filters, requestId: string, pubsubTopic: string, contentFilters: seq[ContentFilter], handler: ContentFilterHandler) {.gcsafe.}=
filters[requestId] = Filter(
pubSubTopic: pubsubTopic,
contentFilters: contentFilters,
handler: handler
)
proc removeContentFilters*(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq
let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
var rIdToRemove: seq[string] = @[]
for rId, f in filters.mpairs:
# Iterate filter entries to remove matching content topics
# make sure we delete the content filter
# if no more topics are left
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
if f.contentFilters.len == 0:
rIdToRemove.add(rId)
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove:
filters.del(rId)
debug "filters modified", filters=filters
proc notify*(filters: Filters, msg: WakuMessage, requestId: string) =
for key, filter in filters.pairs:
# We do this because the key for the filter is set to the requestId received from the filter protocol.
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
# the requestId of the coresponding filter.
if requestId != "" and requestId == key:
filter.handler(msg)
continue
# TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place.
for contentFilter in filter.contentFilters:
if msg.contentTopic == contentFilter.contentTopic:
filter.handler(msg)
break

View File

@ -0,0 +1,287 @@
import
std/[options, sets, tables, sequtils],
stew/results,
chronicles,
chronos,
metrics,
bearssl,
libp2p/protocols/protocol,
libp2p/crypto/crypto
import
../waku_message,
../../node/peer_manager/peer_manager,
../../utils/requests,
./rpc,
./rpc_codec
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
logScope:
topics = "wakufilter"
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety: currently we never
# push more than 1 message at a time.
MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 2.hours
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure"
type Subscription = object
requestId: string
peer: PeerID
pubsubTopic: string
contentTopics: HashSet[ContentTopic]
proc addSubscription(subscriptions: var seq[Subscription], peer: PeerID, requestId: string, pubsubTopic: string, contentTopics: seq[ContentTopic]) =
let subscription = Subscription(
requestId: requestId,
peer: peer,
pubsubTopic: pubsubTopic,
contentTopics: toHashSet(contentTopics)
)
subscriptions.add(subscription)
proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsubscribeTopics: seq[ContentTopic]) =
for sub in subscriptions.mitems:
if sub.peer != peer:
continue
sub.contentTopics.excl(toHashSet(unsubscribeTopics))
# Delete the subscriber if no more content filters left
subscriptions.keepItIf(it.contentTopics.len > 0)
type
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilterResult*[T] = Result[T, string]
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
pushHandler*: MessagePushHandler
subscriptions*: seq[Subscription]
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration
proc init(wf: WakuFilter) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
let res = FilterRPC.init(message)
if res.isErr():
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
trace "filter message received"
let rpc = res.get()
## Filter request
# We are receiving a subscription/unsubscription request
if rpc.request != FilterRequest():
waku_filter_messages.inc(labelValues = ["FilterRequest"])
let
requestId = rpc.requestId
subscribe = rpc.request.subscribe
pubsubTopic = rpc.request.pubsubTopic
contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic)
if subscribe:
info "added filter subscritpiton", peerId=conn.peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
wf.subscriptions.addSubscription(conn.peerId, requestId, pubsubTopic, contentTopics)
else:
info "removed filter subscritpiton", peerId=conn.peerId, contentTopics=contentTopics
wf.subscriptions.removeSubscription(conn.peerId, contentTopics)
waku_filter_subscribers.set(wf.subscriptions.len.int64)
## Push message
# We are receiving a messages from the peer that we subscribed to
if rpc.push != MessagePush():
waku_filter_messages.inc(labelValues = ["MessagePush"])
let
requestId = rpc.requestId
push = rpc.push
info "received filter message push", peerId=conn.peerId
await wf.pushHandler(requestId, push)
wf.handler = handle
wf.codec = WakuFilterCodec
proc init*(T: type WakuFilter,
peerManager: PeerManager,
rng: ref BrHmacDrbgContext,
handler: MessagePushHandler,
timeout: Duration = WakuFilterTimeout): T =
let wf = WakuFilter(rng: rng,
peerManager: peerManager,
pushHandler: handler,
timeout: timeout)
wf.init()
return wf
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()
proc sendFilterRpcToPeer(wf: WakuFilter, rpc: FilterRPC, peer: PeerId): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
proc sendFilterRpcToRemotePeer(wf: WakuFilter, rpc: FilterRPC, peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}=
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isNone():
return err(dialFailure)
let connection = connOpt.get()
await connection.writeLP(rpc.encode().buffer)
return ok()
### Send message to subscriptors
proc removePeerFromFailedPeersTable(wf: WakuFilter, subs: seq[Subscription]) =
## Clear the failed peer table if subscriber was able to connect
for sub in subs:
wf.failedPeers.del($sub)
proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defect, KeyError].} =
## If we have already failed to send message to this peer,
## check for elapsed time and if it's been too long, remove the peer.
for sub in subs:
let subKey: string = $(sub)
if not wf.failedPeers.hasKey(subKey):
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
let elapsedTime = Moment.now() - wf.failedPeers[subKey]
if elapsedTime > wf.timeout:
wf.failedPeers.del(subKey)
let index = wf.subscriptions.find(sub)
wf.subscriptions.delete(index)
proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.async.} =
if wf.subscriptions.len <= 0:
return
var failedSubscriptions: seq[Subscription]
var connectedSubscriptions: seq[Subscription]
for sub in wf.subscriptions:
# TODO: Review when pubsubTopic can be empty and if it is a valid case
if sub.pubSubTopic != "" and sub.pubSubTopic != pubsubTopic:
continue
if msg.contentTopic notin sub.contentTopics:
continue
let rpc = FilterRPC(
requestId: sub.requestId,
push: MessagePush(messages: @[msg])
)
let res = await wf.sendFilterRpcToPeer(rpc, sub.peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
failedSubscriptions.add(sub)
continue
connectedSubscriptions.add(sub)
wf.removePeerFromFailedPeersTable(connectedSubscriptions)
wf.handleClientError(failedSubscriptions)
### Send subscription/unsubscription
proc subscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[string]] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
let rpc = FilterRPC(
requestId: id,
request: FilterRequest(
subscribe: true,
pubSubTopic: pubsubTopic,
contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it))
)
)
let res = await wf.sendFilterRpcToRemotePeer(rpc, peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
return err(res.error())
return ok(id)
proc subscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
waku_filter_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wf.subscribe(pubsubTopic, contentTopics, peerOpt.get())
proc unsubscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let id = generateRequestId(wf.rng)
let rpc = FilterRPC(
requestId: id,
request: FilterRequest(
subscribe: false,
pubSubTopic: pubsubTopic,
contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it))
)
)
let res = await wf.sendFilterRpcToRemotePeer(rpc, peer)
if res.isErr():
waku_filter_errors.inc(labelValues = [res.error()])
return err(res.error())
return ok()
proc unsubscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[void]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isNone():
waku_filter_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return await wf.unsubscribe(pubsubTopic, contentTopics, peerOpt.get())

View File

@ -0,0 +1,18 @@
import ../waku_message
type
ContentFilter* = object
contentTopic*: ContentTopic
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: string
subscribe*: bool
MessagePush* = object
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: FilterRequest
push*: MessagePush

View File

@ -0,0 +1,109 @@
{.push raises: [Defect].}
import
libp2p/protobuf/minprotobuf,
libp2p/varint
import
../waku_message,
../../utils/protobuf,
./rpc
proc encode*(filter: ContentFilter): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic)
return ok(ContentFilter(contentTopic: contentTopic))
proc encode*(rpc: FilterRequest): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, uint64(rpc.subscribe))
output.write3(2, rpc.pubSubTopic)
for filter in rpc.contentFilters:
output.write3(3, filter.encode())
output.finish3()
return output
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
var subflag: uint64
if ?pb.getField(1, subflag):
rpc.subscribe = bool(subflag)
var pubSubTopic: string
discard ?pb.getField(2, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs)
for buf in buffs:
rpc.contentFilters.add(?ContentFilter.init(buf))
return ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer =
var output = initProtoBuffer()
for push in push.messages:
output.write3(1, push.encode())
output.finish3()
return output
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var push = MessagePush()
var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(1, messages)
for buf in messages:
push.messages.add(?WakuMessage.init(buf))
return ok(push)
proc encode*(rpc: FilterRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode())
output.write3(3, rpc.push.encode())
output.finish3()
return output
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = FilterRPC()
var requestId: string
discard ?pb.getField(1, requestId)
rpc.requestId = requestId
var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer)
rpc.request = ?FilterRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer)
rpc.push = ?MessagePush.init(pushBuffer)
return ok(rpc)

View File

@ -1,308 +0,0 @@
{.push raises: [Defect].}
import
std/[tables, sequtils, options],
bearssl,
chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
libp2p/crypto/crypto,
waku_filter_types,
../../utils/requests,
../../utils/protobuf,
../../node/peer_manager/peer_manager
# NOTE This is just a start, the design of this protocol isn't done yet. It
# should be direct payload exchange (a la req-resp), not be coupled with the
# relay protocol.
export waku_filter_types
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"]
logScope:
topics = "wakufilter"
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
WakuFilterTimeout: Duration = 1.days
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") {.raises: [Defect, KeyError]} =
for key in filters.keys:
let filter = filters[key]
# We do this because the key for the filter is set to the requestId received from the filter protocol.
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
# the requestId of the coresponding filter.
if requestId != "" and requestId == key:
filter.handler(msg)
continue
# TODO: In case of no topics we should either trigger here for all messages,
# or we should not allow such filter to exist in the first place.
for contentFilter in filter.contentFilters:
if msg.contentTopic == contentFilter.contentTopic:
filter.handler(msg)
break
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
# Flatten all unsubscribe topics into single seq
let unsubscribeTopics = request.contentFilters.mapIt(it.contentTopic)
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
for subscriber in subscribers.mitems:
if subscriber.peer != peerId: continue
# make sure we delete the content filter
# if no more topics are left
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
# make sure we delete the subscriber
# if no more content filters left
subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0)
debug "subscribers modified", subscribers=subscribers
# @TODO: metrics?
proc encode*(filter: ContentFilter): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, filter.contentTopic)
output.finish3()
return output
proc encode*(rpc: FilterRequest): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, uint64(rpc.subscribe))
output.write3(2, rpc.pubSubTopic)
for filter in rpc.contentFilters:
output.write3(3, filter.encode())
output.finish3()
return output
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var contentTopic: ContentTopic
discard ? pb.getField(1, contentTopic)
return ok(ContentFilter(contentTopic: contentTopic))
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
let pb = initProtoBuffer(buffer)
var subflag: uint64
if ? pb.getField(1, subflag):
rpc.subscribe = bool(subflag)
discard ? pb.getField(2, rpc.pubSubTopic)
var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(3, buffs)
for buf in buffs:
rpc.contentFilters.add(? ContentFilter.init(buf))
return ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer =
var output = initProtoBuffer()
for push in push.messages:
output.write3(1, push.encode())
output.finish3()
return output
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
var push = MessagePush()
let pb = initProtoBuffer(buffer)
var messages: seq[seq[byte]]
discard ? pb.getRepeatedField(1, messages)
for buf in messages:
push.messages.add(? WakuMessage.init(buf))
return ok(push)
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, rpc.requestId)
var requestBuffer: seq[byte]
discard ? pb.getField(2, requestBuffer)
rpc.request = ? FilterRequest.init(requestBuffer)
var pushBuffer: seq[byte]
discard ? pb.getField(3, pushBuffer)
rpc.push = ? MessagePush.init(pushBuffer)
return ok(rpc)
proc encode*(rpc: FilterRPC): ProtoBuffer =
var output = initProtoBuffer()
output.write3(1, rpc.requestId)
output.write3(2, rpc.request.encode())
output.write3(3, rpc.push.encode())
output.finish3()
return output
method init*(wf: WakuFilter) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var message = await conn.readLp(MaxRpcSize.int)
var res = FilterRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
info "filter message received"
let value = res.value
if value.push != MessagePush():
waku_filter_messages.inc(labelValues = ["MessagePush"])
await wf.pushHandler(value.requestId, value.push)
if value.request != FilterRequest():
waku_filter_messages.inc(labelValues = ["FilterRequest"])
if value.request.subscribe:
wf.subscribers.add(Subscriber(peer: conn.peerId, requestId: value.requestId, filter: value.request))
else:
wf.subscribers.unsubscribeFilters(value.request, conn.peerId)
waku_filter_subscribers.set(wf.subscribers.len.int64)
wf.handler = handle
wf.codec = WakuFilterCodec
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler,timeout: Duration = WakuFilterTimeout): T =
let rng = crypto.newRng()
var wf = WakuFilter(rng: rng,
peerManager: peerManager,
pushHandler: handler,
timeout: timeout)
wf.init()
return wf
proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc()
#clear the failed peer table if subscriber was able to connect.
proc handleClientSuccess(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
wf.failedPeers.del(subKey)
# If we have already failed to send message to this peer,
# check for elapsed time and if it's been too long, remove the peer.
proc handleClientError(wf: WakuFilter, subscribers: seq[Subscriber]){.raises: [Defect, KeyError].} =
for subscriber in subscribers:
var subKey: string = $(subscriber)
if wf.failedPeers.hasKey(subKey):
var elapsedTime = Moment.now() - wf.failedPeers[subKey]
if(elapsedTime > wf.timeout):
trace "Remove peer if timeout has reached for", peer=subscriber
var index = wf.subscribers.find(subscriber)
wf.subscribers.delete(index)
wf.failedPeers.del(subKey)
else:
# add the peer to the failed peers table.
wf.failedPeers[subKey] = Moment.now()
return
proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} =
# Handle WakuMessage according to filter protocol
trace "handle message in WakuFilter", topic=topic, msg=msg
var handleMessageFailed = false
var failedSubscriber: seq[Subscriber]
var connectedSubscribers: seq[Subscriber]
for subscriber in wf.subscribers:
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
continue
for filter in subscriber.filter.contentFilters:
if msg.contentTopic == filter.contentTopic:
trace "Found matching contentTopic", filter=filter, msg=msg
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
if connOpt.isSome:
await connOpt.get().writeLP(push.encode().buffer)
connectedSubscribers.add(subscriber)
else:
# @TODO more sophisticated error handling here
handleMessageFailed = true
failedSubscriber.add(subscriber)
error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
break
handleClientSuccess(wf, connectedSubscribers)
if handleMessageFailed:
handleClientError(wf, failedSubscriber)
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isSome:
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isSome:
# This is the only successful path to subscription
let id = generateRequestId(wf.rng)
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
return some(id)
else:
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
return none(string)
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
let
id = generateRequestId(wf.rng)
peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
if peerOpt.isSome:
# @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers
let peer = peerOpt.get()
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
if connOpt.isSome:
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
else:
# @TODO more sophisticated error handling here
error "failed to connect to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])

View File

@ -1,59 +0,0 @@
import
std/[tables],
chronos,
bearssl,
libp2p/protocols/protocol,
../../node/peer_manager/peer_manager,
../waku_message
export waku_message
const
# We add a 64kB safety buffer for protocol overhead.
# 10x-multiplier also for safety: currently we never
# push more than 1 message at a time.
MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024
type
PubSubTopic* = string
ContentFilter* = object
contentTopic*: ContentTopic
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].}
Filter* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: PubSubTopic
handler*: ContentFilterHandler
# @TODO MAYBE MORE INFO?
Filters* = Table[string, Filter]
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: PubSubTopic
subscribe*: bool
MessagePush* = object
messages*: seq[WakuMessage]
FilterRPC* = object
requestId*: string
request*: FilterRequest
push*: MessagePush
Subscriber* = object
peer*: PeerID
requestId*: string
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext
peerManager*: PeerManager
subscribers*: seq[Subscriber]
pushHandler*: MessagePushHandler
failedPeers*: Table[string, chronos.Moment]
timeout*: chronos.Duration