This commit is contained in:
Etan Kissling 2024-03-06 13:12:05 +01:00
parent ca01ee06a8
commit 2482d8a0d8
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
20 changed files with 676 additions and 468 deletions

View File

@ -291,24 +291,23 @@ proc build*(b: SwitchBuilder): Switch
return switch
proc newStandardSwitch*(
privKey = none(PrivateKey),
addrs: MultiAddress | seq[MultiAddress] =
MultiAddress.init("/ip4/127.0.0.1/tcp/0").expect("valid address"),
secureManagers: openArray[SecureProtocol] = [
privKey = none(PrivateKey),
addrs: MultiAddress | seq[MultiAddress] = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
secureManagers: openArray[SecureProtocol] = [
SecureProtocol.Noise,
],
transportFlags: set[ServerFlags] = {},
rng = newRng(),
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
peerStoreCapacity = 1000
): Switch {.raises: [LPError], public.} =
transportFlags: set[ServerFlags] = {},
rng = newRng(),
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
peerStoreCapacity = 1000): Switch
{.raises: [LPError], public.} =
## Helper for common switch configurations.
{.push warning[Deprecated]:off.}
if SecureProtocol.Secio in secureManagers:

View File

@ -280,9 +280,7 @@ proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer =
trace "connection not found", peerId
return mux
proc storeMuxer*(c: ConnManager,
muxer: Muxer)
{.raises: [CatchableError].} =
proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [LPError].} =
## store the connection and muxer
##
@ -311,14 +309,12 @@ proc storeMuxer*(c: ConnManager,
raise newTooManyConnectionsError()
var newPeer = false
c.muxed.withValue(peerId, muxers):
doAssert muxers[].len > 0
doAssert muxer notin muxers[]
muxers[].add(muxer)
do:
c.muxed[peerId] = @[muxer]
newPeer = true
assert muxer notin c.muxed.getOrDefault(peerId)
let
newPeer = peerId notin c.muxed
assert newPeer or c.muxed[peerId].len > 0
c.muxed.mgetOrPut(peerId, newSeq[Muxer]()).add(muxer)
libp2p_peers.set(c.muxed.len.int64)
asyncSpawn c.triggerConnEvent(
@ -380,28 +376,34 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
return
cs.trackConnection(mux.connection)
proc getStream*(c: ConnManager,
muxer: Muxer): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager,
muxer: Muxer
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
## get a muxed stream for the passed muxer
##
if not(isNil(muxer)):
return await muxer.newStream()
proc getStream*(c: ConnManager,
peerId: PeerId): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager,
peerId: PeerId
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError], raw: true).} =
## get a muxed stream for the passed peer from any connection
##
c.getStream(c.selectMuxer(peerId))
return await c.getStream(c.selectMuxer(peerId))
proc getStream*(c: ConnManager,
peerId: PeerId,
dir: Direction): Future[Connection] {.async.} =
proc getStream*(
c: ConnManager,
peerId: PeerId,
dir: Direction
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError], raw: true).} =
## get a muxed stream for the passed peer from a connection with `dir`
##
return await c.getStream(c.selectMuxer(peerId, dir))
c.getStream(c.selectMuxer(peerId, dir))
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
@ -435,4 +437,3 @@ proc close*(c: ConnManager) {.async.} =
await closeMuxer(mux)
trace "Closed ConnManager"

View File

@ -149,11 +149,13 @@ type
signature*: Signature
key*: PublicKey
P2PStreamCallback* = proc(api: DaemonAPI,
stream: P2PStream): Future[void] {.gcsafe, raises: [CatchableError].}
P2PPubSubCallback* = proc(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.gcsafe, raises: [CatchableError].}
P2PStreamCallback* = proc(
api: DaemonAPI,
stream: P2PStream): Future[void] {.async: (raises: []).}
P2PPubSubCallback* = proc(
api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.async: (raises: []).}
DaemonError* = object of LPError
DaemonRemoteError* = object of DaemonError
@ -477,7 +479,9 @@ proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalErro
if initProtoBuffer(error).getRequiredField(1, result).isErr():
raise newException(DaemonLocalError, "Error message is missing!")
proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
proc recvMessage(
conn: StreamTransport
): Future[seq[byte]] {.async: (raises: [CancelledError, TransportError]).} =
var
size: uint
length: int
@ -500,14 +504,18 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport]
{.raises: [LPError].} =
result = connect(api.address)
proc newConnection*(
api: DaemonAPI
): Future[StreamTransport] {.async: (raises: [
CancelledError, LPError], raw: true).} =
connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] =
result = transp.closeWait()
proc closeConnection*(
api: DaemonAPI,
transp: StreamTransport): Future[void] {.async: (raises: []).} =
transp.closeWait()
proc socketExists(address: MultiAddress): Future[bool] {.async.} =
proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} =
try:
var transp = await connect(address)
await transp.closeWait()
@ -524,31 +532,44 @@ else:
proc getProcessId(): int =
result = int(posix.getpid())
proc getSocket(pattern: string,
count: ptr int): Future[MultiAddress] {.async.} =
proc getSocket(
pattern: string,
count: ptr int): Future[MultiAddress] {.async: (raises: [LPError]).} =
var sockname = ""
var pid = $getProcessId()
sockname = pattern % [pid, $(count[])]
try:
sockname = pattern % [pid, $(count[])]
except ValueError as exc:
raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg)
let tmpma = MultiAddress.init(sockname).tryGet()
if UNIX.match(tmpma):
while true:
count[] = count[] + 1
sockname = pattern % [pid, $(count[])]
try:
sockname = pattern % [pid, $(count[])]
except ValueError as exc:
raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg)
var ma = MultiAddress.init(sockname).tryGet()
let res = await socketExists(ma)
if not res:
result = ma
break
elif TCP.match(tmpma):
sockname = pattern % [pid, "0"]
try:
sockname = pattern % [pid, "0"]
except ValueError as exc:
raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg)
var ma = MultiAddress.init(sockname).tryGet()
var sock = createAsyncSocket(ma)
if sock.bindAsyncSocket(ma):
# Socket was successfully bound, then its free to use
count[] = count[] + 1
var ta = sock.getLocalAddress()
sockname = pattern % [pid, $ta.port]
try:
sockname = pattern % [pid, $ta.port]
except ValueError as exc:
raiseAssert("Pattern `" & pattern & "` is invalid: " & $exc.msg)
result = MultiAddress.init(sockname).tryGet()
closeSocket(sock)
@ -822,13 +843,25 @@ template withMessage(m, body: untyped): untyped =
else:
body
proc transactMessage(transp: StreamTransport,
pb: ProtoBuffer): Future[ProtoBuffer] {.async.} =
proc transactMessage(
transp: StreamTransport,
pb: ProtoBuffer
): Future[ProtoBuffer] {.async: (raises: [CancelledError, LPError]).} =
let length = pb.getLen()
let res = await transp.write(pb.getPtr(), length)
let res =
try:
await transp.write(pb.getPtr(), length)
except TransportError as exc:
raise newException(DaemonLocalError,
"Could not send message to daemon!", exc)
if res != length:
raise newException(DaemonLocalError, "Could not send message to daemon!")
var message = await transp.recvMessage()
raise newException(DaemonLocalError, "Sent incomplete message to daemon!")
var message =
try:
await transp.recvMessage()
except TransportError as exc:
raise newException(DaemonLocalError,
"Could not receive message from daemon!", exc)
if len(message) == 0:
raise newException(DaemonLocalError, "Incorrect or empty message received!")
result = initProtoBuffer(message)
@ -878,16 +911,18 @@ proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} =
finally:
await api.closeConnection(transp)
proc openStream*(api: DaemonAPI, peer: PeerId,
protocols: seq[string],
timeout = 0): Future[P2PStream] {.async.} =
proc openStream*(
api: DaemonAPI, peer: PeerId,
protocols: seq[string],
timeout = 0
): Future[P2PStream] {.async: (raises: [CancelledError, LPError]).} =
## Open new stream to peer ``peer`` using one of the protocols in
## ``protocols``. Returns ``StreamTransport`` for the stream.
var transp = await api.newConnection()
var stream = new P2PStream
try:
var pb = await transp.transactMessage(requestStreamOpen(peer, protocols,
timeout))
var pb = await transp.transactMessage(
requestStreamOpen(peer, protocols, timeout))
pb.withMessage() do:
var res: seq[byte]
if pb.getRequiredField(ResponseType.STREAMINFO.int, res).isOk():
@ -902,52 +937,67 @@ proc openStream*(api: DaemonAPI, peer: PeerId,
stream.flags.incl(Outbound)
stream.transp = transp
result = stream
except CatchableError as exc:
except ResultError[ProtoError] as exc:
await api.closeConnection(transp)
raise exc
raise newException(LPError, "Failed to parse message", exc)
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
proc streamHandler(
server: StreamServer,
transp: StreamTransport) {.async: (raises: []).} =
var api = getUserData[DaemonAPI](server)
var message = await transp.recvMessage()
var pb = initProtoBuffer(message)
var stream = new P2PStream
var raddress = newSeq[byte]()
stream.protocol = ""
pb.getRequiredField(1, stream.peer).tryGet()
pb.getRequiredField(2, raddress).tryGet()
stream.raddress = MultiAddress.init(raddress).tryGet()
pb.getRequiredField(3, stream.protocol).tryGet()
stream.flags.incl(Inbound)
stream.transp = transp
if len(stream.protocol) > 0:
var handler = api.handlers.getOrDefault(stream.protocol)
if not isNil(handler):
asyncSpawn handler(api, stream)
proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async, raises: [LPError].} =
## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection()
let maddress = await getSocket(api.pattern, addr api.ucounter)
var server = createStreamServer(maddress, streamHandler, udata = api)
try:
for item in protocols:
api.handlers[item] = handler
server.start()
var pb = await transp.transactMessage(requestStreamHandler(maddress,
protocols))
pb.withMessage() do:
api.servers.add(P2PServer(server: server, address: maddress))
except CatchableError as exc:
for item in protocols:
api.handlers.del(item)
server.stop()
server.close()
await server.join()
raise exc
finally:
var message = await transp.recvMessage()
var pb = initProtoBuffer(message)
var stream = new P2PStream
var raddress = newSeq[byte]()
stream.protocol = ""
pb.getRequiredField(1, stream.peer).tryGet()
pb.getRequiredField(2, raddress).tryGet()
stream.raddress = MultiAddress.init(raddress).tryGet()
pb.getRequiredField(3, stream.protocol).tryGet()
stream.flags.incl(Inbound)
stream.transp = transp
if len(stream.protocol) > 0:
var handler = api.handlers.getOrDefault(stream.protocol)
if not isNil(handler):
asyncSpawn handler(api, stream)
except CancelledError, LPError, ResultError[ProtoError], TransportError:
await api.closeConnection(transp)
proc addHandler*(
api: DaemonAPI,
protocols: seq[string],
handler: P2PStreamCallback) {.async: (raises: [CancelledError, LPError]).} =
## Add stream handler ``handler`` for set of protocols ``protocols``.
let transp = await api.newConnection()
defer: await api.closeConnection(transp)
var added = false
for item in protocols:
api.handlers[item] = handler
defer:
if not added:
for item in protocols:
api.handlers.del(item)
let
maddress = await getSocket(api.pattern, addr api.ucounter)
server = createStreamServer(maddress, streamHandler, udata = api)
defer:
if not added:
try:
server.stop()
except TransportOsError:
discard
server.close()
await noCancel server.join()
var pb = await transp.transactMessage(
requestStreamHandler(maddress, protocols))
pb.withMessage() do:
api.servers.add(P2PServer(server: server, address: maddress))
added = true
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
## Get list of remote peers to which we are currently connected.
var transp = await api.newConnection()

View File

@ -21,47 +21,49 @@ type
Dial* = ref object of RootObj
method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
) {.async: (raises: [CancelledError, LPError], raw: true), base.} =
## connect remote peer without negotiating
## a protocol
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method connect*(
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async, base.} =
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## Connects to a peer and retrieve its PeerId
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
protos: seq[string]
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream over an
## existing connection
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
doAssert(false, "Not implemented!")
method addTransport*(

View File

@ -49,12 +49,12 @@ type
nameResolver: NameResolver
proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
dir = Direction.Out):
Future[Muxer] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
dir = Direction.Out
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
for transport in self.transports: # for each transport
if transport.handles(address): # check if it can dial it
@ -101,10 +101,11 @@ proc dialAndUpgrade(
return nil
proc expandDnsAddr(
self: Dialer,
peerId: Opt[PeerId],
address: MultiAddress): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
address: MultiAddress
): Future[seq[(MultiAddress, Opt[PeerId])]] {.async: (raises: [
CancelledError, LPError]).} =
if not DNSADDR.matchPartial(address): return @[(address, peerId)]
if isNil(self.nameResolver):
info "Can't resolve DNSADDR without NameResolver", ma=address
@ -113,7 +114,8 @@ proc expandDnsAddr(
let
toResolve =
if peerId.isSome:
address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet()
address & MultiAddress.init(multiCodec("p2p"), peerId.get())
.tryGet()
else:
address
resolved = await self.nameResolver.resolveDnsAddr(toResolve)
@ -124,17 +126,17 @@ proc expandDnsAddr(
let
peerIdBytes = lastPart.protoArgument().tryGet()
addrPeerId = PeerId.init(peerIdBytes).tryGet()
result.add((resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId)))
result.add((
resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId)))
else:
result.add((resolvedAddress, peerId))
proc dialAndUpgrade(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
dir = Direction.Out):
Future[Muxer] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
dir = Direction.Out
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
debug "Dialing peer", peerId = peerId.get(default(PeerId))
for rawAddress in addrs:
@ -163,15 +165,15 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] =
return Opt.some(muxer)
proc internalConnect(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
Future[Muxer] {.async.} =
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out
): Future[Muxer] {.async: (raises: [CancelledError, LPError]).} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
raise newException(DialFailedError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())
@ -187,7 +189,10 @@ proc internalConnect(
let muxed =
try:
await self.dialAndUpgrade(peerId, addrs, dir)
except CatchableError as exc:
except CancelledError as exc:
slot.release()
raise exc
except LPError as exc:
slot.release()
raise exc
slot.trackMuxer(muxed)
@ -197,7 +202,11 @@ proc internalConnect(
try:
self.connManager.storeMuxer(muxed)
await self.peerStore.identify(muxed)
except CatchableError as exc:
except CancelledError as exc:
trace "Failed to finish outgoung upgrade", err=exc.msg
await muxed.close()
raise exc
except LPError as exc:
trace "Failed to finish outgoung upgrade", err=exc.msg
await muxed.close()
raise exc
@ -205,28 +214,32 @@ proc internalConnect(
return muxed
finally:
if lock.locked():
lock.release()
try:
lock.release()
except AsyncLockError as exc:
raiseAssert("Releasing an acquired lock should work: " & $exc.msg)
method connect*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async.} =
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async: (raises: [CancelledError, LPError]).} =
## connect remote peer without negotiating
## a protocol
##
if self.connManager.connCount(peerId) > 0 and reuseConnection:
return
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, dir)
discard await self.internalConnect(
Opt.some(peerId), addrs, forceDial, reuseConnection, dir)
method connect*(
self: Dialer,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async.} =
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [CancelledError, LPError]).} =
## Connects to a peer and retrieve its PeerId
parseFullAddress(address).toOpt().withValue(fullAddress):
@ -236,17 +249,19 @@ method connect*(
false)).connection.peerId
if allowUnknownPeerId == false:
raise newException(DialFailedError, "Address without PeerID and unknown peer id disabled!")
raise newException(DialFailedError,
"Address without PeerID and unknown peer id disabled!")
return (await self.internalConnect(
(await self.internalConnect(
Opt.none(PeerId),
@[address],
false)).connection.peerId
proc negotiateStream(
self: Dialer,
conn: Connection,
protos: seq[string]): Future[Connection] {.async.} =
self: Dialer,
conn: Connection,
protos: seq[string]
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
trace "Negotiating stream", conn, protos
let selected = await MultistreamSelect.select(conn, protos)
if not protos.contains(selected):
@ -277,9 +292,10 @@ method tryDial*(
raise newException(DialFailedError, exc.msg)
method dial*(
self: Dialer,
peerId: PeerId,
protos: seq[string]): Future[Connection] {.async.} =
self: Dialer,
peerId: PeerId,
protos: seq[string]
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
## create a protocol stream over an
## existing connection
##
@ -292,11 +308,12 @@ method dial*(
return await self.negotiateStream(stream, protos)
method dial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async.} =
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [CancelledError, LPError]).} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
@ -305,7 +322,7 @@ method dial*(
conn: Muxer
stream: Connection
proc cleanup() {.async.} =
proc cleanup() {.async: (raises: []).} =
if not(isNil(stream)):
await stream.closeWithEOF()
@ -327,7 +344,7 @@ method dial*(
trace "Dial canceled", conn
await cleanup()
raise exc
except CatchableError as exc:
except LPError as exc:
debug "Error dialing", conn, err = exc.msg
await cleanup()
raise exc

View File

@ -50,10 +50,12 @@ template validateSuffix(str: string): untyped =
else:
raise newException(MultiStreamError, "MultistreamSelect failed, malformed message")
proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: seq[string]):
Future[string] {.async.} =
proc select*(
_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: seq[string]
): Future[string] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =
trace "initiating handshake", conn, codec = Codec
## select a remote protocol
await conn.writeLp(Codec & "\n") # write handshake
@ -98,15 +100,22 @@ proc select*(_: MultistreamSelect | type MultistreamSelect,
# No alternatives, fail
return ""
proc select*(_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: string): Future[bool] {.async.} =
proc select*(
_: MultistreamSelect | type MultistreamSelect,
conn: Connection,
proto: string
): Future[bool] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError]).} =
if proto.len > 0:
return (await MultistreamSelect.select(conn, @[proto])) == proto
(await MultistreamSelect.select(conn, @[proto])) == proto
else:
return (await MultistreamSelect.select(conn, @[])) == Codec
(await MultistreamSelect.select(conn, @[])) == Codec
proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
proc select*(
m: MultistreamSelect,
conn: Connection
): Future[bool] {.async: (raises: [
CancelledError, LPStreamError, MultiStreamError], raw: true).} =
m.select(conn, "")
proc list*(m: MultistreamSelect,

View File

@ -83,11 +83,11 @@ proc getDnsResponse(
await sock.closeWait()
method resolveIp*(
self: DnsResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} =
self: DnsResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async: (raises: [CancelledError]).} =
trace "Resolving IP using DNS", address, servers = self.nameServers.mapIt($it), domain
for _ in 0 ..< self.nameServers.len:
let server = self.nameServers[0]
@ -132,14 +132,21 @@ method resolveIp*(
continue
trace "Got IPs from DNS server", resolvedAddresses, server = $server
return resolvedAddresses.toSeq().mapIt(initTAddress(it, port))
var res = newSeqOfCap[TransportAddress](resolvedAddresses.len)
for address in resolvedAddresses:
try:
res.add(initTAddress(address, port))
except TransportAddressError as e:
debug "Failed to parse IP from DNS server", error=e.msg
return res
debug "Failed to resolve address, returning empty set"
return @[]
method resolveTxt*(
self: DnsResolver,
address: string): Future[seq[string]] {.async.} =
self: DnsResolver,
address: string
): Future[seq[string]] {.async: (raises: [CancelledError]).} =
trace "Resolving TXT using DNS", address, servers = self.nameServers.mapIt($it)
for _ in 0 ..< self.nameServers.len:

View File

@ -26,21 +26,36 @@ type MockResolver* = ref object of NameResolver
ipResponses*: Table[(string, bool), seq[string]]
method resolveIp*(
self: MockResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async.} =
self: MockResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async: (raises: [
CancelledError], raw: true).} =
var res: seq[TransportAddress]
if domain == Domain.AF_INET or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, false)):
result.add(initTAddress(resp, port))
try:
res.add(initTAddress(resp, port))
except TransportAddressError:
raiseAssert("ipResponses should only contain valid IP addresses")
if domain == Domain.AF_INET6 or domain == Domain.AF_UNSPEC:
for resp in self.ipResponses.getOrDefault((address, true)):
result.add(initTAddress(resp, port))
try:
res.add(initTAddress(resp, port))
except TransportAddressError:
raiseAssert("ipResponses should only contain valid IP addresses")
let fut = newFuture[seq[TransportAddress]]()
fut.complete(res)
fut
method resolveTxt*(
self: MockResolver,
address: string): Future[seq[string]] {.async.} =
return self.txtResponses.getOrDefault(address)
self: MockResolver,
address: string
): Future[seq[string]] {.async: (raises: [CancelledError], raw: true).} =
let fut = newFuture[seq[string]]()
fut.complete(self.txtResponses.getOrDefault(address))
fut
proc new*(T: typedesc[MockResolver]): T = T()

View File

@ -14,7 +14,7 @@ import
chronos,
chronicles,
stew/endians2
import ".."/[multiaddress, multicodec]
import ".."/[errors, multiaddress, multicodec]
logScope:
topics = "libp2p nameresolver"
@ -23,22 +23,23 @@ type
NameResolver* = ref object of RootObj
method resolveTxt*(
self: NameResolver,
address: string): Future[seq[string]] {.async, base.} =
self: NameResolver,
address: string
): Future[seq[string]] {.async: (raises: [CancelledError], raw: true), base.} =
## Get TXT record
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method resolveIp*(
self: NameResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} =
self: NameResolver,
address: string,
port: Port,
domain: Domain = Domain.AF_UNSPEC
): Future[seq[TransportAddress]] {.async: (raises: [
CancelledError], raw: true), base.} =
## Resolve the specified address
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
proc getHostname*(ma: MultiAddress): string =
let
@ -48,17 +49,18 @@ proc getHostname*(ma: MultiAddress): string =
else: ""
proc resolveOneAddress(
self: NameResolver,
ma: MultiAddress,
domain: Domain = Domain.AF_UNSPEC,
prefix = ""): Future[seq[MultiAddress]]
{.async.} =
self: NameResolver,
ma: MultiAddress,
domain: Domain = Domain.AF_UNSPEC,
prefix = ""
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} =
#Resolve a single address
var pbuf: array[2, byte]
var dnsval = getHostname(ma)
if ma[1].tryGet().protoArgument(pbuf).tryGet() == 0:
if ma[1].tryGet()
.protoArgument(pbuf).tryGet() == 0:
raise newException(MaError, "Incorrect port number")
let
port = Port(fromBytesBE(uint16, pbuf))
@ -66,17 +68,18 @@ proc resolveOneAddress(
return collect(newSeqOfCap(4)):
for address in resolvedAddresses:
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
var createdAddress = MultiAddress.init(address)
.tryGet()[0].tryGet()
for part in ma:
if DNS.match(part.tryGet()): continue
createdAddress &= part.tryGet()
createdAddress
proc resolveDnsAddr*(
self: NameResolver,
ma: MultiAddress,
depth: int = 0): Future[seq[MultiAddress]] {.async.} =
self: NameResolver,
ma: MultiAddress,
depth: int = 0
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} =
if not DNSADDR.matchPartial(ma):
return @[ma]
@ -96,7 +99,8 @@ proc resolveDnsAddr*(
if not entry.startsWith("dnsaddr="): continue
let entryValue = MultiAddress.init(entry[8..^1]).tryGet()
if entryValue.contains(multiCodec("p2p")).tryGet() and ma.contains(multiCodec("p2p")).tryGet():
if entryValue.contains(multiCodec("p2p")).tryGet() and
ma.contains(multiCodec("p2p")).tryGet():
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
continue
@ -111,14 +115,16 @@ proc resolveDnsAddr*(
proc resolveMAddress*(
self: NameResolver,
address: MultiAddress): Future[seq[MultiAddress]] {.async.} =
self: NameResolver,
address: MultiAddress
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError, LPError]).} =
var res = initOrderedSet[MultiAddress]()
if not DNS.matchPartial(address):
res.incl(address)
else:
let code = address[0].tryGet().protoCode().tryGet()
let code = address[0].tryGet()
.protoCode().tryGet()
let seq = case code:
of multiCodec("dns"):
await self.resolveOneAddress(address)

View File

@ -85,20 +85,16 @@ proc parseFullAddress*(ma: string | seq[byte]): MaResult[(PeerId, MultiAddress)]
parseFullAddress(? MultiAddress.init(ma))
proc new*(
p: typedesc[PeerInfo],
key: PrivateKey,
listenAddrs: openArray[MultiAddress] = [],
protocols: openArray[string] = [],
protoVersion: string = "",
agentVersion: string = "",
addressMappers = newSeq[AddressMapper](),
): PeerInfo
{.raises: [LPError].} =
let pubkey = try:
key.getPublicKey().tryGet()
except CatchableError:
raise newException(PeerInfoError, "invalid private key")
p: typedesc[PeerInfo],
key: PrivateKey,
listenAddrs: openArray[MultiAddress] = [],
protocols: openArray[string] = [],
protoVersion: string = "",
agentVersion: string = "",
addressMappers = newSeq[AddressMapper]()
): PeerInfo {.raises: [LPError].} =
let pubkey = key.getPublicKey().valueOr:
raise newException(PeerInfoError, "invalid private key")
let peerId = PeerId.init(key).tryGet()

View File

@ -194,8 +194,8 @@ proc cleanup*(
peerStore.toClean.delete(0)
proc identify*(
peerStore: PeerStore,
muxer: Muxer) {.async.} =
peerStore: PeerStore,
muxer: Muxer) {.async: (raises: [CancelledError, LPError]).} =
# new stream for identify
var stream = await muxer.newStream()

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -30,9 +30,13 @@ type
ReservationError* = object of RelayClientError
RelayV1DialError* = object of RelayClientError
RelayV2DialError* = object of RelayClientError
RelayClientAddConn* = proc(conn: Connection,
duration: uint32,
data: uint64): Future[void] {.gcsafe, raises: [].}
RelayClientAddConn* = proc(
conn: Connection,
duration: uint32,
data: uint64
): Future[void] {.async: (raises: []).}
RelayClient* = ref object of Relay
onNewConnection*: RelayClientAddConn
canHop: bool
@ -44,12 +48,19 @@ type
limitDuration*: uint32 # seconds
limitData*: uint64 # bytes
proc sendStopError(conn: Connection, code: StatusV2) {.async.} =
proc sendStopError(
conn: Connection,
code: StatusV2
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
trace "send stop status", status = $code & " (" & $ord(code) & ")"
let msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code))
await conn.writeLp(encode(msg).buffer)
conn.writeLp(encode(msg).buffer)
proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {.async.} =
proc handleRelayedConnect(
cl: RelayClient,
conn: Connection,
msg: StopMessage) {.async: (raises: [CancelledError, LPStreamError]).} =
let
# TODO: check the go version to see in which way this could fail
# it's unclear in the spec
@ -72,12 +83,16 @@ proc handleRelayedConnect(cl: RelayClient, conn: Connection, msg: StopMessage) {
await conn.writeLp(pb.buffer)
# This sound redundant but the callback could, in theory, be set to nil during
# conn.writeLp so it's safer to double check
if cl.onNewConnection != nil: await cl.onNewConnection(conn, limitDuration, limitData)
else: await conn.close()
if cl.onNewConnection != nil:
await cl.onNewConnection(conn, limitDuration, limitData)
else:
await conn.close()
proc reserve*(cl: RelayClient,
peerId: PeerId,
addrs: seq[MultiAddress] = @[]): Future[Rsvp] {.async.} =
proc reserve*(
cl: RelayClient,
peerId: PeerId,
addrs: seq[MultiAddress] = @[]
): Future[Rsvp] {.async: (raises: [CancelledError, LPError]).} =
let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec)
defer: await conn.close()
let
@ -87,9 +102,10 @@ proc reserve*(cl: RelayClient,
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error writing or reading reservation message", exc=exc.msg
raise newException(ReservationError, exc.msg)
except LPError, ResultError[void]:
let msg = getCurrentExceptionMsg()
trace "error writing or reading reservation message", exc = msg
raise newException(ReservationError, msg)
if msg.msgType != HopMessageType.Status:
raise newException(ReservationError, "Unexpected relay response type")
@ -118,11 +134,14 @@ proc dialPeerV1*(
cl: RelayClient,
conn: Connection,
dstPeerId: PeerId,
dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} =
dstAddrs: seq[MultiAddress]
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, RelayV1DialError]).} =
var
msg = RelayMessage(
msgType: Opt.some(RelayType.Hop),
srcPeer: Opt.some(RelayPeer(peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)),
srcPeer: Opt.some(RelayPeer(
peerId: cl.switch.peerInfo.peerId, addrs: cl.switch.peerInfo.addrs)),
dstPeer: Opt.some(RelayPeer(peerId: dstPeerId, addrs: dstAddrs)))
pb = encode(msg)
@ -132,7 +151,7 @@ proc dialPeerV1*(
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "error writing hop request", exc=exc.msg
raise exc
@ -140,31 +159,35 @@ proc dialPeerV1*(
RelayMessage.decode(await conn.readLp(RelayClientMsgSize))
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "error reading stop response", exc=exc.msg
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc
try:
let msgRcvFromRelay = msgRcvFromRelayOpt.valueOr:
raise newException(RelayV1DialError, "Hop can't open destination stream")
raise newException(RelayV1DialError,
"Hop can't open destination stream")
if msgRcvFromRelay.msgType.tryGet() != RelayType.Status:
raise newException(RelayV1DialError, "Hop can't open destination stream: wrong message type")
raise newException(RelayV1DialError,
"Hop can't open destination stream: wrong message type")
if msgRcvFromRelay.status.tryGet() != StatusV1.Success:
raise newException(RelayV1DialError, "Hop can't open destination stream: status failed")
raise newException(RelayV1DialError,
"Hop can't open destination stream: status failed")
except RelayV1DialError as exc:
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise exc
except ValueError as exc:
except ResultError[void] as exc:
await sendStatus(conn, StatusV1.HopCantOpenDstStream)
raise newException(RelayV1DialError, exc.msg)
result = conn
conn
proc dialPeerV2*(
cl: RelayClient,
conn: RelayConnection,
dstPeerId: PeerId,
dstAddrs: seq[MultiAddress]): Future[Connection] {.async.} =
dstAddrs: seq[MultiAddress]
): Future[Connection] {.async: (raises: [CancelledError, RelayV2DialError]).} =
let
p = Peer(peerId: dstPeerId, addrs: dstAddrs)
pb = encode(HopMessage(msgType: HopMessageType.Connect, peer: Opt.some(p)))
@ -176,8 +199,11 @@ proc dialPeerV2*(
HopMessage.decode(await conn.readLp(RelayClientMsgSize)).tryGet()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error reading stop response", exc=exc.msg
except LPStreamError as exc:
trace "error reading stop response", exc = exc.msg
raise newException(RelayV2DialError, exc.msg)
except ResultError[void] as exc:
trace "error reading stop response", exc = exc.msg
raise newException(RelayV2DialError, exc.msg)
if msgRcvFromRelay.msgType != HopMessageType.Status:
@ -187,7 +213,7 @@ proc dialPeerV2*(
raise newException(RelayV2DialError, "Relay stop failure")
conn.limitDuration = msgRcvFromRelay.limit.duration
conn.limitData = msgRcvFromRelay.limit.data
return conn
conn
proc handleStopStreamV2(cl: RelayClient, conn: Connection) {.async.} =
let msg = StopMessage.decode(await conn.readLp(RelayClientMsgSize)).valueOr:

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -21,36 +21,48 @@ const
RelayV2HopCodec* = "/libp2p/circuit/relay/0.2.0/hop"
RelayV2StopCodec* = "/libp2p/circuit/relay/0.2.0/stop"
proc sendStatus*(conn: Connection, code: StatusV1) {.async.} =
proc sendStatus*(
conn: Connection,
code: StatusV1
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send relay/v1 status", status = $code & "(" & $ord(code) & ")"
let
msg = RelayMessage(msgType: Opt.some(RelayType.Status), status: Opt.some(code))
msg = RelayMessage(
msgType: Opt.some(RelayType.Status), status: Opt.some(code))
pb = encode(msg)
await conn.writeLp(pb.buffer)
conn.writeLp(pb.buffer)
proc sendHopStatus*(conn: Connection, code: StatusV2) {.async.} =
proc sendHopStatus*(
conn: Connection,
code: StatusV2
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send hop relay/v2 status", status = $code & "(" & $ord(code) & ")"
let
msg = HopMessage(msgType: HopMessageType.Status, status: Opt.some(code))
pb = encode(msg)
await conn.writeLp(pb.buffer)
conn.writeLp(pb.buffer)
proc sendStopStatus*(conn: Connection, code: StatusV2) {.async.} =
proc sendStopStatus*(
conn: Connection,
code: StatusV2
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "send stop relay/v2 status", status = $code & " (" & $ord(code) & ")"
let
msg = StopMessage(msgType: StopMessageType.Status, status: Opt.some(code))
pb = encode(msg)
await conn.writeLp(pb.buffer)
conn.writeLp(pb.buffer)
proc bridge*(connSrc: Connection, connDst: Connection) {.async.} =
proc bridge*(
connSrc: Connection,
connDst: Connection) {.async: (raises: [CancelledError]).} =
const bufferSize = 4096
var
bufSrcToDst: array[bufferSize, byte]
bufDstToSrc: array[bufferSize, byte]
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1)
bytesSendFromSrcToDst = 0
bytesSendFromDstToSrc = 0
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.len)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.len)
bytesSentFromSrcToDst = 0
bytesSentFromDstToSrc = 0
bufRead: int
try:
@ -61,25 +73,25 @@ proc bridge*(connSrc: Connection, connDst: Connection) {.async.} =
if futSrc.finished():
bufRead = await futSrc
if bufRead > 0:
bytesSendFromSrcToDst.inc(bufRead)
await connDst.write(@bufSrcToDst[0..<bufRead])
zeroMem(addr(bufSrcToDst), bufSrcToDst.high + 1)
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.high + 1)
bytesSentFromSrcToDst.inc(bufRead)
await connDst.write(@bufSrcToDst[0 ..< bufRead])
zeroMem(addr bufSrcToDst[0], bufSrcToDst.len)
futSrc = connSrc.readOnce(addr bufSrcToDst[0], bufSrcToDst.len)
if futDst.finished():
bufRead = await futDst
if bufRead > 0:
bytesSendFromDstToSrc += bufRead
await connSrc.write(bufDstToSrc[0..<bufRead])
zeroMem(addr(bufDstToSrc), bufDstToSrc.high + 1)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.high + 1)
bytesSentFromDstToSrc += bufRead
await connSrc.write(bufDstToSrc[0 ..< bufRead])
zeroMem(addr bufDstToSrc[0], bufDstToSrc.len)
futDst = connDst.readOnce(addr bufDstToSrc[0], bufDstToSrc.len)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
if connSrc.closed() or connSrc.atEof():
trace "relay src closed connection", src = connSrc.peerId
if connDst.closed() or connDst.atEof():
trace "relay dst closed connection", dst = connDst.peerId
trace "relay error", exc=exc.msg
trace "end relaying", bytesSendFromSrcToDst, bytesSendFromDstToSrc
trace "end relaying", bytesSentFromSrcToDst, bytesSentFromDstToSrc
await futSrc.cancelAndWait()
await futDst.cancelAndWait()

View File

@ -167,9 +167,12 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async.} =
proc identify*(
self: Identify,
conn: Connection,
remotePeerId: PeerId
): Future[IdentifyInfo] {.async: (raises: [
CancelledError, IdentifyError, LPStreamError]).} =
trace "initiating identify", conn
var message = await conn.readLp(64*1024)
if len(message) == 0:

View File

@ -327,26 +327,29 @@ proc sendRegisterResponseError(conn: Connection,
registerResponse: Opt.some(RegisterResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
proc sendDiscoverResponse(conn: Connection,
s: seq[Register],
cookie: Cookie) {.async.} =
proc sendDiscoverResponse(
conn: Connection,
s: seq[Register],
cookie: Cookie
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)
))
))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)))))
conn.writeLp(msg.buffer)
proc sendDiscoverResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
proc sendDiscoverResponseError(
conn: Connection,
status: ResponseStatus,
text: string = ""
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: status, text: Opt.some(text)))))
conn.writeLp(msg.buffer)
proc countRegister(rdv: RendezVous, peerId: PeerId): int =
let n = Moment.now()
@ -407,24 +410,26 @@ proc unregister(rdv: RendezVous, conn: Connection, u: Unregister) =
except KeyError:
return
proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
proc discover(
rdv: RendezVous,
conn: Connection,
d: Discover
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "Received Discover", peerId = conn.peerId, ns = d.ns
libp2p_rendezvous_discover.inc()
if d.ns.len notin 0..255:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
var
cookie =
if d.cookie.isSome():
try:
Cookie.decode(d.cookie.tryGet()).tryGet()
except CatchableError:
await conn.sendDiscoverResponseError(InvalidCookie)
return
except ResultError[void]:
return conn.sendDiscoverResponseError(InvalidCookie)
else: Cookie(offset: rdv.registered.low().uint64 - 1)
if cookie.ns != d.ns or
cookie.offset notin rdv.registered.low().uint64..rdv.registered.high().uint64:
if cookie.ns != d.ns or cookie.offset notin
rdv.registered.low().uint64..rdv.registered.high().uint64:
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
let
nsSalted = d.ns & rdv.salt
@ -433,12 +438,10 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
try:
rdv.namespaces[nsSalted]
except KeyError:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
else: toSeq(cookie.offset.int..rdv.registered.high())
if namespaces.len() == 0:
await conn.sendDiscoverResponse(@[], Cookie())
return
return conn.sendDiscoverResponse(@[], Cookie())
var offset = namespaces[^1]
let n = Moment.now()
var s = collect(newSeq()):
@ -452,12 +455,12 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
rdv.rng.shuffle(s)
await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
proc advertisePeer(rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async.} =
proc advertiseWrap() {.async.} =
proc advertiseWrap() {.async: (raises: []).} =
try:
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer: await conn.close()
@ -470,9 +473,10 @@ proc advertisePeer(rdv: RendezVous,
elif msgRecv.registerResponse.tryGet().status != ResponseStatus.Ok:
trace "Refuse to register", peer, response = msgRecv.registerResponse
else:
trace "Successfully registered", peer, response = msgRecv.registerResponse
except CatchableError as exc:
trace "exception in the advertise", error = exc.msg
trace "Successfully registered",
peer, response = msgRecv.registerResponse
except CancelledError, LPError, ResultError[void]:
trace "exception in the advertise", error = getCurrentExceptionMsg()
finally:
rdv.sema.release()
await rdv.sema.acquire()
@ -625,8 +629,9 @@ proc setup*(rdv: RendezVous, switch: Switch) =
rdv.switch.addPeerEventHandler(handlePeer, Joined)
rdv.switch.addPeerEventHandler(handlePeer, Left)
proc new*(T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng()): T =
proc new*(
T: typedesc[RendezVous],
rng: ref HmacDrbgContext = newRng()): T =
let rdv = T(
rng: rng,
salt: string.fromBytes(generateBytes(rng[], 8)),
@ -651,8 +656,8 @@ proc new*(T: typedesc[RendezVous],
trace "Got an unexpected Discover Response", response = msg.discoverResponse
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in rendezvous handler", error = exc.msg
except LPStreamError, RegistrationError, ResultError[void]:
trace "exception in rendezvous handler", error = getCurrentExceptionMsg()
finally:
await conn.close()

View File

@ -315,7 +315,11 @@ proc readFrame(
await sconn.readExactly(addr buffer[0], buffer.len)
return buffer
proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] =
proc writeFrame(
sconn: Connection,
buf: openArray[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
doAssert buf.len <= uint16.high.int
var
lesize = buf.len.uint16
@ -326,13 +330,24 @@ proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] =
outbuf &= buf
sconn.write(outbuf)
proc receiveHSMessage(sconn: Connection): Future[seq[byte]] = readFrame(sconn)
proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void] =
proc receiveHSMessage(
sconn: Connection
): Future[seq[byte]] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
readFrame(sconn)
proc sendHSMessage(
sconn: Connection,
buf: openArray[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
writeFrame(sconn, buf)
proc handshakeXXOutbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: seq[byte]
): Future[HandshakeResult] {.async: (raises: [
CancelledError, LPStreamError]).} =
const initiator = true
var
hs = HandshakeState.init()
@ -380,7 +395,9 @@ proc handshakeXXOutbound(
proc handshakeXXInbound(
p: Noise, conn: Connection,
p2pSecret: seq[byte]): Future[HandshakeResult] {.async.} =
p2pSecret: seq[byte]
): Future[HandshakeResult] {.async: (raises: [
CancelledError, LPStreamError]).} =
const initiator = false
var
@ -509,21 +526,28 @@ method write*(
# sequencing issues
sconn.stream.write(cipherFrames)
method handshake*(p: Noise, conn: Connection, initiator: bool, peerId: Opt[PeerId]): Future[SecureConn] {.async.} =
method handshake*(
p: Noise,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]
): Future[SecureConn] {.async: (raises: [CancelledError, LPStreamError]).} =
trace "Starting Noise handshake", conn, initiator
let timeout = conn.timeout
conn.timeout = HandshakeTimeout
# https://github.com/libp2p/specs/tree/master/noise#libp2p-data-in-handshake-messages
let
signedPayload = p.localPrivateKey.sign(
PayloadString & p.noiseKeys.publicKey.getBytes).tryGet()
let signedPayload = p.localPrivateKey.sign(
PayloadString & p.noiseKeys.publicKey.getBytes)
if signedPayload.isErr():
raise newException(NoiseHandshakeError,
"Failed to sign public key: " & $signedPayload.error())
var
libp2pProof = initProtoBuffer()
libp2pProof.write(1, p.localPublicKey)
libp2pProof.write(2, signedPayload.getBytes())
libp2pProof.write(2, signedPayload.get().getBytes())
# data field also there but not used!
libp2pProof.finish()

View File

@ -259,13 +259,13 @@ method write*(
await sconn.stream.write(msg)
sconn.activity = true
proc newSecioConn(conn: Connection,
hash: string,
cipher: string,
secrets: Secret,
order: int,
remotePubKey: PublicKey): SecioConn
{.raises: [LPError].} =
proc newSecioConn(
conn: Connection,
hash: string,
cipher: string,
secrets: Secret,
order: int,
remotePubKey: PublicKey): SecioConn =
## Create new secure stream/lpstream, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``.
@ -288,13 +288,20 @@ proc newSecioConn(conn: Connection,
result.readerCoder.init(cipher, secrets.keyOpenArray(i1),
secrets.ivOpenArray(i1))
proc transactMessage(conn: Connection,
msg: seq[byte]): Future[seq[byte]] {.async.} =
proc transactMessage(
conn: Connection,
msg: seq[byte]
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
trace "Sending message", message = msg.shortLog, length = len(msg)
await conn.write(msg)
return await conn.readRawMessage()
await conn.readRawMessage()
method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerId]): Future[SecureConn] {.async.} =
method handshake*(
s: Secio,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]
): Future[SecureConn] {.async: (raises: [CancelledError, LPStreamError]).} =
var
localNonce: array[SecioNonceSize, byte]
remoteNonce: seq[byte]
@ -307,25 +314,29 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI
remoteExchanges: string
remoteCiphers: string
remoteHashes: string
remotePeerId: PeerId
localPeerId: PeerId
localBytesPubkey = s.localPublicKey.getBytes().tryGet()
localBytesPubkey = s.localPublicKey.getBytes()
if localBytesPubkey.isErr():
raise (ref SecioError)(
msg: "Failed to get local public key bytes: " & $localBytesPubkey.error())
hmacDrbgGenerate(s.rng[], localNonce)
var request = createProposal(localNonce,
localBytesPubkey,
localBytesPubkey.get(),
SecioExchanges,
SecioCiphers,
SecioHashes)
localPeerId = PeerId.init(s.localPublicKey).tryGet()
let localPeerId = PeerId.init(s.localPublicKey)
if localPeerId.isErr():
raise (ref SecioError)(
msg: "Failed to initialize local peer ID: " & $localPeerId.error())
trace "Local proposal", schemes = SecioExchanges,
ciphers = SecioCiphers,
hashes = SecioHashes,
pubkey = localBytesPubkey.shortLog,
peer = localPeerId
pubkey = localBytesPubkey.get().shortLog,
peer = localPeerId.get()
var answer = await transactMessage(conn, request)
@ -343,39 +354,54 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI
pubkey = remoteBytesPubkey.shortLog
raise (ref SecioError)(msg: "Remote public key incorrect or corrupted")
remotePeerId = PeerId.init(remotePubkey).tryGet()
let remotePeerId = PeerId.init(remotePubkey)
if remotePeerId.isErr():
raise (ref SecioError)(
msg: "Failed to initialize remote peer ID: " & $remotePeerId.error())
peerId.withValue(targetPid):
if not targetPid.validate():
raise newException(SecioError, "Failed to validate expected peerId.")
if remotePeerId != targetPid:
if remotePeerId.get() != targetPid:
raise newException(SecioError, "Peer ids don't match!")
conn.peerId = remotePeerId
let order = getOrder(remoteBytesPubkey, localNonce, localBytesPubkey,
remoteNonce).tryGet()
conn.peerId = remotePeerId.get()
let order = getOrder(
remoteBytesPubkey, localNonce, localBytesPubkey.get(), remoteNonce)
if order.isErr():
raise (ref SecioError)(msg: "Failed to get order: " & $order.error())
trace "Remote proposal", schemes = remoteExchanges, ciphers = remoteCiphers,
hashes = remoteHashes,
pubkey = remoteBytesPubkey.shortLog, order = order,
peer = remotePeerId
pubkey = remoteBytesPubkey.shortLog,
order = order.get(),
peer = remotePeerId.get()
let scheme = selectBest(order, SecioExchanges, remoteExchanges)
let cipher = selectBest(order, SecioCiphers, remoteCiphers)
let hash = selectBest(order, SecioHashes, remoteHashes)
let scheme = selectBest(order.get(), SecioExchanges, remoteExchanges)
let cipher = selectBest(order.get(), SecioCiphers, remoteCiphers)
let hash = selectBest(order.get(), SecioHashes, remoteHashes)
if len(scheme) == 0 or len(cipher) == 0 or len(hash) == 0:
trace "No algorithms in common", peer = remotePeerId
trace "No algorithms in common", peer = remotePeerId.get()
raise (ref SecioError)(msg: "No algorithms in common")
trace "Encryption scheme selected", scheme = scheme, cipher = cipher,
hash = hash
var ekeypair = ephemeral(scheme, s.rng[]).tryGet()
let ekeypair = ephemeral(scheme, s.rng[])
if ekeypair.isErr():
raise (ref SecioError)(
msg: "Failed to create ephemeral keypair: " & $ekeypair.error())
# We need EC public key in raw binary form
var epubkey = ekeypair.pubkey.getRawBytes().tryGet()
var localCorpus = request[4..^1] & answer & epubkey
var signature = s.localPrivateKey.sign(localCorpus).tryGet()
let epubkey = ekeypair.get().pubkey.getRawBytes()
if epubkey.isErr():
raise (ref SecioError)(
msg: "Failed to get ephemeral key bytes: " & $epubkey.error())
var localCorpus = request[4..^1] & answer & epubkey.get()
let signature = s.localPrivateKey.sign(localCorpus)
if signature.isErr():
raise (ref SecioError)(
msg: "Failed to sign local corpus: " & $signature.error())
var localExchange = createExchange(epubkey, signature.getBytes())
var localExchange = createExchange(epubkey.get(), signature.get().getBytes())
var remoteExchange = await transactMessage(conn, localExchange)
if len(remoteExchange) == 0:
trace "Corpus exchange failed", conn
@ -404,7 +430,7 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI
pubkey = toHex(remoteEBytesPubkey)
raise (ref SecioError)(msg: "Remote ephemeral public key incorrect or corrupted")
var secret = getSecret(remoteEPubkey, ekeypair.seckey)
var secret = getSecret(remoteEPubkey, ekeypair.get().seckey)
if len(secret) == 0:
trace "Shared secret could not be created"
raise (ref SecioError)(msg: "Shared secret could not be created")
@ -421,7 +447,8 @@ method handshake*(s: Secio, conn: Connection, initiator: bool, peerId: Opt[PeerI
# Perform Nonce exchange over encrypted channel.
var secioConn = newSecioConn(conn, hash, cipher, keys, order, remotePubkey)
var secioConn = newSecioConn(
conn, hash, cipher, keys, order.get(), remotePubkey)
result = secioConn
await secioConn.write(remoteNonce)
var res = await secioConn.readMessage()

View File

@ -79,10 +79,13 @@ method readMessage*(
method getWrapped*(s: SecureConn): Connection = s.stream
method handshake*(s: Secure,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]): Future[SecureConn] {.async, base.} =
method handshake*(
s: Secure,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]
): Future[SecureConn] {.async: (raises: [
CancelledError, LPStreamError], raw: true), base.} =
raiseAssert("Not implemented!")
proc handleConn(s: Secure,

View File

@ -128,7 +128,6 @@ proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} =
## returns true if the peer has one or more
## associated connections
##
peerId in s.connManager
proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe, public.} =
@ -136,62 +135,68 @@ proc disconnect*(s: Switch, peerId: PeerId): Future[void] {.gcsafe, public.} =
s.connManager.dropPeer(peerId)
method connect*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out): Future[void] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
): Future[void] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connects to a peer without opening a stream to it
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, dir)
method connect*(
s: Switch,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] =
s: Switch,
address: MultiAddress,
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [CancelledError, LPError], raw: true).} =
## Connects to a peer and retrieve its PeerId
##
## If the P2P part is missing from the MA and `allowUnknownPeerId` is set
## to true, this will discover the PeerId while connecting. This exposes
## you to MiTM attacks, so it shouldn't be used without care!
s.dialer.connect(address, allowUnknownPeerId)
method dial*(
s: Switch,
peerId: PeerId,
protos: seq[string]): Future[Connection] {.public.} =
s: Switch,
peerId: PeerId,
protos: seq[string]
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Open a stream to a connected peer with the specified `protos`
s.dialer.dial(peerId, protos)
proc dial*(s: Switch,
peerId: PeerId,
proto: string): Future[Connection] {.public.} =
proc dial*(
s: Switch,
peerId: PeerId,
proto: string
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Open a stream to a connected peer with the specified `proto`
dial(s, peerId, @[proto])
method dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connected to a peer and open a stream
## with the specified `protos`
s.dialer.dial(peerId, addrs, protos, forceDial)
proc dial*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
proto: string): Future[Connection] {.public.} =
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
proto: string
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), public.} =
## Connected to a peer and open a stream
## with the specified `proto`
dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)

View File

@ -71,40 +71,46 @@ proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] =
err("MultiAddress must be wire address (tcp, udp or unix): " & $ma)
proc connect*(
ma: MultiAddress,
bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil,
flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport]
{.raises: [LPError, MaInvalidAddress].} =
ma: MultiAddress,
bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil,
flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)
): Future[StreamTransport] {.async: (raises: [CancelledError, LPError]).} =
## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
##
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let transportAddress = initTAddress(ma).tryGet()
compilesOr:
return connect(transportAddress, bufferSize, child,
if localAddress.isSome(): initTAddress(localAddress.expect("just checked")).tryGet() else: TransportAddress(),
flags)
do:
# support for older chronos versions
return connect(transportAddress, bufferSize, child)
try:
compilesOr:
return await connect(transportAddress, bufferSize, child,
if localAddress.isSome():
initTAddress(localAddress.expect("just checked"))
.tryGet()
else:
TransportAddress(),
flags)
do:
# support for older chronos versions
return await connect(transportAddress, bufferSize, child)
except TransportError as exc:
raise newException(LPError, "Connect failed", exc)
proc createStreamServer*[T](ma: MultiAddress,
cbproc: StreamCallback,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer
{.raises: [LPError, MaInvalidAddress].} =
proc createStreamServer*[T](
ma: MultiAddress,
cbproc: StreamCallback2,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer {.raises: [LPError].} =
## Create new TCP stream server which bounds to ``ma`` address.
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
@ -120,21 +126,20 @@ proc createStreamServer*[T](ma: MultiAddress,
bufferSize,
child,
init)
except CatchableError as exc:
except TransportError as exc:
raise newException(LPError, exc.msg)
proc createStreamServer*[T](ma: MultiAddress,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer
{.raises: [LPError, MaInvalidAddress].} =
proc createStreamServer*[T](
ma: MultiAddress,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer {.raises: [LPError].} =
## Create new TCP stream server which bounds to ``ma`` address.
##
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
@ -148,11 +153,10 @@ proc createStreamServer*[T](ma: MultiAddress,
bufferSize,
child,
init)
except CatchableError as exc:
except TransportError as exc:
raise newException(LPError, exc.msg)
proc createAsyncSocket*(ma: MultiAddress): AsyncFD
{.raises: [ValueError, LPError].} =
proc createAsyncSocket*(ma: MultiAddress): AsyncFD {.raises: [LPError].} =
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
## protocol information.
##
@ -179,10 +183,7 @@ proc createAsyncSocket*(ma: MultiAddress): AsyncFD
else:
return asyncInvalidSocket
try:
createAsyncSocket(address.getDomain(), socktype, protocol)
except CatchableError as exc:
raise newException(LPError, exc.msg)
createAsyncSocket(address.getDomain(), socktype, protocol)
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool
{.raises: [LPError].} =