This commit is contained in:
Etan Kissling 2024-03-12 21:27:56 +01:00
parent 48a3ac06ff
commit 9dc25c77db
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
6 changed files with 232 additions and 209 deletions

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -21,56 +21,60 @@ type
Dial* = ref object of RootObj
method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
) {.async: (raises: [CancelledError, LPError], raw: true), base.} =
## connect remote peer without negotiating
## a protocol
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method connect*(
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async, base.} =
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## Connects to a peer and retrieve its PeerId
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream over an
## existing connection
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method addTransport*(
self: Dial,
transport: Transport) {.base.} =
doAssert(false, "Not implemented!")
self: Dial,
transport: Transport) {.base.} =
raiseAssert("Not implemented!")
method tryDial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async, base.} =
doAssert(false, "Not implemented!")
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
raiseAssert("Not implemented!")

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -49,13 +49,11 @@ type
nameResolver: NameResolver
proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
dir = Direction.Out):
Future[Muxer] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
dir = Direction.Out): Future[Muxer] {.async.} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
trace "Dialing address", address, peerId = peerId.get(default(PeerId)), hostname
@ -163,18 +161,19 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] =
return Opt.some(muxer)
proc internalConnect(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
Future[Muxer] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
raise newException(DialFailedError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())
let lock = self.dialLock.mgetOrPut(
peerId.get(default(PeerId)), newAsyncLock())
try:
await lock.acquire()
@ -197,7 +196,11 @@ proc internalConnect(
try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
except CatchableError as exc:
except CancelledError as exc:
trace "Failed to finish outgoung upgrade", err=exc.msg
await muxed.close()
raise exc
except LPError as exc:
trace "Failed to finish outgoung upgrade", err=exc.msg
await muxed.close()
raise exc
@ -208,27 +211,27 @@ proc internalConnect(
lock.release()
method connect*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async.} =
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async: (raises: [CancelledError, LPError]).} =
## connect remote peer without negotiating
## a protocol
##
if self.connManager.connCount(peerId) > 0 and reuseConnection:
return
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir)
discard await self.internalConnect(
Opt.some(peerId), addrs, forceDial, reuseConnection, dir)
method connect*(
self: Dialer,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async.} =
self: Dialer,
address: MultiAddress,
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [CancelledError, LPError]).} =
## Connects to a peer and retrieve its PeerId
parseFullAddress(address).toOpt().withValue(fullAddress):
return (await self.internalConnect(
Opt.some(fullAddress[0]),
@ -256,14 +259,14 @@ proc negotiateStream(
return conn
method tryDial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async.} =
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} =
## Create a protocol stream in order to check
## if a connection is possible.
## Doesn't use the Connection Manager to save it.
##
trace "Check if it can dial", peerId, addrs
try:
let mux = await self.dialAndUpgrade(Opt.some(peerId), addrs)
@ -277,13 +280,13 @@ method tryDial*(
raise newException(DialFailedError, exc.msg)
method dial*(
self: Dialer,
peerId: PeerId,
protos: seq[string]): Future[Connection] {.async.} =
self: Dialer,
peerId: PeerId,
protos: seq[string]
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
## create a protocol stream over an
## existing connection
##
trace "Dialing (existing)", peerId, protos
let stream = await self.connManager.getStream(peerId)
if stream.isNil:
@ -292,15 +295,15 @@ method dial*(
return await self.negotiateStream(stream, protos)
method dial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async.} =
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
var
conn: Muxer
stream: Connection
@ -336,13 +339,12 @@ method addTransport*(self: Dialer, t: Transport) =
self.transports &= t
proc new*(
T: type Dialer,
localPeerId: PeerId,
connManager: ConnManager,
peerStore: PeerStore,
transports: seq[Transport],
nameResolver: NameResolver = nil): Dialer =
T: type Dialer,
localPeerId: PeerId,
connManager: ConnManager,
peerStore: PeerStore,
transports: seq[Transport],
nameResolver: NameResolver = nil): Dialer =
T(localPeerId: localPeerId,
connManager: connManager,
transports: transports,

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -76,7 +76,10 @@ type
capacity*: int
toClean*: seq[PeerId]
proc new*(T: type PeerStore, identify: Identify, capacity = 1000): PeerStore {.public.} =
proc new*(
T: type PeerStore,
identify: Identify,
capacity = 1000): PeerStore {.public.} =
T(
identify: identify,
capacity: capacity
@ -86,26 +89,21 @@ proc new*(T: type PeerStore, identify: Identify, capacity = 1000): PeerStore {.p
# Generic Peer Book API #
#########################
proc `[]`*[T](peerBook: PeerBook[T],
peerId: PeerId): T {.public.} =
proc `[]`*[T](peerBook: PeerBook[T], peerId: PeerId): T {.public.} =
## Get all known metadata of a provided peer, or default(T) if missing
peerBook.book.getOrDefault(peerId)
proc `[]=`*[T](peerBook: PeerBook[T],
peerId: PeerId,
entry: T) {.public.} =
proc `[]=`*[T](peerBook: PeerBook[T], peerId: PeerId, entry: T) {.public.} =
## Set metadata for a given peerId.
peerBook.book[peerId] = entry
# Notify clients
for handler in peerBook.changeHandlers:
handler(peerId)
proc del*[T](peerBook: PeerBook[T],
peerId: PeerId): bool {.public.} =
## Delete the provided peer from the book. Returns whether the peer was in the book
proc del*[T](peerBook: PeerBook[T], peerId: PeerId): bool {.public.} =
## Delete the provided peer from the book.
## Returns whether the peer was in the book
if peerId notin peerBook.book:
return false
else:
@ -118,7 +116,8 @@ proc del*[T](peerBook: PeerBook[T],
proc contains*[T](peerBook: PeerBook[T], peerId: PeerId): bool {.public.} =
peerId in peerBook.book
proc addHandler*[T](peerBook: PeerBook[T], handler: PeerBookChangeHandler) {.public.} =
proc addHandler*[T](
peerBook: PeerBook[T], handler: PeerBookChangeHandler) {.public.} =
## Adds a callback that will be called everytime the book changes
peerBook.changeHandlers.add(handler)
@ -145,16 +144,12 @@ proc `[]`*[T](p: PeerStore, typ: type[T]): T {.public.} =
p.books[name] = result
return result
proc del*(peerStore: PeerStore,
peerId: PeerId) {.public.} =
proc del*(peerStore: PeerStore, peerId: PeerId) {.public.} =
## Delete the provided peer from every book.
for _, book in peerStore.books:
book.deletor(peerId)
proc updatePeerInfo*(
peerStore: PeerStore,
info: IdentifyInfo) =
proc updatePeerInfo*(peerStore: PeerStore, info: IdentifyInfo) =
if info.addrs.len > 0:
peerStore[AddressBook][info.peerId] = info.addrs
@ -177,10 +172,7 @@ proc updatePeerInfo*(
if cleanupPos >= 0:
peerStore.toClean.delete(cleanupPos)
proc cleanup*(
peerStore: PeerStore,
peerId: PeerId) =
proc cleanup*(peerStore: PeerStore, peerId: PeerId) =
if peerStore.capacity == 0:
peerStore.del(peerId)
return
@ -194,9 +186,8 @@ proc cleanup*(
peerStore.toClean.delete(0)
proc identify*(
peerStore: PeerStore,
muxer: Muxer) {.async.} =
peerStore: PeerStore,
muxer: Muxer) {.async: (raises: [CancelledError, LPError]).} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
@ -209,7 +200,8 @@ proc identify*(
when defined(libp2p_agents_metrics):
var
knownAgent = "unknown"
shortAgent = info.agentVersion.get("").split("/")[0].safeToLowerAscii().get("")
shortAgent = info.agentVersion.get("").split("/")[0]
.safeToLowerAscii().get("")
if KnownLibP2PAgentsSeq.contains(shortAgent):
knownAgent = shortAgent
muxer.connection.setShortAgent(knownAgent)

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -45,14 +45,14 @@ type
IdentifyNoPubKeyError* = object of IdentifyError
IdentifyInfo* {.public.} = object
pubkey*: Option[PublicKey]
pubkey*: Opt[PublicKey]
peerId*: PeerId
addrs*: seq[MultiAddress]
observedAddr*: Option[MultiAddress]
protoVersion*: Option[string]
agentVersion*: Option[string]
observedAddr*: Opt[MultiAddress]
protoVersion*: Opt[string]
agentVersion*: Opt[string]
protos*: seq[string]
signedPeerRecord*: Option[Envelope]
signedPeerRecord*: Opt[Envelope]
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
@ -60,10 +60,9 @@ type
observedAddrManager*: ObservedAddrManager
IdentifyPushHandler* = proc (
peer: PeerId,
newInfo: IdentifyInfo):
Future[void]
{.gcsafe, raises: [], public.}
peer: PeerId,
newInfo: IdentifyInfo
): Future[void] {.async: (raises: [CancelledError]), public.}
IdentifyPush* = ref object of LPProtocol
identifyHandler: IdentifyPushHandler
@ -81,8 +80,10 @@ chronicles.expandIt(IdentifyInfo):
if it.signedPeerRecord.isSome(): "Some"
else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: bool): ProtoBuffer
{.raises: [].} =
proc encodeMsg(
peerInfo: PeerInfo,
observedAddr: Opt[MultiAddress],
sendSpr: bool): ProtoBuffer =
result = initProtoBuffer()
let pkey = peerInfo.publicKey
@ -121,27 +122,26 @@ proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] =
var pb = initProtoBuffer(buf)
if ? pb.getField(1, pubkey).toOpt():
iinfo.pubkey = some(pubkey)
iinfo.pubkey = Opt.some(pubkey)
if ? pb.getField(8, signedPeerRecord).toOpt() and
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = Opt.some(signedPeerRecord.envelope)
discard ? pb.getRepeatedField(2, iinfo.addrs).toOpt()
discard ? pb.getRepeatedField(3, iinfo.protos).toOpt()
if ? pb.getField(4, oaddr).toOpt():
iinfo.observedAddr = some(oaddr)
iinfo.observedAddr = Opt.some(oaddr)
if ? pb.getField(5, protoVersion).toOpt():
iinfo.protoVersion = some(protoVersion)
iinfo.protoVersion = Opt.some(protoVersion)
if ? pb.getField(6, agentVersion).toOpt():
iinfo.agentVersion = some(agentVersion)
iinfo.agentVersion = Opt.some(agentVersion)
Opt.some(iinfo)
proc new*(
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new(),
): T =
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new()): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord,
@ -158,7 +158,7 @@ method init*(p: Identify) =
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in identify handler", exc = exc.msg, conn
finally:
trace "exiting identify handler", conn
@ -167,36 +167,46 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async.} =
proc identify*(
self: Identify,
conn: Connection,
remotePeerId: PeerId
): Future[IdentifyInfo] {.async: (raises: [
CancelledError, IdentifyError, LPStreamError]).} =
trace "initiating identify", conn
var message = await conn.readLp(64*1024)
if len(message) == 0:
trace "identify: Empty message received!", conn
raise newException(IdentityInvalidMsgError, "Empty message received!")
raise (ref IdentityInvalidMsgError)(msg: "Empty message received!")
var info = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!")
var info = decodeMsg(message).valueOr:
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify: decoded message", conn, info
let
pubkey = info.pubkey.valueOr: raise newException(IdentityInvalidMsgError, "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr: raise newException(IdentityInvalidMsgError, $error)
pubkey = info.pubkey.valueOr:
raise (ref IdentityInvalidMsgError)(msg: "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr:
raise (ref IdentityInvalidMsgError)(msg: $error)
if peer != remotePeerId:
trace "Peer ids don't match", remote = peer, local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
info.peerId = peer
info.observedAddr.withValue(observed):
# Currently, we use the ObservedAddrManager only to find our dialable external NAT address. Therefore, addresses
# Currently, we use the ObservedAddrManager only to find our
# dialable external NAT address. Therefore, addresses
# like "...\p2p-circuit\p2p\..." and "\p2p\..." are not useful to us.
if observed.contains(multiCodec("p2p-circuit")).get(false) or P2PPattern.matchPartial(observed):
if observed.contains(multiCodec("p2p-circuit")).get(false) or
P2PPattern.matchPartial(observed):
trace "Not adding address to ObservedAddrManager.", observed
elif not self.observedAddrManager.addObservation(observed):
trace "Observed address is not valid.", observedAddr = observed
return info
proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
proc new*(
T: typedesc[IdentifyPush],
handler: IdentifyPushHandler = nil): T {.public.} =
## Create a IdentifyPush protocol. `handler` will be called every time
## a peer sends us new `PeerInfo`
let identifypush = T(identifyHandler: handler)
@ -210,21 +220,21 @@ proc init*(p: IdentifyPush) =
var message = await conn.readLp(64*1024)
var identInfo = decodeMsg(message).valueOr:
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify push: decoded message", conn, identInfo
identInfo.pubkey.withValue(pubkey):
let receivedPeerId = PeerId.init(pubkey).tryGet()
if receivedPeerId != conn.peerId:
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
identInfo.peerId = receivedPeerId
trace "triggering peer event", peerInfo = conn.peerId
if not isNil(p.identifyHandler):
if p.identifyHandler != nil:
await p.identifyHandler(conn.peerId, identInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPError as exc:
info "exception in identify push handler", exc = exc.msg, conn
finally:
trace "exiting identify push handler", conn
@ -233,7 +243,11 @@ proc init*(p: IdentifyPush) =
p.handler = handle
p.codec = IdentifyPushCodec
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, public.} =
proc push*(
p: IdentifyPush,
peerInfo: PeerInfo,
conn: Connection
) {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Send new `peerInfo`s to a connection
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer)

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -136,68 +136,75 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe, public.} =
s.connManager.dropPeer(peerId)
method connect*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out): Future[void] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
): Future[void] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connects to a peer without opening a stream to it
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir)
method connect*(
s: Switch,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] =
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [CancelledError, LPError], raw: true).} =
## Connects to a peer and retrieve its PeerId
##
## If the P2P part is missing from the MA and `allowUnknownPeerId` is set
## to true, this will discover the PeerId while connecting. This exposes
## you to MiTM attacks, so it shouldn't be used without care!
s.dialer.connect(address, allowUnknownPeerId)
method dial*(
s: Switch,
peerId: PeerId,
protos: seq[string]): Future[Connection] {.public.} =
protos: seq[string]
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Open a stream to a connected peer with the specified `protos`
s.dialer.dial(peerId, protos)
proc dial*(s: Switch,
peerId: PeerId,
proto: string): Future[Connection] {.public.} =
proc dial*(
s: Switch,
peerId: PeerId,
proto: string
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Open a stream to a connected peer with the specified `proto`
dial(s, peerId, @[proto])
method dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connected to a peer and open a stream
## with the specified `protos`
s.dialer.dial(peerId, addrs, protos, forceDial)
proc dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
proto: string): Future[Connection] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
proto: string
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connected to a peer and open a stream
## with the specified `proto`
dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
{.gcsafe, raises: [LPError], public.} =
proc mount*[T: LPProtocol](
s: Switch,
proto: T,
matcher: Matcher = nil) {.gcsafe, raises: [LPError], public.} =
## mount a protocol to the switch
if isNil(proto.handler):
raise newException(LPError,
"Protocol has to define a handle method or proc")
@ -237,7 +244,6 @@ proc upgradeMonitor(
proc accept(s: Switch, transport: Transport) {.async.} = # noraises
## switch accept loop, ran for every transport
##
let upgrades = newAsyncSemaphore(ConcurrentUpgrades)
while transport.running:
var conn: Connection
@ -323,7 +329,6 @@ proc stop*(s: Switch) {.async, public.} =
proc start*(s: Switch) {.async, public.} =
## Start listening on every transport
if s.started:
warn "Switch has already been started"
return
@ -365,15 +370,15 @@ proc start*(s: Switch) {.async, public.} =
debug "Started libp2p node", peer = s.peerInfo
proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport],
secureManagers: openArray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect,
peerStore: PeerStore,
nameResolver: NameResolver = nil,
services = newSeq[Service]()): Switch
{.raises: [LPError].} =
proc newSwitch*(
peerInfo: PeerInfo,
transports: seq[Transport],
secureManagers: openArray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect,
peerStore: PeerStore,
nameResolver: NameResolver = nil,
services = newSeq[Service]()): Switch {.raises: [LPError].} =
if secureManagers.len == 0:
raise newException(LPError, "Provide at least one secure manager")

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -19,26 +19,32 @@ type
switch*: Switch
connectStub*: connectStubType
connectStubType* = proc (self: SwitchStub,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out): Future[void] {.async.}
connectStubType* = proc (
self: SwitchStub,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
): Future[void] {.async: (raises: [CancelledError, LPError]).}
method connect*(
self: SwitchStub,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async.} =
self: SwitchStub,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
) {.async: (raises: [CancelledError, LPError], raw: true).} =
if (self.connectStub != nil):
await self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir)
self.connectStub(self, peerId, addrs, forceDial, reuseConnection, dir)
else:
await self.switch.connect(peerId, addrs, forceDial, reuseConnection, dir)
self.switch.connect(peerId, addrs, forceDial, reuseConnection, dir)
proc new*(T: typedesc[SwitchStub], switch: Switch, connectStub: connectStubType = nil): T =
proc new*(
T: typedesc[SwitchStub],
switch: Switch,
connectStub: connectStubType = nil): T =
return SwitchStub(
switch: switch,
peerInfo: switch.peerInfo,