This commit is contained in:
Etan Kissling 2024-03-12 00:20:04 +01:00
parent 417922cd7a
commit 23acaece69
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
19 changed files with 289 additions and 247 deletions

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -21,56 +21,60 @@ type
Dial* = ref object of RootObj
method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out) {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false,
reuseConnection = true,
dir = Direction.Out
) {.async: (raises: [CancelledError, LPError], raw: true), base.} =
## connect remote peer without negotiating
## a protocol
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method connect*(
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false): Future[PeerId] {.async, base.} =
self: Dial,
address: MultiAddress,
allowUnknownPeerId = false
): Future[PeerId] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## Connects to a peer and retrieve its PeerId
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
protos: seq[string],
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream over an
## existing connection
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method dial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false): Future[Connection] {.async, base.} =
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string],
forceDial = false
): Future[Connection] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
method addTransport*(
self: Dial,
transport: Transport) {.base.} =
doAssert(false, "Not implemented!")
self: Dial,
transport: Transport) {.base.} =
raiseAssert("Not implemented!")
method tryDial*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]): Future[Opt[MultiAddress]] {.async, base.} =
doAssert(false, "Not implemented!")
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress]
): Future[Opt[MultiAddress]] {.async: (raises: [
CancelledError, LPError], raw: true), base.} =
raiseAssert("Not implemented!")

View File

@ -163,15 +163,15 @@ proc tryReusingConnection(self: Dialer, peerId: PeerId): Opt[Muxer] =
return Opt.some(muxer)
proc internalConnect(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool,
reuseConnection = true,
dir = Direction.Out):
Future[Muxer] {.async.} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
raise newException(DialFailedError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId.get(default(PeerId)), newAsyncLock())

View File

@ -140,7 +140,8 @@ proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[voi
proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T =
let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
let msg = AutonatMsg.decode(await conn.readLp(1024)).valueOr:
raise newException(AutonatError, "Received malformed message")

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -19,7 +19,6 @@ import ../../protocol,
../../../switch,
../../../utils/future
export DcutrError
export chronicles
type Dcutr* = ref object of LPProtocol
@ -29,7 +28,8 @@ logScope:
proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDialableAddrs = 8): T =
proc handleStream(stream: Connection, proto: string) {.async.} =
proc handleStream(
stream: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var peerDialableAddrs: seq[MultiAddress]
try:
let connectMsg = DcutrMsg.decode(await stream.readLp(1024))
@ -65,14 +65,14 @@ proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDi
except CancelledError as err:
raise err
except AllFuturesFailedError as err:
debug "Dcutr receiver could not connect to the remote peer, all connect attempts failed", peerDialableAddrs, msg = err.msg
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts failed", err)
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts failed", peerDialableAddrs, msg = err.msg
except AsyncTimeoutError as err:
debug "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", peerDialableAddrs, msg = err.msg
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", err)
debug "Dcutr receiver could not connect to the remote peer, " &
"all connect attempts timed out", peerDialableAddrs, msg = err.msg
except CatchableError as err:
warn "Unexpected error when Dcutr receiver tried to connect to the remote peer", msg = err.msg
raise newException(DcutrError, "Unexpected error when Dcutr receiver tried to connect to the remote peer", err)
warn "Unexpected error when Dcutr receiver tried to connect " &
"to the remote peer", msg = err.msg
let self = T()
self.handler = handleStream

View File

@ -266,7 +266,8 @@ proc new*(T: typedesc[RelayClient], canHop: bool = false,
maxCircuitPerPeer: maxCircuitPerPeer,
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
case proto:
of RelayV1Codec: await cl.handleStreamV1(conn)

View File

@ -336,7 +336,8 @@ proc new*(T: typedesc[Relay],
msgSize: msgSize,
isCircuitRelayV1: circuitRelayV1)
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
case proto:
of RelayV2HopCodec: await r.handleHopStreamV2(conn)

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -45,14 +45,14 @@ type
IdentifyNoPubKeyError* = object of IdentifyError
IdentifyInfo* {.public.} = object
pubkey*: Option[PublicKey]
pubkey*: Opt[PublicKey]
peerId*: PeerId
addrs*: seq[MultiAddress]
observedAddr*: Option[MultiAddress]
protoVersion*: Option[string]
agentVersion*: Option[string]
observedAddr*: Opt[MultiAddress]
protoVersion*: Opt[string]
agentVersion*: Opt[string]
protos*: seq[string]
signedPeerRecord*: Option[Envelope]
signedPeerRecord*: Opt[Envelope]
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
@ -60,10 +60,9 @@ type
observedAddrManager*: ObservedAddrManager
IdentifyPushHandler* = proc (
peer: PeerId,
newInfo: IdentifyInfo):
Future[void]
{.gcsafe, raises: [], public.}
peer: PeerId,
newInfo: IdentifyInfo
): Future[void] {.async: (raises: [CancelledError]), public.}
IdentifyPush* = ref object of LPProtocol
identifyHandler: IdentifyPushHandler
@ -81,8 +80,10 @@ chronicles.expandIt(IdentifyInfo):
if it.signedPeerRecord.isSome(): "Some"
else: "None"
proc encodeMsg(peerInfo: PeerInfo, observedAddr: Opt[MultiAddress], sendSpr: bool): ProtoBuffer
{.raises: [].} =
proc encodeMsg(
peerInfo: PeerInfo,
observedAddr: Opt[MultiAddress],
sendSpr: bool): ProtoBuffer =
result = initProtoBuffer()
let pkey = peerInfo.publicKey
@ -121,27 +122,26 @@ proc decodeMsg*(buf: seq[byte]): Opt[IdentifyInfo] =
var pb = initProtoBuffer(buf)
if ? pb.getField(1, pubkey).toOpt():
iinfo.pubkey = some(pubkey)
iinfo.pubkey = Opt.some(pubkey)
if ? pb.getField(8, signedPeerRecord).toOpt() and
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = some(signedPeerRecord.envelope)
pubkey == signedPeerRecord.envelope.publicKey:
iinfo.signedPeerRecord = Opt.some(signedPeerRecord.envelope)
discard ? pb.getRepeatedField(2, iinfo.addrs).toOpt()
discard ? pb.getRepeatedField(3, iinfo.protos).toOpt()
if ? pb.getField(4, oaddr).toOpt():
iinfo.observedAddr = some(oaddr)
iinfo.observedAddr = Opt.some(oaddr)
if ? pb.getField(5, protoVersion).toOpt():
iinfo.protoVersion = some(protoVersion)
iinfo.protoVersion = Opt.some(protoVersion)
if ? pb.getField(6, agentVersion).toOpt():
iinfo.agentVersion = some(agentVersion)
iinfo.agentVersion = Opt.some(agentVersion)
Opt.some(iinfo)
proc new*(
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new(),
): T =
T: typedesc[Identify],
peerInfo: PeerInfo,
sendSignedPeerRecord = false,
observedAddrManager = ObservedAddrManager.new()): T =
let identify = T(
peerInfo: peerInfo,
sendSignedPeerRecord: sendSignedPeerRecord,
@ -151,14 +151,15 @@ proc new*(
identify
method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
trace "handling identify request", conn
var pb = encodeMsg(p.peerInfo, conn.observedAddr, p.sendSignedPeerRecord)
await conn.writeLp(pb.buffer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in identify handler", exc = exc.msg, conn
finally:
trace "exiting identify handler", conn
@ -167,36 +168,46 @@ method init*(p: Identify) =
p.handler = handle
p.codec = IdentifyCodec
proc identify*(self: Identify,
conn: Connection,
remotePeerId: PeerId): Future[IdentifyInfo] {.async.} =
proc identify*(
self: Identify,
conn: Connection,
remotePeerId: PeerId
): Future[IdentifyInfo] {.async: (raises: [
CancelledError, IdentifyError, LPStreamError]).} =
trace "initiating identify", conn
var message = await conn.readLp(64*1024)
if len(message) == 0:
trace "identify: Empty message received!", conn
raise newException(IdentityInvalidMsgError, "Empty message received!")
raise (ref IdentityInvalidMsgError)(msg: "Empty message received!")
var info = decodeMsg(message).valueOr: raise newException(IdentityInvalidMsgError, "Incorrect message received!")
var info = decodeMsg(message).valueOr:
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify: decoded message", conn, info
let
pubkey = info.pubkey.valueOr: raise newException(IdentityInvalidMsgError, "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr: raise newException(IdentityInvalidMsgError, $error)
pubkey = info.pubkey.valueOr:
raise (ref IdentityInvalidMsgError)(msg: "No pubkey in identify")
peer = PeerId.init(pubkey).valueOr:
raise (ref IdentityInvalidMsgError)(msg: $error)
if peer != remotePeerId:
trace "Peer ids don't match", remote = peer, local = remotePeerId
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
info.peerId = peer
info.observedAddr.withValue(observed):
# Currently, we use the ObservedAddrManager only to find our dialable external NAT address. Therefore, addresses
# Currently, we use the ObservedAddrManager only to find our
# dialable external NAT address. Therefore, addresses
# like "...\p2p-circuit\p2p\..." and "\p2p\..." are not useful to us.
if observed.contains(multiCodec("p2p-circuit")).get(false) or P2PPattern.matchPartial(observed):
if observed.contains(multiCodec("p2p-circuit")).get(false) or
P2PPattern.matchPartial(observed):
trace "Not adding address to ObservedAddrManager.", observed
elif not self.observedAddrManager.addObservation(observed):
trace "Observed address is not valid.", observedAddr = observed
return info
proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} =
proc new*(
T: typedesc[IdentifyPush],
handler: IdentifyPushHandler = nil): T {.public.} =
## Create a IdentifyPush protocol. `handler` will be called every time
## a peer sends us new `PeerInfo`
let identifypush = T(identifyHandler: handler)
@ -204,27 +215,28 @@ proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.pu
identifypush
proc init*(p: IdentifyPush) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
trace "handling identify push", conn
try:
var message = await conn.readLp(64*1024)
var identInfo = decodeMsg(message).valueOr:
raise newException(IdentityInvalidMsgError, "Incorrect message received!")
raise (ref IdentityInvalidMsgError)(msg: "Incorrect message received!")
debug "identify push: decoded message", conn, identInfo
identInfo.pubkey.withValue(pubkey):
let receivedPeerId = PeerId.init(pubkey).tryGet()
if receivedPeerId != conn.peerId:
raise newException(IdentityNoMatchError, "Peer ids don't match")
raise (ref IdentityNoMatchError)(msg: "Peer ids don't match")
identInfo.peerId = receivedPeerId
trace "triggering peer event", peerInfo = conn.peerId
if not isNil(p.identifyHandler):
if p.identifyHandler != nil:
await p.identifyHandler(conn.peerId, identInfo)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPError as exc:
info "exception in identify push handler", exc = exc.msg, conn
finally:
trace "exiting identify push handler", conn
@ -233,7 +245,11 @@ proc init*(p: IdentifyPush) =
p.handler = handle
p.codec = IdentifyPushCodec
proc push*(p: IdentifyPush, peerInfo: PeerInfo, conn: Connection) {.async, public.} =
proc push*(
p: IdentifyPush,
peerInfo: PeerInfo,
conn: Connection
) {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Send new `peerInfo`s to a connection
var pb = encodeMsg(peerInfo, conn.observedAddr, true)
await conn.writeLp(pb.buffer)

View File

@ -27,7 +27,8 @@ type Perf* = ref object of LPProtocol
proc new*(T: typedesc[Perf]): T {.public.} =
var p = T()
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
var bytesRead = 0
try:
trace "Received benchmark performance check", conn

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -36,44 +36,45 @@ type
PingError* = object of LPError
WrongPingAckError* = object of PingError
PingHandler* {.public.} = proc (
peer: PeerId):
Future[void]
{.gcsafe, raises: [].}
PingHandler* {.public.} =
proc (peer: PeerId): Future[void] {.async: (raises: [CancelledError]).}
Ping* = ref object of LPProtocol
pingHandler*: PingHandler
rng: ref HmacDrbgContext
proc new*(T: typedesc[Ping], handler: PingHandler = nil, rng: ref HmacDrbgContext = newRng()): T {.public.} =
proc new*(
T: typedesc[Ping],
handler: PingHandler = nil,
rng: ref HmacDrbgContext = newRng()): T {.public.} =
let ping = Ping(pinghandler: handler, rng: rng)
ping.init()
ping
method init*(p: Ping) =
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
trace "handling ping", conn
var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn
await conn.write(@buf)
if not isNil(p.pingHandler):
if p.pingHandler != nil:
await p.pingHandler(conn.peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in ping handler", exc = exc.msg, conn
p.handler = handle
p.codec = PingCodec
proc ping*(
p: Ping,
conn: Connection,
): Future[Duration] {.async, public.} =
p: Ping,
conn: Connection
): Future[Duration] {.async: (raises: [CancelledError, LPError]), public.} =
## Sends ping to `conn`, returns the delay
trace "initiating ping", conn
var

View File

@ -163,7 +163,8 @@ method rpcHandler*(f: FloodSub,
f.updateMetrics(rpcMsg)
method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
proc handler(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...

View File

@ -138,7 +138,8 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
ok()
method init*(g: GossipSub) =
proc handler(conn: Connection, proto: string) {.async.} =
proc handler(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## main protocol handler that gets triggered on every
## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc...

View File

@ -360,7 +360,7 @@ proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] =
method handleConn*(p: PubSub,
conn: Connection,
proto: string) {.base, async.} =
proto: string) {.base, async: (raises: [CancelledError]).} =
## handle incoming connections
##
## this proc will:

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -162,49 +162,40 @@ proc encode(msg: Message): ProtoBuffer =
proc decode(_: typedesc[Cookie], buf: seq[byte]): Opt[Cookie] =
var c: Cookie
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, c.offset)
r2 = pb.getRequiredField(2, c.ns)
if r1.isErr() or r2.isErr(): return Opt.none(Cookie)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, c.offset).toOpt()
? pb.getRequiredField(2, c.ns).toOpt()
Opt.some(c)
proc decode(_: typedesc[Register], buf: seq[byte]): Opt[Register] =
var
r: Register
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, r.ns)
r2 = pb.getRequiredField(2, r.signedPeerRecord)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr(): return Opt.none(Register)
if r3.get(false): r.ttl = Opt.some(ttl)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, r.ns).toOpt()
? pb.getRequiredField(2, r.signedPeerRecord).toOpt()
if ? pb.getField(3, ttl).toOpt(): r.ttl = Opt.some(ttl)
Opt.some(r)
proc decode(_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] =
proc decode(
_: typedesc[RegisterResponse], buf: seq[byte]): Opt[RegisterResponse] =
var
rr: RegisterResponse
statusOrd: uint
text: string
ttl: uint64
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, statusOrd)
r2 = pb.getField(2, text)
r3 = pb.getField(3, ttl)
if r1.isErr() or r2.isErr() or r3.isErr() or
not checkedEnumAssign(rr.status, statusOrd): return Opt.none(RegisterResponse)
if r2.get(false): rr.text = Opt.some(text)
if r3.get(false): rr.ttl = Opt.some(ttl)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, statusOrd).toOpt()
if not checkedEnumAssign(rr.status, statusOrd):
return Opt.none(RegisterResponse)
if ? pb.getField(2, text).toOpt(): rr.text = Opt.some(text)
if ? pb.getField(3, ttl).toOpt(): rr.ttl = Opt.some(ttl)
Opt.some(rr)
proc decode(_: typedesc[Unregister], buf: seq[byte]): Opt[Unregister] =
var u: Unregister
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, u.ns)
if r1.isErr(): return Opt.none(Unregister)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, u.ns).toOpt()
Opt.some(u)
proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
@ -212,38 +203,29 @@ proc decode(_: typedesc[Discover], buf: seq[byte]): Opt[Discover] =
d: Discover
limit: uint64
cookie: seq[byte]
let
pb = initProtoBuffer(buf)
r1 = pb.getRequiredField(1, d.ns)
r2 = pb.getField(2, limit)
r3 = pb.getField(3, cookie)
if r1.isErr() or r2.isErr() or r3.isErr: return Opt.none(Discover)
if r2.get(false): d.limit = Opt.some(limit)
if r3.get(false): d.cookie = Opt.some(cookie)
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, d.ns).toOpt()
if ? pb.getField(2, limit).toOpt(): d.limit = Opt.some(limit)
if ? pb.getField(3, cookie).toOpt(): d.cookie = Opt.some(cookie)
Opt.some(d)
proc decode(_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] =
proc decode(
_: typedesc[DiscoverResponse], buf: seq[byte]): Opt[DiscoverResponse] =
var
dr: DiscoverResponse
registrations: seq[seq[byte]]
cookie: seq[byte]
statusOrd: uint
text: string
let
pb = initProtoBuffer(buf)
r1 = pb.getRepeatedField(1, registrations)
r2 = pb.getField(2, cookie)
r3 = pb.getRequiredField(3, statusOrd)
r4 = pb.getField(4, text)
if r1.isErr() or r2.isErr() or r3.isErr or r4.isErr() or
not checkedEnumAssign(dr.status, statusOrd): return Opt.none(DiscoverResponse)
let pb = initProtoBuffer(buf)
discard ? pb.getRepeatedField(1, registrations).toOpt()
for reg in registrations:
var r: Register
let regOpt = Register.decode(reg).valueOr:
return
dr.registrations.add(regOpt)
if r2.get(false): dr.cookie = Opt.some(cookie)
if r4.get(false): dr.text = Opt.some(text)
dr.registrations.add(? Register.decode(reg))
if ? pb.getField(2, cookie).toOpt(): dr.cookie = Opt.some(cookie)
? pb.getRequiredField(3, statusOrd).toOpt()
if not checkedEnumAssign(dr.status, statusOrd):
return Opt.none(DiscoverResponse)
if ? pb.getField(4, text).toOpt(): dr.text = Opt.some(text)
Opt.some(dr)
proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
@ -252,33 +234,31 @@ proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
statusOrd: uint
pbr, pbrr, pbu, pbd, pbdr: ProtoBuffer
let pb = initProtoBuffer(buf)
? pb.getRequiredField(1, statusOrd).toOpt
? pb.getRequiredField(1, statusOrd).toOpt()
if not checkedEnumAssign(msg.msgType, statusOrd): return Opt.none(Message)
if ? pb.getField(2, pbr).optValue:
if ? pb.getField(2, pbr).toOpt():
msg.register = Register.decode(pbr.buffer)
if msg.register.isNone(): return Opt.none(Message)
if ? pb.getField(3, pbrr).optValue:
if ? pb.getField(3, pbrr).toOpt():
msg.registerResponse = RegisterResponse.decode(pbrr.buffer)
if msg.registerResponse.isNone(): return Opt.none(Message)
if ? pb.getField(4, pbu).optValue:
if ? pb.getField(4, pbu).toOpt():
msg.unregister = Unregister.decode(pbu.buffer)
if msg.unregister.isNone(): return Opt.none(Message)
if ? pb.getField(5, pbd).optValue:
if ? pb.getField(5, pbd).toOpt():
msg.discover = Discover.decode(pbd.buffer)
if msg.discover.isNone(): return Opt.none(Message)
if ? pb.getField(6, pbdr).optValue:
if ? pb.getField(6, pbdr).toOpt():
msg.discoverResponse = DiscoverResponse.decode(pbdr.buffer)
if msg.discoverResponse.isNone(): return Opt.none(Message)
Opt.some(msg)
type
RendezVousError* = object of LPError
RegisteredData = object
@ -297,7 +277,7 @@ type
rng: ref HmacDrbgContext
salt: string
defaultDT: Moment
registerDeletionLoop: Future[void]
registerDeletionLoop: Future[void].Raising([])
#registerEvent: AsyncEvent # TODO: to raise during the heartbeat
# + make the heartbeat sleep duration "smarter"
sema: AsyncSemaphore
@ -312,41 +292,61 @@ proc checkPeerRecord(spr: seq[byte], peerId: PeerId): Result[void, string] =
return err("Bad Peer ID")
return ok()
proc sendRegisterResponse(conn: Connection,
ttl: uint64) {.async.} =
proc sendRegisterResponse(
conn: Connection,
ttl: uint64
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: Ok, ttl: Opt.some(ttl)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(
status: Ok,
ttl: Opt.some(ttl)
))
))
conn.writeLp(msg.buffer)
proc sendRegisterResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
proc sendRegisterResponseError(
conn: Connection,
status: ResponseStatus,
text: string = ""
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.RegisterResponse,
registerResponse: Opt.some(RegisterResponse(
status: status,
text: Opt.some(text)
))
))
conn.writeLp(msg.buffer)
proc sendDiscoverResponse(conn: Connection,
s: seq[Register],
cookie: Cookie) {.async.} =
proc sendDiscoverResponse(
conn: Connection,
s: seq[Register],
cookie: Cookie
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)
))
))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: Ok,
registrations: s,
cookie: Opt.some(cookie.encode().buffer)
))
))
conn.writeLp(msg.buffer)
proc sendDiscoverResponseError(conn: Connection,
status: ResponseStatus,
text: string = "") {.async.} =
proc sendDiscoverResponseError(
conn: Connection,
status: ResponseStatus,
text: string = ""
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let msg = encode(Message(
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(status: status, text: Opt.some(text)))))
await conn.writeLp(msg.buffer)
msgType: MessageType.DiscoverResponse,
discoverResponse: Opt.some(DiscoverResponse(
status: status,
text: Opt.some(text)
))
))
conn.writeLp(msg.buffer)
proc countRegister(rdv: RendezVous, peerId: PeerId): int =
let n = Moment.now()
@ -360,9 +360,8 @@ proc save(rdv: RendezVous,
r: Register,
update: bool = true) =
let nsSalted = ns & rdv.salt
discard rdv.namespaces.hasKeyOrPut(nsSalted, newSeq[int]())
try:
for index in rdv.namespaces[nsSalted]:
for index in rdv.namespaces.mgetOrPut(nsSalted, newSeq[int]()):
if rdv.registered[index].peerId == peerId:
if update == false: return
rdv.registered[index].expiration = rdv.defaultDT
@ -376,9 +375,14 @@ proc save(rdv: RendezVous,
rdv.namespaces[nsSalted].add(rdv.registered.high)
# rdv.registerEvent.fire()
except KeyError:
doAssert false, "Should have key"
raiseAssert("Should have key")
proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
proc register(
rdv: RendezVous,
conn: Connection,
r: Register
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
trace "Received Register", peerId = conn.peerId, ns = r.ns
libp2p_rendezvous_register.inc()
if r.ns.len notin 1..255:
@ -390,7 +394,8 @@ proc register(rdv: RendezVous, conn: Connection, r: Register): Future[void] =
if pr.isErr():
return conn.sendRegisterResponseError(InvalidSignedPeerRecord, pr.error())
if rdv.countRegister(conn.peerId) >= RegistrationLimitPerPeer:
return conn.sendRegisterResponseError(NotAuthorized, "Registration limit reached")
return conn.sendRegisterResponseError(
NotAuthorized, "Registration limit reached")
rdv.save(r.ns, conn.peerId, r)
libp2p_rendezvous_registered.inc()
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))
@ -407,12 +412,15 @@ proc unregister(rdv: RendezVous, conn: Connection, u: Unregister) =
except KeyError:
return
proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
proc discover(
rdv: RendezVous,
conn: Connection,
d: Discover
) {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
trace "Received Discover", peerId = conn.peerId, ns = d.ns
libp2p_rendezvous_discover.inc()
if d.ns.len notin 0..255:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
var limit = min(DiscoverLimit, d.limit.get(DiscoverLimit))
var
cookie =
@ -420,11 +428,10 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
try:
Cookie.decode(d.cookie.tryGet()).tryGet()
except CatchableError:
await conn.sendDiscoverResponseError(InvalidCookie)
return
return conn.sendDiscoverResponseError(InvalidCookie)
else: Cookie(offset: rdv.registered.low().uint64 - 1)
if cookie.ns != d.ns or
cookie.offset notin rdv.registered.low().uint64..rdv.registered.high().uint64:
if cookie.ns != d.ns or cookie.offset notin
rdv.registered.low().uint64 .. rdv.registered.high().uint64:
cookie = Cookie(offset: rdv.registered.low().uint64 - 1)
let
nsSalted = d.ns & rdv.salt
@ -433,31 +440,30 @@ proc discover(rdv: RendezVous, conn: Connection, d: Discover) {.async.} =
try:
rdv.namespaces[nsSalted]
except KeyError:
await conn.sendDiscoverResponseError(InvalidNamespace)
return
return conn.sendDiscoverResponseError(InvalidNamespace)
else: toSeq(cookie.offset.int..rdv.registered.high())
if namespaces.len() == 0:
await conn.sendDiscoverResponse(@[], Cookie())
return
return conn.sendDiscoverResponse(@[], Cookie())
var offset = namespaces[^1]
let n = Moment.now()
var s = collect(newSeq()):
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset: continue
limit.dec()
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
for index in namespaces:
var reg = rdv.registered[index]
if limit == 0:
offset = index
break
if reg.expiration < n or index.uint64 <= cookie.offset: continue
limit.dec()
reg.data.ttl = Opt.some((reg.expiration - Moment.now()).seconds.uint64)
reg.data
rdv.rng.shuffle(s)
await conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
conn.sendDiscoverResponse(s, Cookie(offset: offset.uint64, ns: d.ns))
proc advertisePeer(rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async.} =
proc advertiseWrap() {.async.} =
proc advertisePeer(
rdv: RendezVous,
peer: PeerId,
msg: seq[byte]) {.async: (raises: [CancelledError]).} =
proc advertiseWrap() {.async: (raises: []).} =
try:
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer: await conn.close()
@ -471,12 +477,14 @@ proc advertisePeer(rdv: RendezVous,
trace "Refuse to register", peer, response = msgRecv.registerResponse
else:
trace "Successfully registered", peer, response = msgRecv.registerResponse
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in the advertise", error = exc.msg
finally:
rdv.sema.release()
await rdv.sema.acquire()
discard await advertiseWrap().withTimeout(5.seconds)
defer: rdv.sema.release()
try:
await advertiseWrap().wait(5.seconds)
except AsyncTimeoutError:
discard
method advertise*(rdv: RendezVous,
ns: string,
@ -636,7 +644,8 @@ proc new*(T: typedesc[RendezVous],
sema: newAsyncSemaphore(SemaphoreDefaultSize)
)
logScope: topics = "libp2p discovery rendezvous"
proc handleStream(conn: Connection, proto: string) {.async.} =
proc handleStream(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
try:
let
buf = await conn.readLp(4096)
@ -651,7 +660,7 @@ proc new*(T: typedesc[RendezVous],
trace "Got an unexpected Discover Response", response = msg.discoverResponse
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in rendezvous handler", error = exc.msg
finally:
await conn.close()

View File

@ -18,9 +18,10 @@ type
PlainText* = ref object of Secure
method init(p: PlainText) {.gcsafe.} =
proc handle(conn: Connection, proto: string)
{.async.} = discard
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
## plain text doesn't do anything
discard
p.codec = PlainTextCodec
p.handler = handle

View File

@ -134,7 +134,8 @@ proc handleConn(
method init*(s: Secure) =
procCall LPProtocol(s).init()
proc handle(conn: Connection, proto: string) {.async.} =
proc handle(
conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
trace "handling connection upgrade", proto, conn
try:
# We don't need the result but we

View File

@ -1,5 +1,5 @@
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -32,19 +32,19 @@ proc tryAcquire*(s: AsyncSemaphore): bool =
## Attempts to acquire a resource, if successful
## returns true, otherwise false
##
if s.count > 0 and s.queue.len == 0:
s.count.dec
trace "Acquired slot", available = s.count, queue = s.queue.len
return true
proc acquire*(s: AsyncSemaphore): Future[void] =
proc acquire*(
s: AsyncSemaphore
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
## Acquire a resource and decrement the resource
## counter. If no more resources are available,
## the returned future will not complete until
## the resource count goes above 0.
##
let fut = newFuture[void]("AsyncSemaphore.acquire")
if s.tryAcquire():
fut.complete()
@ -75,7 +75,6 @@ proc release*(s: AsyncSemaphore) =
## and completing it and incrementing the
## internal resource count
##
doAssert(s.count <= s.size)
if s.count < s.size:

BIN
tests/testdaemon Executable file

Binary file not shown.

View File

@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -164,9 +164,13 @@ suite "Identify":
switch1 = newStandardSwitch(sendSignedPeerRecord=true)
switch2 = newStandardSwitch(sendSignedPeerRecord=true)
proc updateStore1(peerId: PeerId, info: IdentifyInfo) {.async.} =
proc updateStore1(
peerId: PeerId,
info: IdentifyInfo) {.async: (raises: [CancelledError]).} =
switch1.peerStore.updatePeerInfo(info)
proc updateStore2(peerId: PeerId, info: IdentifyInfo) {.async.} =
proc updateStore2(
peerId: PeerId,
info: IdentifyInfo) {.async: (raises: [CancelledError]).} =
switch2.peerStore.updatePeerInfo(info)
identifyPush1 = IdentifyPush.new(updateStore1)

View File

@ -42,7 +42,8 @@ suite "Ping":
transport1 = TcpTransport.new(upgrade = Upgrade())
transport2 = TcpTransport.new(upgrade = Upgrade())
proc handlePing(peer: PeerId) {.async, closure.} =
proc handlePing(
peer: PeerId) {.async: (raises: [CancelledError]), closure.} =
inc pingReceivedCount
pingProto1 = Ping.new()
pingProto2 = Ping.new(handlePing)