feature/subscriptions-in-node (#141)

* test

* Update wakunode2.nim

* input from @kdeme

* Update wakunode2.nim

* moved

* Update waku_message.nim

* fixed

* fix

* fix

* fix

* fix

* fix
This commit is contained in:
Dean Eigenmann 2020-09-17 04:17:52 +02:00 committed by GitHub
parent a8dbf8a7b6
commit c55635e93e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 15 deletions

View File

@ -29,6 +29,18 @@ type
Filters* = Table[string, Filter] Filters* = Table[string, Filter]
WakuMessage* = object
payload*: seq[byte]
contentTopic*: string
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.}
MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription]
MessageNotificationSubscription* = object
topics*: seq[string] # @TODO TOPIC
handler*: MessageNotificationHandler
# NOTE based on Eth2Node in NBC eth2_network.nim # NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj WakuNode* = ref object of RootObj
switch*: Switch switch*: Switch
@ -38,10 +50,7 @@ type
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages*: seq[(Topic, WakuMessage)] messages*: seq[(Topic, WakuMessage)]
filters*: Filters filters*: Filters
subscriptions*: MessageNotificationSubscriptions
WakuMessage* = object
payload*: seq[byte]
contentTopic*: string
WakuRelay* = ref object of GossipSub WakuRelay* = ref object of GossipSub
gossipEnabled*: bool gossipEnabled*: bool

View File

@ -10,7 +10,7 @@ import
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsub,
libp2p/peerinfo, libp2p/peerinfo,
libp2p/standard_setup, libp2p/standard_setup,
../../protocol/v2/[waku_relay, waku_store, waku_filter], ../../protocol/v2/[waku_relay, waku_store, waku_filter, message_notifier],
./waku_types ./waku_types
logScope: logScope:
@ -105,9 +105,18 @@ proc start*(node: WakuNode) {.async.} =
# NOTE WakuRelay is being instantiated as part of initing node # NOTE WakuRelay is being instantiated as part of initing node
let storeProto = WakuStore.init() let storeProto = WakuStore.init()
node.switch.mount(storeProto) node.switch.mount(storeProto)
node.subscriptions.subscribe(WakuStoreCodec, storeProto.subscription())
let filterProto = WakuFilter.init() let filterProto = WakuFilter.init()
node.switch.mount(filterProto) node.switch.mount(filterProto)
node.subscriptions.subscribe(WakuFilterCodec, filterProto.subscription())
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
node.subscriptions.notify(topic, msg.value())
await node.wakuRelay.subscribe("waku", relayHandler)
# TODO Get this from WakuNode obj # TODO Get this from WakuNode obj
let peerInfo = node.peerInfo let peerInfo = node.peerInfo

View File

@ -7,16 +7,6 @@ import
# #
# Protocols can subscribe to messages of specific topics, then when one is received # Protocols can subscribe to messages of specific topics, then when one is received
# The notification handler function will be called. # The notification handler function will be called.
type
MessageNotificationHandler* = proc(topic: string, msg: WakuMessage) {.gcsafe, closure.}
MessageNotificationSubscription* = object
topics: seq[string] # @TODO TOPIC
handler: MessageNotificationHandler
MessageNotificationSubscriptions* = Table[string, MessageNotificationSubscription]
proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) = proc subscribe*(subscriptions: var MessageNotificationSubscriptions, name: string, subscription: MessageNotificationSubscription) =
subscriptions.add(name, subscription) subscriptions.add(name, subscription)