2020-04-29 04:49:27 +00:00
|
|
|
import
|
2020-07-29 13:24:01 +00:00
|
|
|
confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, sequtils,
|
2020-04-29 04:49:27 +00:00
|
|
|
chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird.
|
2020-05-27 04:07:11 +00:00
|
|
|
eth/[keys, p2p], eth/net/nat,
|
|
|
|
eth/p2p/[discovery, enode],
|
2020-05-15 04:11:14 +00:00
|
|
|
libp2p/multiaddress,
|
|
|
|
libp2p/crypto/crypto,
|
|
|
|
libp2p/protocols/protocol,
|
2020-07-28 08:17:50 +00:00
|
|
|
# NOTE For TopicHandler, solve with exports?
|
|
|
|
libp2p/protocols/pubsub/pubsub,
|
2020-05-15 04:11:14 +00:00
|
|
|
libp2p/peerinfo,
|
2020-07-24 01:39:58 +00:00
|
|
|
stew/shims/net as stewNet,
|
2020-05-18 05:28:54 +00:00
|
|
|
rpc/wakurpc,
|
2020-07-28 11:00:34 +00:00
|
|
|
standard_setup,
|
2020-08-26 11:28:24 +00:00
|
|
|
../../protocol/v2/waku_relay,
|
2020-05-18 06:03:15 +00:00
|
|
|
# TODO: Pull out standard switch from tests
|
2020-05-22 06:12:05 +00:00
|
|
|
waku_types
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
# key and crypto modules different
|
|
|
|
type
|
|
|
|
KeyPair* = crypto.KeyPair
|
|
|
|
PublicKey* = crypto.PublicKey
|
|
|
|
PrivateKey* = crypto.PrivateKey
|
|
|
|
|
2020-07-29 13:24:01 +00:00
|
|
|
Topic* = string
|
|
|
|
Message* = seq[byte]
|
|
|
|
ContentFilter* = object
|
|
|
|
contentTopic*: string
|
|
|
|
|
|
|
|
HistoryQuery* = object
|
|
|
|
topics*: seq[string]
|
|
|
|
|
|
|
|
HistoryResponse* = object
|
|
|
|
messages*: seq[Message]
|
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# NOTE: based on Eth2Node in NBC eth2_network.nim
|
|
|
|
WakuNode* = ref object of RootObj
|
|
|
|
switch*: Switch
|
|
|
|
# XXX: Unclear if we need this
|
|
|
|
peerInfo*: PeerInfo
|
|
|
|
libp2pTransportLoops*: seq[Future[void]]
|
2020-07-29 13:24:01 +00:00
|
|
|
messages: seq[(Topic, Message)]
|
2020-04-29 04:49:27 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
const clientId = "Nimbus waku node"
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
proc setBootNodes(nodes: openArray[string]): seq[ENode] =
|
|
|
|
result = newSeqOfCap[ENode](nodes.len)
|
|
|
|
for nodeId in nodes:
|
|
|
|
# TODO: something more user friendly than an expect
|
|
|
|
result.add(ENode.fromString(nodeId).expect("correct node"))
|
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# NOTE Any difference here in Waku vs Eth2?
|
|
|
|
# E.g. Devp2p/Libp2p support, etc.
|
|
|
|
#func asLibp2pKey*(key: keys.PublicKey): PublicKey =
|
|
|
|
# PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
|
|
|
|
|
|
|
|
func asEthKey*(key: PrivateKey): keys.PrivateKey =
|
|
|
|
keys.PrivateKey(key.skkey)
|
|
|
|
|
2020-05-01 04:24:34 +00:00
|
|
|
proc initAddress(T: type MultiAddress, str: string): T =
|
2020-06-01 03:15:37 +00:00
|
|
|
let address = MultiAddress.init(str).tryGet()
|
2020-05-01 04:24:34 +00:00
|
|
|
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
|
|
|
result = address
|
|
|
|
else:
|
2020-06-01 03:15:37 +00:00
|
|
|
raise newException(ValueError,
|
2020-05-01 04:24:34 +00:00
|
|
|
"Invalid bootstrap node multi-address")
|
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
template tcpEndPoint(address, port): auto =
|
|
|
|
MultiAddress.init(address, tcpProtocol, port)
|
|
|
|
|
2020-08-26 11:28:24 +00:00
|
|
|
proc dialPeer(p: WakuRelayProto, address: string) {.async.} =
|
2020-05-18 05:07:36 +00:00
|
|
|
info "dialPeer", address = address
|
|
|
|
# XXX: This turns ipfs into p2p, not quite sure why
|
2020-05-01 04:24:34 +00:00
|
|
|
let multiAddr = MultiAddress.initAddress(address)
|
2020-05-18 05:07:36 +00:00
|
|
|
info "multiAddr", ma = multiAddr
|
2020-05-01 04:24:34 +00:00
|
|
|
let parts = address.split("/")
|
|
|
|
let remotePeer = PeerInfo.init(parts[^1], [multiAddr])
|
|
|
|
|
|
|
|
info "Dialing peer", multiAddr
|
2020-08-26 11:28:24 +00:00
|
|
|
p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
2020-05-18 06:03:15 +00:00
|
|
|
info "Post switch dial"
|
2020-05-01 04:24:34 +00:00
|
|
|
# Isn't there just one p instance? Why connected here?
|
|
|
|
p.connected = true
|
|
|
|
|
2020-08-26 11:28:24 +00:00
|
|
|
proc connectToNodes(p: WakuRelayProto, nodes: openArray[string]) =
|
2020-05-28 03:40:41 +00:00
|
|
|
for nodeId in nodes:
|
|
|
|
info "connectToNodes", node = nodeId
|
|
|
|
# XXX: This seems...brittle
|
|
|
|
discard dialPeer(p, nodeId)
|
|
|
|
# Waku 1
|
|
|
|
# let whisperENode = ENode.fromString(nodeId).expect("correct node")
|
|
|
|
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
|
2020-04-29 04:49:27 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# NOTE Identical with eth2_network, pull out into common?
|
|
|
|
# NOTE Except portsShift
|
|
|
|
proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress],
|
2020-04-29 04:49:27 +00:00
|
|
|
tcpPort: Port,
|
2020-07-20 04:40:35 +00:00
|
|
|
udpPort: Port] {.gcsafe.} =
|
2020-04-29 04:49:27 +00:00
|
|
|
# defaults
|
2020-07-24 01:39:58 +00:00
|
|
|
result.tcpPort = Port(uint16(conf.tcpPort) + conf.portsShift)
|
|
|
|
result.udpPort = Port(uint16(conf.udpPort) + conf.portsShift)
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
var nat: NatStrategy
|
2020-07-24 01:39:58 +00:00
|
|
|
case conf.nat.toLowerAscii:
|
2020-04-29 04:49:27 +00:00
|
|
|
of "any":
|
|
|
|
nat = NatAny
|
|
|
|
of "none":
|
|
|
|
nat = NatNone
|
|
|
|
of "upnp":
|
|
|
|
nat = NatUpnp
|
|
|
|
of "pmp":
|
|
|
|
nat = NatPmp
|
|
|
|
else:
|
2020-07-24 01:39:58 +00:00
|
|
|
if conf.nat.startsWith("extip:"):
|
|
|
|
try:
|
|
|
|
# any required port redirection is assumed to be done by hand
|
|
|
|
result.ip = some(ValidIpAddress.init(conf.nat[6..^1]))
|
|
|
|
nat = NatNone
|
|
|
|
except ValueError:
|
|
|
|
error "nor a valid IP address", address = conf.nat[6..^1]
|
|
|
|
quit QuitFailure
|
2020-04-29 04:49:27 +00:00
|
|
|
else:
|
2020-07-24 01:39:58 +00:00
|
|
|
error "not a valid NAT mechanism", value = conf.nat
|
|
|
|
quit QuitFailure
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
if nat != NatNone:
|
2020-07-24 01:39:58 +00:00
|
|
|
let extIp = getExternalIP(nat)
|
2020-04-29 04:49:27 +00:00
|
|
|
if extIP.isSome:
|
2020-07-24 01:39:58 +00:00
|
|
|
result.ip = some(ValidIpAddress.init extIp.get)
|
|
|
|
# TODO redirectPorts in considered a gcsafety violation
|
|
|
|
# because it obtains the address of a non-gcsafe proc?
|
2020-07-20 04:40:35 +00:00
|
|
|
let extPorts = ({.gcsafe.}:
|
|
|
|
redirectPorts(tcpPort = result.tcpPort,
|
|
|
|
udpPort = result.udpPort,
|
|
|
|
description = clientId))
|
2020-04-29 04:49:27 +00:00
|
|
|
if extPorts.isSome:
|
|
|
|
(result.tcpPort, result.udpPort) = extPorts.get()
|
|
|
|
|
2020-08-26 11:28:24 +00:00
|
|
|
proc newWakuRelayProto(switch: Switch): WakuRelayProto =
|
|
|
|
var wakuRelayProto = WakuRelayProto(switch: switch, codec: WakuRelayCodec)
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
2020-05-26 03:55:53 +00:00
|
|
|
let msg = cast[string](await conn.readLp(1024))
|
2020-04-29 04:49:27 +00:00
|
|
|
await conn.writeLp("Hello!")
|
|
|
|
await conn.close()
|
|
|
|
|
2020-08-26 11:28:24 +00:00
|
|
|
wakuRelayProto.handler = handle
|
|
|
|
return wakuRelayProto
|
2020-04-29 04:49:27 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# TODO Consider removing unused arguments
|
|
|
|
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch,
|
|
|
|
ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
|
|
|
|
privKey: keys.PrivateKey,
|
|
|
|
peerInfo: PeerInfo): T =
|
|
|
|
new result
|
|
|
|
result.switch = switch
|
|
|
|
result.peerInfo = peerInfo
|
|
|
|
# TODO Peer pool, discovery, protocol state, etc
|
|
|
|
|
|
|
|
proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} =
|
|
|
|
var
|
|
|
|
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
|
|
|
|
hostAddress = tcpEndPoint(conf.libp2pAddress, Port(uint16(conf.tcpPort) + conf.portsShift))
|
|
|
|
announcedAddresses = if extIp.isNone(): @[]
|
|
|
|
else: @[tcpEndPoint(extIp.get(), extTcpPort)]
|
|
|
|
|
|
|
|
info "Initializing networking", hostAddress,
|
|
|
|
announcedAddresses
|
2020-04-29 04:49:27 +00:00
|
|
|
|
|
|
|
let
|
2020-07-24 01:39:58 +00:00
|
|
|
nodekey = conf.nodekey
|
|
|
|
pubkey = nodekey.getKey.get()
|
|
|
|
keys = KeyPair(seckey: nodekey, pubkey: pubkey)
|
2020-05-21 04:16:58 +00:00
|
|
|
peerInfo = PeerInfo.init(nodekey)
|
2020-04-29 04:49:27 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# XXX: Add this when we create node or start it?
|
|
|
|
peerInfo.addrs.add(hostAddress)
|
2020-05-22 06:18:14 +00:00
|
|
|
|
2020-06-02 11:27:53 +00:00
|
|
|
var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true)
|
2020-04-29 04:49:27 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# TODO Either persist WakuNode or something here
|
|
|
|
|
|
|
|
# TODO Look over this
|
|
|
|
# XXX Consider asEthKey and asLibp2pKey
|
|
|
|
result = WakuNode.init(conf, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey, peerInfo)
|
|
|
|
|
|
|
|
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
|
|
|
|
node.libp2pTransportLoops = await node.switch.start()
|
|
|
|
|
2020-08-26 11:28:24 +00:00
|
|
|
# TODO Mount Waku Store and Waku Filter here
|
|
|
|
let wakuRelayProto = newWakuRelayProto(node.switch)
|
|
|
|
node.switch.mount(wakuRelayProto)
|
|
|
|
wakuRelayProto.started = true
|
2020-07-24 01:39:58 +00:00
|
|
|
|
|
|
|
# TODO Move out into separate proc
|
|
|
|
if conf.rpc:
|
|
|
|
let ta = initTAddress(conf.rpcAddress,
|
|
|
|
Port(conf.rpcPort + conf.portsShift))
|
2020-05-22 06:18:14 +00:00
|
|
|
var rpcServer = newRpcHttpServer([ta])
|
2020-08-26 11:28:24 +00:00
|
|
|
setupWakuRPC(wakuRelayProto, rpcServer)
|
2020-05-22 06:18:14 +00:00
|
|
|
rpcServer.start()
|
|
|
|
info "rpcServer started", ta=ta
|
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# TODO Get this from WakuNode obj
|
|
|
|
let peerInfo = node.peerInfo
|
2020-04-29 04:49:27 +00:00
|
|
|
let id = peerInfo.peerId.pretty
|
|
|
|
info "PeerInfo", id = id, addrs = peerInfo.addrs
|
2020-05-18 06:03:15 +00:00
|
|
|
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
|
2020-07-24 01:39:58 +00:00
|
|
|
## XXX: this should be /ip4..., / stripped?
|
2020-04-29 04:49:27 +00:00
|
|
|
info "Listening on", full = listenStr
|
|
|
|
|
2020-05-01 04:24:34 +00:00
|
|
|
# XXX: So doing this _after_ other setup
|
|
|
|
# Optionally direct connect with a set of nodes
|
2020-08-26 11:28:24 +00:00
|
|
|
if conf.staticnodes.len > 0: connectToNodes(wakuRelayProto, conf.staticnodes)
|
2020-05-01 04:24:34 +00:00
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
# TODO Move out into separate proc
|
2020-06-15 04:06:41 +00:00
|
|
|
when defined(insecure):
|
2020-07-24 01:39:58 +00:00
|
|
|
if conf.metricsServer:
|
2020-06-15 04:06:41 +00:00
|
|
|
let
|
2020-07-24 01:39:58 +00:00
|
|
|
address = conf.metricsServerAddress
|
|
|
|
port = conf.metricsServerPort + conf.portsShift
|
2020-06-15 04:06:41 +00:00
|
|
|
info "Starting metrics HTTP server", address, port
|
|
|
|
metrics.startHttpServer($address, Port(port))
|
|
|
|
|
2020-07-24 01:39:58 +00:00
|
|
|
if conf.logMetrics:
|
2020-06-15 04:06:41 +00:00
|
|
|
proc logMetrics(udata: pointer) {.closure, gcsafe.} =
|
|
|
|
{.gcsafe.}:
|
|
|
|
let
|
|
|
|
connectedPeers = connected_peers.value
|
|
|
|
totalMessages = total_messages.value
|
|
|
|
|
|
|
|
info "Node metrics", connectedPeers, totalMessages
|
|
|
|
addTimer(Moment.fromNow(2.seconds), logMetrics)
|
2020-05-27 04:00:50 +00:00
|
|
|
addTimer(Moment.fromNow(2.seconds), logMetrics)
|
|
|
|
|
2020-07-27 09:01:06 +00:00
|
|
|
## Public API
|
|
|
|
##
|
|
|
|
|
2020-07-28 10:28:32 +00:00
|
|
|
method init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} =
|
2020-07-27 09:01:06 +00:00
|
|
|
## Creates and starts a Waku node.
|
|
|
|
##
|
|
|
|
let node = await createWakuNode(conf)
|
|
|
|
await node.start(conf)
|
|
|
|
return node
|
|
|
|
|
2020-07-28 08:17:50 +00:00
|
|
|
# TODO Update TopicHandler to take Message, not seq[byte] data
|
|
|
|
#type TopicHandler* = proc(topic: Topic, message: Message)
|
|
|
|
# Currently this is using the one in pubsub.nim, roughly:
|
|
|
|
#type TopicHandler* = proc(topic: string, data: seq[byte])
|
|
|
|
|
2020-07-27 09:01:06 +00:00
|
|
|
type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message)
|
|
|
|
|
|
|
|
method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
|
|
|
|
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
|
|
|
|
## this topic. TopicHandler is a method that takes a topic and a `Message`.
|
|
|
|
##
|
2020-07-28 08:17:50 +00:00
|
|
|
## Status: Partially implemented.
|
|
|
|
## TODO Ensure Message is passed, not `data` field. This means modifying
|
|
|
|
## TopicHandler.
|
|
|
|
|
|
|
|
let wakuSub = w.switch.pubSub.get()
|
|
|
|
# XXX Consider awaiting here
|
|
|
|
discard wakuSub.subscribe(topic, handler)
|
2020-07-27 09:01:06 +00:00
|
|
|
|
|
|
|
method subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) =
|
|
|
|
echo "NYI"
|
|
|
|
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
|
|
|
|
## this content filter. ContentFilter is a method that takes some content
|
|
|
|
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
|
|
|
|
## has to match the `ContentTopic`.
|
|
|
|
|
|
|
|
## Status: Not yet implemented.
|
|
|
|
## TODO Implement as wrapper around `waku_protocol` and `subscribe` above, and
|
|
|
|
## ensure Message is passed, not `data` field.
|
|
|
|
|
|
|
|
method unsubscribe*(w: WakuNode, topic: Topic) =
|
|
|
|
echo "NYI"
|
|
|
|
## Unsubscribe from a topic.
|
|
|
|
##
|
|
|
|
## Status: Not yet implemented.
|
|
|
|
## TODO Implement.
|
|
|
|
|
|
|
|
method unsubscribe*(w: WakuNode, contentFilter: ContentFilter) =
|
|
|
|
echo "NYI"
|
|
|
|
## Unsubscribe from a content filter.
|
|
|
|
##
|
|
|
|
## Status: Not yet implemented.
|
|
|
|
## TODO Implement.
|
|
|
|
|
|
|
|
method publish*(w: WakuNode, topic: Topic, message: Message) =
|
|
|
|
## Publish a `Message` to a PubSub topic.
|
|
|
|
##
|
2020-07-28 08:18:30 +00:00
|
|
|
## Status: Partially implemented.
|
|
|
|
## TODO: Esure Message is passed, not seq[byte] `data` field.
|
|
|
|
let wakuSub = w.switch.pubSub.get()
|
|
|
|
# XXX Consider awaiting here
|
|
|
|
discard wakuSub.publish(topic, message)
|
2020-07-27 09:01:06 +00:00
|
|
|
|
|
|
|
method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) =
|
|
|
|
## Publish a `Message` to a PubSub topic with a specific content filter.
|
|
|
|
## Currently this means a `contentTopic`.
|
|
|
|
##
|
|
|
|
## Status: Not yet implemented.
|
|
|
|
## TODO Implement as wrapper around `waku_protocol` and `publish`, and ensure
|
|
|
|
## Message is passed, not `data` field. Also ensure content filter is in
|
|
|
|
## Message.
|
2020-07-29 13:24:01 +00:00
|
|
|
##
|
|
|
|
|
|
|
|
w.messages.insert((contentFilter.contentTopic, message))
|
|
|
|
|
|
|
|
let wakuSub = w.switch.pubSub.get()
|
|
|
|
# XXX Consider awaiting here
|
|
|
|
|
|
|
|
# @TODO MAKE SURE WE PASS CONTENT FILTER
|
|
|
|
discard wakuSub.publish(topic, message)
|
2020-07-27 09:01:06 +00:00
|
|
|
|
|
|
|
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
|
|
|
|
## Queries for historical messages.
|
|
|
|
##
|
|
|
|
## Status: Not yet implemented.
|
|
|
|
## TODO Implement as wrapper around `waku_protocol` and send `RPCMsg`.
|
2020-07-29 13:24:01 +00:00
|
|
|
result.messages = newSeq[Message]()
|
|
|
|
|
|
|
|
for msg in w.messages:
|
|
|
|
if msg[0] notin query.topics:
|
|
|
|
continue
|
|
|
|
|
|
|
|
result.messages.insert(msg[1])
|
2020-07-27 09:01:06 +00:00
|
|
|
|
2020-04-29 04:49:27 +00:00
|
|
|
when isMainModule:
|
2020-07-28 08:06:00 +00:00
|
|
|
let conf = WakuNodeConf.load()
|
2020-07-28 10:28:32 +00:00
|
|
|
discard WakuNode.init(conf)
|
2020-07-28 08:06:00 +00:00
|
|
|
runForever()
|