chore(node): waku node code reorganization

This commit is contained in:
Lorenzo Delgado 2022-10-27 17:29:09 +02:00 committed by GitHub
parent bccb73c325
commit 0725da0b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 288 additions and 275 deletions

View File

@ -67,7 +67,7 @@ type Chat = ref object
type
PrivateKey* = crypto.PrivateKey
Topic* = waku_node.Topic
Topic* = waku_node.PubsubTopic
#####################
## chat2 protobufs ##

View File

@ -51,7 +51,7 @@ type
WakuBridge* = ref object of RootObj
nodev1*: EthereumNode
nodev2*: WakuNode
nodev2PubsubTopic: waku_node.Topic # Pubsub topic to bridge to/from
nodev2PubsubTopic: waku_node.PubsubTopic # Pubsub topic to bridge to/from
seen: seq[hashes.Hash] # FIFO queue of seen WakuMessages. Used for deduplication.
rng: ref HmacDrbgContext
v1Pool: seq[Node] # Pool of v1 nodes for possible connections
@ -228,7 +228,7 @@ proc new*(T: type WakuBridge,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
nameResolver: NameResolver = nil,
# Bridge configuration
nodev2PubsubTopic: waku_node.Topic,
nodev2PubsubTopic: waku_node.PubsubTopic,
v1Pool: seq[Node] = @[],
targetV1Peers = 0): T
{.raises: [Defect,IOError, TLSStreamProtocolError, LPError].} =

View File

@ -29,8 +29,8 @@ proc runBackground() {.async.} =
await node.mountRelay()
# Subscribe to a topic
let topic = cast[Topic]("foobar")
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
let topic = cast[PubsubTopic]("foobar")
proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic

View File

@ -56,8 +56,8 @@ const WakuFilterTimeout: Duration = 1.days
# key and crypto modules different
type
# XXX: Weird type, should probably be using pubsub Topic object name?
Topic* = string
# XXX: Weird type, should probably be using pubsub PubsubTopic object name?
PubsubTopic* = string
Message* = seq[byte]
WakuInfo* = object
@ -67,7 +67,7 @@ type
#multiaddrStrings*: seq[string]
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
WakuNode* = ref object
peerManager*: PeerManager
switch*: Switch
wakuRelay*: WakuRelay
@ -128,27 +128,25 @@ template wsFlag(wssEnabled: bool): MultiAddress =
if wssEnabled: MultiAddress.init("/wss").tryGet()
else: MultiAddress.init("/ws").tryGet()
proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress, bindPort: Port,
extIp = none(ValidIpAddress), extPort = none(Port),
peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections,
wsBindPort: Port = (Port)8000,
wsEnabled: bool = false,
wssEnabled: bool = false,
secureKey: string = "",
secureCert: string = "",
wakuFlags = none(WakuEnrBitfield),
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port)
): T
{.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node.
##
## Status: Implemented.
##
proc new*(T: type WakuNode,
nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress,
bindPort: Port,
extIp = none(ValidIpAddress),
extPort = none(Port),
peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections,
wsBindPort: Port = (Port)8000,
wsEnabled: bool = false,
wssEnabled: bool = false,
secureKey: string = "",
secureCert: string = "",
wakuFlags = none(WakuEnrBitfield),
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port)): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
## Creates a Waku Node instance.
## Initialize addresses
let
@ -176,14 +174,14 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
wsExtAddress = some(ip4TcpEndPoint(extIp.get(), wsBindPort) & wsFlag(wssEnabled))
var announcedAddresses: seq[MultiAddress]
if hostExtAddress.isSome:
if hostExtAddress.isSome():
announcedAddresses.add(hostExtAddress.get())
else:
announcedAddresses.add(hostAddress) # We always have at least a bind address for the host
if wsExtAddress.isSome:
if wsExtAddress.isSome():
announcedAddresses.add(wsExtAddress.get())
elif wsHostAddress.isSome:
elif wsHostAddress.isSome():
announcedAddresses.add(wsHostAddress.get())
## Initialize peer
@ -193,8 +191,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
else: some(bindIp)
enrTcpPort = if extPort.isSome(): extPort
else: some(bindPort)
enrMultiaddrs = if wsExtAddress.isSome: @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field
elif wsHostAddress.isSome: @[wsHostAddress.get()]
enrMultiaddrs = if wsExtAddress.isSome(): @[wsExtAddress.get()] # Only add ws/wss to `multiaddrs` field
elif wsHostAddress.isSome(): @[wsHostAddress.get()]
else: @[]
enr = initEnr(nodeKey,
enrIp,
@ -205,7 +203,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
info "Initializing networking", addrs=announcedAddresses
var switch = newWakuSwitch(some(nodekey),
let switch = newWakuSwitch(
some(nodekey),
hostAddress,
wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr},
@ -215,7 +214,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
secureKeyPath = secureKey,
secureCertPath = secureCert,
nameResolver = nameResolver,
sendSignedPeerRecord = sendSignedPeerRecord)
sendSignedPeerRecord = sendSignedPeerRecord
)
let wakuNode = WakuNode(
peerManager: PeerManager.new(switch, peerStorage),
@ -228,13 +228,36 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
return wakuNode
proc peerInfo*(node: WakuNode): PeerInfo =
node.switch.peerInfo
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
if node.wakuRelay.isNil:
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
let peerInfo = node.switch.peerInfo
var listenStr : seq[string]
for address in node.announcedAddresses:
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
listenStr &= fulladdr
let enrUri = node.enr.toUri()
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
return wakuInfo
proc connectToNodes*(node: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE This is dialing on WakuRelay protocol specifically
await connectToNodes(node.peerManager, nodes, WakuRelayCodec, source)
## Waku relay
proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]) =
if node.wakuRelay.isNil():
error "Invalid API call to `subscribe`. WakuRelay not mounted."
# @TODO improved error handling
# TODO: improved error handling
return
info "subscribe", topic=topic
@ -266,55 +289,22 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
debug "Registering default handler", topic=topic
wakuRelay.subscribe(topic, defaultHandler)
if handler.isSome:
if handler.isSome():
debug "Registering handler", topic=topic
wakuRelay.subscribe(topic, handler.get())
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
node.subscribe(topic, some(handler))
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
##
## Status: Implemented.
# Sanity check for well-formed subscribe FilterRequest
doAssert(request.subscribe, "invalid subscribe request")
info "subscribe content", filter=request
var id = generateRequestId(node.rng)
if node.wakuFilter.isNil == false:
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
if resSubscription.isOk():
id = resSubscription.get()
else:
# Failed to subscribe
error "remote subscription to filter failed", filter = request
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
proc unsubscribe*(node: WakuNode, topic: PubsubTopic, handler: TopicHandler) =
## Unsubscribes a handler from a PubSub topic.
##
## Status: Implemented.
if node.wakuRelay.isNil:
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribe`. WakuRelay not mounted."
# @TODO improved error handling
# TODO: improved error handling
return
info "unsubscribe", topic=topic
@ -322,14 +312,12 @@ proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) =
let wakuRelay = node.wakuRelay
wakuRelay.unsubscribe(@[(topic, handler)])
proc unsubscribeAll*(node: WakuNode, topic: Topic) =
proc unsubscribeAll*(node: WakuNode, topic: PubsubTopic) =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
if node.wakuRelay.isNil:
if node.wakuRelay.isNil():
error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted."
# @TODO improved error handling
# TODO: improved error handling
return
info "unsubscribeAll", topic=topic
@ -337,27 +325,7 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) =
let wakuRelay = node.wakuRelay
wakuRelay.unsubscribeAll(topic)
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
## Unsubscribe from a content filter.
##
## Status: Implemented.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert(request.subscribe == false, "invalid unsubscribe request")
info "unsubscribe content", filter=request
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
node.filters.removeContentFilters(request.contentFilters)
waku_node_filters.set(node.filters.len.int64)
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
@ -372,66 +340,79 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsaf
let data = message.encode().buffer
discard await node.wakuRelay.publish(topic, data)
proc lightpush*(node: WakuNode, topic: Topic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
let rpc = PushRequest(pubSubTopic: topic, message: message)
return await node.wakuLightPush.request(rpc)
proc lightpush2*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
discard await node.lightpush(topic, message)
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
# TODO: Once waku swap is less experimental, this can simplified
if node.wakuSwap.isNil:
debug "Using default query"
return await node.wakuStore.query(query)
else:
debug "Using SWAP accounting query"
# TODO: wakuSwap now part of wakuStore object
return await node.wakuStore.queryWithAccounting(query)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStore.isNil():
proc startRelay*(node: WakuNode) {.async.} =
if node.wakuRelay.isNil():
trace "Failed to start relay. Not mounted."
return
let retrievedMessages = await node.wakuStore.resume(peerList)
if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error
return
## Setup and start relay protocol
info "starting relay"
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
# PubsubTopic subscriptions
for topic in node.wakuRelay.defaultTopics:
node.subscribe(topic, none(TopicHandler))
# TODO: Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
##
## Status: Implemented.
##
# Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
let peerInfo = node.switch.peerInfo
await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec),
backoffPeriod)
var listenStr : seq[string]
for address in node.announcedAddresses:
var fulladdr = $address & "/p2p/" & $peerInfo.peerId
listenStr &= fulladdr
let enrUri = node.enr.toUri()
let wakuInfo = WakuInfo(listenAddresses: listenStr, enrUri: enrUri)
return wakuInfo
# Start the WakuRelay protocol
await node.wakuRelay.start()
info "relay started successfully"
proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
triggerSelf = true,
peerExchangeHandler = none(RoutingRecordsHandler))
# TODO: Better error handling: CatchableError is raised by `waitFor`
{.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] =
let mh = MultiHash.digest("sha2-256", m.data)
if mh.isOk():
return ok(mh[].data.buffer)
else:
return ok(($m.data.hash).toBytes())
let wakuRelay = WakuRelay.init(
switch = node.switch,
msgIdProvider = msgIdProvider,
triggerSelf = triggerSelf,
sign = false,
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
)
info "mounting relay"
## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
## Add peer exchange handler
if peerExchangeHandler.isSome():
wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
node.wakuRelay = wakuRelay
if node.started:
# Node has started already. Let's start relay too.
await node.startRelay()
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
info "relay mounted successfully"
## Waku filter
proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} =
info "mounting filter"
@ -454,6 +435,63 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuFilter.isNil():
error "could not set peer, waku filter is nil"
return
info "Set filter peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuFilter.setPeer(remotePeer)
proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} =
## Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
## FilterHandler is a method that takes a MessagePush.
# Sanity check for well-formed subscribe FilterRequest
doAssert(request.subscribe, "invalid subscribe request")
info "subscribe content", filter=request
var id = generateRequestId(node.rng)
if not node.wakuFilter.isNil():
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics)
if resSubscription.isOk():
id = resSubscription.get()
else:
# Failed to subscribe
error "remote subscription to filter failed", filter = request
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
# Register handler for filter, whether remote subscription succeeded or not
node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
## Unsubscribe from a content filter.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert(request.subscribe == false, "invalid unsubscribe request")
info "unsubscribe content", filter=request
let
pubsubTopic = request.pubsubTopic
contentTopics = request.contentFilters.mapIt(it.contentTopic)
discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics)
node.filters.removeContentFilters(request.contentFilters)
waku_node_filters.set(node.filters.len.int64)
## Waku swap
# NOTE: If using the swap protocol, it must be mounted before store. This is
# because store is using a reference to the swap protocol.
@ -468,6 +506,8 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as
node.switch.mount(node.wakuSwap, protocolMatcher(WakuSwapCodec))
## Waku store
const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes
proc executeMessageRetentionPolicy*(node: WakuNode) =
@ -517,77 +557,50 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
proc startRelay*(node: WakuNode) {.async.} =
if node.wakuRelay.isNil:
trace "Failed to start relay. Not mounted."
proc setStorePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuStore.isNil():
error "could not set peer, waku store is nil"
return
## Setup and start relay protocol
info "starting relay"
info "Set store peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuStore.setPeer(remotePeer)
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
# TODO: Once waku swap is less experimental, this can simplified
if node.wakuSwap.isNil():
debug "Using default query"
return await node.wakuStore.query(query)
else:
debug "Using SWAP accounting query"
# TODO: wakuSwap now part of wakuStore object
return await node.wakuStore.queryWithAccounting(query)
proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo])) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node.wakuStore.isNil():
return
let retrievedMessages = await node.wakuStore.resume(peerList)
if retrievedMessages.isErr():
error "failed to resume store", error=retrievedMessages.error
return
# Topic subscriptions
for topic in node.wakuRelay.defaultTopics:
node.subscribe(topic, none(TopicHandler))
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
# Resume previous relay connections
if node.peerManager.hasPeers(protocolMatcher(WakuRelayCodec)):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec,
protocolMatcher(WakuRelayCodec),
backoffPeriod)
# Start the WakuRelay protocol
await node.wakuRelay.start()
info "relay started successfully"
proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
triggerSelf = true,
peerExchangeHandler = none(RoutingRecordsHandler))
# @TODO: Better error handling: CatchableError is raised by `waitFor`
{.async, gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
proc msgIdProvider(m: messages.Message): Result[MessageID, ValidationResult] =
let mh = MultiHash.digest("sha2-256", m.data)
if mh.isOk():
return ok(mh[].data.buffer)
else:
return ok(($m.data.hash).toBytes())
let wakuRelay = WakuRelay.init(
switch = node.switch,
msgIdProvider = msgIdProvider,
triggerSelf = triggerSelf,
sign = false,
verifySignature = false,
maxMessageSize = MaxWakuMessageSize
)
info "mounting relay"
## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
## Add peer exchange handler
if peerExchangeHandler.isSome():
wakuRelay.parameters.enablePX = true # Feature flag for peer exchange in nim-libp2p
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
node.wakuRelay = wakuRelay
if node.started:
# Node has started already. Let's start relay too.
await node.startRelay()
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
info "relay mounted successfully"
## Waku lightpush
proc mountLightPush*(node: WakuNode) {.async.} =
info "mounting light push"
@ -612,6 +625,33 @@ proc mountLightPush*(node: WakuNode) {.async.} =
node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec))
proc setLightPushPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuLightPush.isNil():
error "could not set peer, waku lightpush is nil"
return
info "Set lightpush peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuLightPush.setPeer(remotePeer)
proc lightpush*(node: WakuNode, topic: PubsubTopic, message: WakuMessage): Future[WakuLightpushResult[PushResponse]] {.async, gcsafe.} =
## Pushes a `WakuMessage` to a node which relays it further on PubSub topic.
## Returns whether relaying was successful or not.
## `WakuMessage` should contain a `contentTopic` field for light node
## functionality.
debug "Publishing with lightpush", topic=topic, contentTopic=message.contentTopic
let rpc = PushRequest(pubSubTopic: topic, message: message)
return await node.wakuLightPush.request(rpc)
proc lightpush2*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
discard await node.lightpush(topic, message)
## Waku peer-exchange
proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku peer exchange"
@ -626,6 +666,20 @@ proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].}
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuPeerExchange.isNil():
error "could not set peer, waku peer-exchange is nil"
return
info "Set peer-exchange peer", peer=peer
let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuPeerExchange.setPeer(remotePeer)
## Other protocols
proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting libp2p ping protocol"
@ -656,8 +710,8 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
for peer in peers:
let connOpt = await node.peerManager.dialPeer(peer, PingCodec)
if connOpt.isNone:
# @TODO more sophisticated error handling here
if connOpt.isNone():
# TODO: more sophisticated error handling here
debug "failed to connect to remote peer", peer=peer
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
return
@ -673,46 +727,6 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive)
proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) =
n.wakuStore.setPeer(peer)
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set store peer", address = address
let peer = parseRemotePeerInfo(address)
n.setStorePeer(peer)
proc setFilterPeer*(n: WakuNode, peer: RemotePeerInfo) =
n.wakuFilter.setPeer(peer)
proc setFilterPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set filter peer", address = address
let peer = parseRemotePeerInfo(address)
n.setFilterPeer(peer)
proc setLightPushPeer*(n: WakuNode, peer: RemotePeerInfo) =
n.wakuLightPush.setPeer(peer)
proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set lightpush peer", address = address
let peer = parseRemotePeerInfo(address)
n.wakuLightPush.setPeer(peer)
proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set peer exchange peer", address = address
let remotePeer = parseRemotePeerInfo(address)
n.wakuPeerExchange.setPeer(remotePeer)
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
# NOTE This is dialing on WakuRelay protocol specifically
await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source)
proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes
## using Node Discovery v5
@ -725,9 +739,9 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
while node.wakuDiscv5.listening:
trace "Running discovery loop"
## Query for a random target and collect all discovered nodes
## @TODO: we could filter nodes here
## TODO: we could filter nodes here
let discoveredPeers = await node.wakuDiscv5.findRandomPeers()
if discoveredPeers.isOk:
if discoveredPeers.isOk():
## Let's attempt to connect to peers we
## have not encountered before
@ -738,7 +752,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await connectToNodes(node, newPeers, "discv5")
await node.connectToNodes(newPeers, "discv5")
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
@ -751,7 +765,7 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
info "Starting discovery v5 service"
if not node.wakuDiscv5.isNil:
if not node.wakuDiscv5.isNil():
## First start listening on configured port
try:
trace "Start listening on discv5 port"
@ -776,7 +790,7 @@ proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Stop Discovery v5 service
if not node.wakuDiscv5.isNil:
if not node.wakuDiscv5.isNil():
info "Stopping discovery v5 service"
## Stop Discovery v5 process and close listening port
@ -786,11 +800,10 @@ proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
debug "Successfully stopped discovery v5 service"
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
##
## Status: Implemented.
waku_version.set(1, labelValues=[git_version])
@ -810,7 +823,7 @@ proc start*(node: WakuNode) {.async.} =
info "DNS: discoverable ENR ", enr = node.enr.toUri()
# Perform relay-specific startup tasks TODO: this should be rethought
if not node.wakuRelay.isNil:
if not node.wakuRelay.isNil():
await node.startRelay()
## Update switch peer info with announced addrs
@ -821,10 +834,10 @@ proc start*(node: WakuNode) {.async.} =
info "Node started successfully"
proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil:
if not node.wakuRelay.isNil():
await node.wakuRelay.stop()
if not node.wakuDiscv5.isNil:
if not node.wakuDiscv5.isNil():
discard await node.stopDiscv5()
await node.switch.stop()