mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-15 01:14:56 +00:00
deploy: a6bb3f65e119f8e8f8c85f8d6fe5a28466e4a51a
This commit is contained in:
parent
84ababcd94
commit
74bd1fbd38
@ -3,6 +3,7 @@
|
||||
## Next version
|
||||
|
||||
- Refactor: Split out `waku_types` types into right place; create utils folder.
|
||||
- Refactor: Replace sequence of ContentTopics in ContentFilter with a single ContentTopic.
|
||||
- Docs: Add information on how to query Status test fleet for node addresses; how to view logs and how to update submodules.
|
||||
- PubSub topic `subscribe` and `unsubscribe` no longer returns a future (removed `async` designation)
|
||||
- Added a peer manager for `relay`, `filter`, `store` and `swap` peers.
|
||||
|
@ -318,7 +318,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||
info "Hit filter handler"
|
||||
|
||||
await node.subscribe(
|
||||
FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[DefaultContentTopic])], pubSubTopic: DefaultTopic, subscribe: true),
|
||||
FilterRequest(contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
|
||||
filterHandler
|
||||
)
|
||||
|
||||
|
@ -291,8 +291,11 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
# Light node has not yet subscribed to any filters
|
||||
node.filters.len() == 0
|
||||
|
||||
let contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic, ContentTopic("2")]),
|
||||
ContentFilter(contentTopics: @[ContentTopic("3"), ContentTopic("4")])]
|
||||
let contentFilters = @[ContentFilter(contentTopic: defaultContentTopic),
|
||||
ContentFilter(contentTopic: ContentTopic("2")),
|
||||
ContentFilter(contentTopic: ContentTopic("3")),
|
||||
ContentFilter(contentTopic: ContentTopic("4")),
|
||||
]
|
||||
var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic))
|
||||
|
||||
check:
|
||||
@ -330,7 +333,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
|
||||
# First ensure subscription exists
|
||||
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic])], topic = some(defaultTopic))
|
||||
let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: defaultContentTopic)], topic = some(defaultTopic))
|
||||
check:
|
||||
sub
|
||||
|
||||
@ -730,4 +733,4 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
server3.close()
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
await node3.stop()
|
@ -1,155 +1,155 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sets],
|
||||
testutils/unittests, chronos, chronicles,
|
||||
libp2p/switch,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multistream,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/protocol/message_notifier,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../test_helpers, ./utils
|
||||
|
||||
procSuite "Waku Filter":
|
||||
|
||||
asyncTest "handle filter":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg.messages.len() == 1
|
||||
msg.messages[0] == post
|
||||
responseRequestIdFuture.complete(requestId)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
(await responseRequestIdFuture) == id
|
||||
|
||||
asyncTest "Can subscribe and unsubscribe from content filter":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
var responseCompletionFuture = newFuture[bool]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg.messages.len() == 1
|
||||
msg.messages[0] == post
|
||||
responseCompletionFuture.complete(true)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
# Check that subscription works as expected
|
||||
(await responseCompletionFuture.withTimeout(3.seconds)) == true
|
||||
|
||||
# Reset to test unsubscribe
|
||||
responseCompletionFuture = newFuture[bool]()
|
||||
|
||||
let
|
||||
rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: false)
|
||||
|
||||
await proto.unsubscribe(rpcU)
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
# Check that unsubscribe works as expected
|
||||
(await responseCompletionFuture.withTimeout(5.seconds)) == false
|
||||
|
||||
asyncTest "handle filter subscribe failures":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
let idOpt = (await proto.subscribe(rpc))
|
||||
|
||||
check:
|
||||
idOpt.isNone
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sets],
|
||||
testutils/unittests, chronos, chronicles,
|
||||
libp2p/switch,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/[bufferstream, connection],
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/multistream,
|
||||
../../waku/v2/node/peer_manager/peer_manager,
|
||||
../../waku/v2/protocol/message_notifier,
|
||||
../../waku/v2/protocol/waku_filter/waku_filter,
|
||||
../test_helpers, ./utils
|
||||
|
||||
procSuite "Waku Filter":
|
||||
|
||||
asyncTest "handle filter":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg.messages.len() == 1
|
||||
msg.messages[0] == post
|
||||
responseRequestIdFuture.complete(requestId)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
(await responseRequestIdFuture) == id
|
||||
|
||||
asyncTest "Can subscribe and unsubscribe from content filter":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
peer = PeerInfo.init(key)
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var listenSwitch = newStandardSwitch(some(key))
|
||||
discard await listenSwitch.start()
|
||||
|
||||
var responseCompletionFuture = newFuture[bool]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
check:
|
||||
msg.messages.len() == 1
|
||||
msg.messages[0] == post
|
||||
responseCompletionFuture.complete(true)
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
proto.setPeer(listenSwitch.peerInfo)
|
||||
|
||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||
subscription = proto2.subscription()
|
||||
|
||||
var subscriptions = newTable[string, MessageNotificationSubscription]()
|
||||
subscriptions["test"] = subscription
|
||||
listenSwitch.mount(proto2)
|
||||
|
||||
let id = (await proto.subscribe(rpc)).get()
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
# Check that subscription works as expected
|
||||
(await responseCompletionFuture.withTimeout(3.seconds)) == true
|
||||
|
||||
# Reset to test unsubscribe
|
||||
responseCompletionFuture = newFuture[bool]()
|
||||
|
||||
let
|
||||
rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: false)
|
||||
|
||||
await proto.unsubscribe(rpcU)
|
||||
|
||||
await sleepAsync(2.seconds)
|
||||
|
||||
await subscriptions.notify(defaultTopic, post)
|
||||
|
||||
check:
|
||||
# Check that unsubscribe works as expected
|
||||
(await responseCompletionFuture.withTimeout(5.seconds)) == false
|
||||
|
||||
asyncTest "handle filter subscribe failures":
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
|
||||
let
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
var dialSwitch = newStandardSwitch()
|
||||
discard await dialSwitch.start()
|
||||
|
||||
var responseRequestIdFuture = newFuture[string]()
|
||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
||||
discard
|
||||
|
||||
let
|
||||
proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle)
|
||||
rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], pubSubTopic: defaultTopic, subscribe: true)
|
||||
|
||||
dialSwitch.mount(proto)
|
||||
|
||||
let idOpt = (await proto.subscribe(rpc))
|
||||
|
||||
check:
|
||||
idOpt.isNone
|
||||
|
@ -30,7 +30,7 @@ procSuite "WakuNode":
|
||||
Port(60000))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
||||
@ -82,7 +82,7 @@ procSuite "WakuNode":
|
||||
Port(60002))
|
||||
pubSubTopic = "chat"
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
|
||||
filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
|
||||
message = WakuMessage(payload: "hello world".toBytes(),
|
||||
contentTopic: contentTopic)
|
||||
|
||||
@ -149,8 +149,8 @@ procSuite "WakuNode":
|
||||
otherPayload = @[byte 9]
|
||||
defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic)
|
||||
otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic)
|
||||
defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[defaultContentTopic])], subscribe: true)
|
||||
otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[otherContentTopic])], subscribe: true)
|
||||
defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: defaultContentTopic)], subscribe: true)
|
||||
otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: otherContentTopic)], subscribe: true)
|
||||
|
||||
await node1.start()
|
||||
node1.mountRelay()
|
||||
@ -221,7 +221,7 @@ procSuite "WakuNode":
|
||||
contentTopic = "defaultCT"
|
||||
payload = @[byte 1]
|
||||
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
||||
filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true)
|
||||
filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true)
|
||||
|
||||
await node1.start()
|
||||
node1.mountRelay()
|
||||
@ -322,7 +322,7 @@ procSuite "WakuNode":
|
||||
msg == message
|
||||
completionFut.complete(true)
|
||||
|
||||
await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true), handler)
|
||||
await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), handler)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
@ -679,4 +679,4 @@ procSuite "WakuNode":
|
||||
# (await completionFutLightPush.withTimeout(5.seconds)) == true
|
||||
# await node1.stop()
|
||||
# await node2.stop()
|
||||
# await node3.stop()
|
||||
# await node3.stop()
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az193-336:
|
||||
# Libtool was configured on host fv-az190-951:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -192,4 +192,4 @@ proc completeCmdArg*(T: type Port, val: TaintedString): seq[string] =
|
||||
func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress =
|
||||
# TODO: How should we select between IPv4 and IPv6
|
||||
# Maybe there should be a config option for this.
|
||||
(static ValidIpAddress.init("0.0.0.0"))
|
||||
(static ValidIpAddress.init("0.0.0.0"))
|
@ -1,93 +1,93 @@
|
||||
{.push raises: [Exception, Defect].}
|
||||
|
||||
import
|
||||
std/[tables,sequtils],
|
||||
json_rpc/rpcserver,
|
||||
eth/[common, rlp, keys, p2p],
|
||||
../../protocol/waku_filter/waku_filter_types,
|
||||
../wakunode2,
|
||||
./jsonrpc_types
|
||||
|
||||
export jsonrpc_types
|
||||
|
||||
logScope:
|
||||
topics = "filter api"
|
||||
|
||||
const futTimeout* = 5.seconds # Max time to wait for futures
|
||||
const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable
|
||||
|
||||
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) =
|
||||
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
# Add message to current cache
|
||||
trace "WakuMessage received", msg=msg
|
||||
|
||||
# Make a copy of msgs for this topic to modify
|
||||
var msgs = messageCache.getOrDefault(msg.contentTopic, @[])
|
||||
|
||||
if msgs.len >= maxCache:
|
||||
# Message cache on this topic exceeds maximum. Delete oldest.
|
||||
# @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
|
||||
msgs.delete(0,0)
|
||||
msgs.add(msg)
|
||||
|
||||
# Replace indexed entry with copy
|
||||
# @TODO max number of content topics could be limited in node
|
||||
messageCache[msg.contentTopic] = msgs
|
||||
|
||||
## Filter API version 1 definitions
|
||||
|
||||
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]:
|
||||
## Returns all WakuMessages received on a content topic since the
|
||||
## last time this method was called
|
||||
## @TODO ability to specify a return message limit
|
||||
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
|
||||
|
||||
if messageCache.hasKey(contentTopic):
|
||||
let msgs = messageCache[contentTopic]
|
||||
# Clear cache before next call
|
||||
messageCache[contentTopic] = @[]
|
||||
return msgs
|
||||
else:
|
||||
# Not subscribed to this content topic
|
||||
raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic)
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Subscribes a node to a list of content filters
|
||||
debug "post_waku_v2_filter_v1_subscription"
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO use default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||
|
||||
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
|
||||
# Successfully subscribed to all content filters
|
||||
|
||||
for cTopic in concat(contentFilters.mapIt(it.contentTopics)):
|
||||
# Create message cache for each subscribed content topic
|
||||
messageCache[cTopic] = @[]
|
||||
|
||||
return true
|
||||
else:
|
||||
# Failed to subscribe to one or more content filters
|
||||
raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq))
|
||||
|
||||
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Unsubscribes a node from a list of content filters
|
||||
debug "delete_waku_v2_filter_v1_subscription"
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO consider using default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||
|
||||
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
|
||||
# Successfully unsubscribed from all content filters
|
||||
|
||||
for cTopic in concat(contentFilters.mapIt(it.contentTopics)):
|
||||
# Remove message cache for each unsubscribed content topic
|
||||
messageCache.del(cTopic)
|
||||
|
||||
return true
|
||||
else:
|
||||
# Failed to unsubscribe from one or more content filters
|
||||
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
||||
{.push raises: [Exception, Defect].}
|
||||
|
||||
import
|
||||
std/[tables,sequtils],
|
||||
json_rpc/rpcserver,
|
||||
eth/[common, rlp, keys, p2p],
|
||||
../../protocol/waku_filter/waku_filter_types,
|
||||
../wakunode2,
|
||||
./jsonrpc_types
|
||||
|
||||
export jsonrpc_types
|
||||
|
||||
logScope:
|
||||
topics = "filter api"
|
||||
|
||||
const futTimeout* = 5.seconds # Max time to wait for futures
|
||||
const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable
|
||||
|
||||
proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) =
|
||||
|
||||
proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||
# Add message to current cache
|
||||
trace "WakuMessage received", msg=msg
|
||||
|
||||
# Make a copy of msgs for this topic to modify
|
||||
var msgs = messageCache.getOrDefault(msg.contentTopic, @[])
|
||||
|
||||
if msgs.len >= maxCache:
|
||||
# Message cache on this topic exceeds maximum. Delete oldest.
|
||||
# @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed.
|
||||
msgs.delete(0,0)
|
||||
msgs.add(msg)
|
||||
|
||||
# Replace indexed entry with copy
|
||||
# @TODO max number of content topics could be limited in node
|
||||
messageCache[msg.contentTopic] = msgs
|
||||
|
||||
## Filter API version 1 definitions
|
||||
|
||||
rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]:
|
||||
## Returns all WakuMessages received on a content topic since the
|
||||
## last time this method was called
|
||||
## @TODO ability to specify a return message limit
|
||||
debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic
|
||||
|
||||
if messageCache.hasKey(contentTopic):
|
||||
let msgs = messageCache[contentTopic]
|
||||
# Clear cache before next call
|
||||
messageCache[contentTopic] = @[]
|
||||
return msgs
|
||||
else:
|
||||
# Not subscribed to this content topic
|
||||
raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic)
|
||||
|
||||
rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Subscribes a node to a list of content filters
|
||||
debug "post_waku_v2_filter_v1_subscription"
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO use default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true)
|
||||
|
||||
if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)):
|
||||
# Successfully subscribed to all content filters
|
||||
|
||||
for cTopic in contentFilters.mapIt(it.contentTopic):
|
||||
# Create message cache for each subscribed content topic
|
||||
messageCache[cTopic] = @[]
|
||||
|
||||
return true
|
||||
else:
|
||||
# Failed to subscribe to one or more content filters
|
||||
raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq))
|
||||
|
||||
rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool:
|
||||
## Unsubscribes a node from a list of content filters
|
||||
debug "delete_waku_v2_filter_v1_subscription"
|
||||
|
||||
# Construct a filter request
|
||||
# @TODO consider using default PubSub topic if undefined
|
||||
let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false)
|
||||
|
||||
if (await node.unsubscribe(fReq).withTimeout(futTimeout)):
|
||||
# Successfully unsubscribed from all content filters
|
||||
|
||||
for cTopic in contentFilters.mapIt(it.contentTopic):
|
||||
# Remove message cache for each unsubscribed content topic
|
||||
messageCache.del(cTopic)
|
||||
|
||||
return true
|
||||
else:
|
||||
# Failed to unsubscribe from one or more content filters
|
||||
raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq))
|
||||
|
@ -32,4 +32,4 @@ proc get_waku_v2_private_v1_symmetric_messages(topic: string, symkey: string): s
|
||||
# Asymmetric
|
||||
proc get_waku_v2_private_v1_asymmetric_keypair(): WakuKeyPair
|
||||
proc post_waku_v2_private_v1_asymmetric_message(topic: string, message: WakuRelayMessage, publicKey: string): bool
|
||||
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage]
|
||||
proc get_waku_v2_private_v1_asymmetric_messages(topic: string, privateKey: string): seq[WakuRelayMessage]
|
@ -81,21 +81,17 @@ func asEthKey*(key: PrivateKey): keys.PrivateKey =
|
||||
|
||||
proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
var unsubscribeTopics: seq[ContentTopic]
|
||||
for cf in contentFilters:
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
|
||||
let unsubscribeTopics = contentFilters.mapIt(it.contentTopic)
|
||||
|
||||
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
|
||||
|
||||
var rIdToRemove: seq[string] = @[]
|
||||
for rId, f in filters.mpairs:
|
||||
# Iterate filter entries to remove matching content topics
|
||||
for cf in f.contentFilters.mitems:
|
||||
# Iterate content filters in filter entry
|
||||
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
|
||||
# make sure we delete the content filter
|
||||
# if no more topics are left
|
||||
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0)
|
||||
f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
|
||||
|
||||
if f.contentFilters.len == 0:
|
||||
rIdToRemove.add(rId)
|
||||
@ -744,4 +740,4 @@ when isMainModule:
|
||||
|
||||
c_signal(SIGTERM, handleSigterm)
|
||||
|
||||
runForever()
|
||||
runForever()
|
@ -1,268 +1,258 @@
|
||||
import
|
||||
std/[tables, sequtils, options],
|
||||
bearssl,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/protocols/pubsub/pubsubpeer,
|
||||
libp2p/protocols/pubsub/floodsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
../message_notifier,
|
||||
waku_filter_types,
|
||||
../../utils/requests,
|
||||
../../node/peer_manager/peer_manager
|
||||
|
||||
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||
# relay protocol.
|
||||
|
||||
export waku_filter_types
|
||||
|
||||
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
|
||||
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
||||
|
||||
logScope:
|
||||
topics = "wakufilter"
|
||||
|
||||
const
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
# We do this because the key for the filter is set to the requestId received from the filter protocol.
|
||||
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
|
||||
# the requestId of the coresponding filter.
|
||||
if requestId != "" and requestId == key:
|
||||
filter.handler(msg)
|
||||
continue
|
||||
|
||||
# TODO: In case of no topics we should either trigger here for all messages,
|
||||
# or we should not allow such filter to exist in the first place.
|
||||
for contentFilter in filter.contentFilters:
|
||||
if contentFilter.contentTopics.len > 0:
|
||||
if msg.contentTopic in contentFilter.contentTopics:
|
||||
filter.handler(msg)
|
||||
break
|
||||
|
||||
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
var unsubscribeTopics: seq[ContentTopic]
|
||||
for cf in request.contentFilters:
|
||||
unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics)
|
||||
|
||||
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
|
||||
|
||||
for subscriber in subscribers.mitems:
|
||||
if subscriber.peer.peerId != peerId: continue
|
||||
|
||||
# Iterate through subscriber entries matching peer ID to remove matching content topics
|
||||
for cf in subscriber.filter.contentFilters.mitems:
|
||||
# Iterate content filters in filter entry
|
||||
cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
|
||||
|
||||
# make sure we delete the content filter
|
||||
# if no more topics are left
|
||||
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0)
|
||||
|
||||
# make sure we delete the subscriber
|
||||
# if no more content filters left
|
||||
subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0)
|
||||
|
||||
debug "subscribers modified", subscribers=subscribers
|
||||
# @TODO: metrics?
|
||||
|
||||
proc encode*(filter: ContentFilter): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for contentTopic in filter.contentTopics:
|
||||
result.write(1, contentTopic)
|
||||
|
||||
proc encode*(rpc: FilterRequest): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, uint64(rpc.subscribe))
|
||||
|
||||
result.write(2, rpc.pubSubTopic)
|
||||
|
||||
for filter in rpc.contentFilters:
|
||||
result.write(3, filter.encode())
|
||||
|
||||
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var contentTopics: seq[ContentTopic]
|
||||
discard ? pb.getRepeatedField(1, contentTopics)
|
||||
|
||||
ok(ContentFilter(contentTopics: contentTopics))
|
||||
|
||||
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var subflag: uint64
|
||||
if ? pb.getField(1, subflag):
|
||||
rpc.subscribe = bool(subflag)
|
||||
|
||||
discard ? pb.getField(2, rpc.pubSubTopic)
|
||||
|
||||
var buffs: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(3, buffs)
|
||||
|
||||
for buf in buffs:
|
||||
rpc.contentFilters.add(? ContentFilter.init(buf))
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc encode*(push: MessagePush): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for push in push.messages:
|
||||
result.write(1, push.encode())
|
||||
|
||||
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
|
||||
var push = MessagePush()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var messages: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(1, messages)
|
||||
|
||||
for buf in messages:
|
||||
push.messages.add(? WakuMessage.init(buf))
|
||||
|
||||
ok(push)
|
||||
|
||||
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRPC()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
discard ? pb.getField(1, rpc.requestId)
|
||||
|
||||
var requestBuffer: seq[byte]
|
||||
discard ? pb.getField(2, requestBuffer)
|
||||
|
||||
rpc.request = ? FilterRequest.init(requestBuffer)
|
||||
|
||||
var pushBuffer: seq[byte]
|
||||
discard ? pb.getField(3, pushBuffer)
|
||||
|
||||
rpc.push = ? MessagePush.init(pushBuffer)
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc encode*(rpc: FilterRPC): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, rpc.requestId)
|
||||
result.write(2, rpc.request.encode())
|
||||
result.write(3, rpc.push.encode())
|
||||
|
||||
method init*(wf: WakuFilter) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var res = FilterRPC.init(message)
|
||||
if res.isErr:
|
||||
error "failed to decode rpc"
|
||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
info "filter message received"
|
||||
|
||||
let value = res.value
|
||||
if value.push != MessagePush():
|
||||
wf.pushHandler(value.requestId, value.push)
|
||||
if value.request != FilterRequest():
|
||||
if value.request.subscribe:
|
||||
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
|
||||
else:
|
||||
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
|
||||
|
||||
waku_filter_subscribers.set(wf.subscribers.len.int64)
|
||||
|
||||
wf.handler = handle
|
||||
wf.codec = WakuFilterCodec
|
||||
|
||||
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
|
||||
new result
|
||||
result.rng = crypto.newRng()
|
||||
result.peerManager = peerManager
|
||||
result.pushHandler = handler
|
||||
result.init()
|
||||
|
||||
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
|
||||
wf.peerManager.addPeer(peer, WakuFilterCodec)
|
||||
waku_filter_peers.inc()
|
||||
|
||||
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||
## Returns a Filter for the specific protocol
|
||||
## This filter can then be used to send messages to subscribers that match conditions.
|
||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||
trace "handle WakuFilter subscription", topic=topic, msg=msg
|
||||
|
||||
for subscriber in proto.subscribers:
|
||||
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
|
||||
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
|
||||
continue
|
||||
|
||||
for filter in subscriber.filter.contentFilters:
|
||||
if msg.contentTopic in filter.contentTopics:
|
||||
trace "Found matching contentTopic", filter=filter, msg=msg
|
||||
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
|
||||
|
||||
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(push.encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to push messages to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
break
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
|
||||
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
|
||||
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
|
||||
|
||||
if peerOpt.isSome:
|
||||
let peer = peerOpt.get()
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
# This is the only successful path to subscription
|
||||
let id = generateRequestId(wf.rng)
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
return some(id)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
return none(string)
|
||||
|
||||
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
|
||||
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
|
||||
let
|
||||
id = generateRequestId(wf.rng)
|
||||
peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
|
||||
|
||||
if peerOpt.isSome:
|
||||
# @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers
|
||||
let peer = peerOpt.get()
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
import
|
||||
std/[tables, sequtils, options],
|
||||
bearssl,
|
||||
chronos, chronicles, metrics, stew/results,
|
||||
libp2p/protocols/pubsub/pubsubpeer,
|
||||
libp2p/protocols/pubsub/floodsub,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/protocols/protocol,
|
||||
libp2p/protobuf/minprotobuf,
|
||||
libp2p/stream/connection,
|
||||
libp2p/crypto/crypto,
|
||||
../message_notifier,
|
||||
waku_filter_types,
|
||||
../../utils/requests,
|
||||
../../node/peer_manager/peer_manager
|
||||
|
||||
# NOTE This is just a start, the design of this protocol isn't done yet. It
|
||||
# should be direct payload exchange (a la req-resp), not be coupled with the
|
||||
# relay protocol.
|
||||
|
||||
export waku_filter_types
|
||||
|
||||
declarePublicGauge waku_filter_peers, "number of filter peers"
|
||||
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
|
||||
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
|
||||
|
||||
logScope:
|
||||
topics = "wakufilter"
|
||||
|
||||
const
|
||||
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
|
||||
|
||||
# Error types (metric label values)
|
||||
const
|
||||
dialFailure = "dial_failure"
|
||||
decodeRpcFailure = "decode_rpc_failure"
|
||||
|
||||
proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") =
|
||||
for key in filters.keys:
|
||||
let filter = filters[key]
|
||||
# We do this because the key for the filter is set to the requestId received from the filter protocol.
|
||||
# This means we do not need to check the content filter explicitly as all MessagePushs already contain
|
||||
# the requestId of the coresponding filter.
|
||||
if requestId != "" and requestId == key:
|
||||
filter.handler(msg)
|
||||
continue
|
||||
|
||||
# TODO: In case of no topics we should either trigger here for all messages,
|
||||
# or we should not allow such filter to exist in the first place.
|
||||
for contentFilter in filter.contentFilters:
|
||||
if msg.contentTopic == contentFilter.contentTopic:
|
||||
filter.handler(msg)
|
||||
break
|
||||
|
||||
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
|
||||
# Flatten all unsubscribe topics into single seq
|
||||
let unsubscribeTopics = request.contentFilters.mapIt(it.contentTopic)
|
||||
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
|
||||
|
||||
for subscriber in subscribers.mitems:
|
||||
if subscriber.peer.peerId != peerId: continue
|
||||
|
||||
# make sure we delete the content filter
|
||||
# if no more topics are left
|
||||
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics)
|
||||
|
||||
# make sure we delete the subscriber
|
||||
# if no more content filters left
|
||||
subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0)
|
||||
|
||||
debug "subscribers modified", subscribers=subscribers
|
||||
# @TODO: metrics?
|
||||
|
||||
proc encode*(filter: ContentFilter): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, filter.contentTopic)
|
||||
|
||||
proc encode*(rpc: FilterRequest): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, uint64(rpc.subscribe))
|
||||
|
||||
result.write(2, rpc.pubSubTopic)
|
||||
|
||||
for filter in rpc.contentFilters:
|
||||
result.write(3, filter.encode())
|
||||
|
||||
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var contentTopic: ContentTopic
|
||||
discard ? pb.getField(1, contentTopic)
|
||||
|
||||
ok(ContentFilter(contentTopic: contentTopic))
|
||||
|
||||
proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var subflag: uint64
|
||||
if ? pb.getField(1, subflag):
|
||||
rpc.subscribe = bool(subflag)
|
||||
|
||||
discard ? pb.getField(2, rpc.pubSubTopic)
|
||||
|
||||
var buffs: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(3, buffs)
|
||||
|
||||
for buf in buffs:
|
||||
rpc.contentFilters.add(? ContentFilter.init(buf))
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc encode*(push: MessagePush): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
for push in push.messages:
|
||||
result.write(1, push.encode())
|
||||
|
||||
proc init*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
|
||||
var push = MessagePush()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
var messages: seq[seq[byte]]
|
||||
discard ? pb.getRepeatedField(1, messages)
|
||||
|
||||
for buf in messages:
|
||||
push.messages.add(? WakuMessage.init(buf))
|
||||
|
||||
ok(push)
|
||||
|
||||
proc init*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
|
||||
var rpc = FilterRPC()
|
||||
let pb = initProtoBuffer(buffer)
|
||||
|
||||
discard ? pb.getField(1, rpc.requestId)
|
||||
|
||||
var requestBuffer: seq[byte]
|
||||
discard ? pb.getField(2, requestBuffer)
|
||||
|
||||
rpc.request = ? FilterRequest.init(requestBuffer)
|
||||
|
||||
var pushBuffer: seq[byte]
|
||||
discard ? pb.getField(3, pushBuffer)
|
||||
|
||||
rpc.push = ? MessagePush.init(pushBuffer)
|
||||
|
||||
ok(rpc)
|
||||
|
||||
proc encode*(rpc: FilterRPC): ProtoBuffer =
|
||||
result = initProtoBuffer()
|
||||
|
||||
result.write(1, rpc.requestId)
|
||||
result.write(2, rpc.request.encode())
|
||||
result.write(3, rpc.push.encode())
|
||||
|
||||
method init*(wf: WakuFilter) =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
var message = await conn.readLp(64*1024)
|
||||
var res = FilterRPC.init(message)
|
||||
if res.isErr:
|
||||
error "failed to decode rpc"
|
||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||
return
|
||||
|
||||
info "filter message received"
|
||||
|
||||
let value = res.value
|
||||
if value.push != MessagePush():
|
||||
wf.pushHandler(value.requestId, value.push)
|
||||
if value.request != FilterRequest():
|
||||
if value.request.subscribe:
|
||||
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
|
||||
else:
|
||||
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
|
||||
|
||||
waku_filter_subscribers.set(wf.subscribers.len.int64)
|
||||
|
||||
wf.handler = handle
|
||||
wf.codec = WakuFilterCodec
|
||||
|
||||
proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref BrHmacDrbgContext, handler: MessagePushHandler): T =
|
||||
new result
|
||||
result.rng = crypto.newRng()
|
||||
result.peerManager = peerManager
|
||||
result.pushHandler = handler
|
||||
result.init()
|
||||
|
||||
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
|
||||
wf.peerManager.addPeer(peer, WakuFilterCodec)
|
||||
waku_filter_peers.inc()
|
||||
|
||||
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
|
||||
## Returns a Filter for the specific protocol
|
||||
## This filter can then be used to send messages to subscribers that match conditions.
|
||||
proc handle(topic: string, msg: WakuMessage) {.async.} =
|
||||
trace "handle WakuFilter subscription", topic=topic, msg=msg
|
||||
|
||||
for subscriber in proto.subscribers:
|
||||
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
|
||||
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
|
||||
continue
|
||||
|
||||
for filter in subscriber.filter.contentFilters:
|
||||
if msg.contentTopic == filter.contentTopic:
|
||||
trace "Found matching contentTopic", filter=filter, msg=msg
|
||||
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
|
||||
|
||||
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(push.encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to push messages to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
break
|
||||
|
||||
MessageNotificationSubscription.init(@[], handle)
|
||||
|
||||
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
|
||||
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
|
||||
|
||||
if peerOpt.isSome:
|
||||
let peer = peerOpt.get()
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
# This is the only successful path to subscription
|
||||
let id = generateRequestId(wf.rng)
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
return some(id)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
return none(string)
|
||||
|
||||
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
|
||||
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
|
||||
let
|
||||
id = generateRequestId(wf.rng)
|
||||
peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)
|
||||
|
||||
if peerOpt.isSome:
|
||||
# @TODO: if there are more than one WakuFilter peer, WakuFilter should unsubscribe from all peers
|
||||
let peer = peerOpt.get()
|
||||
|
||||
let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec)
|
||||
|
||||
if connOpt.isSome:
|
||||
await connOpt.get().writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
|
||||
else:
|
||||
# @TODO more sophisticated error handling here
|
||||
error "failed to connect to remote peer"
|
||||
waku_filter_errors.inc(labelValues = [dialFailure])
|
||||
|
@ -1,48 +1,48 @@
|
||||
import
|
||||
std/[tables],
|
||||
bearssl,
|
||||
libp2p/peerinfo,
|
||||
libp2p/protocols/protocol,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../waku_message
|
||||
|
||||
export waku_message
|
||||
|
||||
type
|
||||
ContentFilter* = object
|
||||
contentTopics*: seq[ContentTopic]
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
Filter* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
handler*: ContentFilterHandler
|
||||
|
||||
# @TODO MAYBE MORE INFO?
|
||||
Filters* = Table[string, Filter]
|
||||
|
||||
FilterRequest* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
pubSubTopic*: string
|
||||
subscribe*: bool
|
||||
|
||||
MessagePush* = object
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
FilterRPC* = object
|
||||
requestId*: string
|
||||
request*: FilterRequest
|
||||
push*: MessagePush
|
||||
|
||||
Subscriber* = object
|
||||
peer*: PeerInfo
|
||||
requestId*: string
|
||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
|
||||
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
subscribers*: seq[Subscriber]
|
||||
pushHandler*: MessagePushHandler
|
||||
import
|
||||
std/[tables],
|
||||
bearssl,
|
||||
libp2p/peerinfo,
|
||||
libp2p/protocols/protocol,
|
||||
../../node/peer_manager/peer_manager,
|
||||
../waku_message
|
||||
|
||||
export waku_message
|
||||
|
||||
type
|
||||
ContentFilter* = object
|
||||
contentTopic*: ContentTopic
|
||||
|
||||
ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.}
|
||||
|
||||
Filter* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
handler*: ContentFilterHandler
|
||||
|
||||
# @TODO MAYBE MORE INFO?
|
||||
Filters* = Table[string, Filter]
|
||||
|
||||
FilterRequest* = object
|
||||
contentFilters*: seq[ContentFilter]
|
||||
pubSubTopic*: string
|
||||
subscribe*: bool
|
||||
|
||||
MessagePush* = object
|
||||
messages*: seq[WakuMessage]
|
||||
|
||||
FilterRPC* = object
|
||||
requestId*: string
|
||||
request*: FilterRequest
|
||||
push*: MessagePush
|
||||
|
||||
Subscriber* = object
|
||||
peer*: PeerInfo
|
||||
requestId*: string
|
||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
|
||||
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
|
||||
|
||||
WakuFilter* = ref object of LPProtocol
|
||||
rng*: ref BrHmacDrbgContext
|
||||
peerManager*: PeerManager
|
||||
subscribers*: seq[Subscriber]
|
||||
pushHandler*: MessagePushHandler
|
||||
|
Loading…
x
Reference in New Issue
Block a user