feat(store): ability to decouple store from relay (#937)

This commit is contained in:
Lorenzo Delgado 2022-05-16 12:51:10 +02:00 committed by Lorenzo Delgado
parent 668a01f1f2
commit 32d230b474
5 changed files with 183 additions and 117 deletions

View File

@ -30,7 +30,7 @@ procSuite "Waku Filter":
await listenSwitch.start()
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
@ -43,7 +43,7 @@ procSuite "Waku Filter":
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
@ -75,7 +75,7 @@ procSuite "Waku Filter":
await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
@ -88,7 +88,7 @@ procSuite "Waku Filter":
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
@ -131,7 +131,7 @@ procSuite "Waku Filter":
await dialSwitch.start()
var responseRequestIdFuture = newFuture[string]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let
@ -161,7 +161,7 @@ procSuite "Waku Filter":
await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
@ -174,7 +174,7 @@ procSuite "Waku Filter":
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds)
@ -225,7 +225,7 @@ procSuite "Waku Filter":
await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
@ -238,7 +238,7 @@ procSuite "Waku Filter":
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} =
discard
let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds)

View File

@ -349,6 +349,65 @@ procSuite "WakuNode":
await node1.stop()
await node2.stop()
asyncTest "Store protocol returns expected message when relay is disabled and filter enabled":
# See nwaku issue #937: 'Store: ability to decouple store from relay'
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
pubSubTopic = "/waku/2/default-waku/proto"
contentTopic = ContentTopic("/waku/2/default-content/proto")
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
filterComplFut = newFuture[bool]()
storeComplFut = newFuture[bool]()
await node1.start()
node1.mountStore(persistMessages = true)
node1.mountFilter()
await node2.start()
node2.mountStore(persistMessages = true)
node2.mountFilter()
node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo())
proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} =
check:
msg == message
filterComplFut.complete(true)
await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler)
await sleepAsync(2000.millis)
# Send filter push message to node2
await node1.wakuFilter.handleMessage(pubSubTopic, message)
await sleepAsync(2000.millis)
# Wait for the node2 filter to receive the push message
check:
(await filterComplFut.withTimeout(5.seconds)) == true
proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len == 1
response.messages[0] == message
storeComplFut.complete(true)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler)
check:
(await storeComplFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Messages are correctly relayed":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]

View File

@ -324,7 +324,7 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler, pubSubTopic: request.pubSubTopic)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
@ -457,12 +457,16 @@ proc info*(node: WakuNode): WakuInfo =
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} =
info "mounting filter"
proc filterHandler(requestId: string, msg: MessagePush)
{.gcsafe, raises: [Defect, KeyError].} =
{.async, gcsafe, raises: [Defect, KeyError].} =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
if not node.wakuStore.isNil and (requestId in node.filters):
let pubSubTopic = node.filters[requestId].pubSubTopic
await node.wakuStore.handleMessage(pubSubTopic, message)
waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)

View File

@ -179,7 +179,7 @@ method init*(wf: WakuFilter) =
let value = res.value
if value.push != MessagePush():
waku_filter_messages.inc(labelValues = ["MessagePush"])
wf.pushHandler(value.requestId, value.push)
await wf.pushHandler(value.requestId, value.push)
if value.request != FilterRequest():
waku_filter_messages.inc(labelValues = ["FilterRequest"])
if value.request.subscribe:

View File

@ -15,6 +15,8 @@ const
MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024
type
PubSubTopic* = string
ContentFilter* = object
contentTopic*: ContentTopic
@ -22,6 +24,7 @@ type
Filter* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: PubSubTopic
handler*: ContentFilterHandler
# @TODO MAYBE MORE INFO?
@ -29,7 +32,7 @@ type
FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: string
pubSubTopic*: PubSubTopic
subscribe*: bool
MessagePush* = object
@ -45,7 +48,7 @@ type
requestId*: string
filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN?
MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.}
MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.}
WakuFilter* = ref object of LPProtocol
rng*: ref BrHmacDrbgContext