fix/message-notifier-async (#169)

* made async

* fixes

* unused

* eol
This commit is contained in:
Dean Eigenmann 2020-09-17 22:10:41 +02:00 committed by GitHub
parent 69e4ff8991
commit 6b866490ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 36 additions and 35 deletions

View File

@ -21,7 +21,7 @@ procSuite "Waku Filter":
proto = WakuFilter.init() proto = WakuFilter.init()
subscription = proto.subscription() subscription = proto.subscription()
var subscriptions = initTable[string, MessageNotificationSubscription]() var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription subscriptions["test"] = subscription
let let

View File

@ -21,41 +21,28 @@ procSuite "Waku Store":
proto = WakuStore.init() proto = WakuStore.init()
subscription = proto.subscription() subscription = proto.subscription()
var subscriptions = initTable[string, MessageNotificationSubscription]() var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription subscriptions["test"] = subscription
let let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic") msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic")
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2") msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: "topic2")
subscriptions.notify("foo", msg) var dialSwitch = newStandardSwitch()
subscriptions.notify("foo", msg2) discard await dialSwitch.start()
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var listenSwitch = newStandardSwitch(some(key))
let remoteSecKey = PrivateKey.random(ECDSA, rng[]).get() listenSwitch.mount(proto)
let remotePeerInfo = PeerInfo.init( discard await listenSwitch.start()
remoteSecKey,
[ma],
["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
)
var serverFut: Future[void] await subscriptions.notify("foo", msg)
let msListen = newMultistream() await subscriptions.notify("foo", msg2)
msListen.addHandler(WakuStoreCodec, proto) let conn = await dialSwitch.dial(listenSwitch.peerInfo.peerId, listenSwitch.peerInfo.addrs, WakuStoreCodec)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
var transport1 = TcpTransport.init()
serverFut = await transport1.listen(ma, connHandler)
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
var rpc = HistoryQuery(uuid: "1234", topics: @["topic"]) var rpc = HistoryQuery(uuid: "1234", topics: @["topic"])
discard await msDial.select(conn, WakuStoreCodec)
await conn.writeLP(rpc.encode().buffer) await conn.writeLP(rpc.encode().buffer)
var message = await conn.readLp(64*1024) var message = await conn.readLp(64*1024)

View File

@ -33,9 +33,9 @@ type
payload*: seq[byte] payload*: seq[byte]
contentTopic*: string contentTopic*: string
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.} MessageNotificationHandler* = proc(topic: string, msg: WakuMessage): Future[void] {.gcsafe, closure.}
MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription] MessageNotificationSubscriptions* = TableRef[string, MessageNotificationSubscription]
MessageNotificationSubscription* = object MessageNotificationSubscription* = object
topics*: seq[string] # @TODO TOPIC topics*: seq[string] # @TODO TOPIC

View File

@ -89,7 +89,12 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
# This gets messy with: .PubSub # This gets messy with: .PubSub
switch.mount(wakuRelay) switch.mount(wakuRelay)
result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay) result = WakuNode(
switch: switch,
peerInfo: peerInfo,
wakuRelay: wakuRelay,
subscriptions: newTable[string, MessageNotificationSubscription]()
)
for topic in topics: for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
@ -114,7 +119,7 @@ proc start*(node: WakuNode) {.async.} =
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data) let msg = WakuMessage.init(data)
if msg.isOk(): if msg.isOk():
node.subscriptions.notify(topic, msg.value()) await node.subscriptions.notify(topic, msg.value())
await node.wakuRelay.subscribe("waku", relayHandler) await node.wakuRelay.subscribe("waku", relayHandler)

View File

@ -1,5 +1,6 @@
import import
std/tables, std/tables,
chronos,
./../../node/v2/waku_types ./../../node/v2/waku_types
# The Message Notification system is a method to notify various protocols # The Message Notification system is a method to notify various protocols
@ -7,7 +8,7 @@ import
# #
# Protocols can subscribe to messages of specific topics, then when one is received # Protocols can subscribe to messages of specific topics, then when one is received
# The notification handler function will be called. # The notification handler function will be called.
proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = proc subscribe*(subscriptions: MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) =
subscriptions.add(name, subscription) subscriptions.add(name, subscription)
proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T = proc init*(T: type MessageNotificationSubscription, topics: seq[string], handler: MessageNotificationHandler): T =
@ -23,10 +24,14 @@ proc containsMatch(lhs: seq[string], rhs: seq[string]): bool =
return false return false
proc notify*(subscriptions: var MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.gcsafe.} = proc notify*(subscriptions: MessageNotificationSubscriptions, topic: string, msg: WakuMessage) {.async, gcsafe.} =
var futures = newSeq[Future[void]]()
for subscription in subscriptions.mvalues: for subscription in subscriptions.mvalues:
# @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES # @TODO WILL NEED TO CHECK SUBTOPICS IN FUTURE FOR WAKU TOPICS NOT LIBP2P ONES
if subscription.topics.len > 0 and topic notin subscription.topics: if subscription.topics.len > 0 and topic notin subscription.topics:
continue continue
subscription.handler(topic, msg) futures.add(subscription.handler(topic, msg))
await allFutures(futures)

View File

@ -115,14 +115,18 @@ proc init*(T: type WakuFilter): T =
proc subscription*(proto: WakuFilter): MessageNotificationSubscription = proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol ## Returns a Filter for the specific protocol
## This filter can then be used to send messages to subscribers that match conditions. ## This filter can then be used to send messages to subscribers that match conditions.
proc handle(topic: string, msg: WakuMessage) = proc handle(topic: string, msg: WakuMessage) {.async.} =
var futures = newSeq[Future[void]]()
for subscriber in proto.subscribers: for subscriber in proto.subscribers:
if subscriber.filter.topic != topic: if subscriber.filter.topic != topic:
continue continue
for filter in subscriber.filter.contentFilter: for filter in subscriber.filter.contentFilter:
if msg.contentTopic in filter.topics: if msg.contentTopic in filter.topics:
discard subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer) futures.add(subscriber.connection.writeLp(MessagePush(messages: @[msg]).encode().buffer))
break break
await allFutures(futures)
MessageNotificationSubscription.init(@[], handle) MessageNotificationSubscription.init(@[], handle)

View File

@ -97,7 +97,7 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
## This is used to pipe messages into the storage, therefore ## This is used to pipe messages into the storage, therefore
## the filter should be used by the component that receives ## the filter should be used by the component that receives
## new messages. ## new messages.
proc handle(topic: string, msg: WakuMessage) = proc handle(topic: string, msg: WakuMessage) {.async.} =
proto.messages.add(msg) proto.messages.add(msg)
MessageNotificationSubscription.init(@[], handle) MessageNotificationSubscription.init(@[], handle)