mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-28 07:35:56 +00:00
deploy: 32d230b4746f638efa811e710d6418720a4bca04
This commit is contained in:
parent
e272af30d4
commit
99e72299f3
3
.gitignore
vendored
3
.gitignore
vendored
@ -34,3 +34,6 @@ rln
|
|||||||
package-lock.json
|
package-lock.json
|
||||||
node_modules/
|
node_modules/
|
||||||
/.update.timestamp
|
/.update.timestamp
|
||||||
|
|
||||||
|
# Ignore Jetbrains IDE files
|
||||||
|
.idea/
|
@ -30,7 +30,7 @@ procSuite "Waku Filter":
|
|||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
var responseRequestIdFuture = newFuture[string]()
|
var responseRequestIdFuture = newFuture[string]()
|
||||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
check:
|
check:
|
||||||
msg.messages.len() == 1
|
msg.messages.len() == 1
|
||||||
msg.messages[0] == post
|
msg.messages[0] == post
|
||||||
@ -43,7 +43,7 @@ procSuite "Waku Filter":
|
|||||||
dialSwitch.mount(proto)
|
dialSwitch.mount(proto)
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||||
@ -75,7 +75,7 @@ procSuite "Waku Filter":
|
|||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
var responseCompletionFuture = newFuture[bool]()
|
var responseCompletionFuture = newFuture[bool]()
|
||||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
check:
|
check:
|
||||||
msg.messages.len() == 1
|
msg.messages.len() == 1
|
||||||
msg.messages[0] == post
|
msg.messages[0] == post
|
||||||
@ -88,7 +88,7 @@ procSuite "Waku Filter":
|
|||||||
dialSwitch.mount(proto)
|
dialSwitch.mount(proto)
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
|
||||||
@ -131,7 +131,7 @@ procSuite "Waku Filter":
|
|||||||
await dialSwitch.start()
|
await dialSwitch.start()
|
||||||
|
|
||||||
var responseRequestIdFuture = newFuture[string]()
|
var responseRequestIdFuture = newFuture[string]()
|
||||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let
|
let
|
||||||
@ -161,7 +161,7 @@ procSuite "Waku Filter":
|
|||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
var responseCompletionFuture = newFuture[bool]()
|
var responseCompletionFuture = newFuture[bool]()
|
||||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
check:
|
check:
|
||||||
msg.messages.len() == 1
|
msg.messages.len() == 1
|
||||||
msg.messages[0] == post
|
msg.messages[0] == post
|
||||||
@ -174,7 +174,7 @@ procSuite "Waku Filter":
|
|||||||
dialSwitch.mount(proto)
|
dialSwitch.mount(proto)
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
|
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
|
||||||
@ -225,7 +225,7 @@ procSuite "Waku Filter":
|
|||||||
await listenSwitch.start()
|
await listenSwitch.start()
|
||||||
|
|
||||||
var responseCompletionFuture = newFuture[bool]()
|
var responseCompletionFuture = newFuture[bool]()
|
||||||
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
check:
|
check:
|
||||||
msg.messages.len() == 1
|
msg.messages.len() == 1
|
||||||
msg.messages[0] == post
|
msg.messages[0] == post
|
||||||
@ -238,7 +238,7 @@ procSuite "Waku Filter":
|
|||||||
dialSwitch.mount(proto)
|
dialSwitch.mount(proto)
|
||||||
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
|
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
|
||||||
discard
|
discard
|
||||||
|
|
||||||
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)
|
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)
|
||||||
|
@ -349,6 +349,65 @@ procSuite "WakuNode":
|
|||||||
await node1.stop()
|
await node1.stop()
|
||||||
await node2.stop()
|
await node2.stop()
|
||||||
|
|
||||||
|
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
|
||||||
|
# See nwaku issue #937: 'Store: ability to decouple store from relay'
|
||||||
|
|
||||||
|
let
|
||||||
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||||
|
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||||
|
|
||||||
|
pubSubTopic = "/waku/2/default-waku/proto"
|
||||||
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||||
|
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||||
|
|
||||||
|
filterComplFut = newFuture[bool]()
|
||||||
|
storeComplFut = newFuture[bool]()
|
||||||
|
|
||||||
|
await node1.start()
|
||||||
|
node1.mountStore(persistMessages = true)
|
||||||
|
node1.mountFilter()
|
||||||
|
|
||||||
|
await node2.start()
|
||||||
|
node2.mountStore(persistMessages = true)
|
||||||
|
node2.mountFilter()
|
||||||
|
|
||||||
|
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
|
||||||
|
|
||||||
|
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
|
||||||
|
check:
|
||||||
|
msg == message
|
||||||
|
filterComplFut.complete(true)
|
||||||
|
|
||||||
|
await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler)
|
||||||
|
|
||||||
|
await sleepAsync(2000.millis)
|
||||||
|
|
||||||
|
# Send filter push message to node2
|
||||||
|
await node1.wakuFilter.handleMessage(pubSubTopic, message)
|
||||||
|
|
||||||
|
await sleepAsync(2000.millis)
|
||||||
|
|
||||||
|
# Wait for the node2 filter to receive the push message
|
||||||
|
check:
|
||||||
|
(await filterComplFut.withTimeout(5.seconds)) == true
|
||||||
|
|
||||||
|
proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} =
|
||||||
|
check:
|
||||||
|
response.messages.len == 1
|
||||||
|
response.messages[0] == message
|
||||||
|
storeComplFut.complete(true)
|
||||||
|
|
||||||
|
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler)
|
||||||
|
|
||||||
|
check:
|
||||||
|
(await storeComplFut.withTimeout(5.seconds)) == true
|
||||||
|
|
||||||
|
await node1.stop()
|
||||||
|
await node2.stop()
|
||||||
|
|
||||||
asyncTest "Messages are correctly relayed":
|
asyncTest "Messages are correctly relayed":
|
||||||
let
|
let
|
||||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# libtool - Provide generalized library-building support services.
|
# libtool - Provide generalized library-building support services.
|
||||||
# Generated automatically by config.status (libbacktrace) version-unused
|
# Generated automatically by config.status (libbacktrace) version-unused
|
||||||
# Libtool was configured on host fv-az191-65:
|
# Libtool was configured on host fv-az135-74:
|
||||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||||
#
|
#
|
||||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||||
|
@ -324,7 +324,7 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
|
|||||||
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
|
||||||
|
|
||||||
# Register handler for filter, whether remote subscription succeeded or not
|
# Register handler for filter, whether remote subscription succeeded or not
|
||||||
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
|
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler, pubSubTopic: request.pubSubTopic)
|
||||||
waku_node_filters.set(node.filters.len.int64)
|
waku_node_filters.set(node.filters.len.int64)
|
||||||
|
|
||||||
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
|
||||||
@ -457,12 +457,16 @@ proc info*(node: WakuNode): WakuInfo =
|
|||||||
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
|
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
|
||||||
info "mounting filter"
|
info "mounting filter"
|
||||||
proc filterHandler(requestId: string, msg: MessagePush)
|
proc filterHandler(requestId: string, msg: MessagePush)
|
||||||
{.gcsafe, raises: [Defect, KeyError].} =
|
{.async, gcsafe, raises: [Defect, KeyError].} =
|
||||||
|
|
||||||
info "push received"
|
info "push received"
|
||||||
for message in msg.messages:
|
for message in msg.messages:
|
||||||
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
|
||||||
|
|
||||||
|
if not node.wakuStore.isNil and (requestId in node.filters):
|
||||||
|
let pubSubTopic = node.filters[requestId].pubSubTopic
|
||||||
|
await node.wakuStore.handleMessage(pubSubTopic, message)
|
||||||
|
|
||||||
waku_node_messages.inc(labelValues = ["filter"])
|
waku_node_messages.inc(labelValues = ["filter"])
|
||||||
|
|
||||||
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
|
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
|
||||||
|
@ -179,7 +179,7 @@ method init*(wf: WakuFilter) =
|
|||||||
let value = res.value
|
let value = res.value
|
||||||
if value.push != MessagePush():
|
if value.push != MessagePush():
|
||||||
waku_filter_messages.inc(labelValues = ["MessagePush"])
|
waku_filter_messages.inc(labelValues = ["MessagePush"])
|
||||||
wf.pushHandler(value.requestId, value.push)
|
await wf.pushHandler(value.requestId, value.push)
|
||||||
if value.request != FilterRequest():
|
if value.request != FilterRequest():
|
||||||
waku_filter_messages.inc(labelValues = ["FilterRequest"])
|
waku_filter_messages.inc(labelValues = ["FilterRequest"])
|
||||||
if value.request.subscribe:
|
if value.request.subscribe:
|
||||||
|
@ -15,6 +15,8 @@ const
|
|||||||
MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024
|
MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024
|
||||||
|
|
||||||
type
|
type
|
||||||
|
PubSubTopic* = string
|
||||||
|
|
||||||
ContentFilter* = object
|
ContentFilter* = object
|
||||||
contentTopic*: ContentTopic
|
contentTopic*: ContentTopic
|
||||||
|
|
||||||
@ -22,6 +24,7 @@ type
|
|||||||
|
|
||||||
Filter* = object
|
Filter* = object
|
||||||
contentFilters*: seq[ContentFilter]
|
contentFilters*: seq[ContentFilter]
|
||||||
|
pubSubTopic*: PubSubTopic
|
||||||
handler*: ContentFilterHandler
|
handler*: ContentFilterHandler
|
||||||
|
|
||||||
# @TODO MAYBE MORE INFO?
|
# @TODO MAYBE MORE INFO?
|
||||||
@ -29,7 +32,7 @@ type
|
|||||||
|
|
||||||
FilterRequest* = object
|
FilterRequest* = object
|
||||||
contentFilters*: seq[ContentFilter]
|
contentFilters*: seq[ContentFilter]
|
||||||
pubSubTopic*: string
|
pubSubTopic*: PubSubTopic
|
||||||
subscribe*: bool
|
subscribe*: bool
|
||||||
|
|
||||||
MessagePush* = object
|
MessagePush* = object
|
||||||
@ -45,7 +48,7 @@ type
|
|||||||
requestId*: string
|
requestId*: string
|
||||||
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||||
|
|
||||||
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
|
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
|
||||||
|
|
||||||
WakuFilter* = ref object of LPProtocol
|
WakuFilter* = ref object of LPProtocol
|
||||||
rng*: ref BrHmacDrbgContext
|
rng*: ref BrHmacDrbgContext
|
||||||
|
Loading…
x
Reference in New Issue
Block a user