Add Tor Transport support (#765)
This commit is contained in:
parent
f95eda8bf6
commit
d9305bda84
|
@ -470,6 +470,8 @@ const
|
||||||
WS* = mapAnd(TCP, mapEq("ws"))
|
WS* = mapAnd(TCP, mapEq("ws"))
|
||||||
WSS* = mapAnd(TCP, mapEq("wss"))
|
WSS* = mapAnd(TCP, mapEq("wss"))
|
||||||
WebSockets* = mapOr(WS, WSS)
|
WebSockets* = mapOr(WS, WSS)
|
||||||
|
Onion3* = mapEq("onion3")
|
||||||
|
TcpOnion3* = mapAnd(TCP, Onion3)
|
||||||
|
|
||||||
Unreliable* = mapOr(UDP)
|
Unreliable* = mapOr(UDP)
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ type
|
||||||
opened*: uint64
|
opened*: uint64
|
||||||
closed*: uint64
|
closed*: uint64
|
||||||
|
|
||||||
proc setupStreamTracker(name: string): StreamTracker =
|
proc setupStreamTracker*(name: string): StreamTracker =
|
||||||
let tracker = new StreamTracker
|
let tracker = new StreamTracker
|
||||||
|
|
||||||
proc dumpTracking(): string {.gcsafe.} =
|
proc dumpTracking(): string {.gcsafe.} =
|
||||||
|
|
|
@ -0,0 +1,281 @@
|
||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
## Tor transport implementation
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/strformat
|
||||||
|
import chronos, chronicles, strutils
|
||||||
|
import stew/[byteutils, endians2, results, objects]
|
||||||
|
import ../multicodec
|
||||||
|
import transport,
|
||||||
|
tcptransport,
|
||||||
|
../switch,
|
||||||
|
../builders,
|
||||||
|
../stream/[lpstream, connection, chronosstream],
|
||||||
|
../multiaddress,
|
||||||
|
../upgrademngrs/upgrade
|
||||||
|
|
||||||
|
const
|
||||||
|
IPTcp = mapAnd(IP, mapEq("tcp"))
|
||||||
|
IPv4Tcp = mapAnd(IP4, mapEq("tcp"))
|
||||||
|
IPv6Tcp = mapAnd(IP6, mapEq("tcp"))
|
||||||
|
DnsTcp = mapAnd(DNSANY, mapEq("tcp"))
|
||||||
|
|
||||||
|
Socks5ProtocolVersion = byte(5)
|
||||||
|
NMethods = byte(1)
|
||||||
|
|
||||||
|
type
|
||||||
|
TorTransport* = ref object of Transport
|
||||||
|
transportAddress: TransportAddress
|
||||||
|
tcpTransport: TcpTransport
|
||||||
|
|
||||||
|
Socks5AuthMethod* {.pure.} = enum
|
||||||
|
NoAuth = 0
|
||||||
|
GSSAPI = 1
|
||||||
|
UsernamePassword = 2
|
||||||
|
NoAcceptableMethod = 0xff
|
||||||
|
|
||||||
|
Socks5RequestCommand* {.pure.} = enum
|
||||||
|
Connect = 1, Bind = 2, UdpAssoc = 3
|
||||||
|
|
||||||
|
Socks5AddressType* {.pure.} = enum
|
||||||
|
IPv4 = 1, FQDN = 3, IPv6 = 4
|
||||||
|
|
||||||
|
Socks5ReplyType* {.pure.} = enum
|
||||||
|
Succeeded = (0, "Succeeded"), ServerFailure = (1, "Server Failure"),
|
||||||
|
ConnectionNotAllowed = (2, "Connection Not Allowed"), NetworkUnreachable = (3, "Network Unreachable"),
|
||||||
|
HostUnreachable = (4, "Host Unreachable"), ConnectionRefused = (5, "Connection Refused"),
|
||||||
|
TtlExpired = (6, "Ttl Expired"), CommandNotSupported = (7, "Command Not Supported"),
|
||||||
|
AddressTypeNotSupported = (8, "Address Type Not Supported")
|
||||||
|
|
||||||
|
TransportStartError* = object of transport.TransportError
|
||||||
|
|
||||||
|
Socks5Error* = object of CatchableError
|
||||||
|
Socks5AuthFailedError* = object of Socks5Error
|
||||||
|
Socks5VersionError* = object of Socks5Error
|
||||||
|
Socks5ServerReplyError* = object of Socks5Error
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: typedesc[TorTransport],
|
||||||
|
transportAddress: TransportAddress,
|
||||||
|
flags: set[ServerFlags] = {},
|
||||||
|
upgrade: Upgrade): T {.public.} =
|
||||||
|
## Creates a Tor transport
|
||||||
|
|
||||||
|
T(
|
||||||
|
transportAddress: transportAddress,
|
||||||
|
upgrader: upgrade,
|
||||||
|
tcpTransport: TcpTransport.new(flags, upgrade))
|
||||||
|
|
||||||
|
proc handlesDial(address: MultiAddress): bool {.gcsafe.} =
|
||||||
|
return Onion3.match(address) or TCP.match(address) or DNSANY.match(address)
|
||||||
|
|
||||||
|
proc handlesStart(address: MultiAddress): bool {.gcsafe.} =
|
||||||
|
return TcpOnion3.match(address)
|
||||||
|
|
||||||
|
proc connectToTorServer(
|
||||||
|
transportAddress: TransportAddress): Future[StreamTransport] {.async, gcsafe.} =
|
||||||
|
let transp = await connect(transportAddress)
|
||||||
|
try:
|
||||||
|
discard await transp.write(@[Socks5ProtocolVersion, NMethods, Socks5AuthMethod.NoAuth.byte])
|
||||||
|
let
|
||||||
|
serverReply = await transp.read(2)
|
||||||
|
socks5ProtocolVersion = serverReply[0]
|
||||||
|
serverSelectedMethod = serverReply[1]
|
||||||
|
if socks5ProtocolVersion != Socks5ProtocolVersion:
|
||||||
|
raise newException(Socks5VersionError, "Unsupported socks version")
|
||||||
|
if serverSelectedMethod != Socks5AuthMethod.NoAuth.byte:
|
||||||
|
raise newException(Socks5AuthFailedError, "Unsupported auth method")
|
||||||
|
return transp
|
||||||
|
except CatchableError as err:
|
||||||
|
await transp.closeWait()
|
||||||
|
raise err
|
||||||
|
|
||||||
|
proc readServerReply(transp: StreamTransport) {.async, gcsafe.} =
|
||||||
|
## The specification for this code is defined on
|
||||||
|
## [link text](https://www.rfc-editor.org/rfc/rfc1928#section-5)
|
||||||
|
## and [link text](https://www.rfc-editor.org/rfc/rfc1928#section-6).
|
||||||
|
let
|
||||||
|
portNumOctets = 2
|
||||||
|
ipV4NumOctets = 4
|
||||||
|
ipV6NumOctets = 16
|
||||||
|
firstFourOctets = await transp.read(4)
|
||||||
|
socks5ProtocolVersion = firstFourOctets[0]
|
||||||
|
serverReply = firstFourOctets[1]
|
||||||
|
if socks5ProtocolVersion != Socks5ProtocolVersion:
|
||||||
|
raise newException(Socks5VersionError, "Unsupported socks version")
|
||||||
|
if serverReply != Socks5ReplyType.Succeeded.byte:
|
||||||
|
var socks5ReplyType: Socks5ReplyType
|
||||||
|
if socks5ReplyType.checkedEnumAssign(serverReply):
|
||||||
|
raise newException(Socks5ServerReplyError, fmt"Server reply error: {socks5ReplyType}")
|
||||||
|
else:
|
||||||
|
raise newException(LPError, fmt"Unexpected server reply: {serverReply}")
|
||||||
|
let atyp = firstFourOctets[3]
|
||||||
|
case atyp:
|
||||||
|
of Socks5AddressType.IPv4.byte:
|
||||||
|
discard await transp.read(ipV4NumOctets + portNumOctets)
|
||||||
|
of Socks5AddressType.FQDN.byte:
|
||||||
|
let fqdnNumOctets = await transp.read(1)
|
||||||
|
discard await transp.read(int(uint8.fromBytes(fqdnNumOctets)) + portNumOctets)
|
||||||
|
of Socks5AddressType.IPv6.byte:
|
||||||
|
discard await transp.read(ipV6NumOctets + portNumOctets)
|
||||||
|
else:
|
||||||
|
raise newException(LPError, "Address not supported")
|
||||||
|
|
||||||
|
proc parseOnion3(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: [Defect, LPError, ValueError].} =
|
||||||
|
var addressArray = ($address).split('/')
|
||||||
|
if addressArray.len < 2: raise newException(LPError, fmt"Onion address not supported {address}")
|
||||||
|
addressArray = addressArray[2].split(':')
|
||||||
|
if addressArray.len == 0: raise newException(LPError, fmt"Onion address not supported {address}")
|
||||||
|
let
|
||||||
|
addressStr = addressArray[0] & ".onion"
|
||||||
|
dstAddr = @(uint8(addressStr.len).toBytes()) & addressStr.toBytes()
|
||||||
|
dstPort = address.data.buffer[37..38]
|
||||||
|
return (Socks5AddressType.FQDN.byte, dstAddr, dstPort)
|
||||||
|
|
||||||
|
proc parseIpTcp(address: MultiAddress): (byte, seq[byte], seq[byte]) {.raises: [Defect, LPError, ValueError].} =
|
||||||
|
let (codec, atyp) =
|
||||||
|
if IPv4Tcp.match(address):
|
||||||
|
(multiCodec("ip4"), Socks5AddressType.IPv4.byte)
|
||||||
|
elif IPv6Tcp.match(address):
|
||||||
|
(multiCodec("ip6"), Socks5AddressType.IPv6.byte)
|
||||||
|
else:
|
||||||
|
raise newException(LPError, fmt"IP address not supported {address}")
|
||||||
|
let
|
||||||
|
dstAddr = address[codec].get().protoArgument().get()
|
||||||
|
dstPort = address[multiCodec("tcp")].get().protoArgument().get()
|
||||||
|
(atyp, dstAddr, dstPort)
|
||||||
|
|
||||||
|
proc parseDnsTcp(address: MultiAddress): (byte, seq[byte], seq[byte]) =
|
||||||
|
let
|
||||||
|
dnsAddress = address[multiCodec("dns")].get().protoArgument().get()
|
||||||
|
dstAddr = @(uint8(dnsAddress.len).toBytes()) & dnsAddress
|
||||||
|
dstPort = address[multiCodec("tcp")].get().protoArgument().get()
|
||||||
|
(Socks5AddressType.FQDN.byte, dstAddr, dstPort)
|
||||||
|
|
||||||
|
proc dialPeer(
|
||||||
|
transp: StreamTransport, address: MultiAddress) {.async, gcsafe.} =
|
||||||
|
let (atyp, dstAddr, dstPort) =
|
||||||
|
if Onion3.match(address):
|
||||||
|
parseOnion3(address)
|
||||||
|
elif IPTcp.match(address):
|
||||||
|
parseIpTcp(address)
|
||||||
|
elif DnsTcp.match(address):
|
||||||
|
parseDnsTcp(address)
|
||||||
|
else:
|
||||||
|
raise newException(LPError, fmt"Address not supported: {address}")
|
||||||
|
|
||||||
|
let reserved = byte(0)
|
||||||
|
let request = @[
|
||||||
|
Socks5ProtocolVersion,
|
||||||
|
Socks5RequestCommand.Connect.byte,
|
||||||
|
reserved,
|
||||||
|
atyp] & dstAddr & dstPort
|
||||||
|
discard await transp.write(request)
|
||||||
|
await readServerReply(transp)
|
||||||
|
|
||||||
|
method dial*(
|
||||||
|
self: TorTransport,
|
||||||
|
hostname: string,
|
||||||
|
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
|
||||||
|
## dial a peer
|
||||||
|
##
|
||||||
|
if not handlesDial(address):
|
||||||
|
raise newException(LPError, fmt"Address not supported: {address}")
|
||||||
|
trace "Dialing remote peer", address = $address
|
||||||
|
let transp = await connectToTorServer(self.transportAddress)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await dialPeer(transp, address)
|
||||||
|
return await self.tcpTransport.connHandler(transp, Opt.none(MultiAddress), Direction.Out)
|
||||||
|
except CatchableError as err:
|
||||||
|
await transp.closeWait()
|
||||||
|
raise err
|
||||||
|
|
||||||
|
method start*(
|
||||||
|
self: TorTransport,
|
||||||
|
addrs: seq[MultiAddress]) {.async.} =
|
||||||
|
## listen on the transport
|
||||||
|
##
|
||||||
|
|
||||||
|
var listenAddrs: seq[MultiAddress]
|
||||||
|
var onion3Addrs: seq[MultiAddress]
|
||||||
|
for i, ma in addrs:
|
||||||
|
if not handlesStart(ma):
|
||||||
|
warn "Invalid address detected, skipping!", address = ma
|
||||||
|
continue
|
||||||
|
|
||||||
|
let listenAddress = ma[0..1].get()
|
||||||
|
listenAddrs.add(listenAddress)
|
||||||
|
let onion3 = ma[multiCodec("onion3")].get()
|
||||||
|
onion3Addrs.add(onion3)
|
||||||
|
|
||||||
|
if len(listenAddrs) != 0 and len(onion3Addrs) != 0:
|
||||||
|
await procCall Transport(self).start(onion3Addrs)
|
||||||
|
await self.tcpTransport.start(listenAddrs)
|
||||||
|
else:
|
||||||
|
raise newException(TransportStartError, "Tor Transport couldn't start, no supported addr was provided.")
|
||||||
|
|
||||||
|
method accept*(self: TorTransport): Future[Connection] {.async, gcsafe.} =
|
||||||
|
## accept a new Tor connection
|
||||||
|
##
|
||||||
|
let conn = await self.tcpTransport.accept()
|
||||||
|
conn.observedAddr = Opt.none(MultiAddress)
|
||||||
|
return conn
|
||||||
|
|
||||||
|
method stop*(self: TorTransport) {.async, gcsafe.} =
|
||||||
|
## stop the transport
|
||||||
|
##
|
||||||
|
await procCall Transport(self).stop() # call base
|
||||||
|
await self.tcpTransport.stop()
|
||||||
|
|
||||||
|
method handles*(t: TorTransport, address: MultiAddress): bool {.gcsafe.} =
|
||||||
|
if procCall Transport(t).handles(address):
|
||||||
|
return handlesDial(address) or handlesStart(address)
|
||||||
|
|
||||||
|
type
|
||||||
|
TorSwitch* = ref object of Switch
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: typedesc[TorSwitch],
|
||||||
|
torServer: TransportAddress,
|
||||||
|
rng: ref HmacDrbgContext,
|
||||||
|
addresses: seq[MultiAddress] = @[],
|
||||||
|
flags: set[ServerFlags] = {}): TorSwitch
|
||||||
|
{.raises: [LPError, Defect], public.} =
|
||||||
|
var builder = SwitchBuilder.new()
|
||||||
|
.withRng(rng)
|
||||||
|
.withTransport(proc(upgr: Upgrade): Transport = TorTransport.new(torServer, flags, upgr))
|
||||||
|
if addresses.len != 0:
|
||||||
|
builder = builder.withAddresses(addresses)
|
||||||
|
let switch = builder.withMplex()
|
||||||
|
.withNoise()
|
||||||
|
.build()
|
||||||
|
let torSwitch = T(
|
||||||
|
peerInfo: switch.peerInfo,
|
||||||
|
ms: switch.ms,
|
||||||
|
transports: switch.transports,
|
||||||
|
connManager: switch.connManager,
|
||||||
|
peerStore: switch.peerStore,
|
||||||
|
dialer: Dialer.new(switch.peerInfo.peerId, switch.connManager, switch.transports, switch.ms, nil),
|
||||||
|
nameResolver: nil)
|
||||||
|
|
||||||
|
torSwitch.connManager.peerStore = switch.peerStore
|
||||||
|
return torSwitch
|
||||||
|
|
||||||
|
method addTransport*(s: TorSwitch, t: Transport) =
|
||||||
|
doAssert(false, "not implemented!")
|
||||||
|
|
||||||
|
method getTorTransport*(s: TorSwitch): Transport {.base.} =
|
||||||
|
return s.transports[0]
|
|
@ -13,25 +13,24 @@ import ./helpers
|
||||||
|
|
||||||
type TransportProvider* = proc(): Transport {.gcsafe, raises: [Defect].}
|
type TransportProvider* = proc(): Transport {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
template commonTransportTest*(prov: TransportProvider, ma1: string, ma2: string = "") =
|
||||||
suite name & " common tests":
|
block:
|
||||||
teardown:
|
let transpProvider = prov
|
||||||
checkTrackers()
|
|
||||||
|
|
||||||
asyncTest "can handle local address":
|
asyncTest "can handle local address":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
check transport1.handles(transport1.addrs[0])
|
check transport1.handles(transport1.addrs[0])
|
||||||
await transport1.stop()
|
await transport1.stop()
|
||||||
|
|
||||||
asyncTest "e2e: handle observedAddr":
|
asyncTest "e2e: handle observedAddr":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
let transport2 = prov()
|
let transport2 = transpProvider()
|
||||||
|
|
||||||
proc acceptHandler() {.async, gcsafe.} =
|
proc acceptHandler() {.async, gcsafe.} =
|
||||||
let conn = await transport1.accept()
|
let conn = await transport1.accept()
|
||||||
|
@ -56,9 +55,9 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||||
|
|
||||||
asyncTest "e2e: handle write":
|
asyncTest "e2e: handle write":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
proc acceptHandler() {.async, gcsafe.} =
|
proc acceptHandler() {.async, gcsafe.} =
|
||||||
|
@ -68,7 +67,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
|
|
||||||
let handlerWait = acceptHandler()
|
let handlerWait = acceptHandler()
|
||||||
|
|
||||||
let transport2 = prov()
|
let transport2 = transpProvider()
|
||||||
let conn = await transport2.dial(transport1.addrs[0])
|
let conn = await transport2.dial(transport1.addrs[0])
|
||||||
var msg = newSeq[byte](6)
|
var msg = newSeq[byte](6)
|
||||||
await conn.readExactly(addr msg[0], 6)
|
await conn.readExactly(addr msg[0], 6)
|
||||||
|
@ -84,8 +83,8 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
|
||||||
|
|
||||||
asyncTest "e2e: handle read":
|
asyncTest "e2e: handle read":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
proc acceptHandler() {.async, gcsafe.} =
|
proc acceptHandler() {.async, gcsafe.} =
|
||||||
|
@ -97,7 +96,7 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
|
|
||||||
let handlerWait = acceptHandler()
|
let handlerWait = acceptHandler()
|
||||||
|
|
||||||
let transport2 = prov()
|
let transport2 = transpProvider()
|
||||||
let conn = await transport2.dial(transport1.addrs[0])
|
let conn = await transport2.dial(transport1.addrs[0])
|
||||||
await conn.write("Hello!")
|
await conn.write("Hello!")
|
||||||
|
|
||||||
|
@ -110,12 +109,12 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
transport2.stop()))
|
transport2.stop()))
|
||||||
|
|
||||||
asyncTest "e2e: handle dial cancellation":
|
asyncTest "e2e: handle dial cancellation":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
let transport2 = prov()
|
let transport2 = transpProvider()
|
||||||
let cancellation = transport2.dial(transport1.addrs[0])
|
let cancellation = transport2.dial(transport1.addrs[0])
|
||||||
|
|
||||||
await cancellation.cancelAndWait()
|
await cancellation.cancelAndWait()
|
||||||
|
@ -127,9 +126,9 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
transport2.stop()))
|
transport2.stop()))
|
||||||
|
|
||||||
asyncTest "e2e: handle accept cancellation":
|
asyncTest "e2e: handle accept cancellation":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
let acceptHandler = transport1.accept()
|
let acceptHandler = transport1.accept()
|
||||||
|
@ -143,11 +142,11 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
# this randomly locks the Windows CI job
|
# this randomly locks the Windows CI job
|
||||||
skip()
|
skip()
|
||||||
return
|
return
|
||||||
let addrs = @[MultiAddress.init(ma).tryGet(),
|
let addrs = @[MultiAddress.init(ma1).tryGet(),
|
||||||
MultiAddress.init(ma).tryGet()]
|
MultiAddress.init(if ma2 == "": ma1 else: ma2).tryGet()]
|
||||||
|
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(addrs)
|
await transport1.start(addrs)
|
||||||
|
|
||||||
proc acceptHandler() {.async, gcsafe.} =
|
proc acceptHandler() {.async, gcsafe.} =
|
||||||
|
@ -192,12 +191,12 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
await transport1.stop()
|
await transport1.stop()
|
||||||
|
|
||||||
asyncTest "e2e: stopping transport kills connections":
|
asyncTest "e2e: stopping transport kills connections":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
|
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
let transport2 = prov()
|
let transport2 = transpProvider()
|
||||||
|
|
||||||
let acceptHandler = transport1.accept()
|
let acceptHandler = transport1.accept()
|
||||||
let conn = await transport2.dial(transport1.addrs[0])
|
let conn = await transport2.dial(transport1.addrs[0])
|
||||||
|
@ -212,8 +211,8 @@ proc commonTransportTest*(name: string, prov: TransportProvider, ma: string) =
|
||||||
check conn.closed()
|
check conn.closed()
|
||||||
|
|
||||||
asyncTest "read or write on closed connection":
|
asyncTest "read or write on closed connection":
|
||||||
let ma = @[MultiAddress.init(ma).tryGet()]
|
let ma = @[MultiAddress.init(ma1).tryGet()]
|
||||||
let transport1 = prov()
|
let transport1 = transpProvider()
|
||||||
await transport1.start(ma)
|
await transport1.start(ma)
|
||||||
|
|
||||||
proc acceptHandler() {.async, gcsafe.} =
|
proc acceptHandler() {.async, gcsafe.} =
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import tables
|
||||||
|
import chronos, stew/[byteutils, endians2, shims/net]
|
||||||
|
import ../libp2p/[stream/connection,
|
||||||
|
protocols/connectivity/relay/utils,
|
||||||
|
transports/tcptransport,
|
||||||
|
transports/tortransport,
|
||||||
|
upgrademngrs/upgrade,
|
||||||
|
multiaddress,
|
||||||
|
errors,
|
||||||
|
builders]
|
||||||
|
|
||||||
|
type
|
||||||
|
TorServerStub* = ref object of RootObj
|
||||||
|
tcpTransport: TcpTransport
|
||||||
|
addrTable: Table[string, string]
|
||||||
|
|
||||||
|
proc new*(
|
||||||
|
T: typedesc[TorServerStub]): T {.public.} =
|
||||||
|
|
||||||
|
T(
|
||||||
|
tcpTransport: TcpTransport.new(flags = {ReuseAddr}, upgrade = Upgrade()),
|
||||||
|
addrTable: initTable[string, string]())
|
||||||
|
|
||||||
|
proc registerAddr*(self: TorServerStub, key: string, val: string) =
|
||||||
|
self.addrTable[key] = val
|
||||||
|
|
||||||
|
proc start*(self: TorServerStub, address: TransportAddress) {.async.} =
|
||||||
|
let ma = @[MultiAddress.init(address).tryGet()]
|
||||||
|
|
||||||
|
await self.tcpTransport.start(ma)
|
||||||
|
|
||||||
|
var msg = newSeq[byte](3)
|
||||||
|
while self.tcpTransport.running:
|
||||||
|
let connSrc = await self.tcpTransport.accept()
|
||||||
|
await connSrc.readExactly(addr msg[0], 3)
|
||||||
|
|
||||||
|
await connSrc.write(@[05'u8, 00])
|
||||||
|
|
||||||
|
msg = newSeq[byte](4)
|
||||||
|
await connSrc.readExactly(addr msg[0], 4)
|
||||||
|
let atyp = msg[3]
|
||||||
|
let address = case atyp:
|
||||||
|
of Socks5AddressType.IPv4.byte:
|
||||||
|
let n = 4 + 2 # +2 bytes for the port
|
||||||
|
msg = newSeq[byte](n)
|
||||||
|
await connSrc.readExactly(addr msg[0], n)
|
||||||
|
var ip: array[4, byte]
|
||||||
|
for i, e in msg[0..^3]:
|
||||||
|
ip[i] = e
|
||||||
|
$(ipv4(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2..^1])))
|
||||||
|
of Socks5AddressType.IPv6.byte:
|
||||||
|
let n = 16 + 2 # +2 bytes for the port
|
||||||
|
msg = newSeq[byte](n) # +2 bytes for the port
|
||||||
|
await connSrc.readExactly(addr msg[0], n)
|
||||||
|
var ip: array[16, byte]
|
||||||
|
for i, e in msg[0..^3]:
|
||||||
|
ip[i] = e
|
||||||
|
$(ipv6(ip)) & ":" & $(Port(fromBytesBE(uint16, msg[^2..^1])))
|
||||||
|
of Socks5AddressType.FQDN.byte:
|
||||||
|
await connSrc.readExactly(addr msg[0], 1)
|
||||||
|
let n = int(uint8.fromBytes(msg[0..0])) + 2 # +2 bytes for the port
|
||||||
|
msg = newSeq[byte](n)
|
||||||
|
await connSrc.readExactly(addr msg[0], n)
|
||||||
|
string.fromBytes(msg[0..^3]) & ":" & $(Port(fromBytesBE(uint16, msg[^2..^1])))
|
||||||
|
else:
|
||||||
|
raise newException(LPError, "Address not supported")
|
||||||
|
|
||||||
|
let tcpIpAddr = self.addrTable[$(address)]
|
||||||
|
|
||||||
|
await connSrc.write(@[05'u8, 00, 00, 01, 00, 00, 00, 00, 00, 00])
|
||||||
|
|
||||||
|
let connDst = await self.tcpTransport.dial("", MultiAddress.init(tcpIpAddr).tryGet())
|
||||||
|
|
||||||
|
await bridge(connSrc, connDst)
|
||||||
|
await allFutures(connSrc.close(), connDst.close())
|
||||||
|
|
||||||
|
|
||||||
|
proc stop*(self: TorServerStub) {.async.} =
|
||||||
|
await self.tcpTransport.stop()
|
|
@ -23,6 +23,7 @@ import testmultibase,
|
||||||
testrouting_record
|
testrouting_record
|
||||||
|
|
||||||
import testtcptransport,
|
import testtcptransport,
|
||||||
|
testtortransport,
|
||||||
testnameresolve,
|
testnameresolve,
|
||||||
testwstransport,
|
testwstransport,
|
||||||
testmultistream,
|
testmultistream,
|
||||||
|
|
|
@ -125,7 +125,8 @@ suite "TCP transport":
|
||||||
server.close()
|
server.close()
|
||||||
await server.join()
|
await server.join()
|
||||||
|
|
||||||
|
proc transProvider(): Transport = TcpTransport.new(upgrade = Upgrade())
|
||||||
|
|
||||||
commonTransportTest(
|
commonTransportTest(
|
||||||
"TcpTransport",
|
transProvider,
|
||||||
proc (): Transport = TcpTransport.new(upgrade = Upgrade()),
|
|
||||||
"/ip4/0.0.0.0/tcp/0")
|
"/ip4/0.0.0.0/tcp/0")
|
||||||
|
|
|
@ -0,0 +1,143 @@
|
||||||
|
{.used.}
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import tables
|
||||||
|
import chronos, stew/[byteutils]
|
||||||
|
import ../libp2p/[stream/connection,
|
||||||
|
transports/tcptransport,
|
||||||
|
transports/tortransport,
|
||||||
|
upgrademngrs/upgrade,
|
||||||
|
multiaddress,
|
||||||
|
builders]
|
||||||
|
|
||||||
|
import ./helpers, ./stubs, ./commontransport
|
||||||
|
|
||||||
|
const torServer = initTAddress("127.0.0.1", 9050.Port)
|
||||||
|
var stub: TorServerStub
|
||||||
|
var startFut: Future[void]
|
||||||
|
suite "Tor transport":
|
||||||
|
setup:
|
||||||
|
stub = TorServerStub.new()
|
||||||
|
stub.registerAddr("127.0.0.1:8080", "/ip4/127.0.0.1/tcp/8080")
|
||||||
|
stub.registerAddr("libp2p.nim:8080", "/ip4/127.0.0.1/tcp/8080")
|
||||||
|
stub.registerAddr("::1:8080", "/ip6/::1/tcp/8080")
|
||||||
|
stub.registerAddr("a2mncbqsbullu7thgm4e6zxda2xccmcgzmaq44oayhdtm6rav5vovcad.onion:80", "/ip4/127.0.0.1/tcp/8080")
|
||||||
|
stub.registerAddr("a2mncbqsbullu7thgm4e6zxda2xccmcgzmaq44oayhdtm6rav5vovcae.onion:81", "/ip4/127.0.0.1/tcp/8081")
|
||||||
|
startFut = stub.start(torServer)
|
||||||
|
teardown:
|
||||||
|
waitFor startFut.cancelAndWait()
|
||||||
|
waitFor stub.stop()
|
||||||
|
checkTrackers()
|
||||||
|
|
||||||
|
proc test(lintesAddr: string, dialAddr: string) {.async.} =
|
||||||
|
let server = TcpTransport.new({ReuseAddr}, Upgrade())
|
||||||
|
let ma2 = @[MultiAddress.init(lintesAddr).tryGet()]
|
||||||
|
await server.start(ma2)
|
||||||
|
|
||||||
|
proc runClient() {.async.} =
|
||||||
|
let client = TorTransport.new(transportAddress = torServer, upgrade = Upgrade())
|
||||||
|
let conn = await client.dial("", MultiAddress.init(dialAddr).tryGet())
|
||||||
|
|
||||||
|
await conn.write("client")
|
||||||
|
var resp: array[6, byte]
|
||||||
|
await conn.readExactly(addr resp, 6)
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
check string.fromBytes(resp) == "server"
|
||||||
|
await client.stop()
|
||||||
|
|
||||||
|
proc serverAcceptHandler() {.async, gcsafe.} =
|
||||||
|
let conn = await server.accept()
|
||||||
|
var resp: array[6, byte]
|
||||||
|
await conn.readExactly(addr resp, 6)
|
||||||
|
check string.fromBytes(resp) == "client"
|
||||||
|
|
||||||
|
await conn.write("server")
|
||||||
|
await conn.close()
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
asyncSpawn serverAcceptHandler()
|
||||||
|
await runClient()
|
||||||
|
|
||||||
|
asyncTest "test start and dial using ipv4":
|
||||||
|
await test("/ip4/127.0.0.1/tcp/8080", "/ip4/127.0.0.1/tcp/8080")
|
||||||
|
|
||||||
|
asyncTest "test start and dial using ipv6":
|
||||||
|
await test("/ip6/::1/tcp/8080", "/ip6/::1/tcp/8080")
|
||||||
|
|
||||||
|
asyncTest "test start and dial using dns":
|
||||||
|
await test("/ip4/127.0.0.1/tcp/8080", "/dns/libp2p.nim/tcp/8080")
|
||||||
|
|
||||||
|
asyncTest "test start and dial usion onion3 and builder":
|
||||||
|
const TestCodec = "/test/proto/1.0.0" # custom protocol string identifier
|
||||||
|
|
||||||
|
type
|
||||||
|
TestProto = ref object of LPProtocol # declare a custom protocol
|
||||||
|
|
||||||
|
proc new(T: typedesc[TestProto]): T =
|
||||||
|
|
||||||
|
# every incoming connections will be in handled in this closure
|
||||||
|
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||||
|
|
||||||
|
var resp: array[6, byte]
|
||||||
|
await conn.readExactly(addr resp, 6)
|
||||||
|
check string.fromBytes(resp) == "client"
|
||||||
|
await conn.write("server")
|
||||||
|
|
||||||
|
# We must close the connections ourselves when we're done with it
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
|
return T(codecs: @[TestCodec], handler: handle)
|
||||||
|
|
||||||
|
let rng = newRng()
|
||||||
|
|
||||||
|
let ma = MultiAddress.init("/ip4/127.0.0.1/tcp/8080/onion3/a2mncbqsbullu7thgm4e6zxda2xccmcgzmaq44oayhdtm6rav5vovcad:80").tryGet()
|
||||||
|
|
||||||
|
let serverSwitch = TorSwitch.new(torServer, rng, @[ma], {ReuseAddr})
|
||||||
|
|
||||||
|
# setup the custom proto
|
||||||
|
let testProto = TestProto.new()
|
||||||
|
|
||||||
|
serverSwitch.mount(testProto)
|
||||||
|
await serverSwitch.start()
|
||||||
|
|
||||||
|
let serverPeerId = serverSwitch.peerInfo.peerId
|
||||||
|
let serverAddress = serverSwitch.peerInfo.addrs
|
||||||
|
|
||||||
|
proc startClient() {.async.} =
|
||||||
|
let clientSwitch = TorSwitch.new(torServer = torServer, rng= rng, flags = {ReuseAddr})
|
||||||
|
|
||||||
|
let conn = await clientSwitch.dial(serverPeerId, serverAddress, TestCodec)
|
||||||
|
|
||||||
|
await conn.write("client")
|
||||||
|
|
||||||
|
var resp: array[6, byte]
|
||||||
|
await conn.readExactly(addr resp, 6)
|
||||||
|
check string.fromBytes(resp) == "server"
|
||||||
|
await conn.close()
|
||||||
|
await clientSwitch.stop()
|
||||||
|
|
||||||
|
await startClient()
|
||||||
|
|
||||||
|
await serverSwitch.stop()
|
||||||
|
|
||||||
|
test "It's not possible to add another transport in TorSwitch":
|
||||||
|
when (NimMajor, NimMinor, NimPatch) < (1, 4, 0):
|
||||||
|
type AssertionDefect = AssertionError
|
||||||
|
|
||||||
|
let torSwitch = TorSwitch.new(torServer = torServer, rng= rng, flags = {ReuseAddr})
|
||||||
|
expect(AssertionDefect):
|
||||||
|
torSwitch.addTransport(TcpTransport.new(upgrade = Upgrade()))
|
||||||
|
waitFor torSwitch.stop()
|
||||||
|
|
||||||
|
proc transProvider(): Transport =
|
||||||
|
TorTransport.new(torServer, {ReuseAddr}, Upgrade())
|
||||||
|
|
||||||
|
commonTransportTest(
|
||||||
|
transProvider,
|
||||||
|
"/ip4/127.0.0.1/tcp/8080/onion3/a2mncbqsbullu7thgm4e6zxda2xccmcgzmaq44oayhdtm6rav5vovcad:80",
|
||||||
|
"/ip4/127.0.0.1/tcp/8081/onion3/a2mncbqsbullu7thgm4e6zxda2xccmcgzmaq44oayhdtm6rav5vovcae:81")
|
|
@ -55,14 +55,13 @@ suite "WebSocket transport":
|
||||||
teardown:
|
teardown:
|
||||||
checkTrackers()
|
checkTrackers()
|
||||||
|
|
||||||
commonTransportTest(
|
proc wsTraspProvider(): Transport = WsTransport.new(Upgrade())
|
||||||
"WebSocket",
|
|
||||||
proc (): Transport = WsTransport.new(Upgrade()),
|
|
||||||
"/ip4/0.0.0.0/tcp/0/ws")
|
|
||||||
|
|
||||||
commonTransportTest(
|
commonTransportTest(
|
||||||
"WebSocket Secure",
|
wsTraspProvider,
|
||||||
(proc (): Transport {.gcsafe.} =
|
"/ip4/0.0.0.0/tcp/0/ws")
|
||||||
|
|
||||||
|
proc wsSecureTranspProvider(): Transport {.gcsafe.} =
|
||||||
try:
|
try:
|
||||||
return WsTransport.new(
|
return WsTransport.new(
|
||||||
Upgrade(),
|
Upgrade(),
|
||||||
|
@ -70,7 +69,9 @@ suite "WebSocket transport":
|
||||||
TLSCertificate.init(SecureCert),
|
TLSCertificate.init(SecureCert),
|
||||||
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName})
|
{TLSFlags.NoVerifyHost, TLSFlags.NoVerifyServerName})
|
||||||
except Exception: check(false)
|
except Exception: check(false)
|
||||||
),
|
|
||||||
|
commonTransportTest(
|
||||||
|
wsSecureTranspProvider,
|
||||||
"/ip4/0.0.0.0/tcp/0/wss")
|
"/ip4/0.0.0.0/tcp/0/wss")
|
||||||
|
|
||||||
asyncTest "Hostname verification":
|
asyncTest "Hostname verification":
|
||||||
|
|
Loading…
Reference in New Issue