add raises annotations to async procs

This commit is contained in:
Dmitriy Ryajov 2021-03-18 16:00:23 -06:00
parent f1ab47d9ee
commit f4df5d43e4
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
48 changed files with 1070 additions and 833 deletions

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[options, tables, sequtils, sets]
import chronos, chronicles, metrics
import peerinfo,
@ -27,7 +29,8 @@ const
type
TooManyConnectionsError* = object of LPError
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
ConnProvider* = proc(): Future[Connection]
{.gcsafe, closure, raises: [Defect].}
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
@ -45,7 +48,8 @@ type
discard
ConnEventHandler* =
proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.}
proc(peerId: PeerID, event: ConnEvent): Future[void]
{.gcsafe, raises: [Defect].}
PeerEventKind* {.pure.} = enum
Left,
@ -105,15 +109,34 @@ proc addConnEventHandler*(c: ConnManager,
## Add peer event handler - handlers must not raise exceptions!
##
if isNil(handler): return
c.connEvents.mgetOrPut(kind,
initOrderedSet[ConnEventHandler]()).incl(handler)
try:
if isNil(handler): return
c.connEvents.mgetOrPut(kind,
initOrderedSet[ConnEventHandler]()).incl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Not sure what to do with it here, it seems
# like we should just quit right away because
# there is no way of telling what happened
raiseAssert exc.msg
proc removeConnEventHandler*(c: ConnManager,
handler: ConnEventHandler,
kind: ConnEventKind) =
c.connEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
try:
c.connEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Not sure what to do with it here, it seems
# like we should just quit right away because
# there is no way of telling what happened
raiseAssert exc.msg
proc triggerConnEvent*(c: ConnManager,
peerId: PeerID,
@ -139,15 +162,33 @@ proc addPeerEventHandler*(c: ConnManager,
## Add peer event handler - handlers must not raise exceptions!
##
if isNil(handler): return
c.peerEvents.mgetOrPut(kind,
initOrderedSet[PeerEventHandler]()).incl(handler)
try:
if isNil(handler): return
c.peerEvents.mgetOrPut(kind,
initOrderedSet[PeerEventHandler]()).incl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Not sure what to do with it here, it seems
# like we should just quit right away because
# there is no way of telling what happened
raiseAssert exc.msg
proc removePeerEventHandler*(c: ConnManager,
handler: PeerEventHandler,
kind: PeerEventKind) =
c.peerEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
try:
c.peerEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Not sure what to do with it here, it seems
# like we should just quit right away because
# there is no way of telling what happened
raiseAssert exc.msg
proc triggerPeerEvents*(c: ConnManager,
peerId: PeerID,
@ -169,8 +210,11 @@ proc triggerPeerEvents*(c: ConnManager,
trace "triggering peer events", peerId, event = $event
var peerEvents: seq[Future[void]]
for h in c.peerEvents[event.kind]:
peerEvents.add(h(peerId, event))
try:
for h in c.peerEvents[event.kind]:
peerEvents.add(h(peerId, event))
except Exception as exc:
raiseAssert exc.msg
checkFutures(await allFinished(peerEvents))
except CancelledError as exc:
@ -209,7 +253,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
if conn notin c.muxed:
return
return muxer == c.muxed[conn].muxer
return muxer == c.muxed.getOrDefault(conn).muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "Cleaning up muxer", m = muxerHolder.muxer
@ -225,9 +269,10 @@ proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
proc delConn(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId
if peerId in c.conns:
c.conns[peerId].excl(conn)
c.conns.withValue(peerId, conns):
conns[].excl(conn)
if c.conns[peerId].len == 0:
if c.conns.getOrDefault(peerId).len <= 0:
c.conns.del(peerId)
libp2p_peers.set(c.conns.len.int64)
@ -342,22 +387,23 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
return
if conn in c.muxed:
return c.muxed[conn].muxer
return c.muxed.getOrDefault(conn).muxer
else:
debug "no muxer for connection", conn
proc storeConn*(c: ConnManager, conn: Connection) =
proc storeConn*(c: ConnManager, conn: Connection)
{.raises: [Defect, LPError].} =
## store a connection
##
if isNil(conn):
raise newException(CatchableError, "Connection cannot be nil")
raise newException(LPError, "Connection cannot be nil")
if conn.closed or conn.atEof:
raise newException(CatchableError, "Connection closed or EOF")
raise newException(LPStreamEOFError, "Connection closed or EOF")
if isNil(conn.peerInfo):
raise newException(CatchableError, "Empty peer info")
raise newException(LPError, "Empty peer info")
let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConnsPerPeer:
@ -369,7 +415,10 @@ proc storeConn*(c: ConnManager, conn: Connection) =
if peerId notin c.conns:
c.conns[peerId] = initHashSet[Connection]()
c.conns[peerId].incl(conn)
c.conns.mgetOrPut(peerId,
initHashSet[Connection]()).incl(conn)
# c.conns.getOrDefault(peerId).incl(conn)
libp2p_peers.set(c.conns.len.int64)
# Launch on close listener
@ -463,18 +512,18 @@ proc trackOutgoingConn*(c: ConnManager,
proc storeMuxer*(c: ConnManager,
muxer: Muxer,
handle: Future[void] = nil) =
handle: Future[void] = nil) {.raises: [Defect, LPError].} =
## store the connection and muxer
##
if isNil(muxer):
raise newException(CatchableError, "muxer cannot be nil")
raise newException(LPError, "muxer cannot be nil")
if isNil(muxer.connection):
raise newException(CatchableError, "muxer's connection cannot be nil")
raise newException(LPError, "muxer's connection cannot be nil")
if muxer.connection notin c:
raise newException(CatchableError, "cant add muxer for untracked connection")
raise newException(LPError, "cant add muxer for untracked connection")
c.muxed[muxer.connection] = MuxerHolder(
muxer: muxer,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
## This module implementes API for `go-libp2p-daemon`.
import std/[os, osproc, strutils, tables, strtabs]
import chronos, chronicles
@ -147,10 +149,10 @@ type
key*: PublicKey
P2PStreamCallback* = proc(api: DaemonAPI,
stream: P2PStream): Future[void] {.gcsafe.}
stream: P2PStream): Future[void] {.gcsafe, raises: [Defect].}
P2PPubSubCallback* = proc(api: DaemonAPI,
ticket: PubsubTicket,
message: PubSubMessage): Future[bool] {.gcsafe.}
message: PubSubMessage): Future[bool] {.gcsafe, raises: [Defect].}
DaemonError* = object of LPError
DaemonRemoteError* = object of DaemonError
@ -468,7 +470,8 @@ proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} =
else:
result = ResponseKind.Error
proc getErrorMessage(pb: var ProtoBuffer): string {.inline.} =
proc getErrorMessage(pb: var ProtoBuffer): string
{.inline, raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == cast[int](ResponseType.ERROR):
if pb.getString(1, result) == -1:
raise newException(DaemonLocalError, "Error message is missing!")
@ -570,7 +573,8 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
gossipsubHeartbeatDelay = 0,
peersRequired = 2,
logFile = "",
logLevel = IpfsLogLevel.Debug): Future[DaemonAPI] {.async.} =
logLevel = IpfsLogLevel.Debug): Future[DaemonAPI]
{.async, raises: [Defect, DaemonLocalError].} =
## Initialize connection to `go-libp2p-daemon` control socket.
##
## ``flags`` - set of P2PDaemonFlags.
@ -750,7 +754,11 @@ proc newDaemonApi*(flags: set[P2PDaemonFlags] = {},
# Starting daemon process
# echo "Starting ", cmd, " ", args.join(" ")
api.process = startProcess(cmd, "", args, env, {poParentStreams})
try:
api.process = startProcess(cmd, "", args, env, {poParentStreams})
except Exception as exc:
raiseAssert(exc.msg)
# Waiting until daemon will not be bound to control socket.
while true:
if not api.process.running():
@ -826,7 +834,8 @@ proc transactMessage(transp: StreamTransport,
raise newException(DaemonLocalError, "Incorrect or empty message received!")
result = initProtoBuffer(message)
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
## Get PeerInfo object from ``pb``.
result.addresses = newSeq[MultiAddress]()
if pb.getValue(1, result.peer) == -1:
@ -835,7 +844,11 @@ proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
while pb.getBytes(2, address) != -1:
if len(address) != 0:
var copyaddr = address
result.addresses.add(MultiAddress.init(copyaddr).tryGet())
let maRes = MultiAddress.init(copyaddr)
if maRes.isErr:
raise newException(DaemonLocalError, $maRes.error)
result.addresses.add(maRes.get())
address.setLen(0)
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
@ -875,7 +888,7 @@ proc disconnect*(api: DaemonAPI, peer: PeerID) {.async.} =
proc openStream*(api: DaemonAPI, peer: PeerID,
protocols: seq[string],
timeout = 0): Future[P2PStream] {.async.} =
timeout = 0): Future[P2PStream] {.async, raises: [Defect].} =
## Open new stream to peer ``peer`` using one of the protocols in
## ``protocols``. Returns ``StreamTransport`` for the stream.
var transp = await api.newConnection()
@ -901,7 +914,7 @@ proc openStream*(api: DaemonAPI, peer: PeerID,
result = stream
except Exception as exc:
await api.closeConnection(transp)
raise exc
raiseAssert exc.msg
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
var api = getUserData[DaemonAPI](server)
@ -922,7 +935,7 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
if len(stream.protocol) > 0:
var handler = api.handlers.getOrDefault(stream.protocol)
if not isNil(handler):
asyncCheck handler(api, stream)
asyncSpawn handler(api, stream)
proc addHandler*(api: DaemonAPI, protocols: seq[string],
handler: P2PStreamCallback) {.async.} =
@ -938,14 +951,13 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string],
protocols))
pb.withMessage() do:
api.servers.add(P2PServer(server: server, address: maddress))
except Exception as exc:
finally:
for item in protocols:
api.handlers.del(item)
server.stop()
server.close()
await server.join()
raise exc
finally:
await api.closeConnection(transp)
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
@ -997,26 +1009,31 @@ proc cmTrimPeers*(api: DaemonAPI) {.async.} =
finally:
await api.closeConnection(transp)
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo =
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == 2:
result = pb.getPeerInfo()
else:
raise newException(DaemonLocalError, "Missing required field `peer`!")
proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte] =
proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte]
{.raises: [Defect, DaemonLocalError].} =
result = newSeq[byte]()
if pb.getLengthValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey =
proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID =
proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} =
proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType)
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
var res = pb.enterSubmessage()
if res == cast[int](ResponseType.DHT):
@ -1027,12 +1044,14 @@ proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} =
else:
raise newException(DaemonLocalError, "Wrong message type!")
proc enterPsMessage(pb: var ProtoBuffer) {.inline.} =
proc enterPsMessage(pb: var ProtoBuffer)
{.inline, raises: [Defect, DaemonLocalError].} =
var res = pb.enterSubmessage()
if res != cast[int](ResponseType.PUBSUB):
raise newException(DaemonLocalError, "Wrong message type!")
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType {.inline.} =
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
if pb.getVarintValue(1, dtype) == 0:
raise newException(DaemonLocalError, "Missing required DHT field `type`!")
@ -1292,8 +1311,9 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
await ticket.transp.join()
break
proc pubsubSubscribe*(api: DaemonAPI, topic: string,
handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} =
proc pubsubSubscribe*(
api: DaemonAPI, topic: string,
handler: P2PPubSubCallback): Future[PubsubTicket] {.async.} =
## Subscribe to topic ``topic``.
var transp = await api.newConnection()
try:
@ -1303,11 +1323,11 @@ proc pubsubSubscribe*(api: DaemonAPI, topic: string,
ticket.topic = topic
ticket.handler = handler
ticket.transp = transp
asyncCheck pubsubLoop(api, ticket)
asyncSpawn pubsubLoop(api, ticket)
result = ticket
except Exception as exc:
await api.closeConnection(transp)
raise exc
raiseAssert exc.msg
proc shortLog*(pinfo: PeerInfo): string =
## Get string representation of ``PeerInfo`` object.

View File

@ -7,11 +7,16 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect, DialFailedError].}
import chronos
import peerid,
stream/connection
export peerid, connection
type
DialFailedError* = object of LPError
Dial* = ref object of RootObj
method connect*(

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect, DialFailedError].}
import std/[sugar, tables]
import pkg/[chronos,
@ -32,8 +34,6 @@ declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
type
DialFailedError* = object of CatchableError
Dialer* = ref object of Dial
peerInfo*: PeerInfo
ms: MultistreamSelect

View File

@ -8,9 +8,11 @@ import macros
type
# Base exception type for libp2p
LPError* = object of CatchableError
LPAllFuturesError* = object of LPError
errors*: seq[ref CatchableError]
# could not figure how to make it with a simple template
# sadly nim needs more love for hygenic templates
# sadly nim needs more love for hygienic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable
@ -39,12 +41,14 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
debug "A future has failed, enable trace logging for details", error=exc.name
trace "Exception details", msg=exc.msg
proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void]
{.raises: [Defect, LPAllFuturesError, CancelledError].} =
var futs: seq[Future[T]]
for fut in args:
futs &= fut
proc call() {.async.} =
var first: ref Exception = nil
var allErrors = new LPAllFuturesError
futs = await allFinished(futs)
for fut in futs:
if fut.failed:
@ -54,10 +58,11 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
else:
if err of CancelledError:
raise err
if isNil(first):
first = err
if not isNil(first):
raise first
if isNil(err):
allErrors.errors.add(err)
if allErrors.errors.len > 0:
raise allErrors
return call()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[strutils]
import chronos, chronicles, stew/byteutils
import stream/connection,
@ -25,7 +27,7 @@ const
Ls* = "\x03ls\n"
type
Matcher* = proc (proto: string): bool {.gcsafe.}
Matcher* = proc (proto: string): bool {.gcsafe, raises: [Defect].}
HandlerHolder* = object
protos*: seq[string]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/[chronos, nimcrypto/utils, chronicles, stew/byteutils]
import ../../stream/connection,
../../utility,
@ -56,7 +58,8 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]): Future[void] =
data: seq[byte] = @[]): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
var
left = data.len
offset = 0
@ -85,5 +88,6 @@ proc writeMsg*(conn: Connection,
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: string): Future[void] =
data: string): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
conn.writeMsg(id, msgType, data.toBytes())

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import pkg/[chronos, chronicles, metrics, nimcrypto/utils]
import ./coder,
@ -49,7 +51,8 @@ type
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
func shortLog*(s: LPChannel): auto =
func shortLog*(s: LPChannel): auto
{.raises: [Defect, ValueError].} =
if s.isNil: "LPChannel(nil)"
elif s.conn.peerInfo.isNil: $s.oid
elif s.name != $s.oid and s.name.len > 0:
@ -89,7 +92,7 @@ proc reset*(s: LPChannel) {.async, gcsafe.} =
if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
proc resetMessage() {.async, raises: [Defect].} =
try:
trace "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
@ -135,7 +138,7 @@ method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = "LPChannel"
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
s.timeoutHandler = proc(): Future[void] {.gcsafe, raises: [Defect].} =
trace "Idle timeout expired, resetting LPChannel", s
s.reset()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import tables, sequtils, oids
import chronos, chronicles, stew/byteutils, metrics
import ../muxer,
@ -44,7 +46,9 @@ type
oid*: Oid
maxChannCount: int
func shortLog*(m: MPlex): auto = shortLog(m.connection)
func shortLog*(m: MPlex): auto
{.raises: [Defect, ValueError].} =
shortLog(m.connection)
chronicles.formatIt(Mplex): shortLog(it)
proc newTooManyChannels(): ref TooManyChannels =
@ -71,12 +75,14 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
# happen here
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
proc newStreamInternal*(m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
timeout: Duration):
LPChannel {.gcsafe.} =
proc newStreamInternal*(
m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
timeout: Duration):
LPChannel
{.gcsafe, raises: [Defect, InvalidChannelIdError].} =
## create new channel/stream
##
let id = if initiator:

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos, chronicles
import ../protocols/protocol,
../stream/connection,
@ -21,15 +23,15 @@ const
type
MuxerError* = object of LPError
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.}
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [Defect].}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [Defect].}
Muxer* = ref object of RootObj
streamHandler*: StreamHandler
connection*: Connection
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure.}
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [Defect].}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = ref object of LPProtocol
@ -37,7 +39,8 @@ type
streamHandler*: StreamHandler # triggered every time there is a new stream, called for any muxer instance
muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created
func shortLog*(m: Muxer): auto = shortLog(m.connection)
func shortLog*(m: Muxer): auto {.raises: [Defect, ValueError].} =
shortLog(m.connection)
chronicles.formatIt(Muxer): shortLog(it)
# muxer interface

View File

@ -62,8 +62,8 @@ template postInit(peerinfo: PeerInfo,
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].} =
protocols: openarray[string] = []): PeerInfo
{.raises: [Defect, ResultError[cstring]].} =
result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key).tryGet(),
privateKey: key)
result.postInit(addrs, protocols)

View File

@ -7,13 +7,16 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos
import ../stream/connection
type
LPProtoHandler* = proc (conn: Connection,
proto: string):
Future[void] {.gcsafe, closure.}
LPProtoHandler* = proc (
conn: Connection,
proto: string):
Future[void] {.gcsafe, closure, raises: [Defect].}
LPProtocol* = ref object of RootObj
codecs*: seq[string]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sequtils, sets, tables]
import chronos, chronicles, metrics
import ./pubsub,
@ -45,19 +47,18 @@ method subscribeTopic*(f: FloodSub,
trace "ignoring unknown peer"
return
if subscribe and not(isNil(f.subscriptionValidator)) and not(f.subscriptionValidator(topic)):
if subscribe and
not(isNil(f.subscriptionValidator)) and
not(f.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
warn "ignoring invalid topic subscription", topic, peer
return
if subscribe:
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()
trace "adding subscription for topic", peer, topic
# subscribe the peer to the topic
f.floodsub[topic].incl(peer)
trace "adding subscription for topic", peer, topic
f.floodsub.mgetOrPut(topic,
initHashSet[PubSubPeer]()).incl(peer)
else:
if topic notin f.floodsub:
return
@ -65,7 +66,8 @@ method subscribeTopic*(f: FloodSub,
trace "removing subscription for topic", peer, topic
# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
f.floodsub.withValue(topic, topics):
topics[].excl(peer)
method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
@ -203,7 +205,8 @@ method unsubscribeAll*(f: FloodSub, topic: string) =
for p in f.peers.values:
f.sendSubs(p, @[topic], false)
method initPubSub*(f: FloodSub) =
method initPubSub*(f: FloodSub)
{.raises: [Defect, InitializationError].} =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
f.init()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics, bearssl
import ./pubsub,
@ -72,7 +74,9 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
if (parameters.dOut >= parameters.dLow) or
(parameters.dOut > (parameters.d div 2)):
err("gossipsub: dOut parameter error, Number of outbound connections to keep in the mesh. Must be less than D_lo and at most D/2")
err("gossipsub: dOut parameter error, " &
"Number of outbound connections to keep in the mesh. " &
"Must be less than D_lo and at most D/2")
elif parameters.gossipThreshold >= 0:
err("gossipsub: gossipThreshold parameter error, Must be < 0")
elif parameters.publishThreshold >= parameters.gossipThreshold:
@ -558,7 +562,8 @@ method publish*(g: GossipSub,
return peers.len
proc maintainDirectPeers(g: GossipSub) {.async.} =
proc maintainDirectPeers(g: GossipSub)
{.async, raises: [Defect, CancelledError].} =
while g.heartbeatRunning:
for id, addrs in g.parameters.directPeers:
let peer = g.peers.getOrDefault(id)
@ -569,9 +574,9 @@ proc maintainDirectPeers(g: GossipSub) {.async.} =
let _ = await g.switch.dial(id, addrs, g.codecs)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError:
except CancelledError as exc:
trace "Direct peer dial canceled"
raise
raise exc
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
@ -603,13 +608,16 @@ method stop*(g: GossipSub) {.async.} =
trace "heartbeat stopped"
g.heartbeatFut = nil
method initPubSub*(g: GossipSub) =
method initPubSub*(g: GossipSub)
{.raises: [Defect, InitializationError].} =
procCall FloodSub(g).initPubSub()
if not g.parameters.explicit:
g.parameters = GossipSubParams.init()
g.parameters.validateParameters().tryGet()
let validationRes = g.parameters.validateParameters()
if validationRes.isErr:
raise newException(InitializationError, $validationRes.error)
randomize()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import pubsubpeer,
@ -68,14 +70,18 @@ declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", lab
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
type
InitializationError* = object of LPError
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.}
data: seq[byte]): Future[void]
{.gcsafe, raises: [Defect].}
ValidationResult* {.pure.} = enum
Accept, Reject, Ignore
ValidatorHandler* = proc(topic: string,
message: Message): Future[ValidationResult] {.gcsafe.}
message: Message): Future[ValidationResult]
{.gcsafe, raises: [Defect].}
TopicPair* = tuple[topic: string, handler: TopicHandler]
@ -106,8 +112,7 @@ type
msgSeqno*: uint64
anonymize*: bool # if we omit fromPeer and seqno from RPC messages we send
subscriptionValidator*: SubscriptionValidator # callback used to validate subscriptions
topicsHigh*: int # the maximum number of topics we allow in a subscription message (application specific, defaults to int max)
topicsHigh*: int # the maximum number of topics we allow in a subscription message (application specific, defaults to int max)
knownTopics*: HashSet[string]
method unsubscribePeer*(p: PubSub, peerId: PeerID) {.base.} =
@ -253,7 +258,8 @@ method rpcHandler*(p: PubSub,
else:
libp2p_pubsub_received_prune.inc(labelValues = ["generic"])
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} = discard
method onNewPeer(p: PubSub, peer: PubSubPeer) {.base.} =
discard
method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {.base, gcsafe.} =
# Peer event is raised for the send connection in particular
@ -267,39 +273,42 @@ method onPubSubPeerEvent*(p: PubSub, peer: PubsubPeer, event: PubsubPeerEvent) {
proc getOrCreatePeer*(
p: PubSub,
peer: PeerID,
protos: seq[string]): PubSubPeer =
if peer in p.peers:
return p.peers[peer]
protos: seq[string]): PubSubPeer
{.raises: [Defect, DialFailedError].} =
if peer notin p.peers:
proc getConn(): Future[Connection] {.raises: [Defect, DialFailedError].} =
p.switch.dial(peer, protos)
proc getConn(): Future[Connection] =
p.switch.dial(peer, protos)
proc dropConn(peer: PubSubPeer) =
proc dropConnAsync(peer: PubsubPeer) {.async, raises: [Defect].} =
try:
await p.switch.disconnect(peer.peerId)
except CatchableError as exc: # never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
asyncSpawn dropConnAsync(peer)
proc dropConn(peer: PubSubPeer) =
proc dropConnAsync(peer: PubsubPeer) {.async.} =
try:
await p.switch.disconnect(peer.peerId)
except CatchableError as exc: # never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
asyncSpawn dropConnAsync(peer)
proc onEvent(peer: PubsubPeer, event: PubsubPeerEvent) {.gcsafe, raises: [Defect].} =
p.onPubSubPeerEvent(peer, event)
proc onEvent(peer: PubsubPeer, event: PubsubPeerEvent) {.gcsafe.} =
p.onPubSubPeerEvent(peer, event)
# create new pubsub peer
let pubSubPeer = newPubSubPeer(peer,
getConn,
dropConn,
onEvent,
protos[0])
debug "created new pubsub peer", peer
# create new pubsub peer
let pubSubPeer = newPubSubPeer(peer, getConn, dropConn, onEvent, protos[0])
debug "created new pubsub peer", peer
p.peers[peer] = pubSubPeer
pubSubPeer.observers = p.observers
p.peers[peer] = pubSubPeer
pubSubPeer.observers = p.observers
onNewPeer(p, pubSubPeer)
onNewPeer(p, pubSubPeer)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
pubsubPeer.connect()
pubsubPeer.connect()
return pubSubPeer
return p.peers.getOrDefault(peer)
proc handleData*(p: PubSub, topic: string, data: seq[byte]): Future[void] {.async.} =
if topic notin p.topics: return # Not subscribed
@ -357,7 +366,8 @@ method handleConn*(p: PubSub,
finally:
await conn.closeWithEOF()
method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
method subscribePeer*(p: PubSub, peer: PeerID)
{.base, raises: [Defect, DialFailedError].} =
## subscribe to remote peer to receive/send pubsub
## messages
##
@ -368,7 +378,9 @@ proc updateTopicMetrics(p: PubSub, topic: string) =
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
if p.knownTopics.contains(topic):
libp2p_pubsub_topic_handlers.set(p.topics[topic].handler.len.int64, labelValues = [topic])
libp2p_pubsub_topic_handlers.set(
p.topics.getOrDefault(topic).handler.len.int64,
labelValues = [topic])
else:
libp2p_pubsub_topic_handlers.set(0, labelValues = ["other"])
for key, val in p.topics:
@ -423,11 +435,10 @@ method subscribe*(p: PubSub,
## that will be triggered
## on every received message
##
if topic notin p.topics:
trace "subscribing to topic", name = topic
p.topics[topic] = Topic(name: topic)
trace "subscribing to topic", name = topic
p.topics[topic].handler.add(handler)
p.topics.mgetOrPut(topic,
Topic(name: topic)).handler.add(handler)
for _, peer in p.peers:
p.sendSubs(peer, @[topic], true)
@ -449,36 +460,41 @@ method publish*(p: PubSub,
return 0
method initPubSub*(p: PubSub) {.base.} =
method initPubSub*(p: PubSub)
{.base, raises: [Defect, InitializationError].} =
## perform pubsub initialization
##
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider
method start*(p: PubSub) {.async, base.} =
## start pubsub
##
discard
method stop*(p: PubSub) {.async, base.} =
## stopt pubsub
##
discard
method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t notin p.validators:
p.validators[t] = initHashSet[ValidatorHandler]()
p.validators.mgetOrPut(t,
initHashSet[ValidatorHandler]()).incl(hook)
trace "adding validator for topic", topicId = t
p.validators[t].incl(hook)
method removeValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
for t in topic:
if t in p.validators:
p.validators[t].excl(hook)
p.validators.withValue(t, validators):
validators[].excl(hook)
method validate*(p: PubSub, message: Message): Future[ValidationResult] {.async, base.} =
var pending: seq[Future[ValidationResult]]
@ -519,7 +535,8 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
parameters: PubParams = false): P =
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
let pubsub =
when PubParams is bool:
P(switch: switch,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sequtils, strutils, tables, hashes]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
@ -15,7 +17,8 @@ import rpc/[messages, message, protobuf],
../../stream/connection,
../../crypto/crypto,
../../protobuf/minprotobuf,
../../utility
../../utility,
../../switch
export peerid, connection
@ -40,9 +43,9 @@ type
PubsubPeerEvent* = object
kind*: PubSubPeerEventKind
GetConn* = proc(): Future[Connection] {.gcsafe.}
DropConn* = proc(peer: PubsubPeer) {.gcsafe.} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubsubPeerEvent) {.gcsafe.}
GetConn* = proc(): Future[Connection] {.gcsafe, raises: [Defect, DialFailedError].}
DropConn* = proc(peer: PubsubPeer) {.gcsafe, raises: [Defect].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubsubPeerEvent) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
@ -64,7 +67,8 @@ type
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].}
func hash*(p: PubSubPeer): Hash =
p.peerId.hash
@ -158,7 +162,7 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
try:
let newConn = await p.getConn()
if newConn.isNil:
raise (ref CatchableError)(msg: "Cannot establish send connection")
raise (ref LPError)(msg: "Cannot establish send connection")
# When the send channel goes up, subscriptions need to be sent to the
# remote peer - if we had multiple channels up and one goes down, all
@ -181,8 +185,8 @@ proc connectOnce(p: PubSubPeer): Future[void] {.async.} =
try:
if p.onEvent != nil:
p.onEvent(p, PubsubPeerEvent(kind: PubSubPeerEventKind.Disconnected))
except CancelledError:
raise
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Errors during diconnection events", error = exc.msg
@ -264,11 +268,13 @@ proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [Defect].} =
except Exception as exc: # TODO chronos Exception
raiseAssert exc.msg)
proc newPubSubPeer*(peerId: PeerID,
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string): PubSubPeer =
proc newPubSubPeer*(
peerId: PeerID,
getConn: GetConn,
dropConn: DropConn,
onEvent: OnEvent,
codec: string): PubSubPeer =
PubSubPeer(
getConn: getConn,
dropConn: dropConn,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options, sequtils
import ../../../utility
import ../../../peerid
@ -17,7 +19,7 @@ type
PeerInfoMsg* = object
peerID*: seq[byte]
signedPeerRecord*: seq[byte]
SubOpts* = object
subscribe*: bool
topic*: string

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options
import chronicles
import messages,
@ -14,8 +16,6 @@ import messages,
../../../utility,
../../../protobuf/minprotobuf
{.push raises: [Defect].}
logScope:
topics = "pubsubprotobuf"
@ -116,7 +116,7 @@ proc encodeMessage*(msg: Message, anonymize: bool): seq[byte] =
when defined(libp2p_protobuf_metrics):
libp2p_pubsub_rpc_bytes_write.inc(pb.getLen().int64, labelValues = ["message"])
pb.buffer
proc write*(pb: var ProtoBuffer, field: int, msg: Message, anonymize: bool) =

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables]
import chronos/timer

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos
import chronicles
@ -93,7 +95,8 @@ type
# Utility
func shortLog*(conn: NoiseConnection): auto =
func shortLog*(conn: NoiseConnection): auto
{.raises: [Defect, ValueError].} =
if conn.isNil: "NoiseConnection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
@ -124,7 +127,8 @@ proc hasKey(cs: CipherState): bool =
proc encrypt(
state: var CipherState, data: var openArray[byte],
ad: openArray[byte]): ChaChaPolyTag {.noinit.} =
ad: openArray[byte]): ChaChaPolyTag
{.noinit, raises: [Defect, NoiseNonceMaxError].} =
var nonce: ChaChaPolyNonce
nonce[4..<12] = toBytesLE(state.n)
@ -134,7 +138,8 @@ proc encrypt(
if state.n > NonceMax:
raise newException(NoiseNonceMaxError, "Noise max nonce value reached")
proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte] =
proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseNonceMaxError].} =
result = newSeqOfCap[byte](data.len + sizeof(ChachaPolyTag))
result.add(data)
@ -145,7 +150,8 @@ proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
trace "encryptWithAd",
tag = byteutils.toHex(tag), data = result.shortLog, nonce = state.n - 1
proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte] =
proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseDecryptTagError, NoiseNonceMaxError].} =
var
tagIn = data.toOpenArray(data.len - ChaChaPolyTag.len, data.high).intoChaChaPolyTag
tagOut: ChaChaPolyTag
@ -193,7 +199,8 @@ proc mixKeyAndHash(ss: var SymmetricState; ikm: openArray[byte]) {.used.} =
ss.mixHash(temp_keys[1])
ss.cs = CipherState(k: temp_keys[2])
proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseNonceMaxError].}=
# according to spec if key is empty leave plaintext
if ss.cs.hasKey:
result = ss.cs.encryptWithAd(ss.h.data, data)
@ -201,7 +208,8 @@ proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
result = @data
ss.mixHash(result)
proc decryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
proc decryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseDecryptTagError, NoiseNonceMaxError].} =
# according to spec if key is empty leave plaintext
if ss.cs.hasKey:
result = ss.cs.decryptWithAd(ss.h.data, data)
@ -299,7 +307,8 @@ proc readFrame(sconn: Connection): Future[seq[byte]] {.async.} =
await sconn.readExactly(addr buffer[0], buffer.len)
return buffer
proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] =
proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
doAssert buf.len <= uint16.high.int
var
lesize = buf.len.uint16
@ -311,7 +320,8 @@ proc writeFrame(sconn: Connection, buf: openArray[byte]): Future[void] =
sconn.write(outbuf)
proc receiveHSMessage(sconn: Connection): Future[seq[byte]] = readFrame(sconn)
proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void] =
proc sendHSMessage(sconn: Connection, buf: openArray[byte]): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
writeFrame(sconn, buf)
proc handshakeXXOutbound(
@ -430,7 +440,9 @@ method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} =
proc encryptFrame(
sconn: NoiseConnection, cipherFrame: var openArray[byte], src: openArray[byte]) =
sconn: NoiseConnection,
cipherFrame: var openArray[byte],
src: openArray[byte]) {.raises: [Defect, NoiseNonceMaxError].} =
# Frame consists of length + cipher data + tag
doAssert src.len <= MaxPlainSize
doAssert cipherFrame.len == 2 + src.len + sizeof(ChaChaPolyTag)

View File

@ -6,6 +6,9 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos, chronicles, stew/endians2, bearssl
import nimcrypto/[hmac, sha2, sha, hash, rijndael, twofish, bcmode]
@ -70,7 +73,7 @@ type
SecioError* = object of LPError
func shortLog*(conn: SecioConn): auto =
func shortLog*(conn: SecioConn): auto {.raises: [Defect, ValueError].} =
if conn.isNil: "SecioConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
@ -250,7 +253,8 @@ proc newSecioConn(conn: Connection,
cipher: string,
secrets: Secret,
order: int,
remotePubKey: PublicKey): SecioConn =
remotePubKey: PublicKey): SecioConn
{.raises: [Defect, ResultError[cstring]].} =
## Create new secure stream/lpstream, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``.
@ -422,7 +426,10 @@ method init(s: Secio) {.gcsafe.} =
procCall Secure(s).init()
s.codec = SecioCodec
proc newSecio*(rng: ref BrHmacDrbgContext, localPrivateKey: PrivateKey): Secio =
proc newSecio*(
rng: ref BrHmacDrbgContext,
localPrivateKey: PrivateKey): Secio
{.raises: [Defect, ResultError[CryptoError]].} =
result = Secio(
rng: rng,
localPrivateKey: localPrivateKey,

View File

@ -16,6 +16,8 @@ import ../protocol,
../../peerinfo,
../../errors
{.push raises: [Defect].}
export protocol
logScope:
@ -31,7 +33,7 @@ type
stream*: Connection
buf: StreamSeq
func shortLog*(conn: SecureConn): auto =
func shortLog*(conn: SecureConn): auto {.raises: [Defect, ValueError].} =
if conn.isNil: "SecureConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/strformat
import stew/byteutils
import chronos, chronicles, metrics
@ -33,7 +35,8 @@ type
pushedEof*: bool # eof marker has been put on readQueue
returnedEof*: bool # 0-byte readOnce has been completed
func shortLog*(s: BufferStream): auto =
func shortLog*(s: BufferStream): auto
{.raises: [Defect, ValueError].} =
if s.isNil: "BufferStream(nil)"
elif s.peerInfo.isNil: $s.oid
else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}"
@ -190,14 +193,19 @@ method closeImpl*(s: BufferStream): Future[void] =
# ------------|----------|-------
# Reading | Push Eof | Na
# Pushing | Na | Pop
if not(s.reading and s.pushing):
if s.reading:
if s.readQueue.empty():
# There is an active reader
s.readQueue.addLastNoWait(Eof)
elif s.pushing:
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
try:
if not(s.reading and s.pushing):
if s.reading:
if s.readQueue.empty():
# There is an active reader
s.readQueue.addLastNoWait(Eof)
elif s.pushing:
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
except AsyncQueueFullError as exc:
raiseAssert("Fatal, could not pop from read queue")
except AsyncQueueEmptyError as exc:
raiseAssert("Fatal, could not push to read queue")
trace "Closed BufferStream", s

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos, chronicles, metrics
import connection
@ -31,7 +33,8 @@ when defined(libp2p_agents_metrics):
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
func shortLog*(conn: ChronosStream): string =
func shortLog*(conn: ChronosStream): string
{.raises: [Defect, ValueError].} =
if conn.isNil: "ChronosStream(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
@ -104,7 +107,9 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
if s.tracked:
libp2p_peers_traffic_read.inc(nbytes.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
method write*(s: ChronosStream, msg: seq[byte])
{.async, raises: [Defect, LPStreamClosedError].} =
if s.closed:
raise newLPStreamClosedError()
@ -126,10 +131,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.tracked:
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
method closed*(s: ChronosStream): bool {.raises: [Defect].} =
method closed*(s: ChronosStream): bool =
result = s.client.closed
method atEof*(s: ChronosStream): bool {.raises: [Defect].} =
method atEof*(s: ChronosStream): bool =
s.client.atEof()
method closeImpl*(s: ChronosStream) {.async.} =

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[hashes, oids, strformat]
import chronicles, chronos, metrics
import lpstream,
@ -24,7 +26,7 @@ const
DefaultConnectionTimeout* = 5.minutes
type
TimeoutHandler* = proc(): Future[void] {.gcsafe.}
TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [Defect].}
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
@ -54,7 +56,8 @@ proc onUpgrade*(s: Connection) {.async.} =
if not isNil(s.upgraded):
await s.upgraded
func shortLog*(conn: Connection): string =
func shortLog*(conn: Connection): string
{.raises: [Defect, ValueError].} =
if conn.isNil: "Connection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/oids
import stew/byteutils
import chronicles, chronos, metrics
@ -15,13 +17,11 @@ import ../varint,
../multiaddress,
../errors
export errors
export errors, oids
declareGauge(libp2p_open_streams,
"open stream instances", labels = ["type", "dir"])
export oids
logScope:
topics = "libp2p lpstream"
@ -99,19 +99,19 @@ proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
w.par = p
result = w
proc newLPStreamIncompleteError*(): ref CatchableError =
proc newLPStreamIncompleteError*(): ref LPStreamIncompleteError =
result = newException(LPStreamIncompleteError, "Incomplete data received")
proc newLPStreamLimitError*(): ref CatchableError =
proc newLPStreamLimitError*(): ref LPStreamLimitError =
result = newException(LPStreamLimitError, "Buffer limit reached")
proc newLPStreamIncorrectDefect*(m: string): ref Defect =
proc newLPStreamIncorrectDefect*(m: string): ref LPStreamIncorrectDefect =
result = newException(LPStreamIncorrectDefect, m)
proc newLPStreamEOFError*(): ref CatchableError =
proc newLPStreamEOFError*(): ref LPStreamEOFError =
result = newException(LPStreamEOFError, "Stream EOF!")
proc newLPStreamClosedError*(): ref Exception =
proc newLPStreamClosedError*(): ref LPStreamClosedError =
result = newException(LPStreamClosedError, "Stream Closed!")
func shortLog*(s: LPStream): auto =
@ -143,13 +143,16 @@ method readOnce*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[int]
{.base, async.} =
{.base, async, raises: [Defect, LPStreamEOFError].} =
doAssert(false, "not implemented!")
proc readExactly*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[void] {.async.} =
proc readExactly*(
s: LPStream,
pbytes: pointer,
nbytes: int):
Future[void]
{.async, raises: [Defect, LPStreamEOFError, LPStreamIncompleteError].} =
if s.atEof:
raise newLPStreamEOFError()
@ -178,7 +181,11 @@ proc readExactly*(s: LPStream,
proc readLine*(s: LPStream,
limit = 0,
sep = "\r\n"): Future[string]
{.async, deprecated: "todo".} =
{.
async,
deprecated: "todo",
raises: [Defect, LPStreamEOFError, LPStreamIncompleteError]
.} =
# TODO replace with something that exploits buffering better
var lim = if limit <= 0: -1 else: limit
var state = 0
@ -203,7 +210,8 @@ proc readLine*(s: LPStream,
if len(result) == lim:
break
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
proc readVarint*(conn: LPStream): Future[uint64]
{.async, gcsafe, raises: [Defect, InvalidVarintError].} =
var
varint: uint64
length: int
@ -219,7 +227,8 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]]
{.async, gcsafe, raises: [Defect, LPStreamEOFError, MaxSizeError].} =
## read length prefixed msg, with the length encoded as a varint
let
length = await s.readVarint()
@ -235,10 +244,12 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
await s.readExactly(addr res[0], res.len)
return res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} =
method write*(s: LPStream, msg: seq[byte]): Future[void]
{.base, raises: [Defect, LPStreamClosedError].} =
doAssert(false, "not implemented!")
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
## Write `msg` with a varint-encoded length prefix
let vbytes = PB.toBytes(msg.len().uint64)
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
@ -246,24 +257,29 @@ proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =
buf[vbytes.len..<buf.len] = msg
s.write(buf)
proc writeLp*(s: LPStream, msg: string): Future[void] =
proc writeLp*(s: LPStream, msg: string): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
writeLp(s, msg.toOpenArrayByte(0, msg.high))
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void]
{.deprecated: "seq", raises: [Defect, LPStreamClosedError].} =
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
proc write*(s: LPStream, msg: string): Future[void] =
proc write*(s: LPStream, msg: string): Future[void]
{.raises: [Defect, LPStreamClosedError].} =
s.write(msg.toBytes())
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
method closeImpl*(s: LPStream) {.base, async.} =
## Implementation of close - called only once
##
trace "Closing stream", s, objName = s.objName, dir = $s.dir
s.closeEvent.fire()
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
inc getStreamTracker(s.objName).closed
trace "Closed stream", s, objName = s.objName, dir = $s.dir
method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
method close*(s: LPStream) {.base, async.} = # {.raises [Defect].}
## close the stream - this may block, but will not raise exceptions
##
if s.isClosed:
@ -277,7 +293,7 @@ method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
# itself must implement this - once-only check as well, with their own field
await closeImpl(s)
proc closeWithEOF*(s: LPStream): Future[void] {.async.} =
proc closeWithEOF*(s: LPStream) {.async.} =
## Close the stream and wait for EOF - use this with half-closed streams where
## an EOF is expected to arrive from the other end.
##

View File

@ -1,7 +1,7 @@
import stew/bitops2
{.push raises: [Defect].}
import stew/bitops2
type
StreamSeq* = object
# Seq adapted to the stream use case where we add data at the back and

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables,
options,
sets,
@ -92,41 +94,46 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
method connect*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress]): Future[void] =
addrs: seq[MultiAddress]): Future[void]
{.raises: [Defect, DialFailedError].} =
s.dialer.connect(peerId, addrs)
method dial*(
s: Switch,
peerId: PeerID,
protos: seq[string]): Future[Connection] =
protos: seq[string]): Future[Connection]
{.raises: [Defect, DialFailedError].} =
s.dialer.dial(peerId, protos)
proc dial*(s: Switch,
peerId: PeerID,
proto: string): Future[Connection] =
proto: string): Future[Connection]
{.raises: [Defect, DialFailedError].} =
dial(s, peerId, @[proto])
method dial*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] =
protos: seq[string]): Future[Connection]
{.raises: [Defect, DialFailedError].} =
s.dialer.dial(peerId, addrs, protos)
proc dial*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress],
proto: string): Future[Connection] =
proto: string): Future[Connection]
{.raises: [Defect, DialFailedError].} =
dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe.} =
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe, raises: [Defect, LPError].} =
if isNil(proto.handler):
raise newException(CatchableError,
raise newException(LPError,
"Protocol has to define a handle method or proc")
if proto.codec.len == 0:
raise newException(CatchableError,
raise newException(LPError,
"Protocol has to define a codec string")
s.ms.addHandler(proto.codecs, proto, matcher)
@ -246,9 +253,10 @@ proc newSwitch*(peerInfo: PeerInfo,
muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [],
connManager: ConnManager,
ms: MultistreamSelect): Switch =
ms: MultistreamSelect): Switch
{.raises: [Defect, LPError].} =
if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager")
raise newException(LPError, "Provide at least one secure manager")
let switch = Switch(
peerInfo: peerInfo,
@ -260,21 +268,21 @@ proc newSwitch*(peerInfo: PeerInfo,
switch.mount(identity)
return switch
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool
{.deprecated: "Use PeerID version".} =
not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
# proc isConnected*(s: Switch, peerInfo: PeerInfo): bool
# {.deprecated: "Use PeerID version".} =
# not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version", gcsafe.} =
disconnect(s, peerInfo.peerId)
# proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void]
# {.deprecated: "Use PeerID version", gcsafe.} =
# disconnect(s, peerInfo.peerId)
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void]
{.deprecated: "Use PeerID version".} =
connect(s, peerInfo.peerId, peerInfo.addrs)
# proc connect*(s: Switch, peerInfo: PeerInfo): Future[void]
# {.deprecated: "Use PeerID version".} =
# connect(s, peerInfo.peerId, peerInfo.addrs)
proc dial*(s: Switch,
peerInfo: PeerInfo,
proto: string):
Future[Connection]
{.deprecated: "Use PeerID version".} =
dial(s, peerInfo.peerId, peerInfo.addrs, proto)
# proc dial*(s: Switch,
# peerInfo: PeerInfo,
# proto: string):
# Future[Connection]
# {.deprecated: "Use PeerID version".} =
# dial(s, peerInfo.peerId, peerInfo.addrs, proto)

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, sequtils, tables]
import chronos, chronicles
import transport,
@ -234,7 +236,7 @@ method handles*(
self: TcpTransport,
address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(self).handles(address):
return address.protocols
.tryGet()
.filterIt( it == multiCodec("tcp") )
.len > 0
let protos = address.protocols
if protos.isOk:
let matching = protos.get().filterIt( it == multiCodec("tcp") )
return matching.len > 0

View File

@ -8,6 +8,8 @@
## those terms.
##
{.push raises: [Defect].}
import sequtils
import chronos, chronicles
import ../stream/connection,
@ -28,7 +30,7 @@ type
upgrader*: Upgrade
multicodec*: MultiCodec
proc newTransportClosedError*(parent: ref Exception = nil): ref CatchableError =
proc newTransportClosedError*(parent: ref Exception = nil): ref TransportClosedError =
newException(TransportClosedError,
"Transport closed, no more connections!", parent)
@ -95,7 +97,10 @@ method handles*(
# by default we skip circuit addresses to avoid
# having to repeat the check in every transport
address.protocols.tryGet().filterIt( it == multiCodec("p2p-circuit") ).len == 0
if address.protocols.isOk:
let protos = address.protocols.get()
let matching = protos.filterIt( it == multiCodec("p2p-circuit") )
return matching.len == 0
method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} =
## get the local address of the transport in case started with 0.0.0.0:0

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sequtils]
import pkg/[chronos, chronicles, metrics]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[options, sequtils]
import pkg/[chronos, chronicles, metrics]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import sequtils
import chronos, chronicles

23
tests/asyncunit.nim Normal file
View File

@ -0,0 +1,23 @@
import std/unittest
export unittest
template asyncTeardown*(body: untyped): untyped =
teardown:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncSetup*(body: untyped): untyped =
setup:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe, raises: [Exception].} =
body
)())

View File

@ -1,4 +1,4 @@
import std/unittest
{.push raises: [Defect].}
import chronos, bearssl
@ -9,7 +9,8 @@ import ../libp2p/stream/lpstream
import ../libp2p/muxers/mplex/lpchannel
import ../libp2p/protocols/secure/secure
export unittest
import ./asyncunit
export asyncunit
const
StreamTransportTrackerName = "stream.transport"
@ -48,27 +49,6 @@ template checkTrackers*() =
# Also test the GC is not fooling with us
GC_fullCollect()
template asyncTeardown*(body: untyped): untyped =
teardown:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncSetup*(body: untyped): untyped =
setup:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe.} =
body
)())
type RngWrap = object
rng: ref BrHmacDrbgContext
@ -87,7 +67,7 @@ template rng*(): ref BrHmacDrbgContext =
getRng()
type
WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe.}
WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
TestBufferStream* = ref object of BufferStream
writeHandler*: WriteHandler
@ -99,7 +79,7 @@ proc newBufferStream*(writeHandler: WriteHandler): TestBufferStream =
result.writeHandler = writeHandler
result.initStream()
proc checkExpiringInternal(cond: proc(): bool): Future[bool] {.async, gcsafe.} =
proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} =
{.gcsafe.}:
let start = Moment.now()
while true:

View File

@ -18,8 +18,9 @@ type
proc noop(data: seq[byte]) {.async, gcsafe.} = discard
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
proc getConn(): Future[Connection] =
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer
{.raises: [Defect, DialFailedError].} =
proc getConn(): Future[Connection] {.raises: [Defect, DialFailedError].} =
p.switch.dial(peerId, GossipSubCodec)
proc dropConn(peer: PubSubPeer) =
@ -33,7 +34,8 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
onNewPeer(p, pubSubPeer)
pubSubPeer
proc randomPeerInfo(): PeerInfo =
proc randomPeerInfo(): PeerInfo
{.raises: [Defect, ResultError[cstring]].} =
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
suite "GossipSub internal":

View File

@ -1,7 +1,7 @@
{.used.}
import testgossipinternal,
testfloodsub,
# import testgossipinternal,
import testfloodsub,
testgossipsub,
testmcache,
testtimedcache,

View File

@ -5,6 +5,7 @@ import ../libp2p/stream/bufferstream,
../libp2p/errors
import ./helpers
import ./asyncunit
{.used.}

View File

@ -4,6 +4,7 @@ import ../libp2p/[stream/connection,
stream/bufferstream]
import ./helpers
import ./asyncunit
suite "Connection":
asyncTest "close":

View File

@ -8,6 +8,7 @@ import ../libp2p/[connmanager,
errors]
import helpers
import ./asyncunit
type
TestMuxer = ref object of Muxer

View File

@ -10,7 +10,9 @@ import ../libp2p/[protocols/identify,
transports/tcptransport,
crypto/crypto,
upgrademngrs/upgrade]
import ./helpers
import ./asyncunit
when defined(nimHasUsed): {.used.}

View File

@ -1,7 +1,6 @@
import options, tables
import unittest
import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p/[daemon/daemonapi,
protobuf/minprotobuf,
vbuffer,
@ -28,6 +27,9 @@ import ../libp2p/[daemon/daemonapi,
protocols/pubsub/floodsub,
protocols/pubsub/gossipsub]
import ./helpers
import ./asyncunit
type
# TODO: Unify both PeerInfo structs
NativePeerInfo = peerinfo.PeerInfo
@ -151,7 +153,7 @@ proc testPubSubNodePublish(gossip: bool = false, count: int = 1) {.async.} =
let peer = NativePeerInfo.init(
daemonPeer.peer,
daemonPeer.addresses)
await nativeNode.connect(peer)
await nativeNode.connect(peer.peerId, peer.addrs)
await sleepAsync(1.seconds)
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
@ -216,9 +218,11 @@ suite "Interop":
testFuture.complete()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses),
protos[0])
let conn = await nativeNode.dial(
daemonPeer.peer,
daemonPeer.addresses,
protos[0])
await conn.writeLp("test 1")
check "test 2" == string.fromBytes((await conn.readLp(1024)))
@ -233,195 +237,196 @@ suite "Interop":
await sleepAsync(1.seconds)
asyncTest "native -> daemon connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# We are preparing expect string, which should be prefixed with varint
# length and do not have `\r\n` suffix, because we going to use
# readLine().
var buffer = initVBuffer()
buffer.writeSeq(test & "\r\n")
buffer.finish()
var expect = newString(len(buffer) - 2)
copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
# asyncTest "native -> daemon connection":
# var protos = @["/test-stream"]
# var test = "TEST STRING"
# # We are preparing expect string, which should be prefixed with varint
# # length and do not have `\r\n` suffix, because we going to use
# # readLine().
# var buffer = initVBuffer()
# buffer.writeSeq(test & "\r\n")
# buffer.finish()
# var expect = newString(len(buffer) - 2)
# copyMem(addr expect[0], addr buffer.buffer[0], len(expect))
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise],
outTimeout = 5.minutes)
# let nativeNode = newStandardSwitch(
# secureManagers = [SecureProtocol.Noise],
# outTimeout = 5.minutes)
let awaiters = await nativeNode.start()
# let awaiters = await nativeNode.start()
let daemonNode = await newDaemonApi()
let daemonPeer = await daemonNode.identity()
# let daemonNode = await newDaemonApi()
# let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
# We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string.
var line = await stream.transp.readLine()
check line == expect
testFuture.complete(line)
await stream.close()
# var testFuture = newFuture[string]("test.future")
# proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} =
# # We should perform `readLp()` instead of `readLine()`. `readLine()`
# # here reads actually length prefixed string.
# var line = await stream.transp.readLine()
# check line == expect
# testFuture.complete(line)
# await stream.close()
await daemonNode.addHandler(protos, daemonHandler)
let conn = await nativeNode.dial(NativePeerInfo.init(daemonPeer.peer,
daemonPeer.addresses),
protos[0])
await conn.writeLp(test & "\r\n")
check expect == (await wait(testFuture, 10.secs))
# await daemonNode.addHandler(protos, daemonHandler)
# let conn = await nativeNode.dial(
# daemonPeer.peer,
# daemonPeer.addresses,
# protos[0])
# await conn.writeLp(test & "\r\n")
# check expect == (await wait(testFuture, 10.secs))
await conn.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
# await conn.close()
# await nativeNode.stop()
# await allFutures(awaiters)
# await daemonNode.close()
asyncTest "daemon -> native connection":
var protos = @["/test-stream"]
var test = "TEST STRING"
# asyncTest "daemon -> native connection":
# var protos = @["/test-stream"]
# var test = "TEST STRING"
var testFuture = newFuture[string]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
var line = string.fromBytes(await conn.readLp(1024))
check line == test
testFuture.complete(line)
await conn.close()
# var testFuture = newFuture[string]("test.future")
# proc nativeHandler(conn: Connection, proto: string) {.async.} =
# var line = string.fromBytes(await conn.readLp(1024))
# check line == test
# testFuture.complete(line)
# await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
# # custom proto
# var proto = new LPProtocol
# proto.handler = nativeHandler
# proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
# let nativeNode = newStandardSwitch(
# secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
nativeNode.mount(proto)
# nativeNode.mount(proto)
let awaiters = await nativeNode.start()
let nativePeer = nativeNode.peerInfo
# let awaiters = await nativeNode.start()
# let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
discard await stream.transp.writeLp(test)
# let daemonNode = await newDaemonApi()
# await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
# var stream = await daemonNode.openStream(nativePeer.peerId, protos)
# discard await stream.transp.writeLp(test)
check test == (await wait(testFuture, 10.secs))
# check test == (await wait(testFuture, 10.secs))
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
await sleepAsync(1.seconds)
# await stream.close()
# await nativeNode.stop()
# await allFutures(awaiters)
# await daemonNode.close()
# await sleepAsync(1.seconds)
asyncTest "daemon -> multiple reads and writes":
var protos = @["/test-stream"]
# asyncTest "daemon -> multiple reads and writes":
# var protos = @["/test-stream"]
var testFuture = newFuture[void]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
check "test 1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test 2".toBytes())
# var testFuture = newFuture[void]("test.future")
# proc nativeHandler(conn: Connection, proto: string) {.async.} =
# check "test 1" == string.fromBytes(await conn.readLp(1024))
# await conn.writeLp("test 2".toBytes())
check "test 3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test 4".toBytes())
# check "test 3" == string.fromBytes(await conn.readLp(1024))
# await conn.writeLp("test 4".toBytes())
testFuture.complete()
await conn.close()
# testFuture.complete()
# await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
# # custom proto
# var proto = new LPProtocol
# proto.handler = nativeHandler
# proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
# let nativeNode = newStandardSwitch(
# secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
nativeNode.mount(proto)
# nativeNode.mount(proto)
let awaiters = await nativeNode.start()
let nativePeer = nativeNode.peerInfo
# let awaiters = await nativeNode.start()
# let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
# let daemonNode = await newDaemonApi()
# await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
# var stream = await daemonNode.openStream(nativePeer.peerId, protos)
asyncDiscard stream.transp.writeLp("test 1")
check "test 2" == string.fromBytes(await stream.transp.readLp())
# asyncDiscard stream.transp.writeLp("test 1")
# check "test 2" == string.fromBytes(await stream.transp.readLp())
asyncDiscard stream.transp.writeLp("test 3")
check "test 4" == string.fromBytes(await stream.transp.readLp())
# asyncDiscard stream.transp.writeLp("test 3")
# check "test 4" == string.fromBytes(await stream.transp.readLp())
await wait(testFuture, 10.secs)
# await wait(testFuture, 10.secs)
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
# await stream.close()
# await nativeNode.stop()
# await allFutures(awaiters)
# await daemonNode.close()
asyncTest "read write multiple":
var protos = @["/test-stream"]
var test = "TEST STRING"
# asyncTest "read write multiple":
# var protos = @["/test-stream"]
# var test = "TEST STRING"
var count = 0
var testFuture = newFuture[int]("test.future")
proc nativeHandler(conn: Connection, proto: string) {.async.} =
while count < 10:
var line = string.fromBytes(await conn.readLp(1024))
check line == test
await conn.writeLp(test.toBytes())
count.inc()
# var count = 0
# var testFuture = newFuture[int]("test.future")
# proc nativeHandler(conn: Connection, proto: string) {.async.} =
# while count < 10:
# var line = string.fromBytes(await conn.readLp(1024))
# check line == test
# await conn.writeLp(test.toBytes())
# count.inc()
testFuture.complete(count)
await conn.close()
# testFuture.complete(count)
# await conn.close()
# custom proto
var proto = new LPProtocol
proto.handler = nativeHandler
proto.codec = protos[0] # codec
# # custom proto
# var proto = new LPProtocol
# proto.handler = nativeHandler
# proto.codec = protos[0] # codec
let nativeNode = newStandardSwitch(
secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
# let nativeNode = newStandardSwitch(
# secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes)
nativeNode.mount(proto)
# nativeNode.mount(proto)
let awaiters = await nativeNode.start()
let nativePeer = nativeNode.peerInfo
# let awaiters = await nativeNode.start()
# let nativePeer = nativeNode.peerInfo
let daemonNode = await newDaemonApi()
await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
var stream = await daemonNode.openStream(nativePeer.peerId, protos)
# let daemonNode = await newDaemonApi()
# await daemonNode.connect(nativePeer.peerId, nativePeer.addrs)
# var stream = await daemonNode.openStream(nativePeer.peerId, protos)
var count2 = 0
while count2 < 10:
discard await stream.transp.writeLp(test)
let line = await stream.transp.readLp()
check test == string.fromBytes(line)
inc(count2)
# var count2 = 0
# while count2 < 10:
# discard await stream.transp.writeLp(test)
# let line = await stream.transp.readLp()
# check test == string.fromBytes(line)
# inc(count2)
check 10 == (await wait(testFuture, 1.minutes))
await stream.close()
await nativeNode.stop()
await allFutures(awaiters)
await daemonNode.close()
# check 10 == (await wait(testFuture, 1.minutes))
# await stream.close()
# await nativeNode.stop()
# await allFutures(awaiters)
# await daemonNode.close()
asyncTest "floodsub: daemon publish one":
await testPubSubDaemonPublish()
# asyncTest "floodsub: daemon publish one":
# await testPubSubDaemonPublish()
asyncTest "floodsub: daemon publish many":
await testPubSubDaemonPublish(count = 10)
# asyncTest "floodsub: daemon publish many":
# await testPubSubDaemonPublish(count = 10)
asyncTest "gossipsub: daemon publish one":
await testPubSubDaemonPublish(gossip = true)
# asyncTest "gossipsub: daemon publish one":
# await testPubSubDaemonPublish(gossip = true)
asyncTest "gossipsub: daemon publish many":
await testPubSubDaemonPublish(gossip = true, count = 10)
# asyncTest "gossipsub: daemon publish many":
# await testPubSubDaemonPublish(gossip = true, count = 10)
asyncTest "floodsub: node publish one":
await testPubSubNodePublish()
# asyncTest "floodsub: node publish one":
# await testPubSubNodePublish()
asyncTest "floodsub: node publish many":
await testPubSubNodePublish(count = 10)
# asyncTest "floodsub: node publish many":
# await testPubSubNodePublish(count = 10)
asyncTest "gossipsub: node publish one":
await testPubSubNodePublish(gossip = true)
# asyncTest "gossipsub: node publish one":
# await testPubSubNodePublish(gossip = true)
asyncTest "gossipsub: node publish many":
await testPubSubNodePublish(gossip = true, count = 10)
# asyncTest "gossipsub: node publish many":
# await testPubSubNodePublish(gossip = true, count = 10)

View File

@ -1,404 +1,405 @@
import unittest, strutils, strformat, stew/byteutils
import chronos
import ../libp2p/errors,
../libp2p/multistream,
../libp2p/stream/bufferstream,
../libp2p/stream/connection,
../libp2p/multiaddress,
../libp2p/transports/transport,
../libp2p/transports/tcptransport,
../libp2p/protocols/protocol,
../libp2p/upgrademngrs/upgrade
import ./helpers
when defined(nimHasUsed): {.used.}
## Mock stream for select test
type
TestSelectStream = ref object of Connection
step*: int
method readOnce*(s: TestSelectStream,
pbytes: pointer,
nbytes: int): Future[int] {.async, gcsafe.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
return "\0x3na\n".len()
method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestSelectStream(): TestSelectStream =
new result
result.step = 1
## Mock stream for handles `ls` test
type
LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe.}
TestLsStream = ref object of Connection
step*: int
ls*: LsHandler
method readOnce*(s: TestLsStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len()
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len()
of 3:
var buf = newSeq[byte](1)
buf[0] = 3
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len()
of 4:
var buf = "ls\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
else:
var buf = "na\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.ls(msg)
method close(s: TestLsStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
new result
result.ls = ls
result.step = 1
## Mock stream for handles `na` test
type
NaHandler = proc(procs: string): Future[void] {.gcsafe.}
TestNaStream = ref object of Connection
step*: int
na*: NaHandler
method readOnce*(s: TestNaStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async, gcsafe.} =
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len()
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len()
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len()
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
return "\0x3na\n".len()
method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
if s.step == 4:
await s.na(string.fromBytes(msg))
method close(s: TestNaStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestNaStream(na: NaHandler): TestNaStream =
new result
result.na = na
result.step = 1
suite "Multistream select":
teardown:
checkTrackers()
asyncTest "test select custom proto":
let ms = newMultistream()
let conn = newTestSelectStream()
check (await ms.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0"
await conn.close()
asyncTest "test handle custom proto":
let ms = newMultistream()
let conn = newTestSelectStream()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
check proto == "/test/proto/1.0.0"
await conn.close()
protocol.handler = testHandler
ms.addHandler("/test/proto/1.0.0", protocol)
await ms.handle(conn)
asyncTest "test handle `ls`":
let ms = newMultistream()
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration
let conn = Connection(newTestLsStream(testLsHandler))
let done = newFuture[void]()
proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} =
var strProto: string = string.fromBytes(proto)
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
await conn.close()
done.complete()
proc testHandler(conn: Connection, proto: string): Future[void]
{.async, gcsafe.} = discard
var protocol: LPProtocol = new LPProtocol
protocol.handler = testHandler
ms.addHandler("/test/proto1/1.0.0", protocol)
ms.addHandler("/test/proto2/1.0.0", protocol)
await ms.handle(conn)
await done.wait(5.seconds)
asyncTest "test handle `na`":
let ms = newMultistream()
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.}
let conn = newTestNaStream(testNaHandler)
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
echo msg
check msg == Na
await conn.close()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} = discard
protocol.handler = testHandler
ms.addHandler("/unabvailable/proto/1.0.0", protocol)
await ms.handle(conn)
asyncTest "e2e - handle":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
check proto == "/test/proto/1.0.0"
await conn.writeLp("Hello!")
await conn.close()
protocol.handler = testHandler
let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1 = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let conn = await transport1.accept()
await msListen.handle(conn)
await conn.close()
let handlerWait = acceptHandler()
let msDial = newMultistream()
let transport2 = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, "/test/proto/1.0.0")) == true
let hello = string.fromBytes(await conn.readLp(1024))
check hello == "Hello!"
await conn.close()
await transport2.stop()
await transport1.stop()
await handlerWait.wait(30.seconds)
asyncTest "e2e - ls":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let
handlerWait = newFuture[void]()
let msListen = newMultistream()
var protocol: LPProtocol = new LPProtocol
protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} =
# never reached
discard
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async.} =
# never reached
discard
protocol.handler = testHandler
msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let conn = await transport1.accept()
try:
await msListen.handle(conn)
except LPStreamEOFError:
discard
except LPStreamClosedError:
discard
finally:
await conn.close()
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let ls = await msDial.list(conn)
let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
check ls == protos
await conn.close()
await acceptFut
await transport2.stop()
await transport1.stop()
await listenFut.wait(5.seconds)
asyncTest "e2e - select one from a list with unsupported protos":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
check proto == "/test/proto/1.0.0"
await conn.writeLp("Hello!")
await conn.close()
protocol.handler = testHandler
let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let conn = await transport1.accept()
await msListen.handle(conn)
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn,
@["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
let hello = string.fromBytes(await conn.readLp(1024))
check hello == "Hello!"
await conn.close()
await acceptFut
await transport2.stop()
await transport1.stop()
asyncTest "e2e - select one with both valid":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
await conn.writeLp(&"Hello from {proto}!")
await conn.close()
protocol.handler = testHandler
let msListen = newMultistream()
msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
let conn = await transport1.accept()
await msListen.handle(conn)
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn,
@[
"/test/proto2/1.0.0",
"/test/proto1/1.0.0"
])) == "/test/proto2/1.0.0"
check string.fromBytes(await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!"
await conn.close()
await acceptFut
await transport2.stop()
await transport1.stop()
# import unittest, strutils, strformat, stew/byteutils
# import chronos
# import ../libp2p/errors,
# ../libp2p/multistream,
# ../libp2p/stream/bufferstream,
# ../libp2p/stream/connection,
# ../libp2p/multiaddress,
# ../libp2p/transports/transport,
# ../libp2p/transports/tcptransport,
# ../libp2p/protocols/protocol,
# ../libp2p/upgrademngrs/upgrade
# import ./helpers
# import ./asyncunit
# when defined(nimHasUsed): {.used.}
# ## Mock stream for select test
# type
# TestSelectStream = ref object of Connection
# step*: int
# method readOnce*(s: TestSelectStream,
# pbytes: pointer,
# nbytes: int): Future[int] {.async, gcsafe.} =
# case s.step:
# of 1:
# var buf = newSeq[byte](1)
# buf[0] = 19
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 2
# return buf.len
# of 2:
# var buf = "/multistream/1.0.0\n"
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 3
# return buf.len
# of 3:
# var buf = newSeq[byte](1)
# buf[0] = 18
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 4
# return buf.len
# of 4:
# var buf = "/test/proto/1.0.0\n"
# copyMem(pbytes, addr buf[0], buf.len())
# return buf.len
# else:
# copyMem(pbytes,
# cstring("\0x3na\n"),
# "\0x3na\n".len())
# return "\0x3na\n".len()
# method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
# method close(s: TestSelectStream) {.async, gcsafe.} =
# s.isClosed = true
# s.isEof = true
# proc newTestSelectStream(): TestSelectStream =
# new result
# result.step = 1
# ## Mock stream for handles `ls` test
# type
# LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe.}
# TestLsStream = ref object of Connection
# step*: int
# ls*: LsHandler
# method readOnce*(s: TestLsStream,
# pbytes: pointer,
# nbytes: int):
# Future[int] {.async.} =
# case s.step:
# of 1:
# var buf = newSeq[byte](1)
# buf[0] = 19
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 2
# return buf.len()
# of 2:
# var buf = "/multistream/1.0.0\n"
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 3
# return buf.len()
# of 3:
# var buf = newSeq[byte](1)
# buf[0] = 3
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 4
# return buf.len()
# of 4:
# var buf = "ls\n"
# copyMem(pbytes, addr buf[0], buf.len())
# return buf.len()
# else:
# var buf = "na\n"
# copyMem(pbytes, addr buf[0], buf.len())
# return buf.len()
# method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
# if s.step == 4:
# await s.ls(msg)
# method close(s: TestLsStream) {.async, gcsafe.} =
# s.isClosed = true
# s.isEof = true
# proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
# new result
# result.ls = ls
# result.step = 1
# ## Mock stream for handles `na` test
# type
# NaHandler = proc(procs: string): Future[void] {.gcsafe.}
# TestNaStream = ref object of Connection
# step*: int
# na*: NaHandler
# method readOnce*(s: TestNaStream,
# pbytes: pointer,
# nbytes: int):
# Future[int] {.async, gcsafe.} =
# case s.step:
# of 1:
# var buf = newSeq[byte](1)
# buf[0] = 19
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 2
# return buf.len()
# of 2:
# var buf = "/multistream/1.0.0\n"
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 3
# return buf.len()
# of 3:
# var buf = newSeq[byte](1)
# buf[0] = 18
# copyMem(pbytes, addr buf[0], buf.len())
# s.step = 4
# return buf.len()
# of 4:
# var buf = "/test/proto/1.0.0\n"
# copyMem(pbytes, addr buf[0], buf.len())
# return buf.len()
# else:
# copyMem(pbytes,
# cstring("\0x3na\n"),
# "\0x3na\n".len())
# return "\0x3na\n".len()
# method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
# if s.step == 4:
# await s.na(string.fromBytes(msg))
# method close(s: TestNaStream) {.async, gcsafe.} =
# s.isClosed = true
# s.isEof = true
# proc newTestNaStream(na: NaHandler): TestNaStream =
# new result
# result.na = na
# result.step = 1
# suite "Multistream select":
# teardown:
# checkTrackers()
# asyncTest "test select custom proto":
# let ms = newMultistream()
# let conn = newTestSelectStream()
# check (await ms.select(conn, @["/test/proto/1.0.0"])) == "/test/proto/1.0.0"
# await conn.close()
# asyncTest "test handle custom proto":
# let ms = newMultistream()
# let conn = newTestSelectStream()
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} =
# check proto == "/test/proto/1.0.0"
# await conn.close()
# protocol.handler = testHandler
# ms.addHandler("/test/proto/1.0.0", protocol)
# await ms.handle(conn)
# asyncTest "test handle `ls`":
# let ms = newMultistream()
# proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} # forward declaration
# let conn = Connection(newTestLsStream(testLsHandler))
# let done = newFuture[void]()
# proc testLsHandler(proto: seq[byte]) {.async, gcsafe.} =
# var strProto: string = string.fromBytes(proto)
# check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
# await conn.close()
# done.complete()
# proc testHandler(conn: Connection, proto: string): Future[void]
# {.async, gcsafe.} = discard
# var protocol: LPProtocol = new LPProtocol
# protocol.handler = testHandler
# ms.addHandler("/test/proto1/1.0.0", protocol)
# ms.addHandler("/test/proto2/1.0.0", protocol)
# await ms.handle(conn)
# await done.wait(5.seconds)
# asyncTest "test handle `na`":
# let ms = newMultistream()
# proc testNaHandler(msg: string): Future[void] {.async, gcsafe.}
# let conn = newTestNaStream(testNaHandler)
# proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
# echo msg
# check msg == Na
# await conn.close()
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} = discard
# protocol.handler = testHandler
# ms.addHandler("/unabvailable/proto/1.0.0", protocol)
# await ms.handle(conn)
# asyncTest "e2e - handle":
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} =
# check proto == "/test/proto/1.0.0"
# await conn.writeLp("Hello!")
# await conn.close()
# protocol.handler = testHandler
# let msListen = newMultistream()
# msListen.addHandler("/test/proto/1.0.0", protocol)
# let transport1 = TcpTransport.init(upgrade = Upgrade())
# asyncCheck transport1.start(ma)
# proc acceptHandler(): Future[void] {.async, gcsafe.} =
# let conn = await transport1.accept()
# await msListen.handle(conn)
# await conn.close()
# let handlerWait = acceptHandler()
# let msDial = newMultistream()
# let transport2 = TcpTransport.init(upgrade = Upgrade())
# let conn = await transport2.dial(transport1.ma)
# check (await msDial.select(conn, "/test/proto/1.0.0")) == true
# let hello = string.fromBytes(await conn.readLp(1024))
# check hello == "Hello!"
# await conn.close()
# await transport2.stop()
# await transport1.stop()
# await handlerWait.wait(30.seconds)
# asyncTest "e2e - ls":
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
# let
# handlerWait = newFuture[void]()
# let msListen = newMultistream()
# var protocol: LPProtocol = new LPProtocol
# protocol.handler = proc(conn: Connection, proto: string) {.async, gcsafe.} =
# # never reached
# discard
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async.} =
# # never reached
# discard
# protocol.handler = testHandler
# msListen.addHandler("/test/proto1/1.0.0", protocol)
# msListen.addHandler("/test/proto2/1.0.0", protocol)
# let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# let listenFut = transport1.start(ma)
# proc acceptHandler(): Future[void] {.async, gcsafe.} =
# let conn = await transport1.accept()
# try:
# await msListen.handle(conn)
# except LPStreamEOFError:
# discard
# except LPStreamClosedError:
# discard
# finally:
# await conn.close()
# let acceptFut = acceptHandler()
# let msDial = newMultistream()
# let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# let conn = await transport2.dial(transport1.ma)
# let ls = await msDial.list(conn)
# let protos: seq[string] = @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
# check ls == protos
# await conn.close()
# await acceptFut
# await transport2.stop()
# await transport1.stop()
# await listenFut.wait(5.seconds)
# asyncTest "e2e - select one from a list with unsupported protos":
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} =
# check proto == "/test/proto/1.0.0"
# await conn.writeLp("Hello!")
# await conn.close()
# protocol.handler = testHandler
# let msListen = newMultistream()
# msListen.addHandler("/test/proto/1.0.0", protocol)
# let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# asyncCheck transport1.start(ma)
# proc acceptHandler(): Future[void] {.async, gcsafe.} =
# let conn = await transport1.accept()
# await msListen.handle(conn)
# let acceptFut = acceptHandler()
# let msDial = newMultistream()
# let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# let conn = await transport2.dial(transport1.ma)
# check (await msDial.select(conn,
# @["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
# let hello = string.fromBytes(await conn.readLp(1024))
# check hello == "Hello!"
# await conn.close()
# await acceptFut
# await transport2.stop()
# await transport1.stop()
# asyncTest "e2e - select one with both valid":
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} =
# await conn.writeLp(&"Hello from {proto}!")
# await conn.close()
# protocol.handler = testHandler
# let msListen = newMultistream()
# msListen.addHandler("/test/proto1/1.0.0", protocol)
# msListen.addHandler("/test/proto2/1.0.0", protocol)
# let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# asyncCheck transport1.start(ma)
# proc acceptHandler(): Future[void] {.async, gcsafe.} =
# let conn = await transport1.accept()
# await msListen.handle(conn)
# let acceptFut = acceptHandler()
# let msDial = newMultistream()
# let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
# let conn = await transport2.dial(transport1.ma)
# check (await msDial.select(conn,
# @[
# "/test/proto2/1.0.0",
# "/test/proto1/1.0.0"
# ])) == "/test/proto2/1.0.0"
# check string.fromBytes(await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!"
# await conn.close()
# await acceptFut
# await transport2.stop()
# await transport1.stop()

View File

@ -32,7 +32,9 @@ import ../libp2p/[switch,
protocols/secure/secure,
upgrademngrs/muxedupgrade,
connmanager]
import ./helpers
import ./asyncunit
const
TestCodec = "/test/proto/1.0.0"
@ -40,8 +42,8 @@ const
type
TestProto = ref object of LPProtocol
method init(p: TestProto) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
method init(p: TestProto) {.gcsafe, raises: [Defect].} =
proc handle(conn: Connection, proto: string) {.async, gcsafe, raises: [Defect].} =
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
@ -252,7 +254,7 @@ suite "Noise":
(switch2, peerInfo2) = createSwitch(ma2, true)
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec)
await conn.writeLp("Hello!")
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
@ -281,7 +283,10 @@ suite "Noise":
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
expect(UpgradeFailedError):
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
let conn = await switch2.dial(
switch1.peerInfo.peerId,
switch1.peerInfo.addrs,
TestCodec)
await allFuturesThrowing(
switch1.stop(),

View File

@ -7,6 +7,7 @@ import ../libp2p/crypto/crypto,
../libp2p/peerid
import ./helpers
import ./asyncunit
suite "PeerInfo":
test "Should init with private key":

View File

@ -4,6 +4,7 @@ import chronos
import ../libp2p/utils/semaphore
import ./helpers
import ./asyncunit
randomize()

View File

@ -1,6 +1,6 @@
{.used.}
import unittest, options, sequtils
import options, sequtils
import chronos
import stew/byteutils
import nimcrypto/sysrand
@ -20,6 +20,7 @@ import ../libp2p/[errors,
stream/lpstream,
stream/chronosstream,
transports/tcptransport]
import ./helpers
const

View File

@ -11,6 +11,7 @@ import ../libp2p/[stream/connection,
wire]
import ./helpers
import ./asyncunit
suite "TCP transport":
teardown: