Merge remote-tracking branch 'origin/master' into gossip-one-one

This commit is contained in:
Giovanni Petrantoni 2020-08-08 16:53:14 +09:00
commit 4dcc3b6dda
37 changed files with 1024 additions and 777 deletions

48
.github/workflows/nbc.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: NBC Bump PR
on:
push:
branches:
- master
jobs:
bumpNBC:
runs-on: ubuntu-latest
steps:
- uses: status-im/github-app-token@v1
name: Generate token
id: generate-token
with:
app_id: ${{ secrets.BUMP_BOT_APP_ID }}
private_key: ${{ secrets.BUMP_BOT_APP_PRIVATE_KEY }}
- name: Clone NBC
uses: actions/checkout@v2
with:
repository: status-im/nim-beacon-chain
ref: devel
path: nbc
submodules: true
fetch-depth: 0
- name: Checkout this ref
run: |
cd nbc/vendor/nim-libp2p
git checkout $GITHUB_SHA
- name: Commit this bump
run: |
cd nbc
git config --global user.email "${{ github.actor }}@users.noreply.github.com"
git config --global user.name = "${{ github.actor }}"
git commit -a -m "auto-bump nim-libp2p"
- name: Make PR
uses: status-im/create-pull-request@v3
with:
branch: nim-libp2p-auto-bump
path: nbc
token: ${{ steps.generate-token.outputs.token }}
title: nim-libp2p auto bump

View File

@ -23,7 +23,7 @@ An implementation of [libp2p](https://libp2p.io/) in Nim. Also provides a Nim wr
## Project Status
The current native Nim libp2p implementation support is experimental and shouldn't be relied on for production use. It is under active development and contributions are highly welcomed. :)
Check our [examples folder](/examples) to get started!
Check our [examples folder](/examples) to get started!
# Table of Contents
- [Background](#background)
@ -38,24 +38,24 @@ Check our [examples folder](/examples) to get started!
- [Tests](#tests)
- [Packages](#packages)
- [Contribute](#contribute)
- [Core Developers](#core-developers)
- [Core Developers](#core-developers)
- [License](#license)
## Background
## Background
libp2p is a networking stack and library modularized out of [The IPFS Project](https://github.com/ipfs/ipfs), and bundled separately for other tools to use.
libp2p is the product of a long and arduous quest of understanding; a deep dive into the internet's network stack and the peer-to-peer protocols from the past. Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It is a "network stack", a suite of networking protocols that cleanly separates concerns and enables sophisticated applications to only use the protocols they absolutely need, without giving up interoperability and upgradeability.
libp2p is the product of a long and arduous quest of understanding; a deep dive into the internet's network stack and the peer-to-peer protocols from the past. Building large scale peer-to-peer systems has been complex and difficult in the last 15 years and libp2p is a way to fix that. It is a "network stack", a suite of networking protocols that cleanly separates concerns and enables sophisticated applications to only use the protocols they absolutely need, without giving up interoperability and upgradeability.
libp2p grew out of IPFS, but it is built so that lots of people can use it, for lots of different projects.
- Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow our evolving documentation efforts at [**docs.libp2p.io**](https://docs.libp2p.io).
- [Here](https://github.com/libp2p/libp2p#description) is an overview of libp2p and its implementations in other programming languages.
- Learn more about libp2p at [**libp2p.io**](https://libp2p.io) and follow our evolving documentation efforts at [**docs.libp2p.io**](https://docs.libp2p.io).
- [Here](https://github.com/libp2p/libp2p#description) is an overview of libp2p and its implementations in other programming languages.
## Install
```
nimble install libp2p
```
### Prerequisite
### Prerequisite
- [Nim](https://nim-lang.org/install.html)
## Usage
@ -64,18 +64,19 @@ nimble install libp2p
The specification is available in the [docs/api](docs/api) folder.
### Getting Started
Please read the [GETTING_STARTED.md](docs/GETTING_STARTED.md) guide.
Please read the [GETTING_STARTED.md](docs/GETTING_STARTED.md) guide.
### Tutorials and Examples
### Tutorials and Examples
Example code can be found in the [examples folder](/examples).
#### Direct Chat Tutorial
- [Part I](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-1/): Set up the main function and use multi-thread for processing IO.
- [Part II](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-2/): Dial remote peer and allow customized user input commands.
- [Part III](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-3/): Configure and establish a libp2p node.
- [Part III](https://our.status.im/nim-libp2p-tutorial-a-peer-to-peer-chat-example-3/): Configure and establish a libp2p node.
### Using the Go Daemon
Please find the installation and usage intructions in [daemonapi.md](docs/api/libp2p/daemonapi.md).
Please find the installation and usage intructions in [daemonapi.md](docs/api/libp2p/daemonapi.md).
Examples can be found in the [examples/go-daemon folder](https://github.com/status-im/nim-libp2p/tree/readme/examples/go-daemon);
@ -88,10 +89,10 @@ cd nim-libp2p
nimble install
```
### Tests
### Tests
#### Prerequisite
- [Go 1.12+](https://golang.org/dl/)
- [Go 1.12+](https://golang.org/dl/)
#### Run unit tests
```sh
@ -99,7 +100,7 @@ nimble install
nimble test
```
### Packages
### Packages
List of packages currently in existence for nim-libp2p:
@ -131,7 +132,7 @@ List of packages currently in existence for nim-libp2p:
- [libp2p-gossipsub](https://github.com/status-im/nim-libp2p/blob/master/libp2p/protocols/pubsub/gossipsub.nim)
Packages that exist in the original libp2p specs and are under active development:
Packages that exist in the original libp2p specs and are under active development:
- libp2p-daemon
- libp2p-webrtc-direct
- libp2p-webrtc-star
@ -146,16 +147,23 @@ Packages that exist in the original libp2p specs and are under active developmen
- libp2p-nat-mgnr
- libp2p-utils
** Note that the current stack reflects the minimal requirements for the upcoming Eth2 implementation.
** Note that the current stack reflects the minimal requirements for the upcoming Eth2 implementation.
### Tips and tricks
- enable expensive metrics:
```bash
nim c -d:libp2p_expensive_metrics some_file.nim
```
## Contribute
The libp2p implementation in Nim is a work in progress. We welcome contributors to help out! Specifically, you can:
- Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it.
- **Perform code reviews**. Feel free to let us know if you found anything that can a) speed up the project development b) ensure better quality and c) reduce possible future bugs.
- Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it.
- **Perform code reviews**. Feel free to let us know if you found anything that can a) speed up the project development b) ensure better quality and c) reduce possible future bugs.
- **Add tests**. Help nim-libp2p to be more robust by adding more tests to the [tests folder](https://github.com/status-im/nim-libp2p/tree/master/tests).
### Core Developers
### Core Developers
[@cheatfate](https://github.com/cheatfate), [Dmitriy Ryajov](https://github.com/dryajov), [Giovanni Petrantoni](https://github.com/sinkingsugar), [Zahary Karadjov](https://github.com/zah)
## License
@ -168,4 +176,5 @@ or
* Apache License, Version 2.0, ([LICENSE-APACHEv2](LICENSE-APACHEv2) or http://www.apache.org/licenses/LICENSE-2.0)
at your option. This file may not be copied, modified, or distributed except according to those terms.
at your option. These files may not be copied, modified, or distributed except according to those terms.

View File

@ -24,7 +24,7 @@ steps:
- task: CacheBeta@1
displayName: 'cache Nim binaries'
inputs:
key: NimBinaries | $(Agent.OS) | $(PLATFORM) | "$(Build.SourceBranchName)" | "v4"
key: NimBinaries | $(Agent.OS) | $(PLATFORM) | "$(Build.SourceBranchName)" | "v5"
path: NimBinaries
- task: CacheBeta@1

View File

@ -7,7 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import tables, sequtils, sets
import std/[options, tables, sequtils, sets]
import chronos, chronicles, metrics
import peerinfo,
stream/connection,
@ -31,7 +31,6 @@ type
# copies and mangling by unrelated code.
conns: Table[PeerID, HashSet[Connection]]
muxed: Table[Connection, MuxerHolder]
cleanUpLock: Table[PeerInfo, AsyncLock]
maxConns: int
proc newTooManyConnections(): ref TooManyConnections {.inline.} =
@ -54,9 +53,6 @@ proc contains*(c: ConnManager, conn: Connection): bool =
if isNil(conn.peerInfo):
return
if conn.peerInfo.peerId notin c.conns:
return
return conn in c.conns[conn.peerInfo.peerId]
proc contains*(c: ConnManager, peerId: PeerID): bool =
@ -79,9 +75,24 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
return muxer == c.muxed[conn].muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "cleaning up muxer for peer"
await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)):
await muxerHolder.handle # TODO noraises?
proc delConn(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId
if peerId in c.conns:
c.conns[peerId].excl(conn)
if c.conns[peerId].len == 0:
c.conns.del(peerId)
libp2p_peers.set(c.conns.len.int64)
proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
## clean connection's resources such as muxers and streams
##
if isNil(conn):
return
@ -89,37 +100,20 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
if isNil(conn.peerInfo):
return
let peerInfo = conn.peerInfo
let lock = c.cleanUpLock.mgetOrPut(peerInfo, newAsyncLock())
# Remove connection from all tables without async breaks
var muxer = some(MuxerHolder())
if not c.muxed.pop(conn, muxer.get()):
muxer = none(MuxerHolder)
delConn(c, conn)
try:
await lock.acquire()
trace "cleaning up connection for peer", peer = $peerInfo
if conn in c.muxed:
let muxerHolder = c.muxed[conn]
c.muxed.del(conn)
await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)):
await muxerHolder.handle
if peerInfo.peerId in c.conns:
c.conns[peerInfo.peerId].excl(conn)
if c.conns[peerInfo.peerId].len == 0:
c.conns.del(peerInfo.peerId)
if not(conn.peerInfo.isClosed()):
conn.peerInfo.close()
if muxer.isSome:
await closeMuxerHolder(muxer.get())
finally:
await conn.close()
libp2p_peers.set(c.conns.len.int64)
if lock.locked():
lock.release()
trace "connection cleaned up"
trace "connection cleaned up", peer = $conn.peerInfo
proc onClose(c: ConnManager, conn: Connection) {.async.} =
## connection close even handler
@ -132,32 +126,25 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
await c.cleanupConn(conn)
proc selectConn*(c: ConnManager,
peerInfo: PeerInfo,
peerId: PeerID,
dir: Direction): Connection =
## Select a connection for the provided peer and direction
##
if isNil(peerInfo):
return
let conns = toSeq(
c.conns.getOrDefault(peerInfo.peerId))
c.conns.getOrDefault(peerId))
.filterIt( it.dir == dir )
if conns.len > 0:
return conns[0]
proc selectConn*(c: ConnManager, peerInfo: PeerInfo): Connection =
proc selectConn*(c: ConnManager, peerId: PeerID): Connection =
## Select a connection for the provided giving priority
## to outgoing connections
##
if isNil(peerInfo):
return
var conn = c.selectConn(peerInfo, Direction.Out)
var conn = c.selectConn(peerId, Direction.Out)
if isNil(conn):
conn = c.selectConn(peerInfo, Direction.In)
conn = c.selectConn(peerId, Direction.In)
return conn
@ -181,18 +168,18 @@ proc storeConn*(c: ConnManager, conn: Connection) =
if isNil(conn.peerInfo):
raise newException(CatchableError, "empty peer info")
let peerInfo = conn.peerInfo
if c.conns.getOrDefault(peerInfo.peerId).len > c.maxConns:
trace "too many connections", peer = $conn.peerInfo,
let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConns:
trace "too many connections", peer = $peerId,
conns = c.conns
.getOrDefault(peerInfo.peerId).len
.getOrDefault(peerId).len
raise newTooManyConnections()
if peerInfo.peerId notin c.conns:
c.conns[peerInfo.peerId] = initHashSet[Connection]()
if peerId notin c.conns:
c.conns[peerId] = initHashSet[Connection]()
c.conns[peerInfo.peerId].incl(conn)
c.conns[peerId].incl(conn)
# launch on close listener
asyncCheck c.onClose(conn)
@ -222,25 +209,25 @@ proc storeMuxer*(c: ConnManager,
muxer: muxer,
handle: handle)
trace "storred connection", connections = c.conns.len
trace "stored connection", connections = c.conns.len
proc getMuxedStream*(c: ConnManager,
peerInfo: PeerInfo,
peerId: PeerID,
dir: Direction): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the provided peer
## with the given direction
##
let muxer = c.selectMuxer(c.selectConn(peerInfo, dir))
let muxer = c.selectMuxer(c.selectConn(peerId, dir))
if not(isNil(muxer)):
return await muxer.newStream()
proc getMuxedStream*(c: ConnManager,
peerInfo: PeerInfo): Future[Connection] {.async, gcsafe.} =
peerId: PeerID): Future[Connection] {.async, gcsafe.} =
## get a muxed stream for the passed peer from any connection
##
let muxer = c.selectMuxer(c.selectConn(peerInfo))
let muxer = c.selectMuxer(c.selectConn(peerId))
if not(isNil(muxer)):
return await muxer.newStream()
@ -253,24 +240,38 @@ proc getMuxedStream*(c: ConnManager,
if not(isNil(muxer)):
return await muxer.newStream()
proc dropPeer*(c: ConnManager, peerInfo: PeerInfo) {.async.} =
proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} =
## drop connections and cleanup resources for peer
##
let conns = c.conns.getOrDefault(peerId)
for conn in conns:
delConn(c, conn)
for conn in c.conns.getOrDefault(peerInfo.peerId):
if not(isNil(conn)):
await c.cleanupConn(conn)
var muxers: seq[MuxerHolder]
for conn in conns:
if conn in c.muxed:
muxers.add c.muxed[conn]
c.muxed.del(conn)
for muxer in muxers:
await closeMuxerHolder(muxer)
for conn in conns:
await conn.close()
proc close*(c: ConnManager) {.async.} =
## cleanup resources for the connection
## manager
##
let conns = c.conns
c.conns.clear()
for conns in toSeq(c.conns.values):
for conn in conns:
try:
await c.cleanupConn(conn)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "error cleaning up connections"
let muxed = c.muxed
c.muxed.clear()
for _, muxer in muxed:
await closeMuxerHolder(muxer)
for _, conns2 in conns:
for conn in conns2:
await conn.close()

View File

@ -37,22 +37,6 @@ proc intoCurve25519Key*(s: openarray[byte]): Curve25519Key =
proc getBytes*(key: Curve25519Key): seq[byte] = @key
const
ForbiddenCurveValues: array[12, Curve25519Key] = [
[0.byte, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[1.byte, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[224.byte, 235, 122, 124, 59, 65, 184, 174, 22, 86, 227, 250, 241, 159, 196, 106, 218, 9, 141, 235, 156, 50, 177, 253, 134, 98, 5, 22, 95, 73, 184, 0],
[95.byte, 156, 149, 188, 163, 80, 140, 36, 177, 208, 177, 85, 156, 131, 239, 91, 4, 68, 92, 196, 88, 28, 142, 134, 216, 34, 78, 221, 208, 159, 17, 87],
[236.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 127],
[237.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 127],
[238.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 127],
[205.byte, 235, 122, 124, 59, 65, 184, 174, 22, 86, 227, 250, 241, 159, 196, 106, 218, 9, 141, 235, 156, 50, 177, 253, 134, 98, 5, 22, 95, 73, 184, 128],
[76.byte, 156, 149, 188, 163, 80, 140, 36, 177, 208, 177, 85, 156, 131, 239, 91, 4, 68, 92, 196, 88, 28, 142, 134, 216, 34, 78, 221, 208, 159, 17, 215],
[217.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255],
[218.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255],
[219.byte, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 25],
]
proc byteswap(buf: var Curve25519Key) {.inline.} =
for i in 0..<16:
let
@ -60,47 +44,37 @@ proc byteswap(buf: var Curve25519Key) {.inline.} =
buf[i] = buf[31 - i]
buf[31 - i] = x
proc mul*(_: type[Curve25519], dst: var Curve25519Key, scalar: Curve25519Key, point: Curve25519Key) =
proc mul*(_: type[Curve25519], point: var Curve25519Key, multiplier: Curve25519Key) =
let defaultBrEc = brEcGetDefault()
# The source point is provided in array G (of size Glen bytes);
# the multiplication result is written over it.
dst = scalar
# point needs to be big-endian
# multiplier needs to be big-endian
var
rpoint = point
rpoint.byteswap()
multiplierBs = multiplier
multiplierBs.byteswap()
let
res = defaultBrEc.mul(
cast[pcuchar](addr dst[0]),
cast[pcuchar](addr point[0]),
Curve25519KeySize,
cast[pcuchar](addr rpoint[0]),
cast[pcuchar](addr multiplierBs[0]),
Curve25519KeySize,
EC_curve25519)
assert res == 1
proc mulgen*(_: type[Curve25519], dst: var Curve25519Key, point: Curve25519Key) =
proc mulgen(_: type[Curve25519], dst: var Curve25519Key, point: Curve25519Key) =
let defaultBrEc = brEcGetDefault()
var
rpoint = point
rpoint.byteswap()
block iterate:
while true:
block derive:
let
size = defaultBrEc.mulgen(
cast[pcuchar](addr dst[0]),
cast[pcuchar](addr rpoint[0]),
Curve25519KeySize,
EC_curve25519)
assert size == Curve25519KeySize
for forbid in ForbiddenCurveValues:
if dst == forbid:
break derive
break iterate
let
size = defaultBrEc.mulgen(
cast[pcuchar](addr dst[0]),
cast[pcuchar](addr rpoint[0]),
Curve25519KeySize,
EC_curve25519)
assert size == Curve25519KeySize
proc public*(private: Curve25519Key): Curve25519Key =
Curve25519.mulgen(result, private)

View File

@ -20,7 +20,7 @@ import bearssl
import nimcrypto/utils
import minasn1
export minasn1.Asn1Error
import stew/results
import stew/[results, ctops]
export results
const
@ -540,8 +540,8 @@ proc `==`*(pubkey1, pubkey2: EcPublicKey): bool =
let op2 = pubkey2.getOffset()
if op1 == -1 or op2 == -1:
return false
result = equalMem(unsafeAddr pubkey1.buffer[op1],
unsafeAddr pubkey2.buffer[op2], pubkey1.key.qlen)
return CT.isEqual(pubkey1.buffer.toOpenArray(op1, pubkey1.key.qlen - 1),
pubkey2.buffer.toOpenArray(op2, pubkey2.key.qlen - 1))
proc `==`*(seckey1, seckey2: EcPrivateKey): bool =
## Returns ``true`` if both keys ``seckey1`` and ``seckey2`` are equal.
@ -560,19 +560,30 @@ proc `==`*(seckey1, seckey2: EcPrivateKey): bool =
let op2 = seckey2.getOffset()
if op1 == -1 or op2 == -1:
return false
result = equalMem(unsafeAddr seckey1.buffer[op1],
unsafeAddr seckey2.buffer[op2], seckey1.key.xlen)
return CT.isEqual(seckey1.buffer.toOpenArray(op1, seckey1.key.xlen - 1),
seckey2.buffer.toOpenArray(op2, seckey2.key.xlen - 1))
proc `==`*(sig1, sig2: EcSignature): bool =
proc `==`*(a, b: EcSignature): bool =
## Return ``true`` if both signatures ``sig1`` and ``sig2`` are equal.
if isNil(sig1) and isNil(sig2):
result = true
elif isNil(sig1) and (not isNil(sig2)):
result = false
elif isNil(sig2) and (not isNil(sig1)):
result = false
if isNil(a) and isNil(b):
true
elif isNil(a) and (not isNil(b)):
false
elif isNil(b) and (not isNil(a)):
false
else:
result = (sig1.buffer == sig2.buffer)
# We need to cover all the cases because Signature initialization procedure
# do not perform any checks.
if len(a.buffer) == 0 and len(b.buffer) == 0:
true
elif len(a.buffer) == 0 and len(b.buffer) != 0:
false
elif len(b.buffer) == 0 and len(a.buffer) != 0:
false
elif len(a.buffer) != len(b.buffer):
false
else:
CT.isEqual(a.buffer, b.buffer)
proc init*(key: var EcPrivateKey, data: openarray[byte]): Result[void, Asn1Error] =
## Initialize EC `private key` or `signature` ``key`` from ASN.1 DER binary

View File

@ -15,7 +15,7 @@
import constants, bearssl
import nimcrypto/[hash, sha2, utils]
import stew/results
import stew/[results, ctops]
export results
# This workaround needed because of some bugs in Nim Static[T].
@ -1725,15 +1725,15 @@ proc getBytes*(sig: EdSignature): seq[byte] = @(sig.data)
proc `==`*(eda, edb: EdPrivateKey): bool =
## Compare ED25519 `private key` objects for equality.
result = (eda.data == edb.data)
result = CT.isEqual(eda.data, edb.data)
proc `==`*(eda, edb: EdPublicKey): bool =
## Compare ED25519 `public key` objects for equality.
result = (eda.data == edb.data)
result = CT.isEqual(eda.data, edb.data)
proc `==`*(eda, edb: EdSignature): bool =
## Compare ED25519 `signature` objects for equality.
result = (eda.data == edb.data)
result = CT.isEqual(eda.data, edb.data)
proc `$`*(key: EdPrivateKey): string = toHex(key.data)
## Return string representation of ED25519 `private key`.

View File

@ -19,7 +19,7 @@ import nimcrypto/utils
import bearssl
import minasn1
export Asn1Error
import stew/results
import stew/[results, ctops]
export results
const
@ -662,81 +662,78 @@ proc `$`*(sig: RsaSignature): string =
result.add(toHex(sig.buffer))
result.add(")")
proc cmp(a: openarray[byte], b: openarray[byte]): bool =
let alen = len(a)
let blen = len(b)
if alen == blen:
if alen == 0:
true
else:
var n = alen
var res = 0
while n > 0:
dec(n)
res = res or int(a[n] xor b[n])
(res == 0)
else:
false
proc `==`*(a, b: RsaPrivateKey): bool =
## Compare two RSA private keys for equality.
##
## Result is true if ``a`` and ``b`` are both ``nil`` or ``a`` and ``b`` are
## equal by value.
if isNil(a) and isNil(b):
result = true
true
elif isNil(a) and (not isNil(b)):
result = false
false
elif isNil(b) and (not isNil(a)):
result = false
false
else:
if a.seck.nBitlen == b.seck.nBitlen:
if cast[int](a.seck.nBitlen) > 0:
let r1 = cmp(getArray(a.buffer, a.seck.p, a.seck.plen),
getArray(b.buffer, b.seck.p, b.seck.plen))
let r2 = cmp(getArray(a.buffer, a.seck.q, a.seck.qlen),
getArray(b.buffer, b.seck.q, b.seck.qlen))
let r3 = cmp(getArray(a.buffer, a.seck.dp, a.seck.dplen),
getArray(b.buffer, b.seck.dp, b.seck.dplen))
let r4 = cmp(getArray(a.buffer, a.seck.dq, a.seck.dqlen),
getArray(b.buffer, b.seck.dq, b.seck.dqlen))
let r5 = cmp(getArray(a.buffer, a.seck.iq, a.seck.iqlen),
getArray(b.buffer, b.seck.iq, b.seck.iqlen))
let r6 = cmp(getArray(a.buffer, a.pexp, a.pexplen),
getArray(b.buffer, b.pexp, b.pexplen))
let r7 = cmp(getArray(a.buffer, a.pubk.n, a.pubk.nlen),
getArray(b.buffer, b.pubk.n, b.pubk.nlen))
let r8 = cmp(getArray(a.buffer, a.pubk.e, a.pubk.elen),
getArray(b.buffer, b.pubk.e, b.pubk.elen))
result = r1 and r2 and r3 and r4 and r5 and r6 and r7 and r8
let r1 = CT.isEqual(getArray(a.buffer, a.seck.p, a.seck.plen),
getArray(b.buffer, b.seck.p, b.seck.plen))
let r2 = CT.isEqual(getArray(a.buffer, a.seck.q, a.seck.qlen),
getArray(b.buffer, b.seck.q, b.seck.qlen))
let r3 = CT.isEqual(getArray(a.buffer, a.seck.dp, a.seck.dplen),
getArray(b.buffer, b.seck.dp, b.seck.dplen))
let r4 = CT.isEqual(getArray(a.buffer, a.seck.dq, a.seck.dqlen),
getArray(b.buffer, b.seck.dq, b.seck.dqlen))
let r5 = CT.isEqual(getArray(a.buffer, a.seck.iq, a.seck.iqlen),
getArray(b.buffer, b.seck.iq, b.seck.iqlen))
let r6 = CT.isEqual(getArray(a.buffer, a.pexp, a.pexplen),
getArray(b.buffer, b.pexp, b.pexplen))
let r7 = CT.isEqual(getArray(a.buffer, a.pubk.n, a.pubk.nlen),
getArray(b.buffer, b.pubk.n, b.pubk.nlen))
let r8 = CT.isEqual(getArray(a.buffer, a.pubk.e, a.pubk.elen),
getArray(b.buffer, b.pubk.e, b.pubk.elen))
r1 and r2 and r3 and r4 and r5 and r6 and r7 and r8
else:
result = true
true
else:
false
proc `==`*(a, b: RsaSignature): bool =
## Compare two RSA signatures for equality.
if isNil(a) and isNil(b):
result = true
true
elif isNil(a) and (not isNil(b)):
result = false
false
elif isNil(b) and (not isNil(a)):
result = false
false
else:
result = (a.buffer == b.buffer)
# We need to cover all the cases because Signature initialization procedure
# do not perform any checks.
if len(a.buffer) == 0 and len(b.buffer) == 0:
true
elif len(a.buffer) == 0 and len(b.buffer) != 0:
false
elif len(b.buffer) == 0 and len(a.buffer) != 0:
false
elif len(a.buffer) != len(b.buffer):
false
else:
CT.isEqual(a.buffer, b.buffer)
proc `==`*(a, b: RsaPublicKey): bool =
## Compare two RSA public keys for equality.
if isNil(a) and isNil(b):
result = true
true
elif isNil(a) and (not isNil(b)):
result = false
false
elif isNil(b) and (not isNil(a)):
result = false
false
else:
let r1 = cmp(getArray(a.buffer, a.key.n, a.key.nlen),
getArray(b.buffer, b.key.n, b.key.nlen))
let r2 = cmp(getArray(a.buffer, a.key.e, a.key.elen),
getArray(b.buffer, b.key.e, b.key.elen))
result = r1 and r2
let r1 = CT.isEqual(getArray(a.buffer, a.key.n, a.key.nlen),
getArray(b.buffer, b.key.n, b.key.nlen))
let r2 = CT.isEqual(getArray(a.buffer, a.key.e, a.key.elen),
getArray(b.buffer, b.key.e, b.key.elen))
(r1 and r2)
proc sign*[T: byte|char](key: RsaPrivateKey,
message: openarray[T]): RsaResult[RsaSignature] {.gcsafe.} =

View File

@ -19,8 +19,8 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
if res.failed:
let exc = res.readError()
# We still don't abort but warn
warn "A future has failed, enable trace logging for details", error=exc.name
trace "Exception message", msg=exc.msg
warn "A future has failed, enable trace logging for details", error = exc.name
trace "Exception message", msg= exc.msg, stack = getStackTrace()
else:
quote do:
for res in `futs`:
@ -48,6 +48,8 @@ proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
if err of Defect:
raise err
else:
if err of CancelledError:
raise err
if isNil(first):
first = err
if not isNil(first):

View File

@ -40,6 +40,12 @@ proc newMultistream*(): MultistreamSelect =
new result
result.codec = MSCodec
template validateSuffix(str: string): untyped =
if str.endsWith("\n"):
str.removeSuffix("\n")
else:
raise newException(CatchableError, "MultistreamSelect failed, malformed message")
proc select*(m: MultistreamSelect,
conn: Connection,
proto: seq[string]):
@ -52,17 +58,20 @@ proc select*(m: MultistreamSelect,
await conn.writeLp((proto[0] & "\n")) # select proto
var s = string.fromBytes((await conn.readLp(1024))) # read ms header
s.removeSuffix("\n")
validateSuffix(s)
if s != Codec:
notice "handshake failed", codec = s.toHex()
return ""
notice "handshake failed", codec = s
raise newException(CatchableError, "MultistreamSelect handshake failed")
else:
trace "multistream handshake success"
if proto.len() == 0: # no protocols, must be a handshake call
return Codec
else:
s = string.fromBytes(await conn.readLp(1024)) # read the first proto
validateSuffix(s)
trace "reading first requested proto"
s.removeSuffix("\n")
if s == proto[0]:
trace "successfully selected ", proto = proto[0]
return proto[0]
@ -74,7 +83,7 @@ proc select*(m: MultistreamSelect,
trace "selecting proto", proto = p
await conn.writeLp((p & "\n")) # select proto
s = string.fromBytes(await conn.readLp(1024)) # read the first proto
s.removeSuffix("\n")
validateSuffix(s)
if s == p:
trace "selected protocol", protocol = s
return s
@ -110,42 +119,53 @@ proc list*(m: MultistreamSelect,
result = list
proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: starting multistream handling"
proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async, gcsafe.} =
trace "handle: starting multistream handling", handshaked = active
var handshaked = active
try:
while not conn.closed:
while not conn.atEof:
var ms = string.fromBytes(await conn.readLp(1024))
ms.removeSuffix("\n")
validateSuffix(ms)
trace "handle: got request for ", ms
if not handshaked and ms != Codec:
error "expected handshake message", instead=ms
raise newException(CatchableError,
"MultistreamSelect handling failed, invalid first message")
trace "handle: got request", ms
if ms.len() <= 0:
trace "handle: invalid proto"
await conn.write(Na)
if m.handlers.len() == 0:
trace "handle: sending `na` for protocol ", protocol = ms
trace "handle: sending `na` for protocol", protocol = ms
await conn.write(Na)
continue
case ms:
of "ls":
trace "handle: listing protos"
var protos = ""
for h in m.handlers:
for proto in h.protos:
protos &= (proto & "\n")
await conn.writeLp(protos)
of Codec:
of "ls":
trace "handle: listing protos"
var protos = ""
for h in m.handlers:
for proto in h.protos:
protos &= (proto & "\n")
await conn.writeLp(protos)
of Codec:
if not handshaked:
await conn.write(m.codec)
handshaked = true
else:
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler for", protocol = ms
await conn.writeLp((ms & "\n"))
await h.protocol.handler(conn, ms)
return
debug "no handlers for ", protocol = ms
trace "handle: sending `na` for duplicate handshake while handshaked"
await conn.write(Na)
else:
for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
trace "found handler", protocol = ms
await conn.writeLp(ms & "\n")
await h.protocol.handler(conn, ms)
return
debug "no handlers", protocol = ms
await conn.write(Na)
except CancelledError as exc:
await conn.close()
raise exc

View File

@ -71,9 +71,5 @@ proc writeMsg*(conn: Connection,
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: string) {.async, gcsafe.} =
# TODO: changing this to
#`await conn.writeMsg(id, msgType, data.toBytes())`
# causes all sorts of race conditions and hangs.
# DON'T DO IT!
result = conn.writeMsg(id, msgType, data.toBytes())
data: string): Future[void] =
conn.writeMsg(id, msgType, data.toBytes())

View File

@ -46,8 +46,6 @@ logScope:
type
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
timeout: Duration # channel timeout if no activity
activity: bool # reset every time data is sent or received
name*: string # name of the channel (for debugging)
conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag
@ -57,7 +55,6 @@ type
msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
timerTaskFut: Future[void] # the current timer instanse
proc open*(s: LPChannel) {.async, gcsafe.}
@ -72,6 +69,8 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped =
template withEOFExceptions(body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc
except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg
except LPStreamClosedError as exc:
@ -79,11 +78,6 @@ template withEOFExceptions(body: untyped): untyped =
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
proc cleanupTimer(s: LPChannel) {.async.} =
## cleanup timers
if not s.timerTaskFut.finished:
await s.timerTaskFut.cancelAndWait()
proc closeMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
@ -150,7 +144,6 @@ proc closeRemote*(s: LPChannel) {.async.} =
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream
await s.cleanupTimer()
trace "channel closed on EOF"
except CancelledError as exc:
@ -193,8 +186,6 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
s.isEof = true
s.closedLocal = true
await s.cleanupTimer()
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -222,7 +213,6 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close()
await s.cleanupTimer()
except CancelledError as exc:
await s.reset()
raise exc
@ -235,60 +225,16 @@ method close*(s: LPChannel) {.async, gcsafe.} =
s.closedLocal = true
asyncCheck closeInternal()
proc timeoutMonitor(s: LPChannel) {.async.} =
## monitor the channel for innactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
try:
while true:
await sleepAsync(s.timeout)
if s.closed or s.atEof:
return
if s.activity:
s.activity = false
continue
break
# reset channel on innactivity timeout
trace "channel timed out, resetting"
await s.reset()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = "LPChannel"
s.timeoutHandler = proc() {.async, gcsafe.} =
trace "idle timeout expired, resetting LPChannel"
await s.reset()
procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] =
s.activity = true
procCall BufferStream(s).readOnce(pbytes, nbytes)
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
s.activity = true
procCall BufferStream(s).write(msg)
proc init*(
L: type LPChannel,
id: uint64,
@ -339,7 +285,6 @@ proc init*(
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
chann.timerTaskFut = chann.timeoutMonitor()
trace "created new lpchannel"
return chann

View File

@ -23,9 +23,16 @@ export muxer
logScope:
topics = "mplex"
declareGauge(libp2p_mplex_channels, "mplex channels", labels = ["initiator", "peer"])
const
MaxChannelCount = 200
when defined(libp2p_expensive_metrics):
declareGauge(libp2p_mplex_channels,
"mplex channels", labels = ["initiator", "peer"])
type
TooManyChannels* = object of CatchableError
Mplex* = ref object of Muxer
remote: Table[uint64, LPChannel]
local: Table[uint64, LPChannel]
@ -35,6 +42,10 @@ type
outChannTimeout: Duration
isClosed: bool
oid*: Oid
maxChannCount: int
proc newTooManyChannels(): ref TooManyChannels =
newException(TooManyChannels, "max allowed channel count exceeded")
proc getChannelList(m: Mplex, initiator: bool): var Table[uint64, LPChannel] =
if initiator:
@ -76,10 +87,11 @@ proc newStreamInternal*(m: Mplex,
"channel slot already taken!")
m.getChannelList(initiator)[id] = result
libp2p_mplex_channels.set(
m.getChannelList(initiator).len.int64,
labelValues = [$initiator,
$m.connection.peerInfo])
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
m.getChannelList(initiator).len.int64,
labelValues = [$initiator,
$m.connection.peerInfo])
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables
@ -89,10 +101,11 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
m.getChannelList(chann.initiator).del(chann.id)
trace "cleaned up channel", id = chann.id
libp2p_mplex_channels.set(
m.getChannelList(chann.initiator).len.int64,
labelValues = [$chann.initiator,
$m.connection.peerInfo])
when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set(
m.getChannelList(chann.initiator).len.int64,
labelValues = [$chann.initiator,
$m.connection.peerInfo])
proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
## call the muxer stream handler for this channel
@ -102,9 +115,9 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!")
except CancelledError as exc:
trace "cancling stream handler", exc = exc.msg
trace "cancelling stream handler", exc = exc.msg
await chann.reset()
raise
raise exc
except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg
await chann.reset()
@ -117,7 +130,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "stopping mplex main loop", oid = $m.oid
await m.close()
while not m.connection.closed:
while not m.connection.atEof:
trace "waiting for data", oid = $m.oid
let (id, msgType, data) = await m.connection.readMsg()
trace "read message from connection", id = id,
@ -148,6 +161,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
case msgType:
of MessageType.New:
let name = string.fromBytes(data)
if m.getChannelList(false).len > m.maxChannCount - 1:
warn "too many channels created by remote peer", allowedMax = MaxChannelCount
raise newTooManyChannels()
channel = await m.newStreamInternal(
false,
id,
@ -169,7 +186,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "pushing data to channel"
if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize,
sending = data.len
raise newLPStreamLimitError()
await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut:
@ -202,14 +222,16 @@ method handle*(m: Mplex) {.async, gcsafe.} =
proc init*(M: type Mplex,
conn: Connection,
maxChanns: uint = MaxChannels,
inTimeout, outTimeout: Duration = DefaultChanTimeout): Mplex =
inTimeout, outTimeout: Duration = DefaultChanTimeout,
maxChannCount: int = MaxChannelCount): Mplex =
M(connection: conn,
maxChannels: maxChanns,
inChannTimeout: inTimeout,
outChannTimeout: outTimeout,
remote: initTable[uint64, LPChannel](),
local: initTable[uint64, LPChannel](),
oid: genOid())
oid: genOid(),
maxChannCount: maxChannCount)
method newStream*(m: Mplex,
name: string = "",

View File

@ -16,7 +16,7 @@ logScope:
topics = "muxer"
const
DefaultChanTimeout* = 1.minutes
DefaultChanTimeout* = 5.minutes
type
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import options, sequtils, hashes
import chronos, chronicles
import peerid, multiaddress, crypto/crypto
@ -30,7 +32,6 @@ type
peerId*: PeerID
addrs*: seq[MultiAddress]
protocols*: seq[string]
lifefut: Future[void]
protoVersion*: string
agentVersion*: string
secure*: string
@ -39,9 +40,6 @@ type
privateKey*: PrivateKey
of HasPublic:
key: Option[PublicKey]
# gossip 1.1 spec related
# https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
maintain*: bool
proc id*(p: PeerInfo): string =
if not(isNil(p)):
@ -65,12 +63,12 @@ template postInit(peerinfo: PeerInfo,
peerinfo.addrs = @addrs
if len(protocols) > 0:
peerinfo.protocols = @protocols
peerinfo.lifefut = newFuture[void]("libp2p.peerinfo.lifetime")
proc init*(p: typedesc[PeerInfo],
key: PrivateKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].} =
result = PeerInfo(keyType: HasPrivate, peerId: PeerID.init(key).tryGet(),
privateKey: key)
result.postInit(addrs, protocols)
@ -78,55 +76,31 @@ proc init*(p: typedesc[PeerInfo],
proc init*(p: typedesc[PeerInfo],
peerId: PeerID,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: openarray[string] = []): PeerInfo =
result = PeerInfo(keyType: HasPublic, peerId: peerId)
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo],
peerId: string,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].} =
result = PeerInfo(keyType: HasPublic, peerId: PeerID.init(peerId).tryGet())
result.postInit(addrs, protocols)
proc init*(p: typedesc[PeerInfo],
key: PublicKey,
addrs: openarray[MultiAddress] = [],
protocols: openarray[string] = []): PeerInfo {.inline.} =
protocols: openarray[string] = []): PeerInfo {.
raises: [Defect, ResultError[cstring]].}=
result = PeerInfo(keyType: HasPublic,
peerId: PeerID.init(key).tryGet(),
key: some(key))
result.postInit(addrs, protocols)
proc close*(p: PeerInfo) {.inline.} =
if not p.lifefut.finished:
p.lifefut.complete()
else:
# TODO this should ideally not happen
notice "Closing closed peer", peer = p.id
proc join*(p: PeerInfo): Future[void] {.inline.} =
var retFuture = newFuture[void]()
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
p.lifefut.removeCallback(continuation)
if p.lifefut.finished:
retFuture.complete()
else:
p.lifefut.addCallback(continuation)
retFuture.cancelCallback = cancellation
return retFuture
proc isClosed*(p: PeerInfo): bool {.inline.} =
result = p.lifefut.finished()
proc lifeFuture*(p: PeerInfo): Future[void] {.inline.} =
result = p.lifefut
proc publicKey*(p: PeerInfo): Option[PublicKey] {.inline.} =
proc publicKey*(p: PeerInfo): Option[PublicKey] {.
raises: [Defect, ResultError[CryptoError]].} =
if p.keyType == HasPublic:
if p.peerId.hasPublicKey():
var pubKey: PublicKey

View File

@ -106,10 +106,10 @@ method init*(p: Identify) =
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
try:
defer:
trace "exiting identify handler", oid = conn.oid
trace "exiting identify handler", oid = $conn.oid
await conn.close()
trace "handling identify request", oid = conn.oid
trace "handling identify request", oid = $conn.oid
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
await conn.writeLp(pb.buffer)
except CancelledError as exc:

View File

@ -36,7 +36,7 @@ method subscribeTopic*(f: FloodSub,
let peer = f.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!"
debug "subscribeTopic on a nil peer!", peer = peerId
return
if topic notin f.floodsub:
@ -53,12 +53,15 @@ method subscribeTopic*(f: FloodSub,
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
## handle peer disconnects
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer)
##
procCall PubSub(f).handleDisconnect(peer)
if not(isNil(peer)) and peer.peerInfo notin f.conns:
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer)
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
@ -93,11 +96,13 @@ method rpcHandler*(f: FloodSub,
try:
await h(t, msg.data) # trigger user provided handler
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let published = await f.publishHelper(toSendPeers, m.messages)
let published = await f.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
trace "forwared message to peers", peers = published
@ -120,9 +125,10 @@ method subscribePeer*(p: FloodSub,
method publish*(f: FloodSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(f).publish(topic, data)
discard await procCall PubSub(f).publish(topic, data, timeout)
if data.len <= 0 or topic.len <= 0:
trace "topic or data missing, skipping publish"
@ -137,9 +143,10 @@ method publish*(f: FloodSub,
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg])
let published = await f.publishHelper(f.floodsub.getOrDefault(topic), @[msg], timeout)
libp2p_pubsub_messages_published.inc(labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published,
msg = msg.shortLog()

View File

@ -143,17 +143,18 @@ type
when not defined(release):
prunedPeers: HashSet[PubSubPeer]
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh",
labels = ["topic"])
when defined(libp2p_expensive_metrics):
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
"gossipsub peers per topic in fanout",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
"gossipsub peers per topic in fanout",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub",
labels = ["topic"])
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
"gossipsub peers per topic in gossipsub",
labels = ["topic"])
proc init*(_: type[GossipSubParams]): GossipSubParams =
GossipSubParams(
@ -255,8 +256,9 @@ method init*(g: GossipSub) =
## e.g. ``/floodsub/1.0.0``, etc...
##
if conn.peerInfo.maintain:
g.explicitPeers.incl(conn.peerInfo.id)
# TODO
# if conn.peerInfo.maintain:
# g.explicitPeers.incl(conn.peerInfo.id)
await g.handleConn(conn, proto)
@ -285,7 +287,7 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
stats.topicInfos[topic] = info
assert(g.peerStats[p].topicInfos[topic].inMesh == true)
debug "grafted", p
trace "grafted", p
do:
doAssert(false, "grafted: peerStats key not found for " & $p)
@ -308,7 +310,7 @@ proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
# mgetOrPut does not work, so we gotta do this without referencing
stats.topicInfos[topic] = info
debug "pruned", p
trace "pruned", p
do:
when not defined(release):
if p in g.prunedPeers:
@ -330,8 +332,9 @@ proc replenishFanout(g: GossipSub, topic: string) =
if g.fanout.peers(topic) == GossipSubD:
break
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
@ -390,14 +393,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
g.pruned(peer, topic)
g.mesh.removePeer(topic, peer)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
# Send changes to peers after table updates to avoid stale state
for p in grafts:
@ -418,8 +422,9 @@ proc dropFanoutPeers(g: GossipSub) =
g.lastFanoutPubSub.del(topic)
trace "dropping fanout topic", topic
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
## gossip iHave messages to peers
@ -437,16 +442,14 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
let gossipPeers = mesh + fanout
let mids = g.mcache.window(topic)
if mids.len <= 0:
if not mids.len > 0:
continue
let ihave = ControlIHave(topicID: topic,
messageIDs: toSeq(mids))
if topic notin g.gossipsub:
trace "topic not in gossip array, skipping", topicID = topic
continue
let ihave = ControlIHave(topicID: topic, messageIDs: toSeq(mids))
for peer in allPeers:
if result.len >= GossipSubD:
trace "got gossip peers", peers = result.len
@ -467,18 +470,18 @@ func `/`(a, b: Duration): float64 =
fa / fb
proc updateScores(g: GossipSub) = # avoid async
debug "updating scores", peers = g.peers.len
trace "updating scores", peers = g.peers.len
let now = Moment.now()
var evicting: seq[PubSubPeer]
for peer, stats in g.peerStats.mpairs:
debug "updating peer score", peer, gossipTopics = peer.topics.len
trace "updating peer score", peer, gossipTopics = peer.topics.len
if not peer.connected:
if now > stats.expire:
evicting.add(peer)
debug "evicted peer from memory", peer
trace "evicted peer from memory", peer
continue
# Per topic
@ -497,28 +500,28 @@ proc updateScores(g: GossipSub) = # avoid async
var p1 = info.meshTime / topicParams.timeInMeshQuantum
if p1 > topicParams.timeInMeshCap:
p1 = topicParams.timeInMeshCap
debug "p1", peer, p1
trace "p1", peer, p1
topicScore += p1 * topicParams.timeInMeshWeight
else:
info.meshMessageDeliveriesActive = false
topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
debug "p2", peer, p2 = info.firstMessageDeliveries
trace "p2", peer, p2 = info.firstMessageDeliveries
if info.meshMessageDeliveriesActive:
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
let p3 = deficit * deficit
debug "p3", peer, p3
trace "p3", peer, p3
topicScore += p3 * topicParams.meshMessageDeliveriesWeight
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
debug "p3b", peer, p3b = info.meshFailurePenalty
trace "p3b", peer, p3b = info.meshFailurePenalty
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
debug "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries
trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries
debug "updated peer topic's scores", peer, topic, info, topicScore
trace "updated peer topic's scores", peer, topic, info, topicScore
peer.score += topicScore * topicParams.topicWeight
@ -543,7 +546,7 @@ proc updateScores(g: GossipSub) = # avoid async
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
stats.topicInfos[topic] = info
debug "updated peer's score", peer, score = peer.score
trace "updated peer's score", peer, score = peer.score
for peer in evicting:
g.peerStats.del(peer)
@ -576,10 +579,10 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers()
var sent: seq[Future[void]]
for peer in peers.keys:
if peer in g.peers:
sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer])))
sent.allFinished.await.checkFutures()
for peer, control in peers:
g.peers.withValue(peer, pubsubPeer) do:
sent &= pubsubPeer[].send(RPCMsg(control: some(control)))
checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache
except CancelledError as exc:
@ -596,31 +599,38 @@ proc heartbeat(g: GossipSub) {.async.} =
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
## handle peer disconnects
##
procCall FloodSub(g).handleDisconnect(peer)
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer)
if not(isNil(peer)) and peer.peerInfo notin g.conns:
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys):
if peer in g.mesh[t]:
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
for t in toSeq(g.mesh.keys):
if peer in g.mesh[t]:
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(t).int64, labelValues = [t])
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer)
for t in toSeq(g.fanout.keys):
g.fanout.removePeer(t, peer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
if peer.peerInfo.maintain:
for t in toSeq(g.explicit.keys):
g.explicit.removePeer(t, peer)
# TODO
# if peer.peerInfo.maintain:
# for t in toSeq(g.explicit.keys):
# g.explicit.removePeer(t, peer)
g.explicitPeers.excl(peer.id)
@ -650,7 +660,7 @@ method subscribeTopic*(g: GossipSub,
let peer = g.peers.getOrDefault(peerId)
if peer == nil:
debug "subscribeTopic on a nil peer!"
# floodsub method logs a trace line already
return
g.handleConnect(peer)
@ -670,12 +680,15 @@ method subscribeTopic*(g: GossipSub,
if peerId in g.explicitPeers:
g.explicit.removePeer(topic, peer)
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
@ -694,13 +707,14 @@ proc handleGraft(g: GossipSub,
trace "peer grafted topic"
# It is an error to GRAFT on a explicit peer
if peer.peerInfo.maintain:
trace "attempt to graft an explicit peer", peer=peer.id,
topicID=graft.topicID
# and such an attempt should be logged and rejected with a PRUNE
result.add(ControlPrune(topicID: graft.topicID))
continue
# TODO
# # It is an error to GRAFT on a explicit peer
# if peer.peerInfo.maintain:
# trace "attempt to graft an explicit peer", peer=peer.id,
# topicID=graft.topicID
# # and such an attempt should be logged and rejected with a PRUNE
# result.add(ControlPrune(topicID: graft.topicID))
# continue
# If they send us a graft before they send us a subscribe, what should
# we do? For now, we add them to mesh but don't add them to gossipsub.
@ -719,13 +733,14 @@ proc handleGraft(g: GossipSub,
else:
result.add(ControlPrune(topicID: topic))
else:
debug "peer grafting topic we're not interested in"
trace "peer grafting topic we're not interested in"
result.add(ControlPrune(topicID: topic))
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(topic).int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(topic).int64, labelValues = [topic])
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
@ -733,8 +748,9 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
g.pruned(peer, prune.topicID)
g.mesh.removePeer(prune.topicID, peer)
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
proc handleIHave(g: GossipSub,
peer: PubSubPeer,
@ -835,12 +851,14 @@ method rpcHandler*(g: GossipSub,
localPeer = g.peerInfo.id,
fromPeer = msg.fromPeer.pretty
try:
await h(t, msg.data) # trigger user provided handler
await h(t, msg.data) # trigger user provided handler
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in message handler", exc = exc.msg
# forward the message to all peers interested in it
let published = await g.publishHelper(toSendPeers, m.messages)
let published = await g.publishHelper(toSendPeers, m.messages, DefaultSendTimeout)
trace "forwared message to peers", peers = published
@ -854,9 +872,15 @@ method rpcHandler*(g: GossipSub,
let messages = g.handleIWant(peer, control.iwant)
if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or respControl.iwant.len > 0:
await peer.send(
RPCMsg(control: some(respControl), messages: messages))
respControl.ihave.len > 0:
try:
info "sending control message", msg = respControl
await peer.send(
RPCMsg(control: some(respControl), messages: messages))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception forwarding control messages", exc = exc.msg
method subscribe*(g: GossipSub,
topic: string,
@ -901,9 +925,10 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
method publish*(g: GossipSub,
topic: string,
data: seq[byte]): Future[int] {.async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
# base returns always 0
discard await procCall PubSub(g).publish(topic, data)
discard await procCall PubSub(g).publish(topic, data, timeout)
trace "publishing message on topic", topic, data = data.shortLog
var peers: HashSet[PubSubPeer]
@ -914,7 +939,7 @@ method publish*(g: GossipSub,
for id, peer in g.peers:
if topic in peer.topics and
peer.score >= g.parameters.publishThreshold:
debug "publish: including flood/high score peer", peer = id
trace "publish: including flood/high score peer", peer = id
peers.incl(peer)
# add always direct peers
@ -947,9 +972,10 @@ method publish*(g: GossipSub,
if msgId notin g.mcache:
g.mcache.put(msgId, msg)
let published = await g.publishHelper(peers, @[msg])
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
let published = await g.publishHelper(peers, @[msg], timeout)
when defined(libp2p_expensive_metrics):
if published > 0:
libp2p_pubsub_messages_published.inc(labelValues = [topic])
trace "published message to peers", peers = published,
msg = msg.shortLog()

View File

@ -32,11 +32,10 @@ declareGauge(libp2p_pubsub_peers, "pubsub peer instances")
declareGauge(libp2p_pubsub_topics, "pubsub subscribed topics")
declareCounter(libp2p_pubsub_validation_success, "pubsub successfully validated messages")
declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messages")
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
when defined(libp2p_expensive_metrics):
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
type
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
TopicHandler* = proc(topic: string,
data: seq[byte]): Future[void] {.gcsafe.}
@ -57,7 +56,7 @@ type
peerInfo*: PeerInfo # this peer's info
topics*: Table[string, Topic] # local topics
peers*: Table[string, PubSubPeer] # peerid to peer map
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
conns*: Table[PeerInfo, HashSet[Connection]] # peers connections
triggerSelf*: bool # trigger own local handler on publish
verifySignature*: bool # enable signature verification
sign*: bool # enable message signing
@ -73,8 +72,10 @@ method handleConnect*(p: PubSub, peer: PubSubPeer) {.base.} =
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects
##
if not(isNil(peer)) and peer.peerInfo notin p.conns:
trace "deleting peer", peer = peer.id
peer.onConnect.fire() # Make sure all pending sends are unblocked
p.peers.del(peer.id)
trace "peer disconnected", peer = peer.id
@ -104,24 +105,7 @@ proc sendSubs*(p: PubSub,
topics: seq[string],
subscribe: bool) {.async.} =
## send subscriptions to remote peer
try:
# wait for a connection before publishing
# this happens when
if not peer.onConnect.isSet:
trace "awaiting send connection"
await peer.onConnect.wait()
await peer.sendSubOpts(topics, subscribe)
except CancelledError as exc:
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
raise exc
except CatchableError as exc:
trace "unable to send subscriptions", exc = exc.msg
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
asyncCheck peer.sendSubOpts(topics, subscribe)
method subscribeTopic*(p: PubSub,
topic: string,
@ -163,7 +147,7 @@ proc getOrCreatePeer(p: PubSub,
handleConnect(p, peer)
# metrics
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)
return peer
@ -200,9 +184,8 @@ method handleConn*(p: PubSub,
let peer = p.getOrCreatePeer(conn.peerInfo, proto)
let topics = toSeq(p.topics.keys)
if topics.len > 0:
await p.sendSubs(peer, topics, true)
if p.topics.len > 0:
await p.sendSubs(peer, toSeq(p.topics.keys), true)
try:
peer.handler = handler
@ -238,12 +221,9 @@ method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
if not(isNil(peer)) and not(isNil(peer.conn)):
await peer.conn.close()
proc connected*(p: PubSub, peerInfo: PeerInfo): bool =
if peerInfo.id in p.peers:
let peer = p.peers[peerInfo.id]
if not(isNil(peer)):
return peer.connected
proc connected*(p: PubSub, peerId: PeerID): bool =
p.peers.withValue($peerId, peer):
return peer[] != nil and peer[].connected
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =
@ -298,9 +278,11 @@ method subscribe*(p: PubSub,
# metrics
libp2p_pubsub_topics.set(p.topics.len.int64)
proc sendHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[SendRes] {.async.} =
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message],
timeout: Duration): Future[int] {.async.} =
# send messages and cleanup failed peers
var sent: seq[tuple[id: string, fut: Future[void]]]
for sendPeer in sendPeers:
# avoid sending to self
@ -308,7 +290,7 @@ proc sendHelper*(p: PubSub,
continue
trace "sending messages to peer", peer = sendPeer.id, msgs
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs))))
sent.add((id: sendPeer.id, fut: sendPeer.send(RPCMsg(messages: msgs), timeout)))
var published: seq[string]
var failed: seq[string]
@ -323,13 +305,6 @@ proc sendHelper*(p: PubSub,
trace "sending messages to peer succeeded", peer = f[0].id
published.add(f[0].id)
return (published, failed)
proc publishHelper*(p: PubSub,
sendPeers: HashSet[PubSubPeer],
msgs: seq[Message]): Future[int] {.async.} =
# send messages and cleanup failed peers
let (published, failed) = await p.sendHelper(sendPeers, msgs)
for f in failed:
let peer = p.peers.getOrDefault(f)
if not(isNil(peer)) and not(isNil(peer.conn)):
@ -339,7 +314,8 @@ proc publishHelper*(p: PubSub,
method publish*(p: PubSub,
topic: string,
data: seq[byte]): Future[int] {.base, async.} =
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.base, async.} =
## publish to a ``topic``
if p.triggerSelf and topic in p.topics:
for h in p.topics[topic].handler:
@ -436,3 +412,6 @@ proc removeObserver*(p: PubSub; observer: PubSubObserver) =
let idx = p.observers[].find(observer)
if idx != -1:
p.observers[].del(idx)
proc connected*(p: PubSub, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
peerInfo != nil and connected(p, peerInfo.peerId)

View File

@ -21,10 +21,15 @@ import rpc/[messages, message, protobuf],
logScope:
topics = "pubsubpeer"
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
when defined(libp2p_expensive_metrics):
declareCounter(libp2p_pubsub_sent_messages, "number of messages sent", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_received_messages, "number of messages received", labels = ["id", "topic"])
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])
const
DefaultReadTimeout* = 1.minutes
DefaultSendTimeout* = 10.seconds
type
PubSubObserver* = ref object
@ -86,13 +91,14 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
debug "starting pubsub read loop for peer", closed = conn.closed
try:
try:
while not conn.closed:
while not conn.atEof:
trace "waiting for data", closed = conn.closed
let data = await conn.readLp(64 * 1024)
let data = await conn.readLp(64 * 1024).wait(DefaultReadTimeout)
let digest = $(sha256.digest(data))
trace "read data from peer", data = data.shortLog
if digest in p.recvdRpcCache:
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
trace "message already received, skipping"
continue
@ -107,10 +113,11 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
# trigger hooks
p.recvObservers(msg)
for m in msg.messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
when defined(libp2p_expensive_metrics):
for m in msg.messages:
for t in m.topicIDs:
# metrics
libp2p_pubsub_received_messages.inc(labelValues = [p.id, t])
await p.handler(p, @[msg])
p.recvdRpcCache.put(digest)
@ -124,10 +131,13 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
raise exc
proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
proc send*(
p: PubSubPeer,
msg: RPCMsg,
timeout: Duration = DefaultSendTimeout) {.async.} =
logScope:
peer = p.id
msg = shortLog(msg)
rpcMsg = shortLog(msg)
trace "sending msg to peer"
@ -137,7 +147,7 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
let encoded = encodeRpcMsg(mm)
if encoded.len <= 0:
trace "empty message, skipping"
info "empty message, skipping"
return
logScope:
@ -146,24 +156,41 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
let digest = $(sha256.digest(encoded))
if digest in p.sentRpcCache:
trace "message already sent to peer, skipping"
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
when defined(libp2p_expensive_metrics):
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
return
try:
proc sendToRemote() {.async.} =
logScope:
peer = p.id
rpcMsg = shortLog(msg)
trace "about to send message"
if not p.onConnect.isSet:
await p.onConnect.wait()
if p.connected: # this can happen if the remote disconnected
trace "sending encoded msgs to peer"
await p.sendConn.writeLp(encoded)
p.sentRpcCache.put(digest)
trace "sent pubsub message to remote"
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
when defined(libp2p_expensive_metrics):
for x in mm.messages:
for t in x.topicIDs:
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
let sendFut = sendToRemote()
try:
await sendFut.wait(timeout)
except CatchableError as exc:
trace "unable to send to remote", exc = exc.msg
if not sendFut.finished:
sendFut.cancel()
if not(isNil(p.sendConn)):
await p.sendConn.close()
p.sendConn = nil
@ -171,21 +198,43 @@ proc send*(p: PubSubPeer, msg: RPCMsg) {.async.} =
raise exc
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[void] =
proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool) {.async.} =
trace "sending subscriptions", peer = p.id, subscribe, topicIDs = topics
p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
try:
await p.send(RPCMsg(
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))),
# the long timeout is mostly for cases where
# the connection is flaky at the beggingin
timeout = 3.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending subscriptions", exc = exc.msg
proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] =
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending graft to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
try:
await p.send(RPCMsg(control: some(
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending grafts", exc = exc.msg
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
trace "sending prune to peer", peer = p.id, topicIDs = topics
p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
try:
await p.send(RPCMsg(control: some(
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))),
timeout = 1.minutes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception sending prunes", exc = exc.msg
proc `$`*(p: PubSubPeer): string =
p.id

View File

@ -205,7 +205,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
else:
trace "decodeMessage: data is missing"
if ? pb.getField(3, msg.seqno):
trace "decodeMessage: read seqno", seqno = msg.data.shortLog()
trace "decodeMessage: read seqno", seqno = msg.seqno
else:
trace "decodeMessage: seqno is missing"
if ? pb.getRepeatedField(4, msg.topicIDs):

View File

@ -37,6 +37,8 @@ const
NoiseSize = 32
MaxPlainSize = int(uint16.high - NoiseSize - ChaChaPolyTag.len)
HandshakeTimeout = 1.minutes
type
KeyPair = object
privateKey: Curve25519Key
@ -101,7 +103,8 @@ proc hashProtocol(name: string): MDigest[256] =
result = sha256.digest(name)
proc dh(priv: Curve25519Key, pub: Curve25519Key): Curve25519Key =
Curve25519.mul(result, pub, priv)
result = pub
Curve25519.mul(result, priv)
# Cipherstate
@ -131,7 +134,7 @@ proc decryptWithAd(state: var CipherState, ad, data: openArray[byte]): seq[byte]
ChaChaPoly.decrypt(state.k, nonce, tagOut, result, ad)
trace "decryptWithAd", tagIn = tagIn.shortLog, tagOut = tagOut.shortLog, nonce = state.n
if tagIn != tagOut:
error "decryptWithAd failed", data = byteutils.toHex(data)
debug "decryptWithAd failed", data = shortLog(data)
raise newException(NoiseDecryptTagError, "decryptWithAd failed tag authentication.")
inc state.n
if state.n > NonceMax:
@ -265,14 +268,14 @@ template read_s: untyped =
proc receiveHSMessage(sconn: Connection): Future[seq[byte]] {.async.} =
var besize: array[2, byte]
await sconn.readExactly(addr besize[0], besize.len)
await sconn.readExactly(addr besize[0], besize.len).wait(HandshakeTimeout)
let size = uint16.fromBytesBE(besize).int
trace "receiveHSMessage", size
if size == 0:
return
var buffer = newSeq[byte](size)
await sconn.readExactly(addr buffer[0], buffer.len)
await sconn.readExactly(addr buffer[0], buffer.len).wait(HandshakeTimeout)
return buffer
proc sendHSMessage(sconn: Connection; buf: openArray[byte]): Future[void] =
@ -452,14 +455,14 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
let r1 = remoteProof.getField(1, remotePubKeyBytes)
let r2 = remoteProof.getField(2, remoteSigBytes)
if r1.isErr() or not(r1.get()):
raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
raise newException(NoiseHandshakeError, "Failed to deserialize remote public key bytes. (initiator: " & $initiator & ")")
if r2.isErr() or not(r2.get()):
raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
raise newException(NoiseHandshakeError, "Failed to deserialize remote signature bytes. (initiator: " & $initiator & ")")
if not remotePubKey.init(remotePubKeyBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
raise newException(NoiseHandshakeError, "Failed to decode remote public key. (initiator: " & $initiator & ")")
if not remoteSig.init(remoteSigBytes):
raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ", peer: " & $conn.peerInfo.peerId & ")")
raise newException(NoiseHandshakeError, "Failed to decode remote signature. (initiator: " & $initiator & ")")
let verifyPayload = PayloadString.toBytes & handshakeRes.rs.getBytes
if not remoteSig.verify(verifyPayload, remotePubKey):
@ -475,11 +478,17 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
var
failedKey: PublicKey
discard extractPublicKey(conn.peerInfo.peerId, failedKey)
debug "Noise handshake, peer infos don't match!", initiator, dealt_peer = $conn.peerInfo.id, dealt_key = $failedKey, received_peer = $pid, received_key = $remotePubKey
debug "Noise handshake, peer infos don't match!",
initiator, dealt_peer = $conn.peerInfo.id,
dealt_key = $failedKey, received_peer = $pid,
received_key = $remotePubKey
raise newException(NoiseHandshakeError, "Noise handshake, peer infos don't match! " & $pid & " != " & $conn.peerInfo.peerId)
var tmp = NoiseConnection.init(
conn, PeerInfo.init(remotePubKey), conn.observedAddr)
let peerInfo =
if conn.peerInfo != nil: conn.peerInfo
else: PeerInfo.init(remotePubKey)
var tmp = NoiseConnection.init(conn, peerInfo, conn.observedAddr)
if initiator:
tmp.readCs = handshakeRes.cs2
@ -491,7 +500,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool): Future[SecureCon
finally:
burnMem(handshakeRes)
trace "Noise handshake completed!", initiator, peer = $secure.peerInfo
trace "Noise handshake completed!", initiator, peer = shortLog(secure.peerInfo)
return secure

View File

@ -246,9 +246,12 @@ proc newSecioConn(conn: Connection,
## Create new secure stream/lpstream, using specified hash algorithm ``hash``,
## cipher algorithm ``cipher``, stretched keys ``secrets`` and order
## ``order``.
result = SecioConn.init(conn,
PeerInfo.init(remotePubKey),
conn.observedAddr)
let peerInfo =
if conn.peerInfo != nil: conn.peerInfo
else: PeerInfo.init(remotePubKey)
result = SecioConn.init(conn, peerInfo, conn.observedAddr)
let i0 = if order < 0: 1 else: 0
let i1 = if order < 0: 0 else: 1

View File

@ -81,7 +81,7 @@ method init*(s: Secure) {.gcsafe.} =
except CancelledError as exc:
warn "securing connection canceled"
await conn.close()
raise
raise exc
except CatchableError as exc:
warn "securing connection failed", msg = exc.msg
await conn.close()

View File

@ -220,6 +220,8 @@ method readOnce*(s: BufferStream,
var index = 0
var size = min(nbytes, s.len)
let output = cast[ptr UncheckedArray[byte]](pbytes)
s.activity = true # reset activity flag
while s.len() > 0 and index < size:
output[index] = s.popFirst()
inc(index)
@ -243,6 +245,7 @@ method write*(s: BufferStream, msg: seq[byte]) {.async.} =
if isNil(s.writeHandler):
raise newNotWritableError()
s.activity = true # reset activity flag
await s.writeHandler(msg)
# TODO: move pipe routines out
@ -303,6 +306,6 @@ method close*(s: BufferStream) {.async, gcsafe.} =
else:
trace "attempt to close an already closed bufferstream", trace = getStackTrace()
except CancelledError as exc:
raise
raise exc
except CatchableError as exc:
trace "error closing buffer stream", exc = exc.msg

View File

@ -7,29 +7,42 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import oids
import chronos, chronicles
import connection
logScope:
topics = "chronosstream"
type ChronosStream* = ref object of Connection
const
DefaultChronosStreamTimeout = 10.minutes
type
ChronosStream* = ref object of Connection
client: StreamTransport
method initStream*(s: ChronosStream) =
if s.objName.len == 0:
s.objName = "ChronosStream"
s.timeoutHandler = proc() {.async, gcsafe.} =
trace "idle timeout expired, closing ChronosStream"
await s.close()
procCall Connection(s).initStream()
proc newChronosStream*(client: StreamTransport): ChronosStream =
new result
result.client = client
proc init*(C: type ChronosStream,
client: StreamTransport,
timeout = DefaultChronosStreamTimeout): ChronosStream =
result = C(client: client,
timeout: timeout)
result.initStream()
template withExceptions(body: untyped) =
try:
body
except CancelledError as exc:
raise exc
except TransportIncompleteError:
# for all intents and purposes this is an EOF
raise newLPStreamEOFError()
@ -47,6 +60,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions:
result = await s.client.readOnce(pbytes, nbytes)
s.activity = true # reset activity flag
method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.closed:
@ -59,6 +73,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
var written = 0
while not s.client.closed and written < msg.len:
written += await s.client.write(msg[written..<msg.len])
s.activity = true # reset activity flag
if written < msg.len:
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")
@ -73,7 +88,7 @@ method close*(s: ChronosStream) {.async.} =
try:
if not s.isClosed:
trace "shutting down chronos stream", address = $s.client.remoteAddress(),
oid = s.oid
oid = $s.oid
if not s.client.closed():
await s.client.closeWait()

View File

@ -8,21 +8,31 @@
## those terms.
import hashes
import chronos, metrics
import chronicles, chronos, metrics
import lpstream,
../multiaddress,
../peerinfo
export lpstream
logScope:
topics = "connection"
const
ConnectionTrackerName* = "libp2p.connection"
DefaultConnectionTimeout* = 1.minutes
type
TimeoutHandler* = proc(): Future[void] {.gcsafe.}
Direction* {.pure.} = enum
None, In, Out
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void] # the current timer instanse
timeoutHandler*: TimeoutHandler # timeout handler
peerInfo*: PeerInfo
observedAddr*: Multiaddress
dir*: Direction
@ -32,6 +42,7 @@ type
closed*: uint64
proc setupConnectionTracker(): ConnectionTracker {.gcsafe.}
proc timeoutMonitor(s: Connection) {.async, gcsafe.}
proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} =
result = cast[ConnectionTracker](getTracker(ConnectionTrackerName))
@ -55,21 +66,23 @@ proc setupConnectionTracker(): ConnectionTracker =
result.isLeaked = leakTransport
addTracker(ConnectionTrackerName, result)
proc init*(C: type Connection,
peerInfo: PeerInfo,
dir: Direction): Connection =
result = C(peerInfo: peerInfo, dir: dir)
result.initStream()
method initStream*(s: Connection) =
if s.objName.len == 0:
s.objName = "Connection"
procCall LPStream(s).initStream()
s.closeEvent = newAsyncEvent()
doAssert(isNil(s.timerTaskFut))
s.timerTaskFut = s.timeoutMonitor()
inc getConnectionTracker().opened
method close*(s: Connection) {.async.} =
## cleanup timers
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
if not s.isClosed:
await procCall LPStream(s).close()
inc getConnectionTracker().closed
@ -80,3 +93,50 @@ proc `$`*(conn: Connection): string =
func hash*(p: Connection): Hash =
cast[pointer](p).hash
proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
## monitor the channel for innactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
logScope:
oid = $s.oid
try:
while true:
await sleepAsync(s.timeout)
if s.closed or s.atEof:
return
if s.activity:
s.activity = false
continue
break
# reset channel on innactivity timeout
trace "Connection timed out"
if not(isNil(s.timeoutHandler)):
await s.timeoutHandler()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
proc init*(C: type Connection,
peerInfo: PeerInfo,
dir: Direction,
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
result = C(peerInfo: peerInfo,
dir: dir,
timeout: timeout,
timeoutHandler: timeoutHandler)
result.initStream()

View File

@ -16,6 +16,9 @@ import ../varint,
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
logScope:
topics = "lpstream"
type
LPStream* = ref object of RootObj
closeEvent*: AsyncEvent

View File

@ -50,12 +50,22 @@ const
type
NoPubSubException* = object of CatchableError
Lifecycle* {.pure.} = enum
Connected,
Upgraded,
Disconnected
ConnEventKind* {.pure.} = enum
Connected, # A connection was made and securely upgraded - there may be
# more than one concurrent connection thus more than one upgrade
# event per peer.
Disconnected # Peer disconnected - this event is fired once per upgrade
# when the associated connection is terminated.
Hook* = proc(peer: PeerInfo, cycle: Lifecycle): Future[void] {.gcsafe.}
ConnEvent* = object
case kind*: ConnEventKind
of ConnEventKind.Connected:
incoming*: bool
else:
discard
ConnEventHandler* =
proc(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.}
Switch* = ref object of RootObj
peerInfo*: PeerInfo
@ -69,48 +79,49 @@ type
secureManagers*: seq[Secure]
pubSub*: Option[PubSub]
running: bool
dialLock: Table[string, AsyncLock]
hooks: Table[Lifecycle, HashSet[Hook]]
dialLock: Table[PeerID, AsyncLock]
ConnEvents: Table[ConnEventKind, HashSet[ConnEventHandler]]
pubsubMonitors: Table[PeerId, Future[void]]
# gossip 1.1 related
maintaining: HashSet[PeerInfo]
maintainFut: Future[void]
maintainSleepFut: Future[void]
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!")
proc addHook*(s: Switch, hook: Hook, cycle: Lifecycle) =
s.hooks.mgetOrPut(cycle, initHashSet[Hook]()).incl(hook)
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) =
## Add peer event handler - handlers must not raise exceptions!
if isNil(handler): return
s.ConnEvents.mgetOrPut(kind, initHashSet[ConnEventHandler]()).incl(handler)
proc removeHook*(s: Switch, hook: Hook, cycle: Lifecycle) =
s.hooks.mgetOrPut(cycle, initHashSet[Hook]()).excl(hook)
proc removeConnEventHandler*(s: Switch,
handler: ConnEventHandler, kind: ConnEventKind) =
s.ConnEvents.withValue(kind, handlers) do:
handlers[].excl(handler)
proc triggerHooks(s: Switch, peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} =
proc triggerConnEvent(s: Switch, peerId: PeerID, event: ConnEvent) {.async, gcsafe.} =
try:
if cycle in s.hooks:
var hooks: seq[Future[void]]
for h in s.hooks[cycle]:
if not(isNil(h)):
hooks.add(h(peer, cycle))
if event.kind in s.ConnEvents:
var ConnEvents: seq[Future[void]]
for h in s.ConnEvents[event.kind]:
ConnEvents.add(h(peerId, event))
checkFutures(await allFinished(hooks))
checkFutures(await allFinished(ConnEvents))
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in trigger hooks", exc = exc.msg
except CatchableError as exc: # handlers should not raise!
warn "exception in trigger ConnEvents", exc = exc.msg
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc disconnect*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeer*(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.}
proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
try:
await conn.closeEvent.wait()
trace "about to cleanup pubsub peer"
if s.pubSub.isSome:
let fut = s.pubsubMonitors.getOrDefault(conn.peerInfo.peerId)
if not(isNil(fut)) and not(fut.finished):
await fut.cancelAndWait()
fut.cancel()
await s.pubSub.get().unsubscribePeer(conn.peerInfo)
except CancelledError as exc:
@ -118,12 +129,12 @@ proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} =
except CatchableError as exc:
trace "exception cleaning pubsub peer", exc = exc.msg
proc isConnected*(s: Switch, peer: PeerInfo): bool =
proc isConnected*(s: Switch, peerId: PeerID): bool =
## returns true if the peer has one or more
## associated connections (sockets)
##
peer.peerId in s.connManager
peerId in s.connManager
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
if s.secureManagers.len <= 0:
@ -173,12 +184,11 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
## mux incoming connection
trace "muxing connection", peer = $conn
let muxers = toSeq(s.muxers.keys)
if muxers.len == 0:
if s.muxers.len == 0:
warn "no muxers registered, skipping upgrade flow"
return
let muxerName = await s.ms.select(conn, muxers)
let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys()))
if muxerName.len == 0 or muxerName == "na":
debug "no muxer available, early exit", peer = $conn
return
@ -194,7 +204,6 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
# new stream for identify
var stream = await muxer.newStream()
var handlerFut: Future[void]
defer:
if not(isNil(stream)):
@ -202,7 +211,7 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
# call muxer handler, this should
# not end until muxer ends
handlerFut = muxer.handle()
let handlerFut = muxer.handle()
# do identify first, so that we have a
# PeerInfo in case we didn't before
@ -217,8 +226,8 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "adding muxer for peer", peer = conn.peerInfo.id
s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
await s.connManager.dropPeer(peer)
proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
s.connManager.dropPeer(peerId)
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
logScope:
@ -237,12 +246,12 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
raise newException(CatchableError,
"unable to identify connection, stopping upgrade")
trace "succesfully upgraded outgoing connection", oid = sconn.oid
trace "successfully upgraded outgoing connection", oid = sconn.oid
return sconn
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection", conn = $conn, oid = conn.oid
trace "upgrading incoming connection", conn = $conn, oid = $conn.oid
let ms = newMultistream()
# secure incoming connections
@ -251,7 +260,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
{.async, gcsafe, closure.} =
var sconn: Connection
trace "Securing connection", oid = conn.oid
trace "Securing connection", oid = $conn.oid
let secure = s.secureManagers.filterIt(it.codec == proto)[0]
try:
@ -266,7 +275,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
for muxer in s.muxers.values:
ms.addHandler(muxer.codecs, muxer)
# handle subsequent requests
# handle subsequent secure requests
await ms.handle(sconn)
except CancelledError as exc:
@ -279,105 +288,113 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
for k in s.secureManagers:
ms.addHandler(k.codec, securedHandler)
# handle secured connections
await ms.handle(conn)
# handle un-secured connections
# we handshaked above, set this ms handler as active
await ms.handle(conn, active = true)
proc internalConnect(s: Switch,
peer: PeerInfo): Future[Connection] {.async.} =
peerId: PeerID,
addrs: seq[MultiAddress]): Future[Connection] {.async.} =
logScope: peer = peerId
if s.peerInfo.peerId == peer.peerId:
if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!")
let id = peer.id
var conn: Connection
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
# Ensure there's only one in-flight attempt per peer
let lock = s.dialLock.mgetOrPut(peerId, newAsyncLock())
try:
await lock.acquire()
trace "about to dial peer", peer = id
conn = s.connManager.selectConn(peer)
if conn.isNil or (conn.closed or conn.atEof):
trace "Dialing peer", peer = id
for t in s.transports: # for each transport
for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a, peer = id
try:
conn = await t.dial(a)
# make sure to assign the peer to the connection
conn.peerInfo = peer
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerHooks(
conn.peerInfo,
Lifecycle.Disconnected)
# Check if we have a connection already and try to reuse it
conn = s.connManager.selectConn(peerId)
if conn != nil:
if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager"
await conn.close()
raise newException(CatchableError, "Zombie connection encountered")
asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected)
libp2p_dialed_peers.inc()
trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir
return conn
trace "Dialing peer"
for t in s.transports: # for each transport
for a in addrs: # for each address
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a
let dialed = try:
await t.dial(a)
except CancelledError as exc:
trace "dialing canceled", exc = exc.msg
raise
raise exc
except CatchableError as exc:
trace "dialing failed", exc = exc.msg
libp2p_failed_dials.inc()
continue
continue # Try the next address
try:
let uconn = await s.upgradeOutgoing(conn)
s.connManager.storeOutgoing(uconn)
asyncCheck s.triggerHooks(uconn.peerInfo, Lifecycle.Upgraded)
conn = uconn
trace "dial succesfull", oid = $conn.oid, peer = $conn.peerInfo
# make sure to assign the peer to the connection
dialed.peerInfo = PeerInfo.init(peerId, addrs)
libp2p_dialed_peers.inc()
let upgraded = try:
await s.upgradeOutgoing(dialed)
except CatchableError as exc:
if not(isNil(conn)):
await conn.close()
trace "Unable to establish outgoing link", exc = exc.msg
# If we failed to establish the connection through one transport,
# we won't succeeed through another - no use in trying again
await dialed.close()
debug "upgrade failed", exc = exc.msg
if exc isnot CancelledError:
libp2p_failed_upgrade.inc()
raise exc
if isNil(conn):
libp2p_failed_upgrade.inc()
continue
break
else:
trace "Reusing existing connection", oid = $conn.oid,
direction = $conn.dir,
peer = $conn.peerInfo
doAssert not isNil(upgraded), "checked in upgradeOutgoing"
s.connManager.storeOutgoing(upgraded)
conn = upgraded
trace "dial successful",
oid = $conn.oid,
peerInfo = shortLog(upgraded.peerInfo)
break
finally:
if lock.locked():
lock.release()
if isNil(conn):
raise newException(CatchableError,
"Unable to establish outgoing link")
if isNil(conn): # None of the addresses connected
raise newException(CatchableError, "Unable to establish outgoing link")
if conn.closed or conn.atEof:
await conn.close()
raise newException(CatchableError,
"Connection dead on arrival")
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
doAssert(conn in s.connManager, "connection not tracked!")
await s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false))
trace "dial succesfull", oid = $conn.oid,
peer = $conn.peerInfo
if conn.closed():
# This can happen if one of the peer event handlers deems the peer
# unworthy and disconnects it
raise newException(CatchableError, "Connection closed during handshake")
await s.subscribePeer(peer)
asyncCheck s.cleanupPubSubPeer(conn)
asyncCheck s.subscribePeer(peerId)
trace "got connection", oid = $conn.oid,
direction = $conn.dir,
peer = $conn.peerInfo
return conn
proc connect*(s: Switch, peer: PeerInfo) {.async.} =
discard await s.internalConnect(peer)
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
discard await s.internalConnect(peerId, addrs)
proc dial*(s: Switch,
peer: PeerInfo,
peerId: PeerID,
addrs: seq[MultiAddress],
proto: string):
Future[Connection] {.async.} =
let conn = await s.internalConnect(peer)
let conn = await s.internalConnect(peerId, addrs)
let stream = await s.connManager.getMuxedStream(conn)
proc cleanup() {.async.} =
@ -392,10 +409,12 @@ proc dial*(s: Switch,
await conn.close()
raise newException(CatchableError, "Couldn't get muxed stream")
trace "Attempting to select remote", proto = proto, oid = conn.oid
trace "Attempting to select remote", proto = proto,
streamOid = $stream.oid,
oid = $conn.oid
if not await s.ms.select(stream, proto):
await stream.close()
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
raise newException(CatchableError, "Unable to select sub-protocol" & proto)
return stream
except CancelledError as exc:
@ -418,20 +437,11 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
s.ms.addHandler(proto.codecs, proto)
proc maintainPeers(s: Switch) {.async, gcsafe.}
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
trace "starting switch for peer", peerInfo = shortLog(s.peerInfo)
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try:
conn.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerHooks(
conn.peerInfo,
Lifecycle.Disconnected)
asyncCheck s.triggerHooks(conn.peerInfo, Lifecycle.Connected)
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
except CancelledError as exc:
raise exc
@ -450,26 +460,17 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
if s.pubSub.isSome:
await s.pubSub.get().start()
s.maintainFut = maintainPeers(s)
info "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
debug "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} =
trace "stopping switch"
s.running = false
# we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up
if s.pubSub.isSome:
# Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs)
if not isNil(s.maintainSleepFut):
s.maintainSleepFut.cancel()
if not isNil(s.maintainFut):
await s.maintainFut
if s.pubSub.isSome:
await s.pubSub.get().stop()
# close and cleanup all connections
@ -485,37 +486,25 @@ proc stop*(s: Switch) {.async.} =
trace "switch stopped"
proc maintainPeers(s: Switch) {.async.} =
while s.running:
for peer in s.maintaining:
tryAndWarn "explicit peer maintain":
if not s.connManager.contains(peer.peerId):
# attempt re-connect in this case
trace "explicit peering, trying to re-connect", peer = peer.id
await s.connect(peer)
s.maintainSleepFut = sleepAsync(5.minutes) # spec recommended
await s.maintainSleepFut # do this in order to cancel it
proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
proc subscribePeerInternal(s: Switch, peerId: PeerID) {.async, gcsafe.} =
## Subscribe to pub sub peer
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)):
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
##
if s.pubSub.isSome and not s.pubSub.get().connected(peerId):
trace "about to subscribe to pubsub peer", peer = peerId
var stream: Connection
try:
stream = await s.connManager.getMuxedStream(peerInfo)
stream = await s.connManager.getMuxedStream(peerId)
if isNil(stream):
trace "unable to subscribe to peer", peer = peerInfo.shortLog
trace "unable to subscribe to peer", peer = peerId
return
if not await s.ms.select(stream, s.pubSub.get().codec):
if not(isNil(stream)):
trace "couldn't select pubsub", codec = s.pubSub.get().codec
await stream.close()
return
if peerInfo.maintain:
s.maintaining.incl(peerInfo)
s.pubSub.get().subscribePeer(stream)
await stream.closeEvent.wait()
except CancelledError as exc:
@ -524,39 +513,51 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
raise exc
except CatchableError as exc:
trace "exception in subscribe to peer", peer = peerInfo.shortLog,
trace "exception in subscribe to peer", peer = peerId,
exc = exc.msg
if not(isNil(stream)):
await stream.close()
proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} =
proc pubsubMonitor(s: Switch, peerId: PeerID) {.async.} =
## while peer connected maintain a
## pubsub connection as well
##
var tries = 0
var backoffFactor = 5 # up to ~10 mins
var backoff = 1.seconds
while switch.isConnected(peer) and
tries < MaxPubsubReconnectAttempts:
while s.isConnected(peerId):
try:
debug "subscribing to pubsub peer", peer = $peer
await switch.subscribePeerInternal(peer)
trace "subscribing to pubsub peer", peer = peerId
await s.subscribePeerInternal(peerId)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = $peer, exc = exc.msg
trace "exception in pubsub monitor", peer = peerId, exc = exc.msg
finally:
debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries
await sleepAsync(backoff) # allow the peer to cooldown
backoff = backoff * backoffFactor
tries.inc()
trace "sleeping before trying pubsub peer", peer = peerId
await sleepAsync(1.seconds) # allow the peer to cooldown
trace "exiting pubsub monitor", peer = $peer
trace "exiting pubsub monitor", peer = peerId
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
if peerInfo.peerId notin s.pubsubMonitors:
s.pubsubMonitors[peerInfo.peerId] = s.pubsubMonitor(peerInfo)
proc subscribePeer*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
## Waits until ``server`` is not closed.
##
var retFuture = newFuture[void]("stream.transport.server.join")
let pubsubFut = s.pubsubMonitors.mgetOrPut(
peerId, s.pubsubMonitor(peerId))
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
proc cancel(udata: pointer) {.gcsafe.} =
pubsubFut.removeCallback(continuation, cast[pointer](retFuture))
if not(pubsubFut.finished()):
pubsubFut.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
proc subscribe*(s: Switch, topic: string,
handler: TopicHandler) {.async.} =
@ -584,14 +585,17 @@ proc unsubscribeAll*(s: Switch, topic: string) {.async.} =
await s.pubSub.get().unsubscribeAll(topic)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[int] {.async.} =
proc publish*(s: Switch,
topic: string,
data: seq[byte],
timeout: Duration = InfiniteDuration): Future[int] {.async.} =
## pubslish to pubsub topic
##
if s.pubSub.isNone:
raise newNoPubSubException()
return await s.pubSub.get().publish(topic, data)
return await s.pubSub.get().publish(topic, data, timeout)
proc addValidator*(s: Switch,
topics: varargs[string],
@ -629,7 +633,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
await muxer.close()
return
muxer.connection.peerInfo = stream.peerInfo
let
peerInfo = stream.peerInfo
peerId = peerInfo.peerId
muxer.connection.peerInfo = peerInfo
# store incoming connection
s.connManager.storeIncoming(muxer.connection)
@ -637,12 +644,19 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
# store muxer and muxed connection
s.connManager.storeMuxer(muxer)
trace "got new muxer", peer = $muxer.connection.peerInfo
asyncCheck s.triggerHooks(muxer.connection.peerInfo, Lifecycle.Upgraded)
trace "got new muxer", peer = shortLog(peerInfo)
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true))
# try establishing a pubsub connection
await s.subscribePeer(muxer.connection.peerInfo)
asyncCheck s.cleanupPubSubPeer(muxer.connection)
asyncCheck s.subscribePeer(peerId)
except CancelledError as exc:
await muxer.close()
@ -669,7 +683,6 @@ proc newSwitch*(peerInfo: PeerInfo,
identity: identity,
muxers: muxers,
secureManagers: @secureManagers,
maintaining: initHashSet[PeerInfo]()
)
let s = result # can't capture result
@ -694,3 +707,21 @@ proc newSwitch*(peerInfo: PeerInfo,
if pubSub.isSome:
result.pubSub = pubSub
result.mount(pubSub.get())
proc isConnected*(s: Switch, peerInfo: PeerInfo): bool {.deprecated: "Use PeerID version".} =
not isNil(peerInfo) and isConnected(s, peerInfo.peerId)
proc disconnect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
disconnect(s, peerInfo.peerId)
proc connect*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version".} =
connect(s, peerInfo.peerId, peerInfo.addrs)
proc dial*(s: Switch,
peerInfo: PeerInfo,
proto: string):
Future[Connection] {.deprecated: "Use PeerID version".} =
dial(s, peerInfo.peerId, peerInfo.addrs, proto)
proc subscribePeer*(s: Switch, peerInfo: PeerInfo): Future[void] {.deprecated: "Use PeerID version", gcsafe.} =
subscribePeer(s, peerInfo.peerId)

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import oids
import chronos, chronicles, sequtils
import transport,
../errors,
@ -62,7 +63,7 @@ proc connHandler*(t: TcpTransport,
client: StreamTransport,
initiator: bool): Connection =
trace "handling connection", address = $client.remoteAddress
let conn: Connection = Connection(newChronosStream(client))
let conn: Connection = Connection(ChronosStream.init(client))
conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet()
if not initiator:
if not isNil(t.handler):
@ -71,10 +72,12 @@ proc connHandler*(t: TcpTransport,
proc cleanup() {.async.} =
try:
await client.join()
trace "cleaning up client", addrs = $client.remoteAddress, connoid = conn.oid
trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid
if not(isNil(conn)):
await conn.close()
t.clients.keepItIf(it != client)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error cleaning up client", exc = exc.msg
@ -138,6 +141,8 @@ method close*(t: TcpTransport) {.async, gcsafe.} =
trace "transport stopped"
inc getTcpTransportTracker().closed
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error shutting down tcp transport", exc = exc.msg

View File

@ -15,7 +15,9 @@ import utils,
../../libp2p/[errors,
switch,
stream/connection,
stream/bufferstream,
crypto/crypto,
protocols/pubsub/pubsubpeer,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages,
@ -218,6 +220,45 @@ suite "FloodSub":
check:
waitFor(runTests()) == true
test "FloodSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
,size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "FloodSub multiple peers, no self trigger":
proc runTests(): Future[bool] {.async.} =
var runs = 10

View File

@ -16,8 +16,10 @@ import utils, ../../libp2p/[errors,
peerid,
peerinfo,
stream/connection,
stream/bufferstream,
crypto/crypto,
protocols/pubsub/pubsub,
protocols/pubsub/pubsubpeer,
protocols/pubsub/gossipsub,
protocols/pubsub/peertable,
protocols/pubsub/rpc/messages]
@ -240,6 +242,45 @@ suite "GossipSub":
check:
waitFor(runTests()) == true
test "GossipSub publish should fail on timeout":
proc runTests(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
var nodes = generateNodes(2, gossip = true)
var awaiters: seq[Future[void]]
awaiters.add((await nodes[0].start()))
awaiters.add((await nodes[1].start()))
let subscribes = await subscribeNodes(nodes)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
let pubsub = nodes[0].pubSub.get()
let peer = pubsub.peers[nodes[1].peerInfo.id]
peer.conn = Connection(newBufferStream(
proc (data: seq[byte]) {.async, gcsafe.} =
await sleepAsync(10.seconds)
, size = 0))
let in10millis = Moment.fromNow(10.millis)
let sent = await nodes[0].publish("foobar", "Hello!".toBytes(), 10.millis)
check Moment.now() >= in10millis
check sent == 0
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
await allFuturesThrowing(subscribes)
await allFuturesThrowing(awaiters)
result = true
check:
waitFor(runTests()) == true
test "e2e - GossipSub should add remote peer topic subscriptions":
proc testBasicGossipSub(): Future[bool] {.async.} =
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =

View File

@ -485,23 +485,21 @@ suite "Key interface test suite":
test "Curve25519":
# from bearssl test_crypto.c
var
res: Curve25519Key
bearOp = fromHex("A546E36BF0527C9D3B16154B82465EDD62144C0AC1FC5A18506A2244BA449AC4")
bearIn = fromHex("E6DB6867583030DB3594C1A424B15F7C726624EC26B3353B10A903A6D0AB1C4C")
bearOut = fromHex("C3DA55379DE9C6908E94EA4DF28D084F32ECCF03491C71F754B4075577A28552")
bearOp = fromHex("A546E36BF0527C9D3B16154B82465EDD62144C0AC1FC5A18506A2244BA449AC4").intoCurve25519Key()
bearIn = fromHex("E6DB6867583030DB3594C1A424B15F7C726624EC26B3353B10A903A6D0AB1C4C").intoCurve25519Key()
bearOut = fromHex("C3DA55379DE9C6908E94EA4DF28D084F32ECCF03491C71F754B4075577A28552").intoCurve25519Key()
Curve25519.mul(res, bearIn.intoCurve25519Key, bearOp.intoCurve25519Key)
check res == bearOut
Curve25519.mul(bearIn, bearOp)
check bearIn == bearOut
# from https://github.com/golang/crypto/blob/1d94cc7ab1c630336ab82ccb9c9cda72a875c382/curve25519/vectors_test.go#L26
var
private1: Curve25519Key = [0x66.byte, 0x8f, 0xb9, 0xf7, 0x6a, 0xd9, 0x71, 0xc8, 0x1a, 0xc9, 0x0, 0x7, 0x1a, 0x15, 0x60, 0xbc, 0xe2, 0xca, 0x0, 0xca, 0xc7, 0xe6, 0x7a, 0xf9, 0x93, 0x48, 0x91, 0x37, 0x61, 0x43, 0x40, 0x14]
base: Curve25519Key = [0xdb.byte, 0x5f, 0x32, 0xb7, 0xf8, 0x41, 0xe7, 0xa1, 0xa0, 0x9, 0x68, 0xef, 0xfd, 0xed, 0x12, 0x73, 0x5f, 0xc4, 0x7a, 0x3e, 0xb1, 0x3b, 0x57, 0x9a, 0xac, 0xad, 0xea, 0xe8, 0x9, 0x39, 0xa7, 0xdd]
public1: Curve25519Key
public1Test: Curve25519Key = [0x9.byte, 0xd, 0x85, 0xe5, 0x99, 0xea, 0x8e, 0x2b, 0xee, 0xb6, 0x13, 0x4, 0xd3, 0x7b, 0xe1, 0xe, 0xc5, 0xc9, 0x5, 0xf9, 0x92, 0x7d, 0x32, 0xf4, 0x2a, 0x9a, 0xa, 0xfb, 0x3e, 0xb, 0x40, 0x74]
Curve25519.mul(public1, base, private1)
check public1.toHex == public1Test.toHex
Curve25519.mul(base, private1)
check base.toHex == public1Test.toHex
# RFC vectors
private1 = fromHex("a8abababababababababababababababababababababababababababababab6b").intoCurve25519Key
@ -517,10 +515,10 @@ suite "Key interface test suite":
check p2Pub.toHex == "DE9EDB7D7B7DC1B4D35B61C2ECE435373F8343C85B78674DADFC7E146F882B4F"
var
secret1: Curve25519Key
secret2: Curve25519Key
Curve25519.mul(secret1, p2Pub, private1)
Curve25519.mul(secret2, p1Pub, private2)
secret1 = p2Pub
secret2 = p1Pub
Curve25519.mul(secret1, private1)
Curve25519.mul(secret2, private2)
check secret1.toHex == secret2.toHex

View File

@ -135,7 +135,7 @@ suite "Mplex":
let
conn = newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} =
result = nil
discard
)
chann = LPChannel.init(1, conn, true)

View File

@ -54,6 +54,7 @@ method write*(s: TestSelectStream, msg: seq[byte]) {.async, gcsafe.} = discard
method close(s: TestSelectStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestSelectStream(): TestSelectStream =
new result
@ -104,6 +105,7 @@ method write*(s: TestLsStream, msg: seq[byte]) {.async, gcsafe.} =
method close(s: TestLsStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
new result
@ -157,6 +159,7 @@ method write*(s: TestNaStream, msg: seq[byte]) {.async, gcsafe.} =
method close(s: TestNaStream) {.async, gcsafe.} =
s.isClosed = true
s.isEof = true
proc newTestNaStream(na: NaHandler): TestNaStream =
new result
@ -234,6 +237,7 @@ suite "Multistream select":
let conn = newTestNaStream(testNaHandler)
proc testNaHandler(msg: string): Future[void] {.async, gcsafe.} =
echo msg
check msg == Na
await conn.close()

View File

@ -55,16 +55,3 @@ suite "PeerInfo":
test "Should return some if pubkey is present in id":
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519, rng[]).get()).get())
check peerInfo.publicKey.isSome
test "join() and isClosed() test":
proc testJoin(): Future[bool] {.async, gcsafe.} =
let peerInfo = PeerInfo.init(PeerID.init(PrivateKey.random(Ed25519, rng[]).get()).get())
check peerInfo.isClosed() == false
var joinFut = peerInfo.join()
check joinFut.finished() == false
peerInfo.close()
await wait(joinFut, 100.milliseconds)
check peerInfo.isClosed() == true
check (joinFut.finished() == true) and (joinFut.cancelled() == false)
result = true
check waitFor(testJoin()) == true

View File

@ -237,38 +237,26 @@ suite "Switch":
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Secio])
var step = 0
var cycles: set[Lifecycle]
proc hook(peer: PeerInfo, cycle: Lifecycle) {.async, gcsafe.} =
cycles = cycles + {cycle}
var kinds: set[ConnEventKind]
proc hook(peerId: PeerID, event: ConnEvent) {.async, gcsafe.} =
kinds = kinds + {event.kind}
case step:
of 0:
check cycle == Lifecycle.Connected
check if not(isNil(peer)):
peer.peerId == switch2.peerInfo.peerId
else:
true
check:
event.kind == ConnEventKind.Connected
peerId == switch2.peerInfo.peerId
of 1:
assert(isNil(peer) == false)
check:
cycle == Lifecycle.Upgraded
peer.peerId == switch2.peerInfo.peerId
of 2:
check:
cycle == Lifecycle.Disconnected
event.kind == ConnEventKind.Disconnected
check if not(isNil(peer)):
peer.peerId == switch2.peerInfo.peerId
else:
true
check peerId == switch2.peerInfo.peerId
else:
echo "unkown cycle! ", $cycle
check false
step.inc()
switch1.addHook(hook, Lifecycle.Connected)
switch1.addHook(hook, Lifecycle.Upgraded)
switch1.addHook(hook, Lifecycle.Disconnected)
switch1.addConnEventHandler(hook, ConnEventKind.Connected)
switch1.addConnEventHandler(hook, ConnEventKind.Disconnected)
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
@ -294,10 +282,9 @@ suite "Switch":
check connTracker.isLeaked() == false
check:
cycles == {
Lifecycle.Connected,
Lifecycle.Upgraded,
Lifecycle.Disconnected
kinds == {
ConnEventKind.Connected,
ConnEventKind.Disconnected
}
await allFuturesThrowing(