mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 17:03:09 +00:00
Waku v2: Simplify Waku Relay references (#112)
- Remove wakuRelayProto as this is initiated as part of switch PubSub - Move WakuNode type into waku_types to avoid circular references - Make necessary adjustments to get tests and sim to work
This commit is contained in:
parent
bab0e12d68
commit
705b4b32a1
@ -11,6 +11,7 @@ import json_rpc/[rpcclient, rpcserver]
|
|||||||
|
|
||||||
import ../../waku/node/v2/config
|
import ../../waku/node/v2/config
|
||||||
import ../../waku/node/v2/wakunode2
|
import ../../waku/node/v2/wakunode2
|
||||||
|
import ../../waku/node/v2/waku_types
|
||||||
|
|
||||||
# Loads the config in `waku/node/v2/config.nim`
|
# Loads the config in `waku/node/v2/config.nim`
|
||||||
let conf = WakuNodeConf.load()
|
let conf = WakuNodeConf.load()
|
||||||
|
|||||||
@ -8,7 +8,7 @@ import libp2p/crypto/secp
|
|||||||
import eth/keys
|
import eth/keys
|
||||||
import json_rpc/[rpcclient, rpcserver]
|
import json_rpc/[rpcclient, rpcserver]
|
||||||
|
|
||||||
import ../../waku/node/v2/[config, wakunode2]
|
import ../../waku/node/v2/[config, wakunode2, waku_types]
|
||||||
|
|
||||||
import ../test_helpers
|
import ../test_helpers
|
||||||
|
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import
|
|||||||
eth/[common, rlp, keys, p2p],
|
eth/[common, rlp, keys, p2p],
|
||||||
../../../protocol/v2/waku_relay,
|
../../../protocol/v2/waku_relay,
|
||||||
nimcrypto/[sysrand, hmac, sha2],
|
nimcrypto/[sysrand, hmac, sha2],
|
||||||
|
../wakunode2,
|
||||||
../waku_types
|
../waku_types
|
||||||
|
|
||||||
# Instead of using rlpx waku_protocol here, lets do mock waku2_protocol
|
# Instead of using rlpx waku_protocol here, lets do mock waku2_protocol
|
||||||
@ -13,8 +14,7 @@ import
|
|||||||
# Where is the equivalent in Waku/2?
|
# Where is the equivalent in Waku/2?
|
||||||
# TODO: Extend to get access to protocol state and keys
|
# TODO: Extend to get access to protocol state and keys
|
||||||
#proc setupWakuRPC*(node: EthereumNode, keys: KeyStorage, rpcsrv: RpcServer) =
|
#proc setupWakuRPC*(node: EthereumNode, keys: KeyStorage, rpcsrv: RpcServer) =
|
||||||
# TODO This should probably take node, not wakuRelayProto
|
proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
|
||||||
proc setupWakuRPC*(wakuRelayProto: WakuRelayProto, rpcsrv: RpcServer) =
|
|
||||||
|
|
||||||
# Seems easy enough, lets try to get this first
|
# Seems easy enough, lets try to get this first
|
||||||
rpcsrv.rpc("waku_version") do() -> string:
|
rpcsrv.rpc("waku_version") do() -> string:
|
||||||
@ -23,24 +23,23 @@ proc setupWakuRPC*(wakuRelayProto: WakuRelayProto, rpcsrv: RpcServer) =
|
|||||||
|
|
||||||
# TODO: Implement symkey etc logic
|
# TODO: Implement symkey etc logic
|
||||||
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
|
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
|
||||||
# Assumes someone subscribing on this topic
|
# XXX Why is casting necessary here but not in Nim node API?
|
||||||
#let wakuSub = wakuRelayProto.switch.pubsub
|
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
||||||
let wakuSub = cast[WakuRelay](wakuRelayProto.switch.pubSub.get())
|
|
||||||
# XXX also future return type
|
# XXX also future return type
|
||||||
discard wakuSub.publish(topic, message)
|
discard wakuRelay.publish(topic, message)
|
||||||
return true
|
return true
|
||||||
#if not result:
|
#if not result:
|
||||||
# raise newException(ValueError, "Message could not be posted")
|
# raise newException(ValueError, "Message could not be posted")
|
||||||
|
|
||||||
# TODO: Handler / Identifier logic
|
# TODO: Handler / Identifier logic
|
||||||
rpcsrv.rpc("waku_subscribe") do(topic: string) -> bool:
|
rpcsrv.rpc("waku_subscribe") do(topic: string) -> bool:
|
||||||
let wakuSub = cast[WakuRelay](wakuRelayProto.switch.pubSub.get())
|
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
|
||||||
|
|
||||||
# XXX: Hacky in-line handler
|
# XXX: Hacky in-line handler
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
info "Hit subscribe handler", topic=topic, data=data
|
info "Hit subscribe handler", topic=topic, data=data
|
||||||
|
|
||||||
discard wakuSub.subscribe(topic, handler)
|
discard wakuRelay.subscribe(topic, handler)
|
||||||
return true
|
return true
|
||||||
#if not result:
|
#if not result:
|
||||||
# raise newException(ValueError, "Message could not be posted")
|
# raise newException(ValueError, "Message could not be posted")
|
||||||
|
|||||||
@ -1,11 +1,24 @@
|
|||||||
import libp2p/multiaddress,
|
import
|
||||||
libp2p/crypto/crypto,
|
chronos,
|
||||||
libp2p/protocols/protocol,
|
libp2p/multiaddress,
|
||||||
libp2p/peerinfo,
|
libp2p/crypto/crypto,
|
||||||
standard_setup
|
libp2p/protocols/protocol,
|
||||||
|
libp2p/protocols/pubsub/pubsub,
|
||||||
|
libp2p/peerinfo,
|
||||||
|
standard_setup
|
||||||
|
|
||||||
type WakuRelayProto* = ref object of LPProtocol
|
# Core Waku data types are defined here to avoid recursive dependencies.
|
||||||
switch*: Switch
|
#
|
||||||
conn*: Connection
|
# TODO Move more common data types here
|
||||||
connected*: bool
|
|
||||||
started*: bool
|
type
|
||||||
|
Topic* = string
|
||||||
|
Message* = seq[byte]
|
||||||
|
|
||||||
|
# NOTE based on Eth2Node in NBC eth2_network.nim
|
||||||
|
WakuNode* = ref object of RootObj
|
||||||
|
switch*: Switch
|
||||||
|
# XXX Unclear if we need this
|
||||||
|
peerInfo*: PeerInfo
|
||||||
|
libp2pTransportLoops*: seq[Future[void]]
|
||||||
|
messages*: seq[(Topic, Message)]
|
||||||
|
|||||||
@ -13,7 +13,6 @@ import
|
|||||||
rpc/wakurpc,
|
rpc/wakurpc,
|
||||||
standard_setup,
|
standard_setup,
|
||||||
../../protocol/v2/waku_relay,
|
../../protocol/v2/waku_relay,
|
||||||
# TODO: Pull out standard switch from tests
|
|
||||||
waku_types
|
waku_types
|
||||||
|
|
||||||
# key and crypto modules different
|
# key and crypto modules different
|
||||||
@ -22,7 +21,7 @@ type
|
|||||||
PublicKey* = crypto.PublicKey
|
PublicKey* = crypto.PublicKey
|
||||||
PrivateKey* = crypto.PrivateKey
|
PrivateKey* = crypto.PrivateKey
|
||||||
|
|
||||||
Topic* = string
|
Topic* = waku_types.Topic
|
||||||
Message* = seq[byte]
|
Message* = seq[byte]
|
||||||
ContentFilter* = object
|
ContentFilter* = object
|
||||||
contentTopic*: string
|
contentTopic*: string
|
||||||
@ -33,14 +32,6 @@ type
|
|||||||
HistoryResponse* = object
|
HistoryResponse* = object
|
||||||
messages*: seq[Message]
|
messages*: seq[Message]
|
||||||
|
|
||||||
# NOTE: based on Eth2Node in NBC eth2_network.nim
|
|
||||||
WakuNode* = ref object of RootObj
|
|
||||||
switch*: Switch
|
|
||||||
# XXX: Unclear if we need this
|
|
||||||
peerInfo*: PeerInfo
|
|
||||||
libp2pTransportLoops*: seq[Future[void]]
|
|
||||||
messages: seq[(Topic, Message)]
|
|
||||||
|
|
||||||
const clientId = "Nimbus waku node"
|
const clientId = "Nimbus waku node"
|
||||||
|
|
||||||
proc setBootNodes(nodes: openArray[string]): seq[ENode] =
|
proc setBootNodes(nodes: openArray[string]): seq[ENode] =
|
||||||
@ -68,7 +59,7 @@ proc initAddress(T: type MultiAddress, str: string): T =
|
|||||||
template tcpEndPoint(address, port): auto =
|
template tcpEndPoint(address, port): auto =
|
||||||
MultiAddress.init(address, tcpProtocol, port)
|
MultiAddress.init(address, tcpProtocol, port)
|
||||||
|
|
||||||
proc dialPeer(p: WakuRelayProto, address: string) {.async.} =
|
proc dialPeer(n: WakuNode, address: string) {.async.} =
|
||||||
info "dialPeer", address = address
|
info "dialPeer", address = address
|
||||||
# XXX: This turns ipfs into p2p, not quite sure why
|
# XXX: This turns ipfs into p2p, not quite sure why
|
||||||
let multiAddr = MultiAddress.initAddress(address)
|
let multiAddr = MultiAddress.initAddress(address)
|
||||||
@ -77,16 +68,18 @@ proc dialPeer(p: WakuRelayProto, address: string) {.async.} =
|
|||||||
let remotePeer = PeerInfo.init(parts[^1], [multiAddr])
|
let remotePeer = PeerInfo.init(parts[^1], [multiAddr])
|
||||||
|
|
||||||
info "Dialing peer", multiAddr
|
info "Dialing peer", multiAddr
|
||||||
p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
# NOTE This is dialing on WakuRelay protocol specifically
|
||||||
|
# TODO Keep track of conn and connected state somewhere (WakuRelay?)
|
||||||
|
#p.conn = await p.switch.dial(remotePeer, WakuRelayCodec)
|
||||||
|
#p.connected = true
|
||||||
|
discard n.switch.dial(remotePeer, WakuRelayCodec)
|
||||||
info "Post switch dial"
|
info "Post switch dial"
|
||||||
# Isn't there just one p instance? Why connected here?
|
|
||||||
p.connected = true
|
|
||||||
|
|
||||||
proc connectToNodes(p: WakuRelayProto, nodes: openArray[string]) =
|
proc connectToNodes(n: WakuNode, nodes: openArray[string]) =
|
||||||
for nodeId in nodes:
|
for nodeId in nodes:
|
||||||
info "connectToNodes", node = nodeId
|
info "connectToNodes", node = nodeId
|
||||||
# XXX: This seems...brittle
|
# XXX: This seems...brittle
|
||||||
discard dialPeer(p, nodeId)
|
discard dialPeer(n, nodeId)
|
||||||
# Waku 1
|
# Waku 1
|
||||||
# let whisperENode = ENode.fromString(nodeId).expect("correct node")
|
# let whisperENode = ENode.fromString(nodeId).expect("correct node")
|
||||||
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
|
# traceAsyncErrors node.peerPool.connectToNode(newNode(whisperENode))
|
||||||
@ -136,17 +129,6 @@ proc setupNat(conf: WakuNodeConf): tuple[ip: Option[ValidIpAddress],
|
|||||||
if extPorts.isSome:
|
if extPorts.isSome:
|
||||||
(result.tcpPort, result.udpPort) = extPorts.get()
|
(result.tcpPort, result.udpPort) = extPorts.get()
|
||||||
|
|
||||||
proc newWakuRelayProto(switch: Switch): WakuRelayProto =
|
|
||||||
var wakuRelayProto = WakuRelayProto(switch: switch, codec: WakuRelayCodec)
|
|
||||||
|
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
|
||||||
let msg = cast[string](await conn.readLp(1024))
|
|
||||||
await conn.writeLp("Hello!")
|
|
||||||
await conn.close()
|
|
||||||
|
|
||||||
wakuRelayProto.handler = handle
|
|
||||||
return wakuRelayProto
|
|
||||||
|
|
||||||
# TODO Consider removing unused arguments
|
# TODO Consider removing unused arguments
|
||||||
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch,
|
proc init*(T: type WakuNode, conf: WakuNodeConf, switch: Switch,
|
||||||
ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
|
ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
|
||||||
@ -187,17 +169,15 @@ proc createWakuNode*(conf: WakuNodeConf): Future[WakuNode] {.async, gcsafe.} =
|
|||||||
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
|
proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
|
||||||
node.libp2pTransportLoops = await node.switch.start()
|
node.libp2pTransportLoops = await node.switch.start()
|
||||||
|
|
||||||
|
# NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set
|
||||||
|
#
|
||||||
# TODO Mount Waku Store and Waku Filter here
|
# TODO Mount Waku Store and Waku Filter here
|
||||||
let wakuRelayProto = newWakuRelayProto(node.switch)
|
|
||||||
node.switch.mount(wakuRelayProto)
|
|
||||||
wakuRelayProto.started = true
|
|
||||||
|
|
||||||
# TODO Move out into separate proc
|
# TODO Move out into separate proc
|
||||||
if conf.rpc:
|
if conf.rpc:
|
||||||
let ta = initTAddress(conf.rpcAddress,
|
let ta = initTAddress(conf.rpcAddress, Port(conf.rpcPort + conf.portsShift))
|
||||||
Port(conf.rpcPort + conf.portsShift))
|
|
||||||
var rpcServer = newRpcHttpServer([ta])
|
var rpcServer = newRpcHttpServer([ta])
|
||||||
setupWakuRPC(wakuRelayProto, rpcServer)
|
setupWakuRPC(node, rpcServer)
|
||||||
rpcServer.start()
|
rpcServer.start()
|
||||||
info "rpcServer started", ta=ta
|
info "rpcServer started", ta=ta
|
||||||
|
|
||||||
@ -211,7 +191,7 @@ proc start*(node: WakuNode, conf: WakuNodeConf) {.async.} =
|
|||||||
|
|
||||||
# XXX: So doing this _after_ other setup
|
# XXX: So doing this _after_ other setup
|
||||||
# Optionally direct connect with a set of nodes
|
# Optionally direct connect with a set of nodes
|
||||||
if conf.staticnodes.len > 0: connectToNodes(wakuRelayProto, conf.staticnodes)
|
if conf.staticnodes.len > 0: connectToNodes(node, conf.staticnodes)
|
||||||
|
|
||||||
# TODO Move out into separate proc
|
# TODO Move out into separate proc
|
||||||
when defined(insecure):
|
when defined(insecure):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user