adding raises defect across the codebase (#572)

* adding raises defect across the codebase

* use unittest2

* add windows deps caching

* update mingw link

* die on failed peerinfo initialization

* use result.expect instead of get

* use expect more consistently and rework inits

* use expect more consistently

* throw on missing public key

* remove unused closure annotation

* merge master
This commit is contained in:
Dmitriy Ryajov 2021-05-21 10:27:01 -06:00 committed by GitHub
parent 9674a6a6f6
commit ac4e060e1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 681 additions and 356 deletions

View File

@ -43,6 +43,38 @@ jobs:
with:
path: nim-libp2p
submodules: true
- name: Derive environment variables
shell: bash
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
ARCH=64
PLATFORM=x64
else
ARCH=32
PLATFORM=x86
fi
echo "ARCH=$ARCH" >> $GITHUB_ENV
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
ext=
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
ext=.exe
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "ext=$ext" >> $GITHUB_ENV
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
@ -63,22 +95,50 @@ jobs:
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: Install build dependencies (Windows)
- name: Restore MinGW-W64 (Windows) from cache
if: runner.os == 'Windows'
id: windows-mingw-cache
uses: actions/cache@v2
with:
path: external/mingw-${{ matrix.target.cpu }}
key: 'mingw-${{ matrix.target.cpu }}'
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v2
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
- name: Install MinGW64 dependency (Windows)
if: >
steps.windows-mingw-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
shell: bash
run: |
mkdir external
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
arch=64
else
arch=32
fi
curl -L "https://nim-lang.org/download/mingw$arch-6.3.0.7z" -o "external/mingw$arch.7z"
mkdir -p external
curl -L "https://nim-lang.org/download/mingw$ARCH.7z" -o "external/mingw-${{ matrix.target.cpu }}.7z"
7z x -y "external/mingw-${{ matrix.target.cpu }}.7z" -oexternal/
mv external/mingw$ARCH external/mingw-${{ matrix.target.cpu }}
- name: Install DLLs dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
shell: bash
run: |
mkdir -p external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x "external/mingw$arch.7z" -oexternal/
7z x external/windeps.zip -oexternal/dlls
echo '${{ github.workspace }}'"/external/mingw$arch/bin" >> $GITHUB_PATH
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
shell: bash
run: |
echo "${{ github.workspace }}/external/mingw-${{ matrix.target.cpu }}/bin" >> $GITHUB_PATH
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
- name: Setup environment
shell: bash
@ -133,20 +193,6 @@ jobs:
if: steps.nim-cache.outputs.cache-hit != 'true'
shell: bash
run: |
ncpu=
ext=
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
ext=.exe
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
if [[ ! -e csources/bin/nim$ext ]]; then
make -C csources -j $ncpu CC=gcc ucpu='${{ matrix.target.cpu }}'
@ -164,29 +210,6 @@ jobs:
rm -rf dist
rm -rf .git
# - name: Install library dependencies (Linux amd64)
# if: runner.os == 'Linux' && matrix.target.cpu == 'amd64'
# run: |
# sudo DEBIAN_FRONTEND='noninteractive' apt-get install \
# --no-install-recommends -yq libgmp-dev
#
# - name: Install library dependencies (Linux i386)
# if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
# run: |
# sudo DEBIAN_FRONTEND='noninteractive' apt-get install \
# libgmp-dev:i386
#
# - name: Install library dependencies (macOS)
# if: runner.os == 'macOS'
# run: brew install gmp
#
# - name: Install library dependencies (Windows)
# if: runner.os == 'Windows'
# shell: bash
# run: |
# choco install msys2
# pacman -S mingw-w64-x86_64-gmp
- name: Setup Go
uses: actions/setup-go@v2
with:

View File

@ -1,3 +1,14 @@
## Nim-Libp2p
## Copyright (c) 2020 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import
options, tables, chronos, bearssl,
switch, peerid, peerinfo, stream/connection, multiaddress,
@ -36,9 +47,14 @@ type
agentVersion: string
proc new*(T: type[SwitchBuilder]): T =
let address = MultiAddress
.init("/ip4/127.0.0.1/tcp/0")
.expect("address should initialize to default")
SwitchBuilder(
privKey: none(PrivateKey),
address: MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
address: address,
secureManagers: @[],
tcpTransportOpts: TcpTransportOpts(),
maxConnections: MaxConnections,
@ -109,10 +125,11 @@ proc withAgentVersion*(b: SwitchBuilder, agentVersion: string): SwitchBuilder =
proc build*(b: SwitchBuilder): Switch =
if b.rng == nil: # newRng could fail
raise (ref CatchableError)(msg: "Cannot initialize RNG")
raise newException(Defect, "Cannot initialize RNG")
let pkRes = PrivateKey.random(b.rng[])
let
seckey = b.privKey.get(otherwise = PrivateKey.random(b.rng[]).tryGet())
seckey = b.privKey.get(otherwise = pkRes.expect("Should supply a valid RNG"))
var
secureManagerInstances: seq[Secure]
@ -120,11 +137,11 @@ proc build*(b: SwitchBuilder): Switch =
secureManagerInstances.add(newNoise(b.rng, seckey).Secure)
let
peerInfo = block:
var info = PeerInfo.init(seckey, [b.address])
info.protoVersion = b.protoVersion
info.agentVersion = b.agentVersion
info
peerInfo = PeerInfo.init(
seckey,
[b.address],
protoVersion = b.protoVersion,
agentVersion = b.agentVersion)
let
muxers = block:
@ -149,16 +166,19 @@ proc build*(b: SwitchBuilder): Switch =
if isNil(b.rng):
b.rng = newRng()
let switch = newSwitch(
peerInfo = peerInfo,
transports = transports,
identity = identify,
muxers = muxers,
secureManagers = secureManagerInstances,
maxConnections = b.maxConnections,
maxIn = b.maxIn,
maxOut = b.maxOut,
maxConnsPerPeer = b.maxConnsPerPeer)
let switch = try:
newSwitch(
peerInfo = peerInfo,
transports = transports,
identity = identify,
muxers = muxers,
secureManagers = secureManagerInstances,
maxConnections = b.maxConnections,
maxIn = b.maxIn,
maxOut = b.maxOut,
maxConnsPerPeer = b.maxConnsPerPeer)
except CatchableError as exc:
raise newException(Defect, exc.msg)
return switch

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[options, tables, sequtils, sets]
import chronos, chronicles, metrics
import peerinfo,
@ -25,9 +27,10 @@ const
MaxConnectionsPerPeer* = 5
type
TooManyConnectionsError* = object of CatchableError
TooManyConnectionsError* = object of LPError
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
ConnProvider* = proc(): Future[Connection]
{.gcsafe, closure, raises: [Defect].}
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
@ -45,7 +48,8 @@ type
discard
ConnEventHandler* =
proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.}
proc(peerId: PeerID, event: ConnEvent): Future[void]
{.gcsafe, raises: [Defect].}
PeerEventKind* {.pure.} = enum
Left,
@ -105,13 +109,27 @@ proc addConnEventHandler*(c: ConnManager,
## Add peer event handler - handlers must not raise exceptions!
##
if isNil(handler): return
c.connEvents[kind].incl(handler)
try:
if isNil(handler): return
c.connEvents[kind].incl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Might be related to https://github.com/nim-lang/Nim/issues/17382
raiseAssert exc.msg
proc removeConnEventHandler*(c: ConnManager,
handler: ConnEventHandler,
kind: ConnEventKind) =
c.connEvents[kind].excl(handler)
try:
c.connEvents[kind].excl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Might be related to https://github.com/nim-lang/Nim/issues/17382
raiseAssert exc.msg
proc triggerConnEvent*(c: ConnManager,
peerId: PeerID,
@ -138,12 +156,26 @@ proc addPeerEventHandler*(c: ConnManager,
##
if isNil(handler): return
c.peerEvents[kind].incl(handler)
try:
c.peerEvents[kind].incl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Might be related to https://github.com/nim-lang/Nim/issues/17382
raiseAssert exc.msg
proc removePeerEventHandler*(c: ConnManager,
handler: PeerEventHandler,
kind: PeerEventKind) =
c.peerEvents[kind].excl(handler)
try:
c.peerEvents[kind].excl(handler)
except Exception as exc:
# TODO: there is an Exception being raised
# somewhere in the depths of the std.
# Might be related to https://github.com/nim-lang/Nim/issues/17382
raiseAssert exc.msg
proc triggerPeerEvents*(c: ConnManager,
peerId: PeerID,
@ -205,7 +237,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
if conn notin c.muxed:
return
return muxer == c.muxed[conn].muxer
return muxer == c.muxed.getOrDefault(conn).muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "Cleaning up muxer", m = muxerHolder.muxer
@ -338,11 +370,12 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
return
if conn in c.muxed:
return c.muxed[conn].muxer
return c.muxed.getOrDefault(conn).muxer
else:
debug "no muxer for connection", conn
proc storeConn*(c: ConnManager, conn: Connection) =
proc storeConn*(c: ConnManager, conn: Connection)
{.raises: [Defect, CatchableError].} =
## store a connection
##
@ -456,7 +489,8 @@ proc trackOutgoingConn*(c: ConnManager,
proc storeMuxer*(c: ConnManager,
muxer: Muxer,
handle: Future[void] = nil) =
handle: Future[void] = nil)
{.raises: [Defect, CatchableError].} =
## store the connection and muxer
##

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
## This module implementes API for `go-libp2p-daemon`.
import std/[os, osproc, strutils, tables, strtabs]
import chronos, chronicles
@ -467,7 +469,7 @@ proc checkResponse(pb: var ProtoBuffer): ResponseKind {.inline.} =
else:
result = ResponseKind.Error
proc getErrorMessage(pb: var ProtoBuffer): string {.inline.} =
proc getErrorMessage(pb: var ProtoBuffer): string {.inline, raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == cast[int](ResponseType.ERROR):
if pb.getString(1, result) == -1:
raise newException(DaemonLocalError, "Error message is missing!")
@ -825,7 +827,8 @@ proc transactMessage(transp: StreamTransport,
raise newException(DaemonLocalError, "Incorrect or empty message received!")
result = initProtoBuffer(message)
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
proc getPeerInfo(pb: var ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
## Get PeerInfo object from ``pb``.
result.addresses = newSeq[MultiAddress]()
if pb.getValue(1, result.peer) == -1:
@ -834,7 +837,11 @@ proc getPeerInfo(pb: var ProtoBuffer): PeerInfo =
while pb.getBytes(2, address) != -1:
if len(address) != 0:
var copyaddr = address
result.addresses.add(MultiAddress.init(copyaddr).tryGet())
let addrRes = MultiAddress.init(copyaddr)
if addrRes.isErr:
raise newException(DaemonLocalError, addrRes.error)
result.addresses.add(MultiAddress.init(copyaddr).get())
address.setLen(0)
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
@ -996,26 +1003,31 @@ proc cmTrimPeers*(api: DaemonAPI) {.async.} =
finally:
await api.closeConnection(transp)
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo =
proc dhtGetSinglePeerInfo(pb: var ProtoBuffer): PeerInfo
{.raises: [Defect, DaemonLocalError].} =
if pb.enterSubmessage() == 2:
result = pb.getPeerInfo()
else:
raise newException(DaemonLocalError, "Missing required field `peer`!")
proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte] =
proc dhtGetSingleValue(pb: var ProtoBuffer): seq[byte]
{.raises: [Defect, DaemonLocalError].} =
result = newSeq[byte]()
if pb.getLengthValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey =
proc dhtGetSinglePublicKey(pb: var ProtoBuffer): PublicKey
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID =
proc dhtGetSinglePeerID(pb: var ProtoBuffer): PeerID
{.raises: [Defect, DaemonLocalError].} =
if pb.getValue(3, result) == -1:
raise newException(DaemonLocalError, "Missing field `value`!")
proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} =
proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType)
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
var res = pb.enterSubmessage()
if res == cast[int](ResponseType.DHT):
@ -1026,12 +1038,14 @@ proc enterDhtMessage(pb: var ProtoBuffer, rt: DHTResponseType) {.inline.} =
else:
raise newException(DaemonLocalError, "Wrong message type!")
proc enterPsMessage(pb: var ProtoBuffer) {.inline.} =
proc enterPsMessage(pb: var ProtoBuffer)
{.inline, raises: [Defect, DaemonLocalError].} =
var res = pb.enterSubmessage()
if res != cast[int](ResponseType.PUBSUB):
raise newException(DaemonLocalError, "Wrong message type!")
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType {.inline.} =
proc getDhtMessageType(pb: var ProtoBuffer): DHTResponseType
{.inline, raises: [Defect, DaemonLocalError].} =
var dtype: uint
if pb.getVarintValue(1, dtype) == 0:
raise newException(DaemonLocalError, "Missing required DHT field `type`!")

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
## This module implements Pool of StreamTransport.
import chronos

View File

@ -5,8 +5,14 @@ import chronos
import chronicles
import macros
type
# Base exception type for libp2p
LPError* = object of CatchableError
LPAllFuturesError* = object of LPError
errors*: seq[ref CatchableError]
# could not figure how to make it with a simple template
# sadly nim needs more love for hygenic templates
# sadly nim needs more love for hygienic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable

View File

@ -11,11 +11,11 @@
{.push raises: [Defect].}
import nativesockets, hashes
import pkg/chronos
import std/[nativesockets, hashes]
import tables, strutils, stew/shims/net
import chronos
import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
protobuf/minprotobuf
protobuf/minprotobuf, errors
import stew/[base58, base32, endians2, results]
export results, minprotobuf, vbuffer
@ -46,7 +46,7 @@ type
MaResult*[T] = Result[T, string]
MaError* = object of CatchableError
MaError* = object of LPError
MaInvalidAddress* = object of MaError
IpTransportProtocol* = enum

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[strutils]
import chronos, chronicles, stew/byteutils
import stream/connection,
@ -25,7 +27,7 @@ const
Ls* = "\x03ls\n"
type
Matcher* = proc (proto: string): bool {.gcsafe.}
Matcher* = proc (proto: string): bool {.gcsafe, raises: [Defect].}
HandlerHolder* = object
protos*: seq[string]

View File

@ -7,12 +7,15 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos
import nimcrypto/utils, chronicles, stew/byteutils
import ../../stream/connection,
../../utility,
../../varint,
../../vbuffer
../../vbuffer,
../muxer
logScope:
topics = "libp2p mplexcoder"
@ -32,7 +35,7 @@ type
msgType: MessageType
data: seq[byte]
InvalidMplexMsgType = object of CatchableError
InvalidMplexMsgType* = object of MuxerError
# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream
const MaxMsgSize* = 1 shl 20 # 1mb

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos, chronicles, metrics
import ./coder,
@ -51,11 +53,15 @@ type
writes*: int # In-flight writes
func shortLog*(s: LPChannel): auto =
if s.isNil: "LPChannel(nil)"
elif s.conn.peerInfo.isNil: $s.oid
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}"
try:
if s.isNil: "LPChannel(nil)"
elif s.conn.peerInfo.isNil: $s.oid
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(LPChannel): shortLog(it)
proc open*(s: LPChannel) {.async, gcsafe.} =

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import tables, sequtils, oids
import chronos, chronicles, stew/byteutils, metrics
import ../muxer,
@ -32,8 +34,8 @@ when defined(libp2p_expensive_metrics):
"mplex channels", labels = ["initiator", "peer"])
type
TooManyChannels* = object of CatchableError
InvalidChannelIdError* = object of CatchableError
TooManyChannels* = object of MuxerError
InvalidChannelIdError* = object of MuxerError
Mplex* = ref object of Muxer
channels: array[bool, Table[uint64, LPChannel]]
@ -44,7 +46,9 @@ type
oid*: Oid
maxChannCount: int
func shortLog*(m: MPlex): auto = shortLog(m.connection)
func shortLog*(m: MPlex): auto =
shortLog(m.connection)
chronicles.formatIt(Mplex): shortLog(it)
proc newTooManyChannels(): ref TooManyChannels =
@ -66,17 +70,14 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerInfo.peerId])
except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError, and no other exceptions should
# happen here
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
proc newStreamInternal*(m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
timeout: Duration):
LPChannel {.gcsafe.} =
timeout: Duration): LPChannel
{.gcsafe, raises: [Defect, InvalidChannelIdError].} =
## create new channel/stream
##
let id = if initiator:
@ -117,8 +118,6 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
trace "finished handling stream", m, chann
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Exception in mplex stream handler", m, chann, msg = exc.msg
await chann.reset()
@ -182,8 +181,6 @@ method handle*(m: Mplex) {.async, gcsafe.} =
of MessageType.ResetIn, MessageType.ResetOut:
await channel.reset()
except CancelledError:
# This procedure is spawned as task and it is not part of public API, so
# there no way for this procedure to be cancelled implicitly.
debug "Unexpected cancellation in mplex handler", m
except LPStreamEOFError as exc:
trace "Stream EOF", m, msg = exc.msg

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos, chronicles
import ../protocols/protocol,
../stream/connection,
@ -19,15 +21,17 @@ const
DefaultChanTimeout* = 5.minutes
type
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.}
MuxerError* = object of LPError
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [Defect].}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [Defect].}
Muxer* = ref object of RootObj
streamHandler*: StreamHandler
connection*: Connection
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure.}
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [Defect].}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = ref object of LPProtocol

View File

@ -59,52 +59,89 @@ template postInit(peerinfo: PeerInfo,
if len(protocols) > 0:
peerinfo.protocols = @protocols
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].} =
result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key).tryGet(),
privateKey: key)
result.postInit(addrs, protocols)
proc init*(
p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = [],
protoVersion: string = "",
agentVersion: string = ""): PeerInfo =
let peerInfo = PeerInfo(
keyType: HasPrivate,
peerId: PeerID.init(key).expect("Unable to create peer id from key"),
privateKey: key,
protoVersion: protoVersion,
agentVersion: agentVersion)
proc init*(p: typedesc[PeerInfo],
peerId: PeerID,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo =
result = PeerInfo(keyType: HasPublic, peerId: peerId)
result.postInit(addrs, protocols)
peerInfo.postInit(addrs, protocols)
return peerInfo
proc init*(p: typedesc[PeerInfo],
peerId: string,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId).tryGet())
result.postInit(addrs, protocols)
proc init*(
p: typedesc[PeerInfo],
peerId: PeerID,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = [],
protoVersion: string = "",
agentVersion: string = ""): PeerInfo =
let peerInfo = PeerInfo(
keyType: HasPublic,
peerId: peerId,
protoVersion: protoVersion,
agentVersion: agentVersion)
proc init*(p: typedesc[PeerInfo],
key: PublicKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].}=
result = PeerInfo(keyType: HasPublic,
peerId: PeerID.init(key).tryGet(),
key: some(key))
peerInfo.postInit(addrs, protocols)
return peerInfo
result.postInit(addrs, protocols)
proc init*(
p: typedesc[PeerInfo],
peerId: string,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = [],
protoVersion: string = "",
agentVersion: string = ""): PeerInfo =
proc publicKey*(p: PeerInfo): Option[PublicKey] {.
raises: [Defect, ResultError[CryptoError]].} =
let peerInfo = PeerInfo(
keyType: HasPublic,
peerId: PeerID.init(peerId).expect("Unable to create peer id from string"),
protoVersion: protoVersion,
agentVersion: agentVersion)
peerInfo.postInit(addrs, protocols)
return peerInfo
proc init*(
p: typedesc[PeerInfo],
key: PublicKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = [],
protoVersion: string = "",
agentVersion: string = ""): PeerInfo =
let peerInfo = PeerInfo(
keyType: HasPublic,
peerId: PeerID.init(key).expect("Unable to create peer id from public key"),
key: some(key),
protoVersion: protoVersion,
agentVersion: agentVersion)
peerInfo.postInit(addrs, protocols)
return peerInfo
proc publicKey*(p: PeerInfo): Option[PublicKey] =
var res = none(PublicKey)
if p.keyType == HasPublic:
if p.peerId.hasPublicKey():
var pubKey: PublicKey
if p.peerId.extractPublicKey(pubKey):
result = some(pubKey)
res = some(pubKey)
elif p.key.isSome:
result = p.key
res = p.key
else:
result = some(p.privateKey.getKey().tryGet())
let pkeyRes = p.privateKey.getKey()
if pkeyRes.isOk:
res = some(pkeyRes.get())
return res
func hash*(p: PeerInfo): Hash =
cast[pointer](p).hash

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options
import chronos, chronicles
import ../protobuf/minprotobuf,
@ -16,7 +18,8 @@ import ../protobuf/minprotobuf,
../crypto/crypto,
../multiaddress,
../protocols/protocol,
../utility
../utility,
../errors
logScope:
topics = "libp2p identify"
@ -30,9 +33,10 @@ const
#TODO: implement push identify, leaving out for now as it is not essential
type
IdentifyError* = object of CatchableError
IdentifyError* = object of LPError
IdentityNoMatchError* = object of IdentifyError
IdentityInvalidMsgError* = object of IdentifyError
IdentifyNoPubKeyError* = object of IdentifyError
IdentifyInfo* = object
pubKey*: Option[PublicKey]
@ -45,9 +49,13 @@ type
Identify* = ref object of LPProtocol
peerInfo*: PeerInfo
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: Multiaddress): ProtoBuffer =
proc encodeMsg*(peerInfo: PeerInfo, observedAddr: Multiaddress): ProtoBuffer
{.raises: [Defect, IdentifyNoPubKeyError].} =
result = initProtoBuffer()
result.write(1, peerInfo.publicKey.get().getBytes().tryGet())
if peerInfo.publicKey.isNone:
raise newException(IdentifyNoPubKeyError, "No public key found for peer!")
result.write(1, peerInfo.publicKey.get().getBytes().get())
for ma in peerInfo.addrs:
result.write(2, ma.data.buffer)
for proto in peerInfo.protocols:

View File

@ -7,13 +7,17 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos
import ../stream/connection
type
LPProtoHandler* = proc (conn: Connection,
proto: string):
Future[void] {.gcsafe, closure.}
LPProtoHandler* = proc (
conn: Connection,
proto: string):
Future[void]
{.gcsafe, raises: [Defect].}
LPProtocol* = ref object of RootObj
codecs*: seq[string]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sequtils, sets, hashes, tables]
import chronos, chronicles, metrics, bearssl
import ./pubsub,
@ -201,7 +203,8 @@ method publish*(f: FloodSub,
return peers.len
method initPubSub*(f: FloodSub) =
method initPubSub*(f: FloodSub)
{.raises: [Defect, InitializationError].} =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
var rng = newRng()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics
import ./pubsub,
@ -26,10 +28,9 @@ import ./pubsub,
import stew/results
export results
import gossipsub/[types, scoring, behavior]
export types
export scoring
export behavior
import ./gossipsub/[types, scoring, behavior]
export types, scoring, behavior, pubsub
logScope:
topics = "libp2p gossipsub"
@ -536,13 +537,16 @@ method stop*(g: GossipSub) {.async.} =
trace "heartbeat stopped"
g.heartbeatFut = nil
method initPubSub*(g: GossipSub) =
method initPubSub*(g: GossipSub)
{.raises: [Defect, InitializationError].} =
procCall FloodSub(g).initPubSub()
if not g.parameters.explicit:
g.parameters = GossipSubParams.init()
g.parameters.validateParameters().tryGet()
let validationRes = g.parameters.validateParameters()
if validationRes.isErr:
raise newException(InitializationError, $validationRes.error)
randomize()

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
# {.push raises: [Defect].} TODO compile error on windows due to chronicles?
{.push raises: [Defect].}
import std/[tables, sequtils, sets, algorithm]
import random # for shuffle

View File

@ -1,3 +1,14 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos
import std/[tables, sets]
import ".."/[floodsub, peertable, mcache, pubsubpeer]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sequtils, sets, strutils]
import chronos, chronicles, metrics
import ./pubsubpeer,
@ -68,6 +70,8 @@ declarePublicCounter(libp2p_pubsub_received_graft, "pubsub broadcast graft", lab
declarePublicCounter(libp2p_pubsub_received_prune, "pubsub broadcast prune", labels = ["topic"])
type
InitializationError* = object of LPError
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe, raises: [Defect].}
@ -468,7 +472,8 @@ method publish*(p: PubSub,
return 0
method initPubSub*(p: PubSub) {.base.} =
method initPubSub*(p: PubSub)
{.base, raises: [Defect, InitializationError].} =
## perform pubsub initialization
p.observers = new(seq[PubSubObserver])
if p.msgIdProvider == nil:
@ -538,7 +543,8 @@ proc init*[PubParams: object | bool](
sign: bool = true,
msgIdProvider: MsgIdProvider = defaultMsgIdProvider,
subscriptionValidator: SubscriptionValidator = nil,
parameters: PubParams = false): P =
parameters: PubParams = false): P
{.raises: [Defect, InitializationError].} =
let pubsub =
when PubParams is bool:
P(switch: switch,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[sequtils, strutils, tables, hashes]
import chronos, chronicles, nimcrypto/sha2, metrics
import rpc/[messages, message, protobuf],
@ -40,9 +42,9 @@ type
PubsubPeerEvent* = object
kind*: PubSubPeerEventKind
GetConn* = proc(): Future[Connection] {.gcsafe.}
DropConn* = proc(peer: PubsubPeer) {.gcsafe.} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubsubPeerEvent) {.gcsafe.}
GetConn* = proc(): Future[Connection] {.gcsafe, raises: [Defect].}
DropConn* = proc(peer: PubsubPeer) {.gcsafe, raises: [Defect].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubsubPeerEvent) {.gcsafe, raises: [Defect].}
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
@ -64,7 +66,8 @@ type
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.}
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].}
func hash*(p: PubSubPeer): Hash =
p.peerId.hash

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options, sequtils
import ../../../utility
import ../../../peerid

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options
import stew/assign2
import chronicles

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables]
import chronos/timer

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos
import chronicles
@ -18,8 +20,10 @@ import ../../peerid
import ../../peerinfo
import ../../protobuf/minprotobuf
import ../../utility
import ../../errors
import secure,
../../crypto/[crypto, chacha20poly1305, curve25519, hkdf]
../../crypto/[crypto, chacha20poly1305, curve25519, hkdf]
when defined(libp2p_dump):
import ../../debugutils
@ -85,17 +89,22 @@ type
readCs: CipherState
writeCs: CipherState
NoiseHandshakeError* = object of CatchableError
NoiseDecryptTagError* = object of CatchableError
NoiseOversizedPayloadError* = object of CatchableError
NoiseNonceMaxError* = object of CatchableError # drop connection on purpose
NoiseError* = object of LPError
NoiseHandshakeError* = object of NoiseError
NoiseDecryptTagError* = object of NoiseError
NoiseOversizedPayloadError* = object of NoiseError
NoiseNonceMaxError* = object of NoiseError # drop connection on purpose
# Utility
func shortLog*(conn: NoiseConnection): auto =
if conn.isNil: "NoiseConnection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
try:
if conn.isNil: "NoiseConnection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(NoiseConnection): shortLog(it)
proc genKeyPair(rng: var BrHmacDrbgContext): KeyPair =
@ -122,8 +131,11 @@ proc hasKey(cs: CipherState): bool =
cs.k != EmptyKey
proc encrypt(
state: var CipherState, data: var openArray[byte],
ad: openArray[byte]): ChaChaPolyTag {.noinit.} =
state: var CipherState,
data: var openArray[byte],
ad: openArray[byte]): ChaChaPolyTag
{.noinit, raises: [Defect, NoiseNonceMaxError].} =
var nonce: ChaChaPolyNonce
nonce[4..<12] = toBytesLE(state.n)
@ -133,7 +145,8 @@ proc encrypt(
if state.n > NonceMax:
raise newException(NoiseNonceMaxError, "Noise max nonce value reached")
proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte] =
proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseNonceMaxError].} =
result = newSeqOfCap[byte](data.len + sizeof(ChachaPolyTag))
result.add(data)
@ -144,7 +157,8 @@ proc encryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
trace "encryptWithAd",
tag = byteutils.toHex(tag), data = result.shortLog, nonce = state.n - 1
proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte] =
proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseDecryptTagError, NoiseNonceMaxError].} =
var
tagIn = data.toOpenArray(data.len - ChaChaPolyTag.len, data.high).intoChaChaPolyTag
tagOut: ChaChaPolyTag
@ -192,7 +206,8 @@ proc mixKeyAndHash(ss: var SymmetricState; ikm: openArray[byte]) {.used.} =
ss.mixHash(temp_keys[1])
ss.cs = CipherState(k: temp_keys[2])
proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseNonceMaxError].} =
# according to spec if key is empty leave plaintext
if ss.cs.hasKey:
result = ss.cs.encryptWithAd(ss.h.data, data)
@ -200,7 +215,8 @@ proc encryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
result = @data
ss.mixHash(result)
proc decryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte] =
proc decryptAndHash(ss: var SymmetricState, data: openArray[byte]): seq[byte]
{.raises: [Defect, NoiseDecryptTagError, NoiseNonceMaxError].} =
# according to spec if key is empty leave plaintext
if ss.cs.hasKey:
result = ss.cs.decryptWithAd(ss.h.data, data)
@ -429,7 +445,10 @@ method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} =
proc encryptFrame(
sconn: NoiseConnection, cipherFrame: var openArray[byte], src: openArray[byte]) =
sconn: NoiseConnection,
cipherFrame: var openArray[byte],
src: openArray[byte])
{.raises: [Defect, NoiseNonceMaxError].} =
# Frame consists of length + cipher data + tag
doAssert src.len <= MaxPlainSize
doAssert cipherFrame.len == 2 + src.len + sizeof(ChaChaPolyTag)
@ -581,12 +600,19 @@ proc newNoise*(
privateKey: PrivateKey;
outgoing: bool = true;
commonPrologue: seq[byte] = @[]): Noise =
result = Noise(
let pkBytes = privateKey.getKey()
.expect("Expected valid private key")
.getBytes()
.expect("Couldn't get key bytes")
var noise = Noise(
rng: rng,
outgoing: outgoing,
localPrivateKey: privateKey,
localPublicKey: privateKey.getKey().tryGet().getBytes().tryGet(),
localPublicKey: pkBytes,
noiseKeys: genKeyPair(rng[]),
commonPrologue: commonPrologue,
)
result.init()
noise.init()
return noise

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import chronos
import secure, ../../stream/connection

View File

@ -6,6 +6,9 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos, chronicles, stew/endians2, bearssl
import nimcrypto/[hmac, sha2, sha, hash, rijndael, twofish, bcmode]
@ -15,7 +18,8 @@ import secure,
../../crypto/crypto,
../../crypto/ecnist,
../../peerid,
../../utility
../../utility,
../../errors
export hmac, sha2, sha, hash, rijndael, bcmode
@ -68,12 +72,16 @@ type
writerCoder: SecureCipher
readerCoder: SecureCipher
SecioError* = object of CatchableError
SecioError* = object of LPError
func shortLog*(conn: SecioConn): auto =
if conn.isNil: "SecioConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
try:
if conn.isNil: "SecioConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(SecioConn): shortLog(it)
proc init(mac: var SecureMac, hash: string, key: openarray[byte]) =
@ -250,7 +258,8 @@ proc newSecioConn(conn: Connection,
cipher: string,
secrets: Secret,
order: int,
remotePubKey: PublicKey): SecioConn =
remotePubKey: PublicKey): SecioConn
{.raises: [Defect, SecioError].} =
## Create new secure stream/lpstream, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``.
@ -259,7 +268,7 @@ proc newSecioConn(conn: Connection,
if conn.peerInfo != nil:
conn.peerInfo
else:
PeerInfo.init(PeerID.init(remotePubKey).tryGet())
PeerInfo.init(remotePubKey)
result = SecioConn.init(conn, conn.peerInfo, conn.observedAddr)
@ -423,9 +432,13 @@ method init(s: Secio) {.gcsafe.} =
s.codec = SecioCodec
proc newSecio*(rng: ref BrHmacDrbgContext, localPrivateKey: PrivateKey): Secio =
let pkRes = localPrivateKey.getKey()
if pkRes.isErr:
raise newException(Defect, "Can't fetch local private key")
result = Secio(
rng: rng,
localPrivateKey: localPrivateKey,
localPublicKey: localPrivateKey.getKey().tryGet(),
localPublicKey: localPrivateKey.getKey().get(),
)
result.init()

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[options, strformat]
import chronos, chronicles, bearssl
import ../protocol,
@ -32,9 +34,13 @@ type
buf: StreamSeq
func shortLog*(conn: SecureConn): auto =
if conn.isNil: "SecureConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
try:
if conn.isNil: "SecureConn(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(SecureConn): shortLog(it)
proc init*(T: type SecureConn,

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/strformat
import stew/byteutils
import chronos, chronicles, metrics
@ -34,9 +36,13 @@ type
returnedEof*: bool # 0-byte readOnce has been completed
func shortLog*(s: BufferStream): auto =
if s.isNil: "BufferStream(nil)"
elif s.peerInfo.isNil: $s.oid
else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}"
try:
if s.isNil: "BufferStream(nil)"
elif s.peerInfo.isNil: $s.oid
else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(BufferStream): shortLog(it)
proc len*(s: BufferStream): int =
@ -190,14 +196,17 @@ method closeImpl*(s: BufferStream): Future[void] =
# ------------|----------|-------
# Reading | Push Eof | Na
# Pushing | Na | Pop
if not(s.reading and s.pushing):
if s.reading:
if s.readQueue.empty():
# There is an active reader
s.readQueue.addLastNoWait(Eof)
elif s.pushing:
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
try:
if not(s.reading and s.pushing):
if s.reading:
if s.readQueue.empty():
# There is an active reader
s.readQueue.addLastNoWait(Eof)
elif s.pushing:
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
except AsyncQueueFullError, AsyncQueueEmptyError:
raise newException(Defect, getCurrentExceptionMsg())
trace "Closed BufferStream", s

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, strformat]
import chronos, chronicles, metrics
import connection
@ -31,10 +33,14 @@ when defined(libp2p_agents_metrics):
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
func shortLog*(conn: ChronosStream): string =
if conn.isNil: "ChronosStream(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
func shortLog*(conn: ChronosStream): auto =
try:
if conn.isNil: "ChronosStream(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(ChronosStream): shortLog(it)
method initStream*(s: ChronosStream) =
@ -126,10 +132,10 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.tracked:
libp2p_peers_traffic_write.inc(msg.len.int64, labelValues = [s.shortAgent])
method closed*(s: ChronosStream): bool {.raises: [Defect].} =
method closed*(s: ChronosStream): bool =
result = s.client.closed
method atEof*(s: ChronosStream): bool {.raises: [Defect].} =
method atEof*(s: ChronosStream): bool =
s.client.atEof()
method closeImpl*(s: ChronosStream) {.async.} =

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[hashes, oids, strformat]
import chronicles, chronos, metrics
import lpstream,
@ -23,7 +25,7 @@ const
DefaultConnectionTimeout* = 5.minutes
type
TimeoutHandler* = proc(): Future[void] {.gcsafe.}
TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [Defect].}
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
@ -54,9 +56,13 @@ proc onUpgrade*(s: Connection) {.async.} =
await s.upgraded
func shortLog*(conn: Connection): string =
if conn.isNil: "Connection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
try:
if conn.isNil: "Connection(nil)"
elif conn.peerInfo.isNil: $conn.oid
else: &"{shortLog(conn.peerInfo.peerId)}:{conn.oid}"
except ValueError as exc:
raiseAssert(exc.msg)
chronicles.formatIt(Connection): shortLog(it)
method initStream*(s: Connection) =

View File

@ -7,12 +7,15 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/oids
import stew/byteutils
import chronicles, chronos, metrics
import ../varint,
../peerinfo,
../multiaddress
../multiaddress,
../errors
declareGauge(libp2p_open_streams,
"open stream instances", labels = ["type", "dir"])
@ -39,7 +42,7 @@ type
dir*: Direction
closedWithEOF: bool # prevent concurrent calls
LPStreamError* = object of CatchableError
LPStreamError* = object of LPError
LPStreamIncompleteError* = object of LPStreamError
LPStreamIncorrectDefect* = object of Defect
LPStreamLimitError* = object of LPStreamError
@ -96,19 +99,19 @@ proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
w.par = p
result = w
proc newLPStreamIncompleteError*(): ref CatchableError =
proc newLPStreamIncompleteError*(): ref LPStreamIncompleteError =
result = newException(LPStreamIncompleteError, "Incomplete data received")
proc newLPStreamLimitError*(): ref CatchableError =
proc newLPStreamLimitError*(): ref LPStreamLimitError =
result = newException(LPStreamLimitError, "Buffer limit reached")
proc newLPStreamIncorrectDefect*(m: string): ref Defect =
proc newLPStreamIncorrectDefect*(m: string): ref LPStreamIncorrectDefect =
result = newException(LPStreamIncorrectDefect, m)
proc newLPStreamEOFError*(): ref CatchableError =
proc newLPStreamEOFError*(): ref LPStreamEOFError =
result = newException(LPStreamEOFError, "Stream EOF!")
proc newLPStreamClosedError*(): ref Exception =
proc newLPStreamClosedError*(): ref LPStreamClosedError =
result = newException(LPStreamClosedError, "Stream Closed!")
func shortLog*(s: LPStream): auto =
@ -130,17 +133,17 @@ method initStream*(s: LPStream) {.base.} =
proc join*(s: LPStream): Future[void] =
s.closeEvent.wait()
method closed*(s: LPStream): bool {.base, raises: [Defect].} =
method closed*(s: LPStream): bool {.base.} =
s.isClosed
method atEof*(s: LPStream): bool {.base, raises: [Defect].} =
method atEof*(s: LPStream): bool {.base.} =
s.isEof
method readOnce*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[int]
{.base, async.} =
method readOnce*(
s: LPStream,
pbytes: pointer,
nbytes: int):
Future[int] {.base, async.} =
doAssert(false, "not implemented!")
proc readExactly*(s: LPStream,
@ -236,7 +239,7 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
await s.readExactly(addr res[0], res.len)
return res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, raises: [Defect].} =
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} =
doAssert(false, "not implemented!")
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] =

View File

@ -1,7 +1,7 @@
import stew/bitops2
{.push raises: [Defect].}
import stew/bitops2
type
StreamSeq* = object
# Seq adapted to the stream use case where we add data at the back and

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables,
options,
sets,
@ -54,7 +56,7 @@ const
ConcurrentUpgrades* = 4
type
DialFailedError* = object of CatchableError
DialFailedError* = object of LPError
Switch* = ref object of RootObj
peerInfo*: PeerInfo
@ -276,7 +278,8 @@ proc dial*(s: Switch,
proto: string):
Future[Connection] = dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe.} =
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
{.gcsafe, raises: [Defect, CatchableError].} =
if isNil(proto.handler):
raise newException(CatchableError,
"Protocol has to define a handle method or proc")
@ -404,7 +407,9 @@ proc newSwitch*(peerInfo: PeerInfo,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch =
maxConnsPerPeer = MaxConnectionsPerPeer): Switch
{.raises: [Defect, CatchableError].} =
if secureManagers.len == 0:
raise (ref CatchableError)(msg: "Provide at least one secure manager")

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[oids, sequtils]
import chronos, chronicles
import transport,
@ -203,7 +205,9 @@ method dial*(t: TcpTransport,
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):
return address.protocols
.tryGet()
.filterIt( it == multiCodec("tcp") )
.len > 0
if address.protocols.isOk:
return address.protocols
.get()
.filterIt(
it == multiCodec("tcp")
).len > 0

View File

@ -8,6 +8,8 @@
## those terms.
##
{.push raises: [Defect].}
import sequtils
import chronos, chronicles
import ../stream/connection,
@ -77,7 +79,8 @@ method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
# by default we skip circuit addresses to avoid
# having to repeat the check in every transport
address.protocols.tryGet().filterIt( it == multiCodec("p2p-circuit") ).len == 0
if address.protocols.isOk:
return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0
method localAddress*(t: Transport): MultiAddress {.base, gcsafe.} =
## get the local address of the transport in case started with 0.0.0.0:0

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[tables, sequtils]
import pkg/[chronos, chronicles, metrics]
@ -15,6 +17,9 @@ import ../upgrademngrs/upgrade,
export Upgrade
logScope:
topics = "libp2p muxedupgrade"
type
MuxedUpgrade* = ref object of Upgrade
muxers*: Table[string, MuxerProvider]

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import std/[options, sequtils]
import pkg/[chronos, chronicles, metrics]
@ -14,14 +16,18 @@ import ../stream/connection,
../protocols/secure/secure,
../protocols/identify,
../multistream,
../connmanager
../connmanager,
../errors
export connmanager, connection, identify, secure, multistream
declarePublicCounter(libp2p_failed_upgrade, "peers failed upgrade")
logScope:
topics = "libp2p upgrade"
type
UpgradeFailedError* = object of CatchableError
UpgradeFailedError* = object of LPError
Upgrade* = ref object of RootObj
ms*: MultistreamSelect

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import sequtils
import chronos, chronicles

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
## This module implements wire network connection procedures.
import chronos, stew/endians2
import multiaddress, multicodec
@ -28,11 +30,13 @@ const
mapAnd(mapEq("unix"))
)
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress] {.
raises: [Defect, ResultError[string]] .} =
proc initTAddress*(ma: MultiAddress): MaResult[TransportAddress]
{.raises: [Defect, ResultError[string]].} =
## Initialize ``TransportAddress`` with MultiAddress ``ma``.
##
## MultiAddress must be wire address, e.g. ``{IP4, IP6, UNIX}/{TCP, UDP}``.
##
if TRANSPMA.match(ma):
var pbuf: array[2, byte]
let code = ma[0].tryGet().protoCode().tryGet()
@ -85,14 +89,25 @@ proc createStreamServer*[T](ma: MultiAddress,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer =
init: TransportInitCallback = nil): StreamServer
{.raises: [Defect, MaInvalidAddress].} =
## Create new TCP stream server which bounds to ``ma`` address.
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let address = initTAddress(ma)
result = createStreamServer(address.tryGet(), cbproc, flags, udata, sock,
backlog, bufferSize, child, init)
let address = try:
initTAddress(ma)
except ResultError[string] as exc:
raise newException(Defect, exc.msg)
if address.isErr:
raise newException(MaInvalidAddress, address.error)
try:
return createStreamServer(address.get(), cbproc, flags, udata, sock,
backlog, bufferSize, child, init)
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc createStreamServer*[T](ma: MultiAddress,
flags: set[ServerFlags] = {},
@ -101,16 +116,27 @@ proc createStreamServer*[T](ma: MultiAddress,
backlog: int = 100,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil): StreamServer =
init: TransportInitCallback = nil): StreamServer
{.raises: [Defect, MaInvalidAddress].} =
## Create new TCP stream server which bounds to ``ma`` address.
##
if not(RTRANSPMA.match(ma)):
raise newException(MaInvalidAddress, "Incorrect or unsupported address!")
let address = initTAddress(ma)
result = createStreamServer(address.tryGet(), flags, udata, sock, backlog,
bufferSize, child, init)
let address = try:
initTAddress(ma)
except ResultError[string] as exc:
raise newException(Defect, exc.msg)
proc createAsyncSocket*(ma: MultiAddress): AsyncFD =
try:
return createStreamServer(address.get(), flags, udata, sock, backlog,
bufferSize, child, init)
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc createAsyncSocket*(ma: MultiAddress): AsyncFD
{.raises: [Defect, ResultError[string]].} =
## Create new asynchronous socket using MultiAddress' ``ma`` socket type and
## protocol information.
##
@ -126,7 +152,6 @@ proc createAsyncSocket*(ma: MultiAddress): AsyncFD =
return asyncInvalidSocket
let address = maddr.tryGet()
if address.family in {AddressFamily.IPv4, AddressFamily.IPv6}:
if ma[1].tryGet().protoCode().tryGet() == multiCodec("udp"):
socktype = SockType.SOCK_DGRAM
@ -139,9 +164,14 @@ proc createAsyncSocket*(ma: MultiAddress): AsyncFD =
protocol = cast[Protocol](0)
else:
return asyncInvalidSocket
result = createAsyncSocket(address.getDomain(), socktype, protocol)
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool =
try:
createAsyncSocket(address.getDomain(), socktype, protocol)
except CatchableError as exc:
raise newException(Defect, exc.msg)
proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool
{.raises: [Defect, ResultError[string]].} =
## Bind socket ``sock`` to MultiAddress ``ma``.
##
## Note: This procedure only used in `go-libp2p-daemon` wrapper.
@ -153,7 +183,7 @@ proc bindAsyncSocket*(sock: AsyncFD, ma: MultiAddress): bool =
if maddr.isErr():
return false
let address = maddr.tryGet()
let address = maddr.get()
toSAddr(address, saddr, slen)
if bindSocket(SocketHandle(sock), cast[ptr SockAddr](addr saddr),
slen) == 0:

24
tests/asyncunit.nim Normal file
View File

@ -0,0 +1,24 @@
import unittest2
export unittest2
template asyncTeardown*(body: untyped): untyped =
teardown:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncSetup*(body: untyped): untyped =
setup:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe.} =
body
)())

View File

@ -1,4 +1,4 @@
import std/unittest
{.push raises: [Defect].}
import chronos, bearssl
@ -9,7 +9,8 @@ import ../libp2p/stream/lpstream
import ../libp2p/muxers/mplex/lpchannel
import ../libp2p/protocols/secure/secure
export unittest
import ./asyncunit
export asyncunit
const
StreamTransportTrackerName = "stream.transport"
@ -48,27 +49,6 @@ template checkTrackers*() =
# Also test the GC is not fooling with us
GC_fullCollect()
template asyncTeardown*(body: untyped): untyped =
teardown:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncSetup*(body: untyped): untyped =
setup:
waitFor((
proc() {.async, gcsafe.} =
body
)())
template asyncTest*(name: string, body: untyped): untyped =
test name:
waitFor((
proc() {.async, gcsafe.} =
body
)())
type RngWrap = object
rng: ref BrHmacDrbgContext
@ -99,7 +79,7 @@ proc newBufferStream*(writeHandler: WriteHandler): TestBufferStream =
result.writeHandler = writeHandler
result.initStream()
proc checkExpiringInternal(cond: proc(): bool): Future[bool] {.async, gcsafe.} =
proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} =
{.gcsafe.}:
let start = Moment.now()
while true:

View File

@ -9,7 +9,7 @@
{.used.}
import unittest, sequtils, options, tables, sets
import sequtils, options, tables, sets
import chronos, stew/byteutils
import utils,
../../libp2p/[errors,

View File

@ -3,7 +3,7 @@ include ../../libp2p/protocols/pubsub/gossipsub
{.used.}
import options
import unittest, bearssl
import bearssl
import stew/byteutils
import ../../libp2p/builders
import ../../libp2p/errors
@ -34,7 +34,10 @@ proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): PubSubPeer =
pubSubPeer
proc randomPeerInfo(): PeerInfo =
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
try:
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
except CatchableError as exc:
raise newException(Defect, exc.msg)
suite "GossipSub internal":
teardown:

View File

@ -9,7 +9,7 @@
{.used.}
import unittest, sequtils, options, tables, sets
import sequtils, options, tables, sets
import chronos, stew/byteutils
import chronicles
import utils, ../../libp2p/[errors,

View File

@ -9,7 +9,7 @@
{.used.}
import unittest, sequtils, options, tables, sets
import sequtils, options, tables, sets
import chronos, stew/byteutils
import chronicles
import utils, ../../libp2p/[errors,

View File

@ -1,6 +1,6 @@
{.used.}
import unittest, options, sets, sequtils, bearssl
import unittest2, options, sets, sequtils, bearssl
import stew/byteutils
import ../../libp2p/[peerid,
crypto/crypto,

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
{.used.}

View File

@ -1,7 +1,6 @@
{.used.}
import testgossipinternal,
testfloodsub,
import testfloodsub,
testgossipsub,
testgossipsub2,
testmcache,

View File

@ -1,6 +1,6 @@
{.used.}
import std/unittest
import unittest2
import chronos/timer
import ../../libp2p/protocols/pubsub/timedcache

View File

@ -1,4 +1,3 @@
import unittest
import chronos, stew/byteutils
import ../libp2p/stream/bufferstream,
../libp2p/stream/lpstream,

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
import ../libp2p/[cid, multihash, multicodec]
when defined(nimHasUsed): {.used.}

View File

@ -1,4 +1,3 @@
import unittest
import chronos, nimcrypto/utils
import ../libp2p/[stream/connection,
stream/bufferstream]

View File

@ -1,4 +1,4 @@
import unittest, sequtils
import sequtils
import chronos
import ../libp2p/[connmanager,
stream/connection,

View File

@ -9,7 +9,7 @@
## Test vectors was made using Go implementation
## https://github.com/libp2p/go-libp2p-crypto/blob/master/key.go
import unittest
import unittest2
import nimcrypto/[utils, sysrand]
import ../libp2p/crypto/[crypto, chacha20poly1305, curve25519, hkdf]

View File

@ -1,5 +1,4 @@
import unittest
import chronos
import chronos, unittest2
import ../libp2p/daemon/daemonapi, ../libp2p/multiaddress, ../libp2p/multicodec,
../libp2p/cid, ../libp2p/multihash, ../libp2p/peerid

View File

@ -6,7 +6,7 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import unittest
import unittest2
import nimcrypto/utils
import ../libp2p/crypto/[crypto, ecnist]
import stew/results

View File

@ -8,7 +8,7 @@
## those terms.
## Test vectors are from RFC 8032 (https://tools.ietf.org/html/rfc8032)
import unittest
import unittest2
import nimcrypto/utils
import ../libp2p/crypto/crypto
import ../libp2p/crypto/ed25519/ed25519

View File

@ -1,4 +1,4 @@
import unittest, options, bearssl
import options, bearssl
import chronos, strutils
import ../libp2p/[protocols/identify,
multiaddress,

View File

@ -1,5 +1,4 @@
import options, tables
import unittest
import chronos, chronicles, stew/byteutils
import helpers
import ../libp2p/[daemon/daemonapi,

View File

@ -6,7 +6,7 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import unittest
import unittest2
import ../libp2p/crypto/minasn1
import nimcrypto/utils as ncrutils

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import unittest
import unittest2
import ../libp2p/protobuf/minprotobuf
import stew/byteutils, strutils

View File

@ -1,4 +1,4 @@
import unittest, strformat, strformat, random, oids, sequtils
import strformat, strformat, random, oids, sequtils
import chronos, nimcrypto/utils, chronicles, stew/byteutils
import ../libp2p/[errors,
stream/connection,

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
import ../libp2p/[multicodec, multiaddress]
when defined(nimHasUsed): {.used.}

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
import ../libp2p/multibase
import stew/results

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
import nimcrypto/utils
import ../libp2p/multihash

View File

@ -1,4 +1,4 @@
import unittest, strutils, strformat, stew/byteutils
import strutils, strformat, stew/byteutils
import chronos
import ../libp2p/errors,
../libp2p/multistream,

View File

@ -9,7 +9,7 @@
{.used.}
import unittest, tables, bearssl
import tables, bearssl
import chronos, stew/byteutils
import chronicles
import ../libp2p/crypto/crypto

View File

@ -9,7 +9,7 @@
## Test vectors was made using Go implementation
## https://github.com/libp2p/go-libp2p-peer
import unittest
import unittest2
import nimcrypto/utils, stew/base58
import ../libp2p/crypto/crypto, ../libp2p/peerid

View File

@ -1,6 +1,6 @@
{.used.}
import unittest, options, bearssl
import options, bearssl
import chronos
import ../libp2p/crypto/crypto,
../libp2p/peerinfo,
@ -35,7 +35,11 @@ suite "PeerInfo":
check seckey.getKey.get() == peerInfo.publicKey.get()
test "Should init from CIDv0 string":
var peerInfo = PeerInfo.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N")
var peerInfo: PeerInfo
try:
peerInfo = PeerInfo.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N")
except CatchableError:
check false
check:
PeerID.init("QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N").get() == peerInfo.peerId

View File

@ -1,5 +1,5 @@
import
std/[unittest, tables, sequtils, sets],
std/[unittest2, tables, sequtils, sets],
../libp2p/crypto/crypto,
../libp2p/multiaddress,
../libp2p/peerid,

View File

@ -8,7 +8,7 @@
## those terms.
when defined(nimHasUsed): {.used.}
import unittest
import unittest2
import ../libp2p/crypto/crypto
import nimcrypto/utils

View File

@ -6,7 +6,7 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import unittest
import unittest2
import nimcrypto/utils
import ../libp2p/crypto/[crypto, rsa]

View File

@ -6,7 +6,7 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import unittest, bearssl
import unittest2, bearssl
import ../libp2p/crypto/[crypto, secp]
import nimcrypto/utils

View File

@ -1,6 +1,6 @@
{.used.}
import unittest
import unittest2
import stew/byteutils
import ../libp2p/stream/streamseq

View File

@ -1,6 +1,6 @@
{.used.}
import unittest, options, sequtils
import options, sequtils
import chronos
import stew/byteutils
import nimcrypto/sysrand

View File

@ -1,6 +1,6 @@
{.used.}
import unittest, sequtils
import sequtils
import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,

View File

@ -1,4 +1,4 @@
import unittest
import unittest2
import ../libp2p/varint
when defined(nimHasUsed): {.used.}