diff --git a/CHANGELOG.md b/CHANGELOG.md index ead6d876f..d8d422af6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This release contains the following: - Bridge now uses content topic format according to [23/WAKU2-TOPICS](https://rfc.vac.dev/spec/23/) - Better internal differentiation between local and remote peer info - Maximum number of libp2p connections is now configurable +- Unsecure Websocket is now supported in nim-waku. #### General refactoring diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 9cde3a6a1..f6d95af5f 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -321,8 +321,11 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = Port(uint16(conf.tcpPort) + conf.portsShift), Port(uint16(conf.udpPort) + conf.portsShift)) node = WakuNode.new(conf.nodekey, conf.listenAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) - + Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, extTcpPort, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport) + await node.start() node.mountRelay(conf.topics.split(" "), diff --git a/examples/v2/config_chat2.nim b/examples/v2/config_chat2.nim index fc22a834c..775776aa5 100644 --- a/examples/v2/config_chat2.nim +++ b/examples/v2/config_chat2.nim @@ -215,6 +215,17 @@ type defaultValue: "/toy-chat/2/huilong/proto" name: "content-topic" }: string + ## Websocket Configuration + websocketSupport* {. + desc: "Enable websocket: true|false", + defaultValue: false + name: "websocket-support"}: bool + + websocketPort* {. + desc: "WebSocket listening port." + defaultValue: 8000 + name: "websocket-port" }: Port + # NOTE: Keys are different in nim-libp2p proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T = try: diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 0619866f9..f09981f4c 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -478,7 +478,7 @@ procSuite "WakuNode": # invalid IP address discard parseRemotePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") - expect ValueError: + expect LPError: # no PeerID discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002") @@ -1028,3 +1028,140 @@ procSuite "WakuNode": node1.switch.isConnected(node3.peerInfo.peerId) == false await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + +asyncTest "Messages are relayed between two websocket nodes": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + node1.mountRelay(@[pubSubTopic]) + + await node2.start() + node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(2000.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + + +asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002)) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + node1.mountRelay(@[pubSubTopic]) + + await node2.start() + node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(2000.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + +asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + node1.mountRelay(@[pubSubTopic]) + + await node2.start() + node2.mountRelay(@[pubSubTopic]) + + #delete websocket peer address + node2.peerInfo.addrs.delete(1) + + await node1.connectToNodes(@[node2.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(2000.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == false + await node1.stop() + await node2.stop() \ No newline at end of file diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 9d8a93c74..b431dc38b 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -31,7 +31,7 @@ type desc: "TCP listening port." defaultValue: 60000 name: "tcp-port" }: Port - + portsShift* {. desc: "Add a shift to all port numbers." defaultValue: 0 @@ -232,6 +232,17 @@ type defaultValue: false name: "discv5-enr-auto-update" .}: bool + ## websocket config + websocketSupport* {. + desc: "Enable websocket: true|false", + defaultValue: false + name: "websocket-support"}: bool + + websocketPort* {. + desc: "WebSocket listening port." + defaultValue: 8000 + name: "websocket-port" }: Port + # NOTE: Keys are different in nim-libp2p proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T = try: diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 8c0cf9ea0..4db711f57 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -12,6 +12,7 @@ import libp2p/protocols/pubsub/gossipsub, libp2p/nameresolving/dnsresolver, libp2p/builders, + libp2p/transports/[transport, tcptransport, wstransport], ../protocol/[waku_relay, waku_message], ../protocol/waku_store/waku_store, ../protocol/waku_swap/waku_swap, @@ -20,6 +21,7 @@ import ../protocol/waku_rln_relay/[waku_rln_relay_types], ../utils/peers, ../utils/requests, + ../utils/wakuswitch, ./storage/migration/migration_types, ./peer_manager/peer_manager, ./dnsdisc/waku_dnsdisc, @@ -124,14 +126,18 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte template tcpEndPoint(address, port): auto = MultiAddress.init(address, tcpProtocol, port) -## Public API -## + +template addWsFlag() = + MultiAddress.init("/ws").tryGet() + proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, bindIp: ValidIpAddress, bindPort: Port, extIp = none[ValidIpAddress](), extPort = none[Port](), peerStorage: PeerStorage = nil, - maxConnections = builders.MaxConnections): T + maxConnections = builders.MaxConnections, + wsBindPort: Port = (Port)8000, + wsEnabled: bool = false): T {.raises: [Defect, LPError].} = ## Creates a Waku Node. ## @@ -140,8 +146,11 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, let rng = crypto.newRng() hostAddress = tcpEndPoint(bindIp, bindPort) + wsHostAddress = tcpEndPoint(bindIp, wsbindPort) & addWsFlag announcedAddresses = if extIp.isNone() or extPort.isNone(): @[] - else: @[tcpEndPoint(extIp.get(), extPort.get())] + elif wsEnabled == false: @[tcpEndPoint(extIp.get(), extPort.get())] + else : @[tcpEndPoint(extIp.get(), extPort.get()), + tcpEndPoint(extIp.get(), wsBindPort) & addWsFlag] peerInfo = PeerInfo.init(nodekey) enrIp = if extIp.isSome(): extIp else: some(bindIp) @@ -149,20 +158,25 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, else: some(bindPort) enr = createEnr(nodeKey, enrIp, enrTcpPort, none(Port)) - info "Initializing networking", hostAddress, - announcedAddresses - # XXX: Add this when we create node or start it? - peerInfo.addrs.add(hostAddress) # Index 0 + if wsEnabled == true: + info "Initializing networking", hostAddress, wsHostAddress, + announcedAddresses + peerInfo.addrs.add(wsHostAddress) + else : + info "Initializing networking", hostAddress, announcedAddresses + + peerInfo.addrs.add(hostAddress) for multiaddr in announcedAddresses: peerInfo.addrs.add(multiaddr) # Announced addresses in index > 0 - var switch = newStandardSwitch( - some(nodekey), - hostAddress, - transportFlags = {ServerFlags.ReuseAddr}, - rng = rng, - maxConnections = maxConnections) - + var switch = newWakuSwitch(some(nodekey), + hostAddress, + wsHostAddress, + transportFlags = {ServerFlags.ReuseAddr}, + rng = rng, + maxConnections = maxConnections, + wsEnabled = wsEnabled) + let wakuNode = WakuNode( peerManager: PeerManager.new(switch, peerStorage), switch: switch, @@ -892,6 +906,7 @@ when isMainModule: ## file. Optionally include persistent peer storage. ## No protocols are mounted yet. + let ## `udpPort` is only supplied to satisfy underlying APIs but is not ## actually a supported transport for libp2p traffic. @@ -908,11 +923,14 @@ when isMainModule: else: extTcpPort + let node = WakuNode.new(conf.nodekey, conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extPort, pStorage, - conf.maxConnections.int) + conf.maxConnections.int, + Port(uint16(conf.websocketPort) + conf.portsShift), + conf.websocketSupport) if conf.discv5Discovery: let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift) diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index d68ffdf87..a0859ad6c 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -48,14 +48,11 @@ proc init*(p: typedesc[RemotePeerInfo], return remotePeerInfo -proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueError, LPError].}= - # @TODO: Rather than raising exceptions, this should return a Result - let address = MultiAddress.init(str).tryGet() - if IPFS.match(address) and matchPartial(multiaddress.TCP, address): - return address - else: - raise newException(ValueError, - "Invalid bootstrap node multi-address") +## Check if wire Address is supported +proc validWireAddr*(ma: MultiAddress): bool = + const + ValidTransports = mapOr(TCP, WebSockets) + return ValidTransports.match(ma) func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] = if typedR.tcp6.isSome or typedR.tcp.isSome: @@ -69,10 +66,11 @@ func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] = ## Parses a fully qualified peer multiaddr, in the ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}= - let multiAddr = MultiAddress.initAddress(address) + let multiAddr = MultiAddress.init(address).tryGet() var - ipPart, tcpPart, p2pPart: MultiAddress + + ipPart, tcpPart, p2pPart, wsPart: MultiAddress for addrPart in multiAddr.items(): case addrPart[].protoName()[] @@ -82,15 +80,17 @@ proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, Va tcpPart = addrPart.tryGet() of "p2p": p2pPart = addrPart.tryGet() - + of "ws": + wsPart = addrPart.tryGet() + # nim-libp2p dialing requires remote peers to be initialised with a peerId and a wire address let - peerIdStr = p2pPart.toString()[].split("/")[^1] - wireAddr = ipPart & tcpPart - - if (not wireAddr.isWire()): + peerIdStr = p2pPart.toString()[].split("/")[^1] + + wireAddr = ipPart & tcpPart & wsPart + if (not wireAddr.validWireAddr()): raise newException(ValueError, "Invalid node multi-address") - + return RemotePeerInfo.init(peerIdStr, @[wireAddr]) ## Converts an ENR to dialable RemotePeerInfo diff --git a/waku/v2/utils/wakuswitch.nim b/waku/v2/utils/wakuswitch.nim new file mode 100644 index 000000000..ce5e2c896 --- /dev/null +++ b/waku/v2/utils/wakuswitch.nim @@ -0,0 +1,55 @@ +# Waku Switch utils. +import + std/[options, sequtils], + chronos, chronicles, + stew/byteutils, + eth/keys, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/gossipsub, + libp2p/nameresolving/dnsresolver, + libp2p/nameresolving/nameresolver, + libp2p/builders, + libp2p/transports/[transport, tcptransport, wstransport] + +proc withWsTransport*(b: SwitchBuilder): SwitchBuilder = + b.withTransport(proc(upgr: Upgrade): Transport = WsTransport.new(upgr)) + +proc newWakuSwitch*( + privKey = none(crypto.PrivateKey), + address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet(), + secureManagers: openarray[SecureProtocol] = [ + SecureProtocol.Noise, + ], + transportFlags: set[ServerFlags] = {}, + rng = crypto.newRng(), + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes, + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1, + maxConnsPerPeer = MaxConnectionsPerPeer, + nameResolver: NameResolver = nil, + wsEnabled: bool = false): Switch + {.raises: [Defect, LPError].} = + + var b = SwitchBuilder + .new() + .withRng(rng) + .withMaxConnections(maxConnections) + .withMaxIn(maxIn) + .withMaxOut(maxOut) + .withMaxConnsPerPeer(maxConnsPerPeer) + .withMplex(inTimeout, outTimeout) + .withNoise() + .withTcpTransport(transportFlags) + .withNameResolver(nameResolver) + if privKey.isSome(): + b = b.withPrivateKey(privKey.get()) + if wsEnabled == true: + b = b.withAddresses(@[wsAddress, address]) + b = b.withWsTransport() + else : + b = b.withAddress(address) + + b.build() \ No newline at end of file