Various transports improvement (#594)

* little transport cleanup

* rename TcpTransport.init -> TcpTransport.new

* moved transport e2e to common file

* remove localAddress

* rename testtransport -> testtcptransport

* add checktrackers to commontransports

* removed multicodec from transports
This commit is contained in:
Tanguy Cizain 2021-06-30 10:59:30 +02:00 committed by GitHub
parent 49f137049f
commit 26e47d7da5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 175 additions and 185 deletions

View File

@ -169,7 +169,7 @@ proc build*(b: SwitchBuilder): Switch
transports = block: transports = block:
var transports: seq[Transport] var transports: seq[Transport]
if b.tcpTransportOpts.enable: if b.tcpTransportOpts.enable:
transports.add(Transport(TcpTransport.init(b.tcpTransportOpts.flags, muxedUpgrade))) transports.add(Transport(TcpTransport.new(b.tcpTransportOpts.flags, muxedUpgrade)))
transports transports
if b.secureManagers.len == 0: if b.secureManagers.len == 0:

View File

@ -114,21 +114,25 @@ proc connHandler*(self: TcpTransport,
return conn return conn
func init*( proc init*(
T: type TcpTransport, T: typedesc[TcpTransport],
flags: set[ServerFlags] = {},
upgrade: Upgrade): T {.deprecated: "use .new".} =
T.new(flags, upgrade)
proc new*(
T: typedesc[TcpTransport],
flags: set[ServerFlags] = {}, flags: set[ServerFlags] = {},
upgrade: Upgrade): T = upgrade: Upgrade): T =
result = T( let transport = T(
flags: flags, flags: flags,
upgrader: upgrade upgrader: upgrade
) )
result.initTransport()
method initTransport*(self: TcpTransport) =
self.multicodec = multiCodec("tcp")
inc getTcpTransportTracker().opened inc getTcpTransportTracker().opened
return transport
method start*( method start*(
self: TcpTransport, self: TcpTransport,
@ -150,7 +154,6 @@ method start*(
# always get the resolved address in case we're bound to 0.0.0.0:0 # always get the resolved address in case we're bound to 0.0.0.0:0
self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet() self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet()
self.running = true
trace "Listening on", address = self.ma trace "Listening on", address = self.ma
@ -158,8 +161,6 @@ method stop*(self: TcpTransport) {.async, gcsafe.} =
## stop the transport ## stop the transport
## ##
self.running = false # mark stopped as soon as possible
try: try:
trace "Stopping TCP transport" trace "Stopping TCP transport"
await procCall Transport(self).stop() # call base await procCall Transport(self).stop() # call base
@ -179,24 +180,6 @@ method stop*(self: TcpTransport) {.async, gcsafe.} =
except CatchableError as exc: except CatchableError as exc:
trace "Error shutting down tcp transport", exc = exc.msg trace "Error shutting down tcp transport", exc = exc.msg
method upgradeIncoming*(
self: TcpTransport,
conn: Connection): Future[void] {.gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
self.upgrader.upgradeIncoming(conn)
method upgradeOutgoing*(
self: TcpTransport,
conn: Connection): Future[Connection] {.gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
self.upgrader.upgradeOutgoing(conn)
method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
## accept a new TCP connection ## accept a new TCP connection
## ##

View File

@ -28,40 +28,35 @@ type
ma*: Multiaddress ma*: Multiaddress
running*: bool running*: bool
upgrader*: Upgrade upgrader*: Upgrade
multicodec*: MultiCodec
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
newException(TransportClosedError, newException(TransportClosedError,
"Transport closed, no more connections!", parent) "Transport closed, no more connections!", parent)
method initTransport*(self: Transport) {.base, gcsafe, locks: "unknown".} =
## perform protocol initialization
##
discard
method start*( method start*(
self: Transport, self: Transport,
ma: MultiAddress): Future[void] {.base, async.} = ma: MultiAddress): Future[void] {.base, async.} =
## start the transport ## start the transport
## ##
self.ma = ma
trace "starting transport", address = $ma trace "starting transport", address = $ma
self.ma = ma
self.running = true
method stop*(self: Transport): Future[void] {.base, async.} = method stop*(self: Transport): Future[void] {.base, async.} =
## stop and cleanup the transport ## stop and cleanup the transport
## including all outstanding connections ## including all outstanding connections
## ##
discard trace "stopping transport", address = $self.ma
self.running = false
method accept*(self: Transport): Future[Connection] method accept*(self: Transport): Future[Connection]
{.base, gcsafe.} = {.base, gcsafe.} =
## accept incoming connections ## accept incoming connections
## ##
discard doAssert(false, "Not implemented!")
method dial*( method dial*(
self: Transport, self: Transport,
@ -69,7 +64,7 @@ method dial*(
## dial a peer ## dial a peer
## ##
discard doAssert(false, "Not implemented!")
method upgradeIncoming*( method upgradeIncoming*(
self: Transport, self: Transport,
@ -78,7 +73,7 @@ method upgradeIncoming*(
## transport specific upgrades ## transport specific upgrades
## ##
doAssert(false, "Not implemented!") self.upgrader.upgradeIncoming(conn)
method upgradeOutgoing*( method upgradeOutgoing*(
self: Transport, self: Transport,
@ -87,7 +82,7 @@ method upgradeOutgoing*(
## transport specific upgrades ## transport specific upgrades
## ##
doAssert(false, "Not implemented!") self.upgrader.upgradeOutgoing(conn)
method handles*( method handles*(
self: Transport, self: Transport,
@ -99,9 +94,3 @@ method handles*(
# having to repeat the check in every transport # having to repeat the check in every transport
if address.protocols.isOk: if address.protocols.isOk:
return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0 return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0
method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} =
## get the local address of the transport in case started with 0.0.0.0:0
##
discard

93
tests/commontransport.nim Normal file
View File

@ -0,0 +1,93 @@
{.used.}
import sequtils
import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
upgrademngrs/upgrade,
multiaddress,
errors,
wire]
import ./helpers
proc commonTransportTest*(transportType: typedesc[Transport], ma: string) =
suite $transportType & " common":
teardown:
checkTrackers()
asyncTest "e2e: handle write":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
await conn.write("Hello!")
await conn.close()
let handlerWait = acceptHandler()
let transport2: transportType = transportType.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
await conn.close() #for some protocols, closing requires actively, so we must close here
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await transport2.stop()
await transport1.stop()
check string.fromBytes(msg) == "Hello!"
asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
asyncSpawn transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
check string.fromBytes(msg) == "Hello!"
await conn.close()
let handlerWait = acceptHandler()
let transport2: transportType = transportType.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
await conn.write("Hello!")
await conn.close() #for some protocols, closing requires actively, so we must close here
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await transport2.stop()
await transport1.stop()
asyncTest "e2e: handle dial cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
await transport1.start(ma)
let transport2: transportType = transportType.new(upgrade = Upgrade())
let cancellation = transport2.dial(transport1.ma)
await cancellation.cancelAndWait()
check cancellation.cancelled
await transport2.stop()
await transport1.stop()
asyncTest "e2e: handle accept cancellation":
let ma: MultiAddress = Multiaddress.init(ma).tryGet()
let transport1: transportType = transportType.new(upgrade = Upgrade())
await transport1.start(ma)
let acceptHandler = transport1.accept()
await acceptHandler.cancelAndWait()
check acceptHandler.cancelled
await transport1.stop()

View File

@ -41,8 +41,8 @@ suite "Identify":
remotePeerInfo = PeerInfo.init( remotePeerInfo = PeerInfo.init(
remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"])
transport1 = TcpTransport.init(upgrade = Upgrade()) transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.init(upgrade = Upgrade()) transport2 = TcpTransport.new(upgrade = Upgrade())
identifyProto1 = Identify.new(remotePeerInfo) identifyProto1 = Identify.new(remotePeerInfo)
identifyProto2 = Identify.new(remotePeerInfo) identifyProto2 = Identify.new(remotePeerInfo)

View File

@ -380,7 +380,7 @@ suite "Mplex":
asyncTest "read/write receiver": asyncTest "read/write receiver":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -396,7 +396,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -417,7 +417,7 @@ suite "Mplex":
asyncTest "read/write receiver lazy": asyncTest "read/write receiver lazy":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -433,7 +433,7 @@ suite "Mplex":
await mplexListen.close() await mplexListen.close()
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -461,7 +461,7 @@ suite "Mplex":
for _ in 0..<MaxMsgSize: for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z')))) bigseq.add(uint8(rand(uint('A')..uint('z'))))
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -485,7 +485,7 @@ suite "Mplex":
check false check false
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -508,7 +508,7 @@ suite "Mplex":
asyncTest "read/write initiator": asyncTest "read/write initiator":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -522,7 +522,7 @@ suite "Mplex":
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
@ -544,7 +544,7 @@ suite "Mplex":
asyncTest "multiple streams": asyncTest "multiple streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
let done = newFuture[void]() let done = newFuture[void]()
@ -564,7 +564,7 @@ suite "Mplex":
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
@ -588,7 +588,7 @@ suite "Mplex":
asyncTest "multiple read/write streams": asyncTest "multiple read/write streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
let done = newFuture[void]() let done = newFuture[void]()
@ -609,7 +609,7 @@ suite "Mplex":
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
@ -635,7 +635,7 @@ suite "Mplex":
asyncTest "channel closes listener with EOF": asyncTest "channel closes listener with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept() let conn = await transport1.accept()
@ -657,7 +657,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -682,7 +682,7 @@ suite "Mplex":
asyncTest "channel closes dialer with EOF": asyncTest "channel closes dialer with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var count = 0 var count = 0
var done = newFuture[void]() var done = newFuture[void]()
@ -705,7 +705,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -747,7 +747,7 @@ suite "Mplex":
asyncTest "dialing mplex closes both ends": asyncTest "dialing mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -764,7 +764,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -789,7 +789,7 @@ suite "Mplex":
asyncTest "listening mplex closes both ends": asyncTest "listening mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexListen: Mplex var mplexListen: Mplex
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -807,7 +807,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -832,7 +832,7 @@ suite "Mplex":
asyncTest "canceling mplex handler closes both ends": asyncTest "canceling mplex handler closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var mplexHandle: Future[void] var mplexHandle: Future[void]
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -851,7 +851,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -875,7 +875,7 @@ suite "Mplex":
asyncTest "closing dialing connection should close both ends": asyncTest "closing dialing connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -892,7 +892,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -917,7 +917,7 @@ suite "Mplex":
asyncTest "canceling listening connection should close both ends": asyncTest "canceling listening connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
var listenConn: Connection var listenConn: Connection
var listenStreams: seq[Connection] var listenStreams: seq[Connection]
@ -935,7 +935,7 @@ suite "Mplex":
await transport1.start(ma) await transport1.start(ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn) let mplexDial = Mplex.init(conn)
@ -963,7 +963,7 @@ suite "Mplex":
asyncTest "channel should be able to handle erratic read/writes": asyncTest "channel should be able to handle erratic read/writes":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
var complete = newFuture[void]() var complete = newFuture[void]()
@ -984,7 +984,7 @@ suite "Mplex":
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
@ -1035,7 +1035,7 @@ suite "Mplex":
asyncTest "channel should handle 1 byte read/write": asyncTest "channel should handle 1 byte read/write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
var complete = newFuture[void]() var complete = newFuture[void]()
@ -1053,7 +1053,7 @@ suite "Mplex":
await mplexListen.handle() await mplexListen.handle()
await mplexListen.close() await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler() let acceptFut = acceptHandler()

View File

@ -248,7 +248,7 @@ suite "Multistream select":
let msListen = MultistreamSelect.new() let msListen = MultistreamSelect.new()
msListen.addHandler("/test/proto/1.0.0", protocol) msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1 = TcpTransport.init(upgrade = Upgrade()) let transport1 = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(ma) asyncSpawn transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -259,7 +259,7 @@ suite "Multistream select":
let handlerWait = acceptHandler() let handlerWait = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2 = TcpTransport.init(upgrade = Upgrade()) let transport2 = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, "/test/proto/1.0.0")) == true check (await msDial.select(conn, "/test/proto/1.0.0")) == true
@ -295,7 +295,7 @@ suite "Multistream select":
msListen.addHandler("/test/proto1/1.0.0", protocol) msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol) msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let listenFut = transport1.start(ma) let listenFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -311,7 +311,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
let ls = await msDial.list(conn) let ls = await msDial.list(conn)
@ -340,7 +340,7 @@ suite "Multistream select":
let msListen = MultistreamSelect.new() let msListen = MultistreamSelect.new()
msListen.addHandler("/test/proto/1.0.0", protocol) msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(ma) asyncSpawn transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -349,7 +349,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, check (await msDial.select(conn,
@ -378,7 +378,7 @@ suite "Multistream select":
msListen.addHandler("/test/proto1/1.0.0", protocol) msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol) msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(ma) asyncSpawn transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} = proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -387,7 +387,7 @@ suite "Multistream select":
let acceptFut = acceptHandler() let acceptFut = acceptHandler()
let msDial = MultistreamSelect.new() let msDial = MultistreamSelect.new()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma) let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, check (await msDial.select(conn,

View File

@ -16,7 +16,7 @@ import testmultibase,
testcid, testcid,
testpeerid testpeerid
import testtransport, import testtcptransport,
testmultistream, testmultistream,
testbufferstream, testbufferstream,
testidentify, testidentify,

View File

@ -68,7 +68,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
connManager = ConnManager.init() connManager = ConnManager.init()
ms = MultistreamSelect.new() ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms) muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms)
transports = @[Transport(TcpTransport.init(upgrade = muxedUpgrade))] transports = @[Transport(TcpTransport.new(upgrade = muxedUpgrade))]
let switch = newSwitch( let switch = newSwitch(
peerInfo, peerInfo,
@ -90,7 +90,7 @@ suite "Noise":
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server]) serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(server) asyncSpawn transport1.start(server)
proc acceptHandler() {.async.} = proc acceptHandler() {.async.} =
@ -104,7 +104,7 @@ suite "Noise":
let let
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.ma)
@ -128,7 +128,7 @@ suite "Noise":
serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false)
let let
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(server) asyncSpawn transport1.start(server)
@ -144,7 +144,7 @@ suite "Noise":
let let
handlerWait = acceptHandler() handlerWait = acceptHandler()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8]) clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8])
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.ma)
@ -164,7 +164,7 @@ suite "Noise":
serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false) serverNoise = Noise.new(rng, serverInfo.privateKey, outgoing = false)
readTask = newFuture[void]() readTask = newFuture[void]()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport1.start(server) asyncSpawn transport1.start(server)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -180,7 +180,7 @@ suite "Noise":
let let
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.ma)
@ -205,7 +205,7 @@ suite "Noise":
trace "Sending huge payload", size = hugePayload.len trace "Sending huge payload", size = hugePayload.len
let let
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
listenFut = transport1.start(server) listenFut = transport1.start(server)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -219,7 +219,7 @@ suite "Noise":
let let
acceptFut = acceptHandler() acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma]) clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true) clientNoise = Noise.new(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma) conn = await transport2.dial(transport1.ma)

View File

@ -37,8 +37,8 @@ suite "Ping":
asyncSetup: asyncSetup:
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
transport1 = TcpTransport.init(upgrade = Upgrade()) transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.init(upgrade = Upgrade()) transport2 = TcpTransport.new(upgrade = Upgrade())
proc handlePing(peer: PeerInfo) {.async, gcsafe, closure.} = proc handlePing(peer: PeerInfo) {.async, gcsafe, closure.} =
inc pingReceivedCount inc pingReceivedCount
@ -76,8 +76,8 @@ suite "Ping":
proc testPing(): Future[void] {.async.} = proc testPing(): Future[void] {.async.} =
let baseMa = Multiaddress.init("/ip4/127.0.0.1/tcp/0").tryGet() let baseMa = Multiaddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let transportdialer: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transportdialer: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport.start(baseMa) asyncSpawn transport.start(baseMa)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =

View File

@ -620,7 +620,7 @@ suite "Switch":
asyncTest "e2e canceling dial should not leak": asyncTest "e2e canceling dial should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport = TcpTransport.init(upgrade = Upgrade()) let transport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma) await transport.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -657,7 +657,7 @@ suite "Switch":
asyncTest "e2e closing remote conn should not leak": asyncTest "e2e closing remote conn should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport = TcpTransport.init(upgrade = Upgrade()) let transport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma) await transport.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =

View File

@ -10,7 +10,7 @@ import ../libp2p/[stream/connection,
errors, errors,
wire] wire]
import ./helpers import ./helpers, ./commontransport
suite "TCP transport": suite "TCP transport":
teardown: teardown:
@ -18,7 +18,7 @@ suite "TCP transport":
asyncTest "test listener: handle write": asyncTest "test listener: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport.start(ma) asyncSpawn transport.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -40,7 +40,7 @@ suite "TCP transport":
asyncTest "test listener: handle read": asyncTest "test listener: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
asyncSpawn transport.start(ma) asyncSpawn transport.start(ma)
proc acceptHandler() {.async, gcsafe.} = proc acceptHandler() {.async, gcsafe.} =
@ -78,7 +78,7 @@ suite "TCP transport":
server.start() server.start()
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet() let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport.dial(ma) let conn = await transport.dial(ma)
var msg = newSeq[byte](6) var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6) await conn.readExactly(addr msg[0], 6)
@ -112,7 +112,7 @@ suite "TCP transport":
server.start() server.start()
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet() let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
let conn = await transport.dial(ma) let conn = await transport.dial(ma)
await conn.write("Hello!") await conn.write("Hello!")
@ -125,79 +125,4 @@ suite "TCP transport":
server.close() server.close()
await server.join() await server.join()
asyncTest "e2e: handle write": TcpTransport.commonTransportTest("/ip4/0.0.0.0/tcp/0")
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
await conn.write("Hello!")
await conn.close()
let handlerWait = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await conn.close()
await transport2.stop()
await transport1.stop()
check string.fromBytes(msg) == "Hello!"
asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncSpawn transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
check string.fromBytes(msg) == "Hello!"
await conn.close()
let handlerWait = acceptHandler()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
await conn.write("Hello!")
await handlerWait.wait(1.seconds) # when no issues will not wait that long!
await conn.close()
await transport2.stop()
await transport1.stop()
asyncTest "e2e: handle dial cancellation":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let cancellation = transport2.dial(transport1.ma)
await cancellation.cancelAndWait()
check cancellation.cancelled
await transport2.stop()
await transport1.stop()
asyncTest "e2e: handle accept cancellation":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
let acceptHandler = transport1.accept()
await acceptHandler.cancelAndWait()
check acceptHandler.cancelled
await transport1.stop()