This commit is contained in:
Dmitriy Ryajov 2021-03-20 10:47:09 -06:00
parent 2c3613577b
commit 65d1fbf6c2
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
8 changed files with 169 additions and 42 deletions

View File

@ -262,7 +262,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} =
try:
trace "Triggering connect events", conn
conn.upgrade()
conn.upgradeComplete()
let peerId = conn.peerInfo.peerId
await c.triggerPeerEvents(

View File

@ -578,8 +578,10 @@ method init*(p: Noise) {.gcsafe.} =
p.codec = NoiseCodec
proc newNoise*(
rng: ref BrHmacDrbgContext, privateKey: PrivateKey;
outgoing: bool = true; commonPrologue: seq[byte] = @[]): Noise =
rng: ref BrHmacDrbgContext,
privateKey: PrivateKey,
outgoing: bool = true,
commonPrologue: seq[byte] = @[]): Noise =
result = Noise(
rng: rng,
outgoing: outgoing,

View File

@ -178,7 +178,7 @@ method closeImpl*(s: BufferStream): Future[void] =
#
# - If a push was in progress but no reader is
# attached we need to pop the queue
# - If a read was in progress without without a
# - If a read was in progress without a
# push/data we need to push the Eof marker to
# notify the reader that the channel closed
#

View File

@ -42,14 +42,14 @@ proc isUpgraded*(s: Connection): bool =
if not isNil(s.upgraded):
return s.upgraded.finished
proc upgrade*(s: Connection, failed: ref Exception = nil) =
proc upgradeComplete*(s: Connection) =
if not isNil(s.upgraded):
if not isNil(failed):
s.upgraded.fail(failed)
return
s.upgraded.complete()
proc upgradeFail*(s: Connection, failed: ref Exception) =
if not isNil(s.upgraded) and not isNil(failed):
s.upgraded.fail(failed)
proc onUpgrade*(s: Connection) {.async.} =
if not isNil(s.upgraded):
await s.upgraded

View File

@ -103,47 +103,67 @@ method upgradeOutgoing*(
return sconn
proc securedMuxableConnection*(
self: MuxedUpgrade,
conn: Connection,
proto: string,
ms: MultistreamSelect) {.async, gcsafe.} =
## Secure and handle a muxable incoming
## connection.
##
## This means that the incoming
## connection should be "securable" and
## "muxable".
##
## This is also closely related
## to transports, some could potentially
## not require neither securing nor muxing,
## some might require either one of this.
##
trace "Starting secure handler", conn
let secure = self.secureManagers.filterIt(it.codec == proto)[0]
var cconn = conn
try:
var sconn = await secure.secure(cconn, false)
if isNil(sconn):
return
cconn = sconn
# add the muxer
for muxer in self.muxers.values:
ms.addHandler(muxer.codecs, muxer)
# handle subsequent secure requests
await ms.handle(cconn)
except CatchableError as exc:
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
if not cconn.isUpgraded:
cconn.upgradeFail(exc)
finally:
if not isNil(cconn):
await cconn.close()
trace "Stopped secure handler", conn
method upgradeIncoming*(
self: MuxedUpgrade,
incomingConn: Connection): Future[void] {.async, gcsafe.} = # noraises
## This is the upgrade flow to handle muxed
## connections
##
trace "Upgrading incoming connection", incomingConn
let ms = newMultistream()
# secure incoming connections
proc securedHandler(conn: Connection,
proto: string)
{.async, gcsafe, closure.} =
trace "Starting secure handler", conn
let secure = self.secureManagers.filterIt(it.codec == proto)[0]
var cconn = conn
try:
var sconn = await secure.secure(cconn, false)
if isNil(sconn):
return
cconn = sconn
# add the muxer
for muxer in self.muxers.values:
ms.addHandler(muxer.codecs, muxer)
# handle subsequent secure requests
await ms.handle(cconn)
except CatchableError as exc:
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
if not cconn.isUpgraded:
cconn.upgrade(exc)
finally:
if not isNil(cconn):
await cconn.close()
trace "Stopped secure handler", conn
try:
if (await ms.select(incomingConn)): # just handshake
# add the secure handlers
for k in self.secureManagers:
ms.addHandler(k.codec, securedHandler)
ms.addHandler(
k.codec,
proc(conn: Connection, proto: string): Future[void] =
self.securedMuxableConnection(conn, proto, ms)
)
# handle un-secured connections
# we handshaked above, set this ms handler as active
@ -151,7 +171,7 @@ method upgradeIncoming*(
except CatchableError as exc:
debug "Exception upgrading incoming", exc = exc.msg
if not incomingConn.isUpgraded:
incomingConn.upgrade(exc)
incomingConn.upgradeFail(exc)
finally:
if not isNil(incomingConn):
await incomingConn.close()

View File

@ -1 +1,2 @@
--path:".."
--threads:on

View File

@ -112,7 +112,7 @@ suite "Mplex":
var data = newSeq[byte](6)
await chann.close() # closing channel
# should be able to read on local clsoe
# should be able to read on local close
await chann.readExactly(addr data[0], 3)
# closing remote end
let closeFut = chann.pushEof()

104
tests/testupgrade.nim Normal file
View File

@ -0,0 +1,104 @@
import unittest, tables
import chronos
import ../libp2p
import ../libp2p/upgrademngrs/muxedupgrade
import ./helpers
proc createMuxedManager(
seckey = PrivateKey.random(rng[]).tryGet(),
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()): Upgrade =
proc createMplex(conn: Connection): Muxer =
Mplex.init(conn)
let
peerInfo = PeerInfo.init(seckey, [ma])
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
return MuxedUpgrade.init(
newIdentify(peerInfo),
{MplexCodec: mplexProvider}.toTable,
[newNoise(rng, seckey).Secure],
ConnManager.init(),
newMultistream())
suite "Test Upgrade Managers":
asyncTest "Test Identify flow":
let
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
seckey = PrivateKey.random(rng[]).tryGet()
peerInfo = PeerInfo.init(seckey, [ma])
upgrade = createMuxedManager()
transport1 = TcpTransport.init(upgrade = Upgrade())
transport2 = TcpTransport.init(upgrade = upgrade)
identifyProto = newIdentify(peerInfo)
msListen = newMultistream()
peerInfo.agentVersion = AgentVersion
peerInfo.protoVersion = ProtoVersion
msListen.addHandler(IdentifyCodec, identifyProto)
let serverFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept()
await msListen.handle(c)
let acceptFut = acceptHandler()
let conn = await transport2.dial(transport1.ma)
check isNil(conn.peerInfo)
await upgrade.identify(conn)
check:
not isNil(conn.peerInfo)
conn.peerInfo.peerId == peerInfo.peerId
conn.peerInfo.addrs == peerInfo.addrs
conn.peerInfo.agentVersion == peerInfo.agentVersion
conn.peerInfo.protoVersion == peerInfo.protoVersion
conn.peerInfo.protocols == peerInfo.protocols
await allFuturesThrowing(
transport1.stop(),
transport2.stop(),
acceptFut,
serverFut
)
asyncTest "Test Secure flow":
let
ma = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
seckey = PrivateKey.random(rng[]).tryGet()
peerInfo = PeerInfo.init(seckey, [ma])
upgrade = createMuxedManager()
transport1 = TcpTransport.init(upgrade = Upgrade())
transport2 = TcpTransport.init(upgrade = upgrade)
secure = newNoise(rng, seckey).Secure
msListen = newMultistream()
peerInfo.agentVersion = AgentVersion
peerInfo.protoVersion = ProtoVersion
msListen.addHandler(NoiseCodec, secure)
let serverFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let c = await transport1.accept()
await msListen.handle(c)
let acceptFut = acceptHandler()
let conn = await transport2.dial(transport1.ma)
check isNil(conn.peerInfo)
let sconn = await upgrade.secure(conn)
check:
not isNil(sconn)
sconn.transportDir == Direction.Out
await allFuturesThrowing(
transport1.stop(),
transport2.stop(),
acceptFut,
serverFut
)