diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 648ae11cd..df8da2b95 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -21,7 +21,7 @@ procSuite "Waku Filter": proto = WakuFilter.init() subscription = proto.subscription() - var subscriptions = initTable[string, MessageNotificationSubscription]() + var subscriptions = newTable[string, MessageNotificationSubscription]() subscriptions["test"] = subscription let diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index a7c30ebb2..f2d3a5270 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -21,41 +21,28 @@ procSuite "Waku Store": proto = WakuStore.init() subscription = proto.subscription() - var subscriptions = initTable[string, MessageNotificationSubscription]() + var subscriptions = newTable[string, MessageNotificationSubscription]() subscriptions["test"] = subscription let - peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic") msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2") - subscriptions.notify("foo", msg) - subscriptions.notify("foo", msg2) + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() - let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() - let remotePeerInfo = PeerInfo.init( - remoteSecKey, - [ma], - ["/test/proto1/1.0.0", "/test/proto2/1.0.0"] - ) + var listenSwitch = newStandardSwitch(some(key)) + listenSwitch.mount(proto) + discard await listenSwitch.start() - var serverFut: Future[void] - let msListen = newMultistream() + await subscriptions.notify("foo", msg) + await subscriptions.notify("foo", msg2) - msListen.addHandler(WakuStoreCodec, proto) - proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = - await msListen.handle(conn) - - var transport1 = TcpTransport.init() - serverFut = await transport1.listen(ma, connHandler) - - let msDial = newMultistream() - let transport2: TcpTransport = TcpTransport.init() - let conn = await transport2.dial(transport1.ma) + let conn = await dialSwitch.dial(listenSwitch.peerInfo.peerId, listenSwitch.peerInfo.addrs, WakuStoreCodec) var rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) - discard await msDial.select(conn, WakuStoreCodec) await conn.writeLP(rpc.encode().buffer) var message = await conn.readLp(64*1024) diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index bf93a8a81..8feb99321 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -33,9 +33,9 @@ type payload*: seq[byte] contentTopic*: string - MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.} + MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.} - MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription] + MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription] MessageNotificationSubscription* = object topics*: seq[string] # @TODO TOPIC diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 6d3d62752..9b3d2efe4 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -89,7 +89,12 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, # This gets messy with: .PubSub switch.mount(wakuRelay) - result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay) + result = WakuNode( + switch: switch, + peerInfo: peerInfo, + wakuRelay: wakuRelay, + subscriptions: newTable[string, MessageNotificationSubscription]() + ) for topic in topics: proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -114,7 +119,7 @@ proc start*(node: WakuNode) {.async.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) if msg.isOk(): - node.subscriptions.notify(topic, msg.value()) + await node.subscriptions.notify(topic, msg.value()) await node.wakuRelay.subscribe("waku", relayHandler) diff --git a/waku/protocol/v2/message_notifier.nim b/waku/protocol/v2/message_notifier.nim index 983a08f75..125956391 100644 --- a/waku/protocol/v2/message_notifier.nim +++ b/waku/protocol/v2/message_notifier.nim @@ -1,5 +1,6 @@ import std/tables, + chronos, ./../../node/v2/waku_types # The Message Notification system is a method to notify various protocols @@ -7,7 +8,7 @@ import # # Protocols can subscribe to messages of specific topics, then when one is received # The notification handler function will be called. -proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = +proc subscribe*(subscriptions: MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = subscriptions.add(name, subscription) proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T = @@ -23,10 +24,14 @@ proc containsMatch(lhs: seq[string], rhs: seq[string]): bool = return false -proc notify*(subscriptions: var MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.gcsafe.} = +proc notify*(subscriptions: MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.async, gcsafe.} = + var futures = newSeq[Future[void]]() + for subscription in subscriptions.mvalues: # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES if subscription.topics.len > 0 and topic notin subscription.topics: continue - subscription.handler(topic, msg) + futures.add(subscription.handler(topic, msg)) + + await allFutures(futures) diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index 1fca4b046..9e941ec3b 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -115,14 +115,18 @@ proc init*(T: type WakuFilter): T = proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol ## This filter can then be used to send messages to subscribers that match conditions. - proc handle(topic: string, msg: WakuMessage) = + proc handle(topic: string, msg: WakuMessage) {.async.} = + var futures = newSeq[Future[void]]() + for subscriber in proto.subscribers: if subscriber.filter.topic != topic: continue for filter in subscriber.filter.contentFilter: if msg.contentTopic in filter.topics: - discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer) + futures.add(subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer)) break + await allFutures(futures) + MessageNotificationSubscription.init(@[], handle) diff --git a/waku/protocol/v2/waku_store.nim b/waku/protocol/v2/waku_store.nim index f439a1981..fe131af73 100644 --- a/waku/protocol/v2/waku_store.nim +++ b/waku/protocol/v2/waku_store.nim @@ -97,7 +97,7 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## This is used to pipe messages into the storage, therefore ## the filter should be used by the component that receives ## new messages. - proc handle(topic: string, msg: WakuMessage) = + proc handle(topic: string, msg: WakuMessage) {.async.} = proto.messages.add(msg) MessageNotificationSubscription.init(@[], handle)