Simplify wakunode2 subscriptions model (#662)

This commit is contained in:
Hanno Cornelius 2021-07-13 09:18:51 +02:00 committed by GitHub
parent cf7b8faf27
commit 2869e71865
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 92 additions and 220 deletions

View File

@ -19,7 +19,6 @@ import
filter_api, filter_api,
admin_api, admin_api,
private_api], private_api],
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_store/[waku_store, waku_store_types], ../../waku/v2/protocol/waku_store/[waku_store, waku_store_types],
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
@ -226,8 +225,6 @@ procSuite "Waku v2 JSON-RPC API":
peer = PeerInfo.init(key) peer = PeerInfo.init(key)
node.mountStore(persistMessages = true) node.mountStore(persistMessages = true)
let
subscription = node.wakuStore.subscription()
var listenSwitch = newStandardSwitch(some(key)) var listenSwitch = newStandardSwitch(some(key))
discard waitFor listenSwitch.start() discard waitFor listenSwitch.start()
@ -237,9 +234,6 @@ procSuite "Waku v2 JSON-RPC API":
listenSwitch.mount(node.wakuRelay) listenSwitch.mount(node.wakuRelay)
listenSwitch.mount(node.wakuStore) listenSwitch.mount(node.wakuStore)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions[testCodec] = subscription
# Now prime it with some history before tests # Now prime it with some history before tests
var var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0), msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: 0),
@ -254,7 +248,7 @@ procSuite "Waku v2 JSON-RPC API":
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)] WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: 9)]
for wakuMsg in msgList: for wakuMsg in msgList:
waitFor subscriptions.notify(defaultTopic, wakuMsg) waitFor node.wakuStore.handleMessage(defaultTopic, wakuMsg)
let client = newRpcHttpClient() let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort) await client.connect("127.0.0.1", rpcPort)

View File

@ -9,7 +9,6 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/multistream, libp2p/multistream,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
../test_helpers, ./utils ../test_helpers, ./utils
@ -47,19 +46,15 @@ procSuite "Waku Filter":
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard discard
let let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
subscription = proto2.subscription()
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto2) listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get() let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
await subscriptions.notify(defaultTopic, post) await proto2.handleMessage(defaultTopic, post)
check: check:
(await responseRequestIdFuture) == id (await responseRequestIdFuture) == id
@ -96,19 +91,15 @@ procSuite "Waku Filter":
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard discard
let let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle)
subscription = proto2.subscription()
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto2) listenSwitch.mount(proto2)
let id = (await proto.subscribe(rpc)).get() let id = (await proto.subscribe(rpc)).get()
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
await subscriptions.notify(defaultTopic, post) await proto2.handleMessage(defaultTopic, post)
check: check:
# Check that subscription works as expected # Check that subscription works as expected
@ -124,7 +115,7 @@ procSuite "Waku Filter":
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
await subscriptions.notify(defaultTopic, post) await proto2.handleMessage(defaultTopic, post)
check: check:
# Check that unsubscribe works as expected # Check that unsubscribe works as expected

View File

@ -9,7 +9,6 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/multistream, libp2p/multistream,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/protocol/message_notifier,
../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../test_helpers, ./utils ../test_helpers, ./utils

View File

@ -8,7 +8,7 @@ import
libp2p/stream/[bufferstream, connection], libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/pubsub/rpc/message, libp2p/protocols/pubsub/rpc/message,
../../waku/v2/protocol/[waku_message, message_notifier], ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_manager,
@ -33,18 +33,14 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
await subscriptions.notify("foo", msg) await proto.handleMessage("foo", msg)
await subscriptions.notify("foo", msg2) await proto.handleMessage("foo", msg2)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -77,19 +73,15 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)]) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
await subscriptions.notify("foo", msg1) await proto.handleMessage("foo", msg1)
await subscriptions.notify("foo", msg2) await proto.handleMessage("foo", msg2)
await subscriptions.notify("foo", msg3) await proto.handleMessage("foo", msg3)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -126,21 +118,17 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic" pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic" pubsubtopic2 = "non queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) # this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
# publish messages # publish messages
await subscriptions.notify(pubsubtopic1, msg1) await proto.handleMessage(pubsubtopic1, msg1)
await subscriptions.notify(pubsubtopic2, msg2) await proto.handleMessage(pubsubtopic2, msg2)
await subscriptions.notify(pubsubtopic2, msg3) await proto.handleMessage(pubsubtopic2, msg3)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -174,21 +162,17 @@ procSuite "Waku Store":
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic" pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic" pubsubtopic2 = "non queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic1 # this query targets: pubsubtopic1
rpc = HistoryQuery(pubsubTopic: pubsubTopic1) rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
# publish messages # publish messages
await subscriptions.notify(pubsubtopic2, msg1) await proto.handleMessage(pubsubtopic2, msg1)
await subscriptions.notify(pubsubtopic2, msg2) await proto.handleMessage(pubsubtopic2, msg2)
await subscriptions.notify(pubsubtopic2, msg3) await proto.handleMessage(pubsubtopic2, msg3)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -218,21 +202,17 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic = "queried topic" pubsubtopic = "queried topic"
subscription: MessageNotificationSubscription = proto.subscription()
# this query targets: pubsubtopic # this query targets: pubsubtopic
rpc = HistoryQuery(pubsubTopic: pubsubtopic) rpc = HistoryQuery(pubsubTopic: pubsubtopic)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
# publish messages # publish messages
await subscriptions.notify(pubsubtopic, msg1) await proto.handleMessage(pubsubtopic, msg1)
await subscriptions.notify(pubsubtopic, msg2) await proto.handleMessage(pubsubtopic, msg2)
await subscriptions.notify(pubsubtopic, msg3) await proto.handleMessage(pubsubtopic, msg3)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -267,19 +247,15 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
subscription = proto.subscription()
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)]) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
await subscriptions.notify("foo", msg) await proto.handleMessage("foo", msg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
await subscriptions.notify("foo", msg2) await proto.handleMessage("foo", msg2)
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -341,18 +317,14 @@ procSuite "Waku Store":
let let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) ) rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
for wakuMsg in msgList: for wakuMsg in msgList:
await subscriptions.notify("foo", wakuMsg) await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -392,18 +364,14 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key)) var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start() discard await listenSwitch.start()
let let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]() proto.setPeer(listenSwitch.peerInfo)
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
for wakuMsg in msgList: for wakuMsg in msgList:
await subscriptions.notify("foo", wakuMsg) await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -443,18 +411,14 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key)) var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start() discard await listenSwitch.start()
let let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]() proto.setPeer(listenSwitch.peerInfo)
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
for wakuMsg in msgList: for wakuMsg in msgList:
await subscriptions.notify("foo", wakuMsg) await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()
@ -585,19 +549,15 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key)) var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start() discard await listenSwitch.start()
let let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]() proto.setPeer(listenSwitch.peerInfo)
subscriptions["test"] = subscription
listenSwitch.mount(proto) listenSwitch.mount(proto)
for wakuMsg in msgList: for wakuMsg in msgList:
# the pubsub topic should be DefaultTopic # the pubsub topic should be DefaultTopic
await subscriptions.notify(DefaultTopic, wakuMsg) await proto.handleMessage(DefaultTopic, wakuMsg)
asyncTest "handle temporal history query with a valid time window": asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]() var completionFut = newFuture[bool]()

View File

@ -10,7 +10,7 @@ import
libp2p/crypto/[crypto, secp], libp2p/crypto/[crypto, secp],
libp2p/switch, libp2p/switch,
eth/keys, eth/keys,
../../waku/v2/protocol/[waku_message, message_notifier], ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/node/wakunode2, ../../waku/v2/node/wakunode2,
@ -67,7 +67,7 @@ procSuite "Waku SWAP Accounting":
node2.mountSwap() node2.mountSwap()
node2.mountStore(persistMessages = true) node2.mountStore(persistMessages = true)
await node2.subscriptions.notify("/waku/2/default-waku/proto", message) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -117,7 +117,7 @@ procSuite "Waku SWAP Accounting":
node2.mountSwap(swapConfig) node2.mountSwap(swapConfig)
node2.mountStore(persistMessages = true) node2.mountStore(persistMessages = true)
await node2.subscriptions.notify("/waku/2/default-waku/proto", message) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)

View File

@ -14,7 +14,7 @@ import
eth/keys, eth/keys,
../../waku/v2/node/storage/sqlite, ../../waku/v2/node/storage/sqlite,
../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/[waku_relay, waku_message],
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/protocol/waku_lightpush/waku_lightpush,
@ -279,7 +279,7 @@ procSuite "WakuNode":
await node2.start() await node2.start()
node2.mountStore(persistMessages = true) node2.mountStore(persistMessages = true)
await node2.subscriptions.notify("/waku/2/default-waku/proto", message) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -290,10 +290,8 @@ procSuite "WakuNode":
response.messages[0] == message response.messages[0] == message
completionFut.complete(true) completionFut.complete(true)
await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler) await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), storeHandler)
check: check:
(await completionFut.withTimeout(5.seconds)) == true (await completionFut.withTimeout(5.seconds)) == true
await node1.stop() await node1.stop()
@ -328,7 +326,7 @@ procSuite "WakuNode":
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
await node2.subscriptions.notify("/waku/2/default-waku/proto", message) await node2.wakuFilter.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -762,7 +760,7 @@ procSuite "WakuNode":
await node2.start() await node2.start()
node2.mountStore(persistMessages = true) node2.mountStore(persistMessages = true)
await node2.subscriptions.notify("/waku/2/default-waku/proto", message) await node2.wakuStore.handleMessage("/waku/2/default-waku/proto", message)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)
@ -802,8 +800,8 @@ procSuite "WakuNode":
await node2.start() await node2.start()
node2.mountStore(persistMessages = true) node2.mountStore(persistMessages = true)
await node2.subscriptions.notify(DefaultTopic, msg1) await node2.wakuStore.handleMessage(DefaultTopic, msg1)
await node2.subscriptions.notify(DefaultTopic, msg2) await node2.wakuStore.handleMessage(DefaultTopic, msg2)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)

View File

@ -15,7 +15,7 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
libp2p/builders, libp2p/builders,
../protocol/[waku_relay, waku_message, message_notifier], ../protocol/[waku_relay, waku_message],
../protocol/waku_store/waku_store, ../protocol/waku_store/waku_store,
../protocol/waku_swap/waku_swap, ../protocol/waku_swap/waku_swap,
../protocol/waku_filter/waku_filter, ../protocol/waku_filter/waku_filter,
@ -75,7 +75,6 @@ type
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages*: seq[(Topic, WakuMessage)] messages*: seq[(Topic, WakuMessage)]
filters*: Filters filters*: Filters
subscriptions*: MessageNotificationSubscriptions
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
started*: bool # Indicates that node has started listening started*: bool # Indicates that node has started listening
@ -161,7 +160,6 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
switch: switch, switch: switch,
rng: rng, rng: rng,
peerInfo: peerInfo, peerInfo: peerInfo,
subscriptions: newTable[string, MessageNotificationSubscription](),
filters: initTable[string, Filter]() filters: initTable[string, Filter]()
) )
@ -206,7 +204,13 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
let msg = WakuMessage.init(data) let msg = WakuMessage.init(data)
if msg.isOk(): if msg.isOk():
await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node # Notify mounted protocols of new message
if (not node.wakuFilter.isNil):
await node.wakuFilter.handleMessage(topic, msg.value())
if (not node.wakuStore.isNil):
await node.wakuStore.handleMessage(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"]) waku_node_messages.inc(labelValues = ["relay"])
let wakuRelay = node.wakuRelay let wakuRelay = node.wakuRelay
@ -399,7 +403,6 @@ proc mountFilter*(node: WakuNode) =
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler)
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription())
# NOTE: If using the swap protocol, it must be mounted before store. This is # NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol. # because store is using a reference to the swap protocol.
@ -415,14 +418,12 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: boo
if node.wakuSwap.isNil: if node.wakuSwap.isNil:
debug "mounting store without swap" debug "mounting store without swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store) node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages)
else: else:
debug "mounting store with swap" debug "mounting store with swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap) node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages)
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
if persistMessages:
node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription())
when defined(rln): when defined(rln):
proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(string), ethAccountAddress: Option[Address] = none(Address), membershipContractAddress: Option[Address] = none(Address)) {.async.} = proc mountRlnRelay*(node: WakuNode, ethClientAddress: Option[string] = none(string), ethAccountAddress: Option[Address] = none(Address), membershipContractAddress: Option[Address] = none(Address)) {.async.} =

View File

@ -1,65 +0,0 @@
import
std/tables,
chronos,
waku_message
## The Message Notification system is a method to notify various protocols
## running on a node when a new message was received.
#
## Protocols can subscribe to messages of specific topics, then when one is received
## The notification handler function will be called.
##
## This works as follows:
##
## .. code-block::
## var topic = "foo"
##
## proc handle(topic: string, msg: WakuMessage) {.async.} =
## info "new message", msg = msg
##
## MessageNotificationSubscription.init(@[topic], handle)
##
## var subscriptions = newTable[string, MessageNotificationSubscription]()
## subscriptions["identifier"] = subscription
##
## await subscriptions.notify(topic, WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic(1)))
type
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[
void] {.gcsafe, closure.}
MessageNotificationSubscriptionIdentifier* = string
MessageNotificationSubscription* = object
topics*: seq[string] # @TODO TOPIC
handler*: MessageNotificationHandler
MessageNotificationSubscriptions* = TableRef[MessageNotificationSubscriptionIdentifier, MessageNotificationSubscription]
proc subscribe*(subscriptions: MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) =
subscriptions.add(name, subscription)
proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T =
result = T(
topics: topics,
handler: handler
)
proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
for leftItem in lhs:
if leftItem in rhs:
return true
return false
proc notify*(subscriptions: MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.async, gcsafe.} =
var futures = newSeq[Future[void]]()
for subscription in subscriptions.mvalues:
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
if subscription.topics.len > 0 and topic notin subscription.topics:
continue
futures.add(subscription.handler(topic, msg))
await allFutures(futures)

View File

@ -9,7 +9,6 @@ import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
libp2p/crypto/crypto, libp2p/crypto/crypto,
../message_notifier,
waku_filter_types, waku_filter_types,
../../utils/requests, ../../utils/requests,
../../node/peer_manager/peer_manager ../../node/peer_manager/peer_manager
@ -191,33 +190,30 @@ proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
wf.peerManager.addPeer(peer, WakuFilterCodec) wf.peerManager.addPeer(peer, WakuFilterCodec)
waku_filter_peers.inc() waku_filter_peers.inc()
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions.
proc handle(topic: string, msg: WakuMessage) {.async.} =
trace "handle WakuFilter subscription", topic=topic, msg=msg
for subscriber in proto.subscribers: proc handleMessage*(wf: WakuFilter, topic: string, msg: WakuMessage) {.async.} =
if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: # Handle WakuMessage according to filter protocol
trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic trace "handle message in WakuFilter", topic=topic, msg=msg
continue
for filter in subscriber.filter.contentFilters: for subscriber in wf.subscribers:
if msg.contentTopic == filter.contentTopic: if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic:
trace "Found matching contentTopic", filter=filter, msg=msg trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic
let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) continue
let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
if connOpt.isSome: for filter in subscriber.filter.contentFilters:
await connOpt.get().writeLP(push.encode().buffer) if msg.contentTopic == filter.contentTopic:
else: trace "Found matching contentTopic", filter=filter, msg=msg
# @TODO more sophisticated error handling here let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg]))
error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure]) let connOpt = await wf.peerManager.dialPeer(subscriber.peer, WakuFilterCodec)
break
MessageNotificationSubscription.init(@[], handle) if connOpt.isSome:
await connOpt.get().writeLP(push.encode().buffer)
else:
# @TODO more sophisticated error handling here
error "failed to push messages to remote peer"
waku_filter_errors.inc(labelValues = [dialFailure])
break
proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} = proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[Option[string]] {.async, gcsafe.} =
let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec)

View File

@ -9,7 +9,6 @@ import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
libp2p/crypto/crypto, libp2p/crypto/crypto,
../message_notifier,
waku_lightpush_types, waku_lightpush_types,
../../utils/requests, ../../utils/requests,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,

View File

@ -12,7 +12,6 @@ import
libp2p/protocols/protocol, libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
../message_notifier,
../../node/storage/message/message_store, ../../node/storage/message/message_store,
../waku_swap/waku_swap, ../waku_swap/waku_swap,
./waku_store_types, ./waku_store_types,
@ -397,13 +396,14 @@ proc init*(ws: WakuStore) {.raises: [Defect, Exception]} =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T {.raises: [Defect, Exception]} = store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true): T {.raises: [Defect, Exception]} =
debug "init" debug "init"
new result new result
result.rng = rng result.rng = rng
result.peerManager = peerManager result.peerManager = peerManager
result.store = store result.store = store
result.wakuSwap = wakuSwap result.wakuSwap = wakuSwap
result.persistMessages = persistMessages
result.init() result.init()
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
@ -411,25 +411,24 @@ proc setPeer*(ws: WakuStore, peer: PeerInfo) {.raises: [Defect, Exception]} =
ws.peerManager.addPeer(peer, WakuStoreCodec) ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc() waku_store_peers.inc()
proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
## The filter function returns the pubsub filter for the node. if (not w.persistMessages):
## This is used to pipe messages into the storage, therefore # Store is mounted but new messages should not be stored
## the filter should be used by the component that receives return
## new messages.
proc handle(topic: string, msg: WakuMessage) {.async.} =
debug "subscription handle", topic=topic
let index = msg.computeIndex()
proto.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
waku_store_messages.inc(labelValues = ["stored"])
if proto.store.isNil:
return
let res = proto.store.put(index, msg, topic)
if res.isErr:
warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
result = MessageNotificationSubscription.init(@[], handle) # Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg
let index = msg.computeIndex()
w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
waku_store_messages.inc(labelValues = ["stored"])
if w.store.isNil:
return
let res = w.store.put(index, msg, topic)
if res.isErr:
warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.

View File

@ -70,3 +70,4 @@ type
messages*: seq[IndexedWakuMessage] messages*: seq[IndexedWakuMessage]
store*: MessageStore store*: MessageStore
wakuSwap*: WakuSwap wakuSwap*: WakuSwap
persistMessages*: bool

View File

@ -30,7 +30,6 @@ import
libp2p/protobuf/minprotobuf, libp2p/protobuf/minprotobuf,
libp2p/stream/connection, libp2p/stream/connection,
../../node/peer_manager/peer_manager, ../../node/peer_manager/peer_manager,
../message_notifier,
./waku_swap_types, ./waku_swap_types,
../../waku/v2/protocol/waku_swap/waku_swap_contracts ../../waku/v2/protocol/waku_swap/waku_swap_contracts