Waku v2 Websocket support (#732)

* add config for ws support

* code clean up

* code clean up

* Integrate both ws and tcp transport

* change ws default port

* resolve review comments

Signed-off-by: rshiv <reeshav96@gmail.com>

* Unit test debug

Signed-off-by: rshiv <reeshav96@gmail.com>

* Websocket tests addition

Signed-off-by: rshiv <reeshav96@gmail.com>

* websocket failure unittest addition

Signed-off-by: rshiv <reeshav96@gmail.com>

* Commented testcase related to incorrect helper function

Signed-off-by: rshiv <reeshav96@gmail.com>

* Add wireAddr validation

Signed-off-by: rshiv <reeshav96@gmail.com>

* CLoses issue 756

Signed-off-by: rshiv <reeshav96@gmail.com>

* Websocket tests addition

Signed-off-by: rshiv <reeshav96@gmail.com>

* Minor Code clean up

* review comment fixes

Signed-off-by: rshiv <reeshav96@gmail.com>

* Review comment fix

* Update tests/v2/test_wakunode.nim

Co-authored-by: oskarth <ot@oskarthoren.com>

* Update tests/v2/test_wakunode.nim

Co-authored-by: oskarth <ot@oskarthoren.com>

* review comment fix

Signed-off-by: rshiv <reeshav96@gmail.com>

* Changelog update

Signed-off-by: rshiv <reeshav96@gmail.com>

* chat2 var change

Signed-off-by: rshiv <reeshav96@gmail.com>

* test flag changes

Signed-off-by: rshiv <reeshav96@gmail.com>

Co-authored-by: oskarth <ot@oskarthoren.com>
This commit is contained in:
rshiv 2021-11-02 10:29:11 +00:00 committed by GitHub
parent f85434e072
commit d1e06fa17a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 272 additions and 36 deletions

View File

@ -15,6 +15,7 @@ This release contains the following:
- Bridge now uses content topic format according to [23/WAKU2-TOPICS](https://rfc.vac.dev/spec/23/)
- Better internal differentiation between local and remote peer info
- Maximum number of libp2p connections is now configurable
- Unsecure Websocket is now supported in nim-waku.
#### General refactoring

View File

@ -321,8 +321,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
node = WakuNode.new(conf.nodekey, conf.listenAddress,
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extTcpPort,
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
wsEnabled = conf.websocketSupport)
await node.start()
node.mountRelay(conf.topics.split(" "),

View File

@ -215,6 +215,17 @@ type
defaultValue: "/toy-chat/2/huilong/proto"
name: "content-topic" }: string
## Websocket Configuration
websocketSupport* {.
desc: "Enable websocket: true|false",
defaultValue: false
name: "websocket-support"}: bool
websocketPort* {.
desc: "WebSocket listening port."
defaultValue: 8000
name: "websocket-port" }: Port
# NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
try:

View File

@ -478,7 +478,7 @@ procSuite "WakuNode":
# invalid IP address
discard parseRemotePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
expect ValueError:
expect LPError:
# no PeerID
discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002")
@ -1028,3 +1028,140 @@ procSuite "WakuNode":
node1.switch.isConnected(node3.peerInfo.peerId) == false
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "Messages are relayed between two websocket nodes":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true)
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
node1.mountRelay(@[pubSubTopic])
await node2.start()
node2.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node2.publish(pubSubTopic, message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60002))
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
node1.mountRelay(@[pubSubTopic])
await node2.start()
node2.mountRelay(@[pubSubTopic])
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node2.publish(pubSubTopic, message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()
asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true)
pubSubTopic = "test"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
node1.mountRelay(@[pubSubTopic])
await node2.start()
node2.mountRelay(@[pubSubTopic])
#delete websocket peer address
node2.peerInfo.addrs.delete(1)
await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFut.complete(true)
node1.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node2.publish(pubSubTopic, message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(5.seconds)) == false
await node1.stop()
await node2.stop()

View File

@ -31,7 +31,7 @@ type
desc: "TCP listening port."
defaultValue: 60000
name: "tcp-port" }: Port
portsShift* {.
desc: "Add a shift to all port numbers."
defaultValue: 0
@ -232,6 +232,17 @@ type
defaultValue: false
name: "discv5-enr-auto-update" .}: bool
## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
defaultValue: false
name: "websocket-support"}: bool
websocketPort* {.
desc: "WebSocket listening port."
defaultValue: 8000
name: "websocket-port" }: Port
# NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
try:

View File

@ -12,6 +12,7 @@ import
libp2p/protocols/pubsub/gossipsub,
libp2p/nameresolving/dnsresolver,
libp2p/builders,
libp2p/transports/[transport, tcptransport, wstransport],
../protocol/[waku_relay, waku_message],
../protocol/waku_store/waku_store,
../protocol/waku_swap/waku_swap,
@ -20,6 +21,7 @@ import
../protocol/waku_rln_relay/[waku_rln_relay_types],
../utils/peers,
../utils/requests,
../utils/wakuswitch,
./storage/migration/migration_types,
./peer_manager/peer_manager,
./dnsdisc/waku_dnsdisc,
@ -124,14 +126,18 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte
template tcpEndPoint(address, port): auto =
MultiAddress.init(address, tcpProtocol, port)
## Public API
##
template addWsFlag() =
MultiAddress.init("/ws").tryGet()
proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
bindIp: ValidIpAddress, bindPort: Port,
extIp = none[ValidIpAddress](), extPort = none[Port](),
peerStorage: PeerStorage = nil,
maxConnections = builders.MaxConnections): T
maxConnections = builders.MaxConnections,
wsBindPort: Port = (Port)8000,
wsEnabled: bool = false): T
{.raises: [Defect, LPError].} =
## Creates a Waku Node.
##
@ -140,8 +146,11 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
let
rng = crypto.newRng()
hostAddress = tcpEndPoint(bindIp, bindPort)
wsHostAddress = tcpEndPoint(bindIp, wsbindPort) & addWsFlag
announcedAddresses = if extIp.isNone() or extPort.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extPort.get())]
elif wsEnabled == false: @[tcpEndPoint(extIp.get(), extPort.get())]
else : @[tcpEndPoint(extIp.get(), extPort.get()),
tcpEndPoint(extIp.get(), wsBindPort) & addWsFlag]
peerInfo = PeerInfo.init(nodekey)
enrIp = if extIp.isSome(): extIp
else: some(bindIp)
@ -149,20 +158,25 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
else: some(bindPort)
enr = createEnr(nodeKey, enrIp, enrTcpPort, none(Port))
info "Initializing networking", hostAddress,
announcedAddresses
# XXX: Add this when we create node or start it?
peerInfo.addrs.add(hostAddress) # Index 0
if wsEnabled == true:
info "Initializing networking", hostAddress, wsHostAddress,
announcedAddresses
peerInfo.addrs.add(wsHostAddress)
else :
info "Initializing networking", hostAddress, announcedAddresses
peerInfo.addrs.add(hostAddress)
for multiaddr in announcedAddresses:
peerInfo.addrs.add(multiaddr) # Announced addresses in index > 0
var switch = newStandardSwitch(
some(nodekey),
hostAddress,
transportFlags = {ServerFlags.ReuseAddr},
rng = rng,
maxConnections = maxConnections)
var switch = newWakuSwitch(some(nodekey),
hostAddress,
wsHostAddress,
transportFlags = {ServerFlags.ReuseAddr},
rng = rng,
maxConnections = maxConnections,
wsEnabled = wsEnabled)
let wakuNode = WakuNode(
peerManager: PeerManager.new(switch, peerStorage),
switch: switch,
@ -892,6 +906,7 @@ when isMainModule:
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
let
## `udpPort` is only supplied to satisfy underlying APIs but is not
## actually a supported transport for libp2p traffic.
@ -908,11 +923,14 @@ when isMainModule:
else:
extTcpPort
let node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extPort,
pStorage,
conf.maxConnections.int)
conf.maxConnections.int,
Port(uint16(conf.websocketPort) + conf.portsShift),
conf.websocketSupport)
if conf.discv5Discovery:
let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)

View File

@ -48,14 +48,11 @@ proc init*(p: typedesc[RemotePeerInfo],
return remotePeerInfo
proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueError, LPError].}=
# @TODO: Rather than raising exceptions, this should return a Result
let address = MultiAddress.init(str).tryGet()
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
return address
else:
raise newException(ValueError,
"Invalid bootstrap node multi-address")
## Check if wire Address is supported
proc validWireAddr*(ma: MultiAddress): bool =
const
ValidTransports = mapOr(TCP, WebSockets)
return ValidTransports.match(ma)
func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
if typedR.tcp6.isSome or typedR.tcp.isSome:
@ -69,10 +66,11 @@ func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}=
let multiAddr = MultiAddress.initAddress(address)
let multiAddr = MultiAddress.init(address).tryGet()
var
ipPart, tcpPart, p2pPart: MultiAddress
ipPart, tcpPart, p2pPart, wsPart: MultiAddress
for addrPart in multiAddr.items():
case addrPart[].protoName()[]
@ -82,15 +80,17 @@ proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, Va
tcpPart = addrPart.tryGet()
of "p2p":
p2pPart = addrPart.tryGet()
of "ws":
wsPart = addrPart.tryGet()
# nim-libp2p dialing requires remote peers to be initialised with a peerId and a wire address
let
peerIdStr = p2pPart.toString()[].split("/")[^1]
wireAddr = ipPart & tcpPart
if (not wireAddr.isWire()):
peerIdStr = p2pPart.toString()[].split("/")[^1]
wireAddr = ipPart & tcpPart & wsPart
if (not wireAddr.validWireAddr()):
raise newException(ValueError, "Invalid node multi-address")
return RemotePeerInfo.init(peerIdStr, @[wireAddr])
## Converts an ENR to dialable RemotePeerInfo

View File

@ -0,0 +1,55 @@
# Waku Switch utils.
import
std/[options, sequtils],
chronos, chronicles,
stew/byteutils,
eth/keys,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/nameresolving/dnsresolver,
libp2p/nameresolving/nameresolver,
libp2p/builders,
libp2p/transports/[transport, tcptransport, wstransport]
proc withWsTransport*(b: SwitchBuilder): SwitchBuilder =
b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr))
proc newWakuSwitch*(
privKey = none(crypto.PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet(),
secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise,
],
transportFlags: set[ServerFlags] = {},
rng = crypto.newRng(),
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil,
wsEnabled: bool = false): Switch
{.raises: [Defect, LPError].} =
var b = SwitchBuilder
.new()
.withRng(rng)
.withMaxConnections(maxConnections)
.withMaxIn(maxIn)
.withMaxOut(maxOut)
.withMaxConnsPerPeer(maxConnsPerPeer)
.withMplex(inTimeout, outTimeout)
.withNoise()
.withTcpTransport(transportFlags)
.withNameResolver(nameResolver)
if privKey.isSome():
b = b.withPrivateKey(privKey.get())
if wsEnabled == true:
b = b.withAddresses(@[wsAddress, address])
b = b.withWsTransport()
else :
b = b.withAddress(address)
b.build()