mirror of
synced 2025-03-01 08:30:29 +00:00
Upgrade flow refactoring (#807)
This commit is contained in:
@ -12,6 +12,7 @@ switch("warning", "LockLevel:off")
if (NimMajor, NimMinor) < (1, 6):
switch("warningAsError", "UseBase:on")
# Avoid some rare stack corruption while using exceptions with a SEH-enabled
@ -230,7 +230,7 @@ proc build*(b: SwitchBuilder): Switch
identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.new(identify, b.muxers, secureManagerInstances, connManager, ms)
muxedUpgrade = MuxedUpgrade.new(b.muxers, secureManagerInstances, connManager, ms)
transports = block:
@ -247,14 +247,13 @@ proc build*(b: SwitchBuilder): Switch
let peerStore =
if isSome(b.peerStoreCapacity):
PeerStore.new(identify, b.peerStoreCapacity.get())
let switch = newSwitch(
peerInfo = peerInfo,
transports = transports,
identity = identify,
secureManagers = secureManagerInstances,
connManager = connManager,
ms = ms,
@ -262,6 +261,8 @@ proc build*(b: SwitchBuilder): Switch
peerStore = peerStore,
services = b.services)
if b.autonat:
let autonat = Autonat.new(switch)
@ -55,7 +55,6 @@ type
PeerEventKind* {.pure.} = enum
PeerEvent* = object
@ -68,19 +67,14 @@ type
PeerEventHandler* =
proc(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe, raises: [Defect].}
MuxerHolder = object
muxer: Muxer
handle: Future[void]
ConnManager* = ref object of RootObj
maxConnsPerPeer: int
inSema*: AsyncSemaphore
outSema*: AsyncSemaphore
conns: Table[PeerId, HashSet[Connection]]
muxed: Table[Connection, MuxerHolder]
muxed: Table[PeerId, seq[Muxer]]
connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]]
expectedConnectionsOverLimit*: Table[(PeerId, Direction), Future[Connection]]
expectedConnectionsOverLimit*: Table[(PeerId, Direction), Future[Muxer]]
peerStore*: PeerStore
ConnectionSlot* = object
@ -110,12 +104,12 @@ proc new*(C: type ConnManager,
outSema: outSema)
proc connCount*(c: ConnManager, peerId: PeerId): int =
proc connectedPeers*(c: ConnManager, dir: Direction): seq[PeerId] =
var peers = newSeq[PeerId]()
for peerId, conns in c.conns:
if conns.anyIt(it.dir == dir):
for peerId, mux in c.muxed:
if mux.anyIt(it.connection.dir == dir):
return peers
@ -202,14 +196,6 @@ proc triggerPeerEvents*(c: ConnManager,
let count = c.connCount(peerId)
if event.kind == PeerEventKind.Joined and count != 1:
trace "peer already joined", peer = peerId, event = $event
elif event.kind == PeerEventKind.Left and count != 0:
trace "peer still connected or already left", peer = peerId, event = $event
trace "triggering peer events", peer = peerId, event = $event
var peerEvents: seq[Future[void]]
@ -222,13 +208,13 @@ proc triggerPeerEvents*(c: ConnManager,
except CatchableError as exc: # handlers should not raise!
warn "Exception in triggerPeerEvents", exc = exc.msg, peer = peerId
proc expectConnection*(c: ConnManager, p: PeerId, dir: Direction): Future[Connection] {.async.} =
proc expectConnection*(c: ConnManager, p: PeerId, dir: Direction): Future[Muxer] {.async.} =
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
let key = (p, dir)
if key in c.expectedConnectionsOverLimit:
raise newException(AlreadyExpectingConnectionError, "Already expecting an incoming connection from that peer")
let future = newFuture[Connection]()
let future = newFuture[Muxer]()
c.expectedConnectionsOverLimit[key] = future
@ -236,18 +222,8 @@ proc expectConnection*(c: ConnManager, p: PeerId, dir: Direction): Future[Connec
proc contains*(c: ConnManager, conn: Connection): bool =
## checks if a connection is being tracked by the
## connection manager
if isNil(conn):
return conn in c.conns.getOrDefault(conn.peerId)
proc contains*(c: ConnManager, peerId: PeerId): bool =
peerId in c.conns
peerId in c.muxed
proc contains*(c: ConnManager, muxer: Muxer): bool =
## checks if a muxer is being tracked by the connection
@ -255,185 +231,134 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
if isNil(muxer):
return false
let conn = muxer.connection
if conn notin c:
return muxer in c.muxed.getOrDefault(conn.peerId)
if conn notin c.muxed:
proc closeMuxer(muxer: Muxer) {.async.} =
trace "Cleaning up muxer", m = muxer
return muxer == c.muxed.getOrDefault(conn).muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "Cleaning up muxer", m = muxerHolder.muxer
await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)):
await muxer.close()
if not(isNil(muxer.handler)):
await muxerHolder.handle # TODO noraises?
await muxer.handler # TODO noraises?
except CatchableError as exc:
trace "Exception in close muxer handler", exc = exc.msg
trace "Cleaned up muxer", m = muxerHolder.muxer
proc delConn(c: ConnManager, conn: Connection) =
let peerId = conn.peerId
c.conns.withValue(peerId, peerConns):
if peerConns[].len == 0:
c.conns.del(peerId) # invalidates `peerConns`
trace "Removed connection", conn
proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
## clean connection's resources such as muxers and streams
if isNil(conn):
trace "Wont cleanup a nil connection"
# Remove connection from all tables without async breaks
var muxer = some(MuxerHolder())
if not c.muxed.pop(conn, muxer.get()):
muxer = none(MuxerHolder)
delConn(c, conn)
trace "Cleaned up muxer", m = muxer
proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
if muxer.isSome:
await closeMuxerHolder(muxer.get())
await conn.close()
trace "Triggering disconnect events", mux
let peerId = mux.connection.peerId
trace "Connection cleaned up", conn
let muxers = c.muxed.getOrDefault(peerId).filterIt(it != mux)
if muxers.len > 0:
c.muxed[peerId] = muxers
await c.triggerPeerEvents(peerId, PeerEvent(kind: PeerEventKind.Left))
proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} =
trace "Triggering connect events", conn
if not(c.peerStore.isNil):
let peerId = conn.peerId
await c.triggerPeerEvents(
peerId, PeerEvent(kind: PeerEventKind.Joined, initiator: conn.dir == Direction.Out))
await c.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: conn.dir == Direction.In))
except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception in switch peer connection cleanup",
conn, msg = exc.msg
proc peerCleanup(c: ConnManager, conn: Connection) {.async.} =
trace "Triggering disconnect events", conn
let peerId = conn.peerId
await c.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
await c.triggerPeerEvents(peerId, PeerEvent(kind: PeerEventKind.Left))
if not(c.peerStore.isNil):
except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception peer cleanup handler",
conn, msg = exc.msg
mux, msg = exc.msg
proc onClose(c: ConnManager, conn: Connection) {.async.} =
proc onClose(c: ConnManager, mux: Muxer) {.async.} =
## connection close even handler
## triggers the connections resource cleanup
await conn.join()
trace "Connection closed, cleaning up", conn
await c.cleanupConn(conn)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propagate CancelledError.
debug "Unexpected cancellation in connection manager's cleanup", conn
await mux.connection.join()
trace "Connection closed, cleaning up", mux
except CatchableError as exc:
debug "Unexpected exception in connection manager's cleanup",
errMsg = exc.msg, conn
errMsg = exc.msg, mux
trace "Triggering peerCleanup", conn
asyncSpawn c.peerCleanup(conn)
await c.muxCleanup(mux)
proc selectConn*(c: ConnManager,
proc selectMuxer*(c: ConnManager,
peerId: PeerId,
dir: Direction): Connection =
dir: Direction): Muxer =
## Select a connection for the provided peer and direction
let conns = toSeq(
.filterIt( it.dir == dir )
.filterIt( it.connection.dir == dir )
if conns.len > 0:
return conns[0]
proc selectConn*(c: ConnManager, peerId: PeerId): Connection =
proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer =
## Select a connection for the provided giving priority
## to outgoing connections
var conn = c.selectConn(peerId, Direction.Out)
if isNil(conn):
conn = c.selectConn(peerId, Direction.In)
if isNil(conn):
var mux = c.selectMuxer(peerId, Direction.Out)
if isNil(mux):
mux = c.selectMuxer(peerId, Direction.In)
if isNil(mux):
trace "connection not found", peerId
return mux
return conn
proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
## select the muxer for the provided connection
proc storeMuxer*(c: ConnManager,
muxer: Muxer)
{.raises: [Defect, CatchableError].} =
## store the connection and muxer
if isNil(conn):
if isNil(muxer):
raise newException(LPError, "muxer cannot be nil")
if conn in c.muxed:
return c.muxed.getOrDefault(conn).muxer
debug "no muxer for connection", conn
if isNil(muxer.connection):
raise newException(LPError, "muxer's connection cannot be nil")
proc storeConn*(c: ConnManager, conn: Connection)
{.raises: [Defect, LPError].} =
## store a connection
if isNil(conn):
raise newException(LPError, "Connection cannot be nil")
if conn.closed or conn.atEof:
if muxer.connection.closed or muxer.connection.atEof:
raise newException(LPError, "Connection closed or EOF")
let peerId = conn.peerId
peerId = muxer.connection.peerId
dir = muxer.connection.dir
# we use getOrDefault in the if below instead of [] to avoid the KeyError
if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer:
let key = (peerId, conn.dir)
if c.muxed.getOrDefault(peerId).len > c.maxConnsPerPeer:
let key = (peerId, dir)
let expectedConn = c.expectedConnectionsOverLimit.getOrDefault(key)
if expectedConn != nil and not expectedConn.finished:
debug "Too many connections for peer",
conn, conns = c.conns.getOrDefault(peerId).len
conns = c.muxed.getOrDefault(peerId).len
raise newTooManyConnectionsError()
c.conns.mgetOrPut(peerId, HashSet[Connection]()).incl(conn)
assert muxer notin c.muxed.getOrDefault(peerId)
# Launch on close listener
# All the errors are handled inside `onClose()` procedure.
asyncSpawn c.onClose(conn)
newPeer = peerId notin c.muxed
assert newPeer or c.muxed[peerId].len > 0
c.muxed.mgetOrPut(peerId, newSeq[Muxer]()).add(muxer)
trace "Stored connection",
conn, direction = $conn.dir, connections = c.conns.len
asyncSpawn c.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: dir == Direction.In))
if newPeer:
asyncSpawn c.triggerPeerEvents(
peerId, PeerEvent(kind: PeerEventKind.Joined, initiator: dir == Direction.Out))
asyncSpawn c.onClose(muxer)
trace "Stored muxer",
muxer, direction = $muxer.connection.dir, peers = c.muxed.len
proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} =
await c.inSema.acquire()
@ -476,39 +401,17 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
asyncSpawn semaphoreMonitor()
proc storeMuxer*(c: ConnManager,
muxer: Muxer,
handle: Future[void] = nil)
{.raises: [Defect, CatchableError].} =
## store the connection and muxer
if isNil(muxer):
raise newException(CatchableError, "muxer cannot be nil")
if isNil(muxer.connection):
raise newException(CatchableError, "muxer's connection cannot be nil")
if muxer.connection notin c:
raise newException(CatchableError, "cant add muxer for untracked connection")
c.muxed[muxer.connection] = MuxerHolder(
muxer: muxer,
handle: handle)
trace "Stored muxer",
muxer, handle = not handle.isNil, connections = c.conns.len
asyncSpawn c.onConnUpgraded(muxer.connection)
proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
if isNil(mux):
proc getStream*(c: ConnManager,
peerId: PeerId,
dir: Direction): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the provided peer
## with the given direction
muxer: Muxer): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed muxer
let muxer = c.selectMuxer(c.selectConn(peerId, dir))
if not(isNil(muxer)):
return await muxer.newStream()
@ -517,40 +420,25 @@ proc getStream*(c: ConnManager,
## get a muxed stream for the passed peer from any connection
let muxer = c.selectMuxer(c.selectConn(peerId))
if not(isNil(muxer)):
return await muxer.newStream()
return await c.getStream(c.selectMuxer(peerId))
proc getStream*(c: ConnManager,
conn: Connection): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed connection
peerId: PeerId,
dir: Direction): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed peer from a connection with `dir`
let muxer = c.selectMuxer(conn)
if not(isNil(muxer)):
return await muxer.newStream()
return await c.getStream(c.selectMuxer(peerId, dir))
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
## drop connections and cleanup resources for peer
trace "Dropping peer", peerId
let conns = c.conns.getOrDefault(peerId)
for conn in conns:
trace "Removing connection", conn
delConn(c, conn)
var muxers: seq[MuxerHolder]
for conn in conns:
if conn in c.muxed:
muxers.add c.muxed[conn]
let muxers = c.muxed.getOrDefault(peerId)
for muxer in muxers:
await closeMuxerHolder(muxer)
for conn in conns:
await conn.close()
trace "Dropped peer", peerId
await closeMuxer(muxer)
trace "Peer dropped", peerId
@ -560,9 +448,6 @@ proc close*(c: ConnManager) {.async.} =
trace "Closing ConnManager"
let conns = c.conns
let muxed = c.muxed
@ -572,12 +457,9 @@ proc close*(c: ConnManager) {.async.} =
for _, fut in expected:
await fut.cancelAndWait()
for _, muxer in muxed:
await closeMuxerHolder(muxer)
for _, conns2 in conns:
for conn in conns2:
await conn.close()
for _, muxers in muxed:
for mux in muxers:
await closeMuxer(mux)
trace "Closed ConnManager"
@ -17,7 +17,9 @@ import pkg/[chronos,
import dial,
@ -41,10 +43,10 @@ type
Dialer* = ref object of Dial
localPeerId*: PeerId
ms: MultistreamSelect
connManager: ConnManager
dialLock: Table[PeerId, AsyncLock]
transports: seq[Transport]
peerStore: PeerStore
nameResolver: NameResolver
proc dialAndUpgrade(
@ -52,7 +54,7 @@ proc dialAndUpgrade(
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress):
Future[Connection] {.async.} =
Future[Muxer] {.async.} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
@ -75,7 +77,7 @@ proc dialAndUpgrade(
let conn =
let mux =
await transport.upgradeOutgoing(dialed, peerId)
except CatchableError as exc:
@ -89,9 +91,9 @@ proc dialAndUpgrade(
# Try other address
return nil
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerId = conn.peerId
return conn
doAssert not isNil(mux), "connection died after upgradeOutgoing"
debug "Dial successful", peerId = mux.connection.peerId
return mux
return nil
proc expandDnsAddr(
@ -126,7 +128,7 @@ proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
Future[Muxer] {.async.} =
debug "Dialing peer", peerId
@ -147,21 +149,13 @@ proc dialAndUpgrade(
if not isNil(result):
return result
proc tryReusingConnection(self: Dialer, peerId: PeerId): Future[Opt[Connection]] {.async.} =
var conn = self.connManager.selectConn(peerId)
if conn == nil:
return Opt.none(Connection)
proc tryReusingConnection(self: Dialer, peerId: PeerId): Future[Opt[Muxer]] {.async.} =
let muxer = self.connManager.selectMuxer(peerId)
if muxer == nil:
return Opt.none(Muxer)
if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager", conn
await conn.close()
raise newException(DialFailedError, "Zombie connection encountered")
trace "Reusing existing connection", conn, direction = $conn.dir
return Opt.some(conn)
trace "Reusing existing connection", muxer, direction = $muxer.connection.dir
return Opt.some(muxer)
proc internalConnect(
self: Dialer,
@ -169,7 +163,7 @@ proc internalConnect(
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true):
Future[Connection] {.async.} =
Future[Muxer] {.async.} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
@ -179,32 +173,30 @@ proc internalConnect(
await lock.acquire()
if peerId.isSome and reuseConnection:
let connOpt = await self.tryReusingConnection(peerId.get())
if connOpt.isSome:
return connOpt.get()
let muxOpt = await self.tryReusingConnection(peerId.get())
if muxOpt.isSome:
return muxOpt.get()
let slot = self.connManager.getOutgoingSlot(forceDial)
let conn =
let muxed =
await self.dialAndUpgrade(peerId, addrs)
except CatchableError as exc:
raise exc
if isNil(conn): # None of the addresses connected
if isNil(muxed): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
# A disconnect could have happened right after
# we've added the connection so we check again
# to prevent races due to that.
if conn.closed() or conn.atEof():
# This can happen when the other ends drops us
# before we get a chance to return the connection
# back to the dialer.
trace "Connection dead on arrival", conn
raise newLPStreamClosedError()
await self.peerStore.identify(muxed)
except CatchableError as exc:
trace "Failed to finish outgoung upgrade", err=exc.msg
await muxed.close()
raise exc
return conn
return muxed
if lock.locked():
@ -235,21 +227,21 @@ method connect*(
return (await self.internalConnect(
if allowUnknownPeerId == false:
raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!")
return (await self.internalConnect(
proc negotiateStream(
self: Dialer,
conn: Connection,
protos: seq[string]): Future[Connection] {.async.} =
trace "Negotiating stream", conn, protos
let selected = await self.ms.select(conn, protos)
let selected = await MultistreamSelect.select(conn, protos)
if not protos.contains(selected):
await conn.closeWithEOF()
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
@ -267,11 +259,11 @@ method tryDial*(
trace "Check if it can dial", peerId, addrs
let conn = await self.dialAndUpgrade(Opt.some(peerId), addrs)
if conn.isNil():
let mux = await self.dialAndUpgrade(Opt.some(peerId), addrs)
if mux.isNil():
raise newException(DialFailedError, "No valid multiaddress")
await conn.close()
return conn.observedAddr
await mux.close()
return mux.connection.observedAddr
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -303,7 +295,7 @@ method dial*(
conn: Connection
conn: Muxer
stream: Connection
proc cleanup() {.async.} =
@ -340,12 +332,12 @@ proc new*(
T: type Dialer,
localPeerId: PeerId,
connManager: ConnManager,
peerStore: PeerStore,
transports: seq[Transport],
ms: MultistreamSelect,
nameResolver: NameResolver = nil): Dialer =
T(localPeerId: localPeerId,
connManager: connManager,
transports: transports,
ms: ms,
peerStore: peerStore,
nameResolver: nameResolver)
@ -21,12 +21,11 @@ logScope:
topics = "libp2p multistream"
MsgSize* = 1024
Codec* = "/multistream/1.0.0"
MsgSize = 1024
Codec = "/multistream/1.0.0"
MSCodec* = "\x13" & Codec & "\n"
Na* = "\x03na\n"
Ls* = "\x03ls\n"
Na = "na\n"
Ls = "ls\n"
Matcher* = proc (proto: string): bool {.gcsafe, raises: [Defect].}
@ -45,7 +44,7 @@ type
proc new*(T: typedesc[MultistreamSelect]): T =
codec: MSCodec,
codec: Codec,
template validateSuffix(str: string): untyped =
@ -54,13 +53,13 @@ template validateSuffix(str: string): untyped =
raise newException(MultiStreamError, "MultistreamSelect failed, malformed message")
proc select*(m: MultistreamSelect,
proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: seq[string]):
Future[string] {.async.} =
trace "initiating handshake", conn, codec = m.codec
trace "initiating handshake", conn, codec = Codec
## select a remote protocol
await conn.write(m.codec) # write handshake
await conn.writeLp(Codec & "\n") # write handshake
if proto.len() > 0:
trace "selecting proto", conn, proto = proto[0]
await conn.writeLp((proto[0] & "\n")) # select proto
@ -102,13 +101,13 @@ proc select*(m: MultistreamSelect,
# No alternatives, fail
return ""
proc select*(m: MultistreamSelect,
proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: string): Future[bool] {.async.} =
if proto.len > 0:
return (await m.select(conn, @[proto])) == proto
return (await MultistreamSelect.select(conn, @[proto])) == proto
return (await m.select(conn, @[])) == Codec
return (await MultistreamSelect.select(conn, @[])) == Codec
proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
m.select(conn, "")
@ -119,7 +118,7 @@ proc list*(m: MultistreamSelect,
if not await m.select(conn):
await conn.write(Ls) # send ls
await conn.writeLp(Ls) # send ls
var list = newSeq[string]()
let ms = string.fromBytes(await conn.readLp(MsgSize))
@ -129,68 +128,86 @@ proc list*(m: MultistreamSelect,
result = list
proc handle*(
_: type MultistreamSelect,
conn: Connection,
protos: seq[string],
matchers = newSeq[Matcher](),
active: bool = false,
): Future[string] {.async, gcsafe.} =
trace "Starting multistream negotiation", conn, handshaked = active
var handshaked = active
while not conn.atEof:
var ms = string.fromBytes(await conn.readLp(MsgSize))
if not handshaked and ms != Codec:
debug "expected handshake message", conn, instead=ms
raise newException(CatchableError,
"MultistreamSelect handling failed, invalid first message")
trace "handle: got request", conn, ms
if ms.len() <= 0:
trace "handle: invalid proto", conn
await conn.writeLp(Na)
case ms:
of "ls":
trace "handle: listing protos", conn
#TODO this doens't seem to follow spec, each protocol
# should be length prefixed. Not very important
# since LS is getting deprecated
await conn.writeLp(protos.join("\n") & "\n")
of Codec:
if not handshaked:
await conn.writeLp(Codec & "\n")
handshaked = true
trace "handle: sending `na` for duplicate handshake while handshaked",
await conn.writeLp(Na)
elif ms in protos or matchers.anyIt(it(ms)):
trace "found handler", conn, protocol = ms
await conn.writeLp(ms & "\n")
conn.protocol = ms
return ms
trace "no handlers", conn, protocol = ms
await conn.writeLp(Na)
proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async, gcsafe.} =
trace "Starting multistream handler", conn, handshaked = active
var handshaked = active
handshaked = active
protos: seq[string]
matchers: seq[Matcher]
for h in m.handlers:
if not isNil(h.match):
for proto in h.protos:
while not conn.atEof:
var ms = string.fromBytes(await conn.readLp(MsgSize))
let ms = await MultistreamSelect.handle(conn, protos, matchers, active)
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler", conn, protocol = ms
if not handshaked and ms != Codec:
notice "expected handshake message", conn, instead=ms
raise newException(CatchableError,
"MultistreamSelect handling failed, invalid first message")
trace "handle: got request", conn, ms
if ms.len() <= 0:
trace "handle: invalid proto", conn
await conn.write(Na)
if m.handlers.len() == 0:
trace "handle: sending `na` for protocol", conn, protocol = ms
await conn.write(Na)
case ms:
of "ls":
trace "handle: listing protos", conn
var protos = ""
for h in m.handlers:
for proto in h.protos:
protos &= (proto & "\n")
await conn.writeLp(protos)
of Codec:
if not handshaked:
await conn.write(m.codec)
handshaked = true
trace "handle: sending `na` for duplicate handshake while handshaked",
await conn.write(Na)
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler", conn, protocol = ms
var protocolHolder = h
let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams:
debug "Max streams for protocol reached, blocking new stream",
conn, protocol = ms, maxIncomingStreams
await conn.writeLp(ms & "\n")
conn.protocol = ms
await protocolHolder.protocol.handler(conn, ms)
protocolHolder.openedStreams.inc(conn.peerId, -1)
if protocolHolder.openedStreams[conn.peerId] == 0:
debug "no handlers", conn, protocol = ms
await conn.write(Na)
var protocolHolder = h
let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams:
debug "Max streams for protocol reached, blocking new stream",
conn, protocol = ms, maxIncomingStreams
await protocolHolder.protocol.handler(conn, ms)
protocolHolder.openedStreams.inc(conn.peerId, -1)
if protocolHolder.openedStreams[conn.peerId] == 0:
debug "no handlers", conn, ms
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -32,24 +32,28 @@ type
Muxer* = ref object of RootObj
streamHandler*: StreamHandler
handler*: Future[void]
connection*: Connection
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [Defect].}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = ref object of LPProtocol
MuxerProvider* = object
newMuxer*: MuxerConstructor
streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance
muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created
codec*: string
func shortLog*(m: Muxer): auto = shortLog(m.connection)
func shortLog*(m: Muxer): auto =
if isNil(m): "nil"
else: shortLog(m.connection)
chronicles.formatIt(Muxer): shortLog(it)
# muxer interface
method newStream*(m: Muxer, name: string = "", lazy: bool = false):
Future[Connection] {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} = discard
method close*(m: Muxer) {.base, async, gcsafe.} =
if not isNil(m.connection):
await m.connection.close()
method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard
proc new*(
@ -57,36 +61,5 @@ proc new*(
creator: MuxerConstructor,
codec: string): T {.gcsafe.} =
let muxerProvider = T(newMuxer: creator)
muxerProvider.codec = codec
let muxerProvider = T(newMuxer: creator, codec: codec)
method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
trace "starting muxer handler", proto=proto, conn
muxer = c.newMuxer(conn)
if not isNil(c.streamHandler):
muxer.streamHandler = c.streamHandler
var futs = newSeq[Future[void]]()
futs &= muxer.handle()
# finally await both the futures
if not isNil(c.muxerHandler):
await c.muxerHandler(muxer)
when defined(libp2p_agents_metrics):
conn.shortAgent = muxer.connection.shortAgent
checkFutures(await allFinished(futs))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg, conn, proto
await conn.close()
c.handler = handler
@ -356,6 +356,8 @@ proc open*(channel: YamuxChannel) {.async, gcsafe.} =
channel.opened = true
await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack}))
method getWrapped*(channel: YamuxChannel): Connection = channel.conn
Yamux* = ref object of Muxer
channels: Table[uint32, YamuxChannel]
@ -28,11 +28,16 @@ else:
std/[tables, sets, options, macros],
./peerid, ./peerinfo,
@ -70,11 +75,15 @@ type
PeerStore* {.public.} = ref object
books: Table[string, BasePeerBook]
identify: Identify
capacity*: int
toClean*: seq[PeerId]
proc new*(T: type PeerStore, capacity = 1000): PeerStore {.public.} =
T(capacity: capacity)
proc new*(T: type PeerStore, identify: Identify, capacity = 1000): PeerStore {.public.} =
identify: identify,
capacity: capacity
# Generic Peer Book API #
@ -186,3 +195,28 @@ proc cleanup*(
while peerStore.toClean.len > peerStore.capacity:
proc identify*(
peerStore: PeerStore,
muxer: Muxer) {.async.} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
if (await MultistreamSelect.select(stream, peerStore.identify.codec())):
let info = await peerStore.identify.identify(stream, stream.peerId)
when defined(libp2p_agents_metrics):
var knownAgent = "unknown"
if info.agentVersion.isSome and info.agentVersion.get().len > 0:
let shortAgent = info.agentVersion.get().split("/")[0].safeToLowerAscii()
if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()):
knownAgent = shortAgent.get()
await stream.closeWithEOF()
@ -65,7 +65,7 @@ method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[Mult
await conn.close()
incomingConnection.cancel() # Safer to always try to cancel cause we aren't sure if the peer dialled us or not
if incomingConnection.completed():
await (await incomingConnection).close()
await (await incomingConnection).connection.close()
trace "sending Dial", addrs = switch.peerInfo.addrs
await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs)
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
@ -81,7 +81,7 @@ proc hasEnoughIncomingSlots(switch: Switch): bool =
return switch.connManager.slotsAvailable(In) >= 2
proc doesPeerHaveIncomingConn(switch: Switch, peerId: PeerId): bool =
return switch.connManager.selectConn(peerId, In) != nil
return switch.connManager.selectMuxer(peerId, In) != nil
proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
@ -406,7 +406,11 @@ method onTopicSubscription*(p: PubSub, topic: string, subscribed: bool) {.base,
# Notify others that we are no longer interested in the topic
for _, peer in p.peers:
p.sendSubs(peer, [topic], subscribed)
# If we don't have a sendConn yet, we will
# send the full sub list when we get the sendConn,
# so no need to send it here
if peer.hasSendConn:
p.sendSubs(peer, [topic], subscribed)
if subscribed:
@ -177,6 +177,10 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
# stop working so we make an effort to only keep a single channel alive
trace "Get new send connection", p, newConn
# Careful to race conditions here.
# Topic subscription relies on either connectedFut
# to be completed, or onEvent to be called later
p.sendConn = newConn
p.address = if p.sendConn.observedAddr.isSome: some(p.sendConn.observedAddr.get) else: none(MultiAddress)
@ -217,6 +221,9 @@ proc connect*(p: PubSubPeer) =
asyncSpawn connectImpl(p)
proc hasSendConn*(p: PubSubPeer): bool =
p.sendConn != nil
template sendMetrics(msg: RPCMsg): untyped =
when defined(libp2p_expensive_metrics):
for x in msg.messages:
@ -56,7 +56,6 @@ proc new*(T: type SecureConn,
peerId: peerId,
observedAddr: observedAddr,
closeEvent: conn.closeEvent,
upgraded: conn.upgraded,
timeout: timeout,
dir: conn.dir)
@ -39,7 +39,6 @@ type
timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId
observedAddr*: Opt[MultiAddress]
upgraded*: Future[void]
protocol*: string # protocol used by the connection, used as tag for metrics
transportDir*: Direction # The bottom level transport (generally the socket) direction
when defined(libp2p_agents_metrics):
@ -47,22 +46,6 @@ type
proc timeoutMonitor(s: Connection) {.async, gcsafe.}
proc isUpgraded*(s: Connection): bool =
if not isNil(s.upgraded):
return s.upgraded.finished
proc upgrade*(s: Connection, failed: ref CatchableError = nil) =
if not isNil(s.upgraded):
if not isNil(failed):
proc onUpgrade*(s: Connection) {.async.} =
if not isNil(s.upgraded):
await s.upgraded
func shortLog*(conn: Connection): string =
if conn.isNil: "Connection(nil)"
@ -80,9 +63,6 @@ method initStream*(s: Connection) =
if isNil(s.upgraded):
s.upgraded = newFuture[void]()
if s.timeout > 0.millis:
trace "Monitoring for timeout", s, timeout = s.timeout
@ -100,10 +80,6 @@ method closeImpl*(s: Connection): Future[void] =
s.timerTaskFut = nil
if not isNil(s.upgraded) and not s.upgraded.finished:
s.upgraded = nil
trace "Closed connection", s
procCall LPStream(s).closeImpl()
@ -158,6 +134,13 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
method getWrapped*(s: Connection): Connection {.base.} =
doAssert(false, "not implemented!")
when defined(libp2p_agents_metrics):
proc setShortAgent*(s: Connection, shortAgent: string) =
var conn = s
while not isNil(conn):
conn.shortAgent = shortAgent
conn = conn.getWrapped()
proc new*(C: type Connection,
peerId: PeerId,
dir: Direction,
@ -220,24 +220,27 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
s.ms.addHandler(proto.codecs, proto, matcher)
proc upgradeMonitor(conn: Connection, upgrades: AsyncSemaphore) {.async.} =
## monitor connection for upgrades
proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
let muxed = await trans.upgradeIncoming(conn)
await switch.peerStore.identify(muxed)
trace "Connection upgrade succeeded"
proc upgradeMonitor(
switch: Switch,
trans: Transport,
conn: Connection,
upgrades: AsyncSemaphore) {.async.} =
# Since we don't control the flow of the
# upgrade, this timeout guarantees that a
# "hanged" remote doesn't hold the upgrade
# forever
await conn.onUpgrade.wait(30.seconds) # wait for connection to be upgraded
trace "Connection upgrade succeeded"
await switch.upgrader(trans, conn).wait(30.seconds)
except CatchableError as exc:
if exc isnot CancelledError:
if not isNil(conn):
await conn.close()
trace "Exception awaiting connection upgrade", exc = exc.msg, conn
upgrades.release() # don't forget to release the slot!
proc accept(s: Switch, transport: Transport) {.async.} = # noraises
## switch accept loop, ran for every transport
@ -278,8 +281,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
conn.transportDir = Direction.In
debug "Accepted an incoming connection", conn
asyncSpawn upgradeMonitor(conn, upgrades)
asyncSpawn transport.upgradeIncoming(conn)
asyncSpawn s.upgradeMonitor(transport, conn, upgrades)
except CancelledError as exc:
trace "releasing semaphore on cancellation"
upgrades.release() # always release the slot
@ -377,14 +379,13 @@ proc start*(s: Switch) {.async, gcsafe, public.} =
proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
identity: Identify,
secureManagers: openArray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect,
peerStore: PeerStore,
nameResolver: NameResolver = nil,
peerStore = PeerStore.new(),
services = newSeq[Service]()): Switch
{.raises: [Defect, LPError], public.} =
{.raises: [Defect, LPError].} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")
@ -394,11 +395,9 @@ proc newSwitch*(peerInfo: PeerInfo,
transports: transports,
connManager: connManager,
peerStore: peerStore,
dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver),
dialer: Dialer.new(peerInfo.peerId, connManager, peerStore, transports, nameResolver),
nameResolver: nameResolver,
services: services)
switch.connManager.peerStore = peerStore
return switch
@ -269,7 +269,7 @@ proc new*(
transports: switch.transports,
connManager: switch.connManager,
peerStore: switch.peerStore,
dialer: Dialer.new(switch.peerInfo.peerId, switch.connManager, switch.transports, switch.ms, nil),
dialer: Dialer.new(switch.peerInfo.peerId, switch.connManager, switch.peerStore, switch.transports, nil),
nameResolver: nil)
torSwitch.connManager.peerStore = switch.peerStore
@ -18,6 +18,7 @@ import chronos, chronicles
import ../stream/connection,
@ -80,7 +81,7 @@ proc dial*(
method upgradeIncoming*(
self: Transport,
conn: Connection): Future[void] {.base, gcsafe.} =
conn: Connection): Future[Muxer] {.base, gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
@ -90,7 +91,7 @@ method upgradeIncoming*(
method upgradeOutgoing*(
self: Transport,
conn: Connection,
peerId: Opt[PeerId]): Future[Connection] {.base, gcsafe.} =
peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
@ -30,35 +30,24 @@ type
proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider =
for m in self.muxers:
if muxerName in m.codecs:
if muxerName == m.codec:
return m
proc identify*(
self: MuxedUpgrade,
muxer: Muxer) {.async, gcsafe.} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
await self.identify(stream)
when defined(libp2p_agents_metrics):
muxer.connection.shortAgent = stream.shortAgent
await stream.closeWithEOF()
proc mux*(
self: MuxedUpgrade,
conn: Connection): Future[Muxer] {.async, gcsafe.} =
## mux outgoing connection
conn: Connection,
direction: Direction): Future[Muxer] {.async, gcsafe.} =
## mux connection
trace "Muxing connection", conn
if self.muxers.len == 0:
warn "no muxers registered, skipping upgrade flow", conn
let muxerName = await self.ms.select(conn, self.muxers.mapIt(it.codec))
let muxerName =
if direction == Out: await self.ms.select(conn, self.muxers.mapIt(it.codec))
else: await MultistreamSelect.handle(conn, self.muxers.mapIt(it.codec))
if muxerName.len == 0 or muxerName == "na":
debug "no muxer available, early exit", conn
@ -70,36 +59,23 @@ proc mux*(
# install stream handler
muxer.streamHandler = self.streamHandler
# store it in muxed connections if we have a peer for it
self.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
await self.identify(muxer)
except CatchableError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
# loop
debug "Could not identify connection", conn, msg = exc.msg
muxer.handler = muxer.handle()
return muxer
method upgradeOutgoing*(
proc upgrade(
self: MuxedUpgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Connection] {.async, gcsafe.} =
trace "Upgrading outgoing connection", conn
direction: Direction,
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
trace "Upgrading connection", conn, direction
let sconn = await self.secure(conn, peerId) # secure the connection
let sconn = await self.secure(conn, direction, peerId) # secure the connection
if isNil(sconn):
raise newException(UpgradeFailedError,
"unable to secure connection, stopping upgrade")
let muxer = await self.mux(sconn) # mux it if possible
let muxer = await self.mux(sconn, direction) # mux it if possible
if muxer == nil:
# TODO this might be relaxed in the future
raise newException(UpgradeFailedError,
"a muxer is required for outgoing connections")
@ -111,108 +87,28 @@ method upgradeOutgoing*(
raise newException(UpgradeFailedError,
"Connection closed or missing peer info, stopping upgrade")
trace "Upgraded outgoing connection", conn, sconn
trace "Upgraded connection", conn, sconn, direction
return muxer
return sconn
method upgradeOutgoing*(
self: MuxedUpgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Muxer] {.async, gcsafe.} =
return await self.upgrade(conn, Out, peerId)
method upgradeIncoming*(
self: MuxedUpgrade,
incomingConn: Connection) {.async, gcsafe.} = # noraises
trace "Upgrading incoming connection", incomingConn
let ms = MultistreamSelect.new()
# secure incoming connections
proc securedHandler(conn: Connection,
proto: string)
{.async, gcsafe, closure.} =
trace "Starting secure handler", conn
let secure = self.secureManagers.filterIt(it.codec == proto)[0]
var cconn = conn
var sconn = await secure.secure(cconn, false, Opt.none(PeerId))
if isNil(sconn):
cconn = sconn
# add the muxer
for muxer in self.muxers:
ms.addHandler(muxer.codecs, muxer)
# handle subsequent secure requests
await ms.handle(cconn)
except CatchableError as exc:
debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn
if not cconn.isUpgraded:
if not isNil(cconn):
await cconn.close()
trace "Stopped secure handler", conn
if (await ms.select(incomingConn)): # just handshake
# add the secure handlers
for k in self.secureManagers:
ms.addHandler(k.codec, securedHandler)
# handle un-secured connections
# we handshaked above, set this ms handler as active
await ms.handle(incomingConn, active = true)
except CatchableError as exc:
debug "Exception upgrading incoming", exc = exc.msg
if not incomingConn.isUpgraded:
if not isNil(incomingConn):
await incomingConn.close()
proc muxerHandler(
self: MuxedUpgrade,
muxer: Muxer) {.async, gcsafe.} =
conn = muxer.connection
# store incoming connection
# store muxer and muxed connection
await self.identify(muxer)
when defined(libp2p_agents_metrics):
#TODO Passing data between layers is a pain
if muxer.connection of SecureConn:
let secureConn = (SecureConn)muxer.connection
secureConn.stream.shortAgent = muxer.connection.shortAgent
except IdentifyError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
# loop
debug "Could not identify connection", conn, msg = exc.msg
except LPStreamClosedError as exc:
debug "Identify stream closed", conn, msg = exc.msg
except LPStreamEOFError as exc:
debug "Identify stream EOF", conn, msg = exc.msg
except CancelledError as exc:
await muxer.close()
raise exc
except CatchableError as exc:
await muxer.close()
trace "Exception in muxer handler", conn, msg = exc.msg
conn: Connection): Future[Muxer] {.async, gcsafe.} =
return await self.upgrade(conn, In, Opt.none(PeerId))
proc new*(
T: type MuxedUpgrade,
identity: Identify,
muxers: seq[MuxerProvider],
secureManagers: openArray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect): T =
let upgrader = T(
identity: identity,
muxers: muxers,
secureManagers: @secureManagers,
connManager: connManager,
@ -231,10 +127,4 @@ proc new*(
await conn.closeWithEOF()
trace "Stream handler done", conn
for _, val in muxers:
val.streamHandler = upgrader.streamHandler
val.muxerHandler = proc(muxer: Muxer): Future[void]
{.raises: [Defect].} =
return upgrader
@ -19,6 +19,7 @@ import pkg/[chronos, chronicles, metrics]
import ../stream/connection,
@ -37,29 +38,31 @@ type
Upgrade* = ref object of RootObj
ms*: MultistreamSelect
identity*: Identify
connManager*: ConnManager
secureManagers*: seq[Secure]
method upgradeIncoming*(
self: Upgrade,
conn: Connection): Future[void] {.base.} =
conn: Connection): Future[Muxer] {.base.} =
doAssert(false, "Not implemented!")
method upgradeOutgoing*(
self: Upgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Connection] {.base.} =
peerId: Opt[PeerId]): Future[Muxer] {.base.} =
doAssert(false, "Not implemented!")
proc secure*(
self: Upgrade,
conn: Connection,
direction: Direction,
peerId: Opt[PeerId]): Future[Connection] {.async, gcsafe.} =
if self.secureManagers.len <= 0:
raise newException(UpgradeFailedError, "No secure managers registered!")
let codec = await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
let codec =
if direction == Out: await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
else: await MultistreamSelect.handle(conn, self.secureManagers.mapIt(it.codec))
if codec.len == 0:
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
@ -70,30 +73,4 @@ proc secure*(
# let's avoid duplicating checks but detect if it fails to do it properly
doAssert(secureProtocol.len > 0)
return await secureProtocol[0].secure(conn, true, peerId)
proc identify*(
self: Upgrade,
conn: Connection) {.async, gcsafe.} =
## identify the connection
if (await self.ms.select(conn, self.identity.codec)):
info = await self.identity.identify(conn, conn.peerId)
peerStore = self.connManager.peerStore
if info.pubkey.isNone and isNil(conn):
raise newException(UpgradeFailedError,
"no public key provided and no existing peer identity found")
conn.peerId = info.peerId
when defined(libp2p_agents_metrics):
conn.shortAgent = "unknown"
if info.agentVersion.isSome and info.agentVersion.get().len > 0:
let shortAgent = info.agentVersion.get().split("/")[0].safeToLowerAscii()
if shortAgent.isOk() and KnownLibP2PAgentsSeq.contains(shortAgent.get()):
conn.shortAgent = shortAgent.get()
trace "identified remote peer", conn, peerId = shortLog(conn.peerId)
return await secureProtocol[0].secure(conn, direction == Out, peerId)
@ -8,6 +8,7 @@ import strutils, os
# Only add chronicles param if the
@ -9,6 +9,7 @@ import ../../libp2p/errors
import ../../libp2p/crypto/crypto
import ../../libp2p/stream/bufferstream
import ../../libp2p/switch
import ../../libp2p/muxers/muxer
import ../helpers
@ -495,7 +496,7 @@ suite "GossipSub internal":
peer.handler = handler
peer.appScore = gossipSub.parameters.graylistThreshold - 1
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
gossipSub.switch.connManager.storeMuxer(Muxer(connection: conn))
@ -107,11 +107,7 @@ suite "GossipSub":
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
@ -157,11 +153,7 @@ suite "GossipSub":
nodes[0].subscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
await waitSubGraph(nodes, "foobar")
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
@ -424,8 +416,6 @@ suite "GossipSub":
await passed.wait(2.seconds)
trace "test done, stopping..."
await allFuturesThrowing(
@ -452,21 +442,23 @@ suite "GossipSub":
GossipSub(nodes[1]).parameters.d = 0
GossipSub(nodes[1]).parameters.dHigh = 0
GossipSub(nodes[1]).parameters.dLow = 0
await subscribeNodes(nodes)
nodes[1].subscribe("foobar", handler)
nodes[0].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
await waitSub(nodes[1], nodes[0], "foobar")
nodes[0].unsubscribe("foobar", handler)
nodes[1].subscribe("foobar", handler)
let gsNode = GossipSub(nodes[1])
checkExpiring: gsNode.mesh.getOrDefault("foobar").len == 0
nodes[0].subscribe("foobar", handler)
check GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0
gsNode.mesh.getOrDefault("foobar").len == 0 and
GossipSub(nodes[0]).mesh.getOrDefault("foobar").len == 0 and
GossipSub(nodes[0]).gossipsub.getOrDefault("foobar").len == 1 or
GossipSub(nodes[0]).fanout.getOrDefault("foobar").len == 1
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
@ -532,8 +524,8 @@ suite "GossipSub":
asyncTest "e2e - GossipSub should not send to source & peers who already seen":
# 3 nodes: A, B, C
# A publishes, B relays, C is having a long validation
# so C should not send to anyone
# A publishes, C relays, B is having a long validation
# so B should not send to anyone
nodes = generateNodes(
@ -566,10 +558,7 @@ suite "GossipSub":
nodes[0].subscribe("foobar", handlerA)
nodes[1].subscribe("foobar", handlerB)
nodes[2].subscribe("foobar", handlerC)
await waitSub(nodes[0], nodes[1], "foobar")
await waitSub(nodes[0], nodes[2], "foobar")
await waitSub(nodes[2], nodes[1], "foobar")
await waitSub(nodes[1], nodes[2], "foobar")
await waitSubGraph(nodes, "foobar")
var gossip1: GossipSub = GossipSub(nodes[0])
var gossip2: GossipSub = GossipSub(nodes[1])
@ -587,7 +576,11 @@ suite "GossipSub":
nodes[1].addValidator("foobar", slowValidator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
gossip1.mesh.getOrDefault("foobar").len == 2 and
gossip2.mesh.getOrDefault("foobar").len == 2 and
gossip3.mesh.getOrDefault("foobar").len == 2)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 2
await bFinished
@ -629,7 +622,7 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
check await passed
check await passed.wait(10.seconds)
"foobar" in gossip1.gossipsub
@ -132,13 +132,17 @@ proc waitSubGraph*(nodes: seq[PubSub], key: string) {.async, gcsafe.} =
seen: HashSet[PeerId]
for n in nodes:
nodesMesh[n.peerInfo.peerId] = toSeq(GossipSub(n).mesh.getOrDefault(key).items()).mapIt(it.peerId)
proc explore(p: PeerId) =
if p in seen: return
for peer in nodesMesh.getOrDefault(p):
if seen.len == nodes.len: return
var ok = 0
for n in nodes:
proc explore(p: PeerId) =
if p in seen: return
for peer in nodesMesh.getOrDefault(p):
if seen.len == nodes.len: ok.inc()
if ok == nodes.len: return
trace "waitSubGraph sleeping..."
await sleepAsync(5.milliseconds)
@ -10,8 +10,8 @@ import ../libp2p/[connmanager,
import helpers
proc getConnection(peerId: PeerId, dir: Direction = Direction.In): Connection =
return Connection.new(peerId, dir, Opt.none(MultiAddress))
proc getMuxer(peerId: PeerId, dir: Direction = Direction.In): Muxer =
return Muxer(connection: Connection.new(peerId, dir, Opt.none(MultiAddress)))
TestMuxer = ref object of Muxer
@ -22,71 +22,55 @@ method newStream*(
name: string = "",
lazy: bool = false):
Future[Connection] {.async, gcsafe.} =
result = getConnection(m.peerId, Direction.Out)
result = Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress))
suite "Connection Manager":
asyncTest "add and retrieve a connection":
asyncTest "add and retrieve a muxer":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let mux = getMuxer(peerId)
check conn in connMngr
check mux in connMngr
let peerConn = connMngr.selectConn(peerId)
check peerConn == conn
check peerConn.dir == Direction.In
let peerMux = connMngr.selectMuxer(peerId)
check peerMux == mux
check peerMux.connection.dir == Direction.In
await connMngr.close()
asyncTest "shouldn't allow a closed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
await conn.close()
let mux = getMuxer(peerId)
await mux.connection.close()
expect CatchableError:
await connMngr.close()
asyncTest "shouldn't allow an EOFed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
conn.isEof = true
let mux = getMuxer(peerId)
mux.connection.isEof = true
expect CatchableError:
await conn.close()
await mux.close()
await connMngr.close()
asyncTest "add and retrieve a muxer":
asyncTest "shouldn't allow a muxer with no connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new Muxer
muxer.connection = conn
check muxer in connMngr
let peerMuxer = connMngr.selectMuxer(conn)
check peerMuxer == muxer
await connMngr.close()
asyncTest "shouldn't allow a muxer for an untracked connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new Muxer
muxer.connection = conn
let muxer = getMuxer(peerId)
let conn = muxer.connection
muxer.connection = nil
expect CatchableError:
@ -99,33 +83,34 @@ suite "Connection Manager":
# This would work with 1 as well cause of a bug in connmanager that will get fixed soon
let connMngr = ConnManager.new(maxConnsPerPeer = 2)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn1 = getConnection(peerId, Direction.Out)
let conn2 = getConnection(peerId)
let mux1 = getMuxer(peerId, Direction.Out)
let mux2 = getMuxer(peerId)
check conn1 in connMngr
check conn2 in connMngr
check mux1 in connMngr
check mux2 in connMngr
let outConn = connMngr.selectConn(peerId, Direction.Out)
let inConn = connMngr.selectConn(peerId, Direction.In)
let outMux = connMngr.selectMuxer(peerId, Direction.Out)
let inMux = connMngr.selectMuxer(peerId, Direction.In)
check outConn != inConn
check outConn.dir == Direction.Out
check inConn.dir == Direction.In
check outMux != inMux
check outMux == mux1
check inMux == mux2
check outMux.connection.dir == Direction.Out
check inMux.connection.dir == Direction.In
await connMngr.close()
asyncTest "get muxed stream for peer":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
muxer.peerId = peerId
muxer.connection = conn
muxer.connection = connection
check muxer in connMngr
@ -134,18 +119,18 @@ suite "Connection Manager":
check stream.peerId == peerId
await connMngr.close()
await connection.close()
await stream.close()
asyncTest "get stream from directed connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new TestMuxer
let connection = Connection.new(peerId, Direction.In, Opt.none(MultiAddress))
muxer.peerId = peerId
muxer.connection = conn
muxer.connection = connection
check muxer in connMngr
@ -156,57 +141,37 @@ suite "Connection Manager":
await connMngr.close()
await stream1.close()
asyncTest "get stream from any connection":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new TestMuxer
muxer.peerId = peerId
muxer.connection = conn
check muxer in connMngr
let stream = await connMngr.getStream(conn)
check not(isNil(stream))
await connMngr.close()
await stream.close()
await connection.close()
asyncTest "should raise on too many connections":
let connMngr = ConnManager.new(maxConnsPerPeer = 0)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conns = @[
let muxs = @[getMuxer(peerId)]
expect TooManyConnectionsError:
await connMngr.close()
await allFuturesThrowing(
allFutures(conns.mapIt( it.close() )))
allFutures(muxs.mapIt( it.close() )))
asyncTest "expect connection from peer":
# FIXME This should be 1 instead of 0, it will get fixed soon
let connMngr = ConnManager.new(maxConnsPerPeer = 0)
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conns = @[
let muxs = @[
expect TooManyConnectionsError:
let waitedConn1 = connMngr.expectConnection(peerId, In)
@ -217,38 +182,32 @@ suite "Connection Manager":
waitedConn2 = connMngr.expectConnection(peerId, In)
waitedConn3 = connMngr.expectConnection(PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(), In)
conn = getConnection(peerId)
conn = getMuxer(peerId)
check (await waitedConn2) == conn
expect TooManyConnectionsError:
await connMngr.close()
checkExpiring: waitedConn3.cancelled()
await allFuturesThrowing(
allFutures(conns.mapIt( it.close() )))
allFutures(muxs.mapIt( it.close() )))
asyncTest "cleanup on connection close":
let connMngr = ConnManager.new()
let peerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
let conn = getConnection(peerId)
let muxer = new Muxer
let muxer = getMuxer(peerId)
muxer.connection = conn
check conn in connMngr
check muxer in connMngr
await conn.close()
await sleepAsync(10.millis)
await muxer.close()
check conn notin connMngr
check muxer notin connMngr
checkExpiring: muxer notin connMngr
await connMngr.close()
@ -261,23 +220,19 @@ suite "Connection Manager":
Direction.In else:
let conn = getConnection(peerId, dir)
let muxer = new Muxer
muxer.connection = conn
let muxer = getMuxer(peerId, dir)
check conn in connMngr
check muxer in connMngr
check not(isNil(connMngr.selectConn(peerId, dir)))
check not(isNil(connMngr.selectMuxer(peerId, dir)))
check peerId in connMngr
await connMngr.dropPeer(peerId)
check peerId notin connMngr
check isNil(connMngr.selectConn(peerId, Direction.In))
check isNil(connMngr.selectConn(peerId, Direction.Out))
checkExpiring: peerId notin connMngr
check isNil(connMngr.selectMuxer(peerId, Direction.In))
check isNil(connMngr.selectMuxer(peerId, Direction.Out))
await connMngr.close()
@ -363,7 +318,6 @@ suite "Connection Manager":
asyncTest "track incoming max connections limits - fail on outgoing":
let connMngr = ConnManager.new(maxIn = 3)
var conns: seq[Connection]
for i in 0..<3:
check await connMngr.getIncomingSlot().withTimeout(10.millis)
@ -376,7 +330,6 @@ suite "Connection Manager":
asyncTest "allow force dial":
let connMngr = ConnManager.new(maxConnections = 2)
var conns: seq[Connection]
for i in 0..<3:
discard connMngr.getOutgoingSlot(true)
@ -389,17 +342,17 @@ suite "Connection Manager":
asyncTest "release slot on connection end":
let connMngr = ConnManager.new(maxConnections = 3)
var conns: seq[Connection]
var muxs: seq[Muxer]
for i in 0..<3:
let slot = connMngr.getOutgoingSlot()
let conn =
let muxer =
PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet(),
# should be full now
let incomingSlot = connMngr.getIncomingSlot()
@ -407,7 +360,7 @@ suite "Connection Manager":
check (await incomingSlot.withTimeout(10.millis)) == false
await allFuturesThrowing(
allFutures(conns.mapIt( it.close() )))
allFutures(muxs.mapIt( it.close() )))
check await incomingSlot.withTimeout(10.millis)
@ -177,7 +177,7 @@ suite "Identify":
switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
switch2.peerStore[AddressBook][switch1.peerInfo.peerId] == switch1.peerInfo.addrs
switch1.peerStore[KeyBook][switch2.peerInfo.peerId] == switch2.peerInfo.publicKey
switch2.peerStore[KeyBook][switch1.peerInfo.peerId] == switch1.peerInfo.publicKey
@ -224,8 +224,7 @@ suite "Multistream select":
var conn: Connection = nil
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
echo msg
check msg == Na
check msg == "\x03na\n"
await conn.close()
conn = newTestNaStream(testNaHandler)
@ -67,6 +67,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
identify = Identify.new(peerInfo)
peerStore = PeerStore.new(identify)
mplexProvider = MuxerProvider.new(createMplex, MplexCodec)
muxers = @[mplexProvider]
secureManagers = if secio:
@ -75,16 +76,16 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
[Secure(Noise.new(rng, privateKey, outgoing = outgoing))]
connManager = ConnManager.new()
ms = MultistreamSelect.new()
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagers, connManager, ms)
muxedUpgrade = MuxedUpgrade.new(muxers, secureManagers, connManager, ms)
transports = @[Transport(TcpTransport.new(upgrade = muxedUpgrade))]
let switch = newSwitch(
result = (switch, peerInfo)
suite "Noise":
@ -96,7 +96,7 @@ suite "PeerStore":
toSeq(values(addressBook.book))[0] == @[multiaddr1, multiaddr2]
test "Pruner - no capacity":
let peerStore = PeerStore.new(capacity = 0)
let peerStore = PeerStore.new(nil, capacity = 0)
peerStore[AgentBook][peerId1] = "gds"
@ -104,7 +104,7 @@ suite "PeerStore":
check peerId1 notin peerStore[AgentBook]
test "Pruner - FIFO":
let peerStore = PeerStore.new(capacity = 1)
let peerStore = PeerStore.new(nil, capacity = 1)
peerStore[AgentBook][peerId1] = "gds"
peerStore[AgentBook][peerId2] = "gds"
@ -114,7 +114,7 @@ suite "PeerStore":
peerId2 notin peerStore[AgentBook]
test "Pruner - regular capacity":
var peerStore = PeerStore.new(capacity = 20)
var peerStore = PeerStore.new(nil, capacity = 20)
for i in 0..<30:
let randomPeerId = PeerId.init(KeyPair.random(ECDSA, rng[]).get().pubkey).get()
@ -124,7 +124,7 @@ suite "PeerStore":
check peerStore[AgentBook].len == 20
test "Pruner - infinite capacity":
var peerStore = PeerStore.new(capacity = -1)
var peerStore = PeerStore.new(nil, capacity = -1)
for i in 0..<30:
let randomPeerId = PeerId.init(KeyPair.random(ECDSA, rng[]).get().pubkey).get()
Reference in New Issue
Block a user