mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-01-11 21:44:24 +00:00
Merge branch 'master' into gossip-one-one
This commit is contained in:
commit
38422ff4cb
@ -13,13 +13,13 @@
|
||||
|
||||
import rsa, ecnist, ed25519/ed25519, secp, bearssl
|
||||
import ../protobuf/minprotobuf, ../vbuffer, ../multihash, ../multicodec
|
||||
import nimcrypto/[rijndael, blowfish, twofish, sha, sha2, hash, hmac, utils]
|
||||
import nimcrypto/[rijndael, blowfish, twofish, sha2, hash, hmac, utils]
|
||||
import ../utility
|
||||
import stew/results
|
||||
export results
|
||||
|
||||
# Export modules of types that are part of public API
|
||||
export rijndael, blowfish, twofish, sha, sha2, hash, hmac, utils
|
||||
# This is workaround for Nim's `import` bug
|
||||
export rijndael, blowfish, twofish, sha2, hash, hmac, utils
|
||||
|
||||
from strutils import split
|
||||
|
||||
@ -37,7 +37,6 @@ type
|
||||
Blowfish
|
||||
|
||||
DigestSheme* = enum
|
||||
Sha1,
|
||||
Sha256,
|
||||
Sha512
|
||||
|
||||
@ -184,7 +183,8 @@ proc getKey*(key: PrivateKey): CryptoResult[PublicKey] =
|
||||
else:
|
||||
err(KeyError)
|
||||
|
||||
proc toRawBytes*(key: PrivateKey | PublicKey, data: var openarray[byte]): CryptoResult[int] =
|
||||
proc toRawBytes*(key: PrivateKey | PublicKey,
|
||||
data: var openarray[byte]): CryptoResult[int] =
|
||||
## Serialize private key ``key`` (using scheme's own serialization) and store
|
||||
## it to ``data``.
|
||||
##
|
||||
@ -274,7 +274,7 @@ proc getBytes*(sig: Signature): seq[byte] =
|
||||
## Return signature ``sig`` in binary form.
|
||||
result = sig.data
|
||||
|
||||
proc init*(key: var PrivateKey, data: openarray[byte]): bool =
|
||||
proc init*[T: PrivateKey|PublicKey](key: var T, data: openarray[byte]): bool =
|
||||
## Initialize private key ``key`` from libp2p's protobuf serialized raw
|
||||
## binary form.
|
||||
##
|
||||
@ -287,54 +287,29 @@ proc init*(key: var PrivateKey, data: openarray[byte]): bool =
|
||||
if pb.getBytes(2, buffer) != 0:
|
||||
if cast[int8](id) in SupportedSchemesInt:
|
||||
var scheme = cast[PKScheme](cast[int8](id))
|
||||
var nkey = PrivateKey(scheme: scheme)
|
||||
if scheme == RSA:
|
||||
when key is PrivateKey:
|
||||
var nkey = PrivateKey(scheme: scheme)
|
||||
else:
|
||||
var nkey = PublicKey(scheme: scheme)
|
||||
case scheme:
|
||||
of PKScheme.RSA:
|
||||
if init(nkey.rsakey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == Ed25519:
|
||||
return true
|
||||
of PKScheme.Ed25519:
|
||||
if init(nkey.edkey, buffer):
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == ECDSA:
|
||||
return true
|
||||
of PKScheme.ECDSA:
|
||||
if init(nkey.eckey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == Secp256k1:
|
||||
return true
|
||||
of PKScheme.Secp256k1:
|
||||
if init(nkey.skkey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
|
||||
proc init*(key: var PublicKey, data: openarray[byte]): bool =
|
||||
## Initialize public key ``key`` from libp2p's protobuf serialized raw
|
||||
## binary form.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
var id: uint64
|
||||
var buffer: seq[byte]
|
||||
if len(data) > 0:
|
||||
var pb = initProtoBuffer(@data)
|
||||
if pb.getVarintValue(1, id) != 0:
|
||||
if pb.getBytes(2, buffer) != 0:
|
||||
if cast[int8](id) in SupportedSchemesInt:
|
||||
var scheme = cast[PKScheme](cast[int8](id))
|
||||
var nkey = PublicKey(scheme: scheme)
|
||||
if scheme == RSA:
|
||||
if init(nkey.rsakey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == Ed25519:
|
||||
if init(nkey.edkey, buffer):
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == ECDSA:
|
||||
if init(nkey.eckey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
elif scheme == Secp256k1:
|
||||
if init(nkey.skkey, buffer).isOk:
|
||||
key = nkey
|
||||
result = true
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc init*(sig: var Signature, data: openarray[byte]): bool =
|
||||
## Initialize signature ``sig`` from raw binary form.
|
||||
@ -344,18 +319,8 @@ proc init*(sig: var Signature, data: openarray[byte]): bool =
|
||||
sig.data = @data
|
||||
result = true
|
||||
|
||||
proc init*(key: var PrivateKey, data: string): bool =
|
||||
## Initialize private key ``key`` from libp2p's protobuf serialized
|
||||
## hexadecimal string representation.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
try:
|
||||
key.init(fromHex(data))
|
||||
except ValueError:
|
||||
false
|
||||
|
||||
proc init*(key: var PublicKey, data: string): bool =
|
||||
## Initialize public key ``key`` from libp2p's protobuf serialized
|
||||
proc init*[T: PrivateKey|PublicKey](key: var T, data: string): bool =
|
||||
## Initialize private/public key ``key`` from libp2p's protobuf serialized
|
||||
## hexadecimal string representation.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
@ -374,7 +339,8 @@ proc init*(sig: var Signature, data: string): bool =
|
||||
except ValueError:
|
||||
false
|
||||
|
||||
proc init*(t: typedesc[PrivateKey], data: openarray[byte]): CryptoResult[PrivateKey] =
|
||||
proc init*(t: typedesc[PrivateKey],
|
||||
data: openarray[byte]): CryptoResult[PrivateKey] =
|
||||
## Create new private key from libp2p's protobuf serialized binary form.
|
||||
var res: t
|
||||
if not res.init(data):
|
||||
@ -382,7 +348,8 @@ proc init*(t: typedesc[PrivateKey], data: openarray[byte]): CryptoResult[Private
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
proc init*(t: typedesc[PublicKey], data: openarray[byte]): CryptoResult[PublicKey] =
|
||||
proc init*(t: typedesc[PublicKey],
|
||||
data: openarray[byte]): CryptoResult[PublicKey] =
|
||||
## Create new public key from libp2p's protobuf serialized binary form.
|
||||
var res: t
|
||||
if not res.init(data):
|
||||
@ -390,7 +357,8 @@ proc init*(t: typedesc[PublicKey], data: openarray[byte]): CryptoResult[PublicKe
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
proc init*(t: typedesc[Signature], data: openarray[byte]): CryptoResult[Signature] =
|
||||
proc init*(t: typedesc[Signature],
|
||||
data: openarray[byte]): CryptoResult[Signature] =
|
||||
## Create new public key from libp2p's protobuf serialized binary form.
|
||||
var res: t
|
||||
if not res.init(data):
|
||||
@ -421,117 +389,93 @@ proc init*(t: typedesc[Signature], data: string): CryptoResult[Signature] =
|
||||
except ValueError:
|
||||
err(SigError)
|
||||
|
||||
proc `==`*(key1, key2: PublicKey): bool =
|
||||
proc `==`*(key1, key2: PublicKey): bool {.inline.} =
|
||||
## Return ``true`` if two public keys ``key1`` and ``key2`` of the same
|
||||
## scheme and equal.
|
||||
if key1.scheme == key2.scheme:
|
||||
if key1.scheme == RSA:
|
||||
result = (key1.rsakey == key2.rsakey)
|
||||
elif key1.scheme == Ed25519:
|
||||
result = (key1.edkey == key2.edkey)
|
||||
elif key1.scheme == ECDSA:
|
||||
result = (key1.eckey == key2.eckey)
|
||||
case key1.scheme
|
||||
of PKScheme.RSA:
|
||||
(key1.rsakey == key2.rsakey)
|
||||
of PKScheme.Ed25519:
|
||||
(key1.edkey == key2.edkey)
|
||||
of PKScheme.ECDSA:
|
||||
(key1.eckey == key2.eckey)
|
||||
of PKScheme.Secp256k1:
|
||||
(key1.skkey == key2.skkey)
|
||||
of PKScheme.NoSupport:
|
||||
false
|
||||
else:
|
||||
false
|
||||
|
||||
proc `==`*(key1, key2: PrivateKey): bool =
|
||||
## Return ``true`` if two private keys ``key1`` and ``key2`` of the same
|
||||
## scheme and equal.
|
||||
if key1.scheme == key2.scheme:
|
||||
if key1.scheme == RSA:
|
||||
result = (key1.rsakey == key2.rsakey)
|
||||
elif key1.scheme == Ed25519:
|
||||
result = (key1.edkey == key2.edkey)
|
||||
elif key1.scheme == ECDSA:
|
||||
result = (key1.eckey == key2.eckey)
|
||||
case key1.scheme
|
||||
of PKScheme.RSA:
|
||||
(key1.rsakey == key2.rsakey)
|
||||
of PKScheme.Ed25519:
|
||||
(key1.edkey == key2.edkey)
|
||||
of PKScheme.ECDSA:
|
||||
(key1.eckey == key2.eckey)
|
||||
of PKScheme.Secp256k1:
|
||||
(key1.skkey == key2.skkey)
|
||||
of PKScheme.NoSupport:
|
||||
false
|
||||
else:
|
||||
false
|
||||
|
||||
proc `$`*(key: PrivateKey): string =
|
||||
## Get string representation of private key ``key``.
|
||||
if key.scheme == RSA:
|
||||
result = $(key.rsakey)
|
||||
elif key.scheme == Ed25519:
|
||||
result = "Ed25519 key ("
|
||||
result.add($(key.edkey))
|
||||
result.add(")")
|
||||
elif key.scheme == ECDSA:
|
||||
result = "Secp256r1 key ("
|
||||
result.add($(key.eckey))
|
||||
result.add(")")
|
||||
elif key.scheme == Secp256k1:
|
||||
result = "Secp256k1 key ("
|
||||
result.add($(key.skkey))
|
||||
result.add(")")
|
||||
proc `$`*(key: PrivateKey|PublicKey): string =
|
||||
## Get string representation of private/public key ``key``.
|
||||
case key.scheme:
|
||||
of PKScheme.RSA:
|
||||
$(key.rsakey)
|
||||
of PKScheme.Ed25519:
|
||||
"ed25519 key (" & $key.edkey & ")"
|
||||
of PKScheme.ECDSA:
|
||||
"secp256r1 key (" & $key.eckey & ")"
|
||||
of PKScheme.Secp256k1:
|
||||
"secp256k1 key (" & $key.skkey & ")"
|
||||
of PKScheme.NoSupport:
|
||||
"not supported"
|
||||
|
||||
proc `$`*(key: PublicKey): string =
|
||||
## Get string representation of public key ``key``.
|
||||
if key.scheme == RSA:
|
||||
result = $(key.rsakey)
|
||||
elif key.scheme == Ed25519:
|
||||
result = "Ed25519 key ("
|
||||
result.add($(key.edkey))
|
||||
result.add(")")
|
||||
elif key.scheme == ECDSA:
|
||||
result = "Secp256r1 key ("
|
||||
result.add($(key.eckey))
|
||||
result.add(")")
|
||||
elif key.scheme == Secp256k1:
|
||||
result = "Secp256k1 key ("
|
||||
result.add($(key.skkey))
|
||||
result.add(")")
|
||||
|
||||
func shortLog*(key: PrivateKey): string =
|
||||
## Get string representation of private key ``key``.
|
||||
if key.scheme == RSA:
|
||||
result = ($key.rsakey).shortLog
|
||||
elif key.scheme == Ed25519:
|
||||
result = "Ed25519 key ("
|
||||
result.add(($key.edkey).shortLog)
|
||||
result.add(")")
|
||||
elif key.scheme == ECDSA:
|
||||
result = "Secp256r1 key ("
|
||||
result.add(($key.eckey).shortLog)
|
||||
result.add(")")
|
||||
elif key.scheme == Secp256k1:
|
||||
result = "Secp256k1 key ("
|
||||
result.add(($key.skkey).shortLog)
|
||||
result.add(")")
|
||||
|
||||
proc shortLog*(key: PublicKey): string =
|
||||
## Get string representation of public key ``key``.
|
||||
if key.scheme == RSA:
|
||||
result = ($key.rsakey).shortLog
|
||||
elif key.scheme == Ed25519:
|
||||
result = "Ed25519 key ("
|
||||
result.add(($key.edkey).shortLog)
|
||||
result.add(")")
|
||||
elif key.scheme == ECDSA:
|
||||
result = "Secp256r1 key ("
|
||||
result.add(($key.eckey).shortLog)
|
||||
result.add(")")
|
||||
elif key.scheme == Secp256k1:
|
||||
result = "Secp256k1 key ("
|
||||
result.add(($key.skkey).shortLog)
|
||||
result.add(")")
|
||||
func shortLog*(key: PrivateKey|PublicKey): string =
|
||||
## Get short string representation of private/public key ``key``.
|
||||
case key.scheme:
|
||||
of PKScheme.RSA:
|
||||
($key.rsakey).shortLog
|
||||
of PKScheme.Ed25519:
|
||||
"ed25519 key (" & ($key.edkey).shortLog & ")"
|
||||
of PKScheme.ECDSA:
|
||||
"secp256r1 key (" & ($key.eckey).shortLog & ")"
|
||||
of PKScheme.Secp256k1:
|
||||
"secp256k1 key (" & ($key.skkey).shortLog & ")"
|
||||
of PKScheme.NoSupport:
|
||||
"not supported"
|
||||
|
||||
proc `$`*(sig: Signature): string =
|
||||
## Get string representation of signature ``sig``.
|
||||
result = toHex(sig.data)
|
||||
|
||||
proc sign*(key: PrivateKey, data: openarray[byte]): CryptoResult[Signature] {.gcsafe.} =
|
||||
proc sign*(key: PrivateKey,
|
||||
data: openarray[byte]): CryptoResult[Signature] {.gcsafe.} =
|
||||
## Sign message ``data`` using private key ``key`` and return generated
|
||||
## signature in raw binary form.
|
||||
var res: Signature
|
||||
if key.scheme == RSA:
|
||||
case key.scheme:
|
||||
of PKScheme.RSA:
|
||||
let sig = ? key.rsakey.sign(data).orError(SigError)
|
||||
res.data = ? sig.getBytes().orError(SigError)
|
||||
ok(res)
|
||||
elif key.scheme == Ed25519:
|
||||
of PKScheme.Ed25519:
|
||||
let sig = key.edkey.sign(data)
|
||||
res.data = sig.getBytes()
|
||||
ok(res)
|
||||
elif key.scheme == ECDSA:
|
||||
of PKScheme.ECDSA:
|
||||
let sig = ? key.eckey.sign(data).orError(SigError)
|
||||
res.data = ? sig.getBytes().orError(SigError)
|
||||
ok(res)
|
||||
elif key.scheme == Secp256k1:
|
||||
of PKScheme.Secp256k1:
|
||||
let sig = key.skkey.sign(data)
|
||||
res.data = sig.getBytes()
|
||||
ok(res)
|
||||
@ -541,22 +485,33 @@ proc sign*(key: PrivateKey, data: openarray[byte]): CryptoResult[Signature] {.gc
|
||||
proc verify*(sig: Signature, message: openarray[byte], key: PublicKey): bool =
|
||||
## Verify signature ``sig`` using message ``message`` and public key ``key``.
|
||||
## Return ``true`` if message signature is valid.
|
||||
if key.scheme == RSA:
|
||||
case key.scheme:
|
||||
of PKScheme.RSA:
|
||||
var signature: RsaSignature
|
||||
if signature.init(sig.data).isOk:
|
||||
result = signature.verify(message, key.rsakey)
|
||||
elif key.scheme == Ed25519:
|
||||
signature.verify(message, key.rsakey)
|
||||
else:
|
||||
false
|
||||
of PKScheme.Ed25519:
|
||||
var signature: EdSignature
|
||||
if signature.init(sig.data):
|
||||
result = signature.verify(message, key.edkey)
|
||||
elif key.scheme == ECDSA:
|
||||
signature.verify(message, key.edkey)
|
||||
else:
|
||||
false
|
||||
of PKScheme.ECDSA:
|
||||
var signature: EcSignature
|
||||
if signature.init(sig.data).isOk:
|
||||
result = signature.verify(message, key.eckey)
|
||||
elif key.scheme == Secp256k1:
|
||||
signature.verify(message, key.eckey)
|
||||
else:
|
||||
false
|
||||
of PKScheme.Secp256k1:
|
||||
var signature: SkSignature
|
||||
if signature.init(sig.data).isOk:
|
||||
result = signature.verify(message, key.skkey)
|
||||
signature.verify(message, key.skkey)
|
||||
else:
|
||||
false
|
||||
else:
|
||||
false
|
||||
|
||||
template makeSecret(buffer, hmactype, secret, seed: untyped) {.dirty.}=
|
||||
var ctx: hmactype
|
||||
@ -609,8 +564,6 @@ proc stretchKeys*(cipherType: string, hashType: string,
|
||||
makeSecret(result.data, HMAC[sha256], sharedSecret, seed)
|
||||
elif hashType == "SHA512":
|
||||
makeSecret(result.data, HMAC[sha512], sharedSecret, seed)
|
||||
elif hashType == "SHA1":
|
||||
makeSecret(result.data, HMAC[sha1], sharedSecret, seed)
|
||||
|
||||
template goffset*(secret, id, o: untyped): untyped =
|
||||
id * (len(secret.data) shr 1) + o
|
||||
@ -802,23 +755,28 @@ proc decodeExchange*(message: seq[byte],
|
||||
|
||||
## Serialization/Deserialization helpers
|
||||
|
||||
proc write*(vb: var VBuffer, pubkey: PublicKey) {.inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
proc write*(vb: var VBuffer, pubkey: PublicKey) {.
|
||||
inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
## Write PublicKey value ``pubkey`` to buffer ``vb``.
|
||||
vb.writeSeq(pubkey.getBytes().tryGet())
|
||||
|
||||
proc write*(vb: var VBuffer, seckey: PrivateKey) {.inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
proc write*(vb: var VBuffer, seckey: PrivateKey) {.
|
||||
inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
## Write PrivateKey value ``seckey`` to buffer ``vb``.
|
||||
vb.writeSeq(seckey.getBytes().tryGet())
|
||||
|
||||
proc write*(vb: var VBuffer, sig: PrivateKey) {.inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
proc write*(vb: var VBuffer, sig: PrivateKey) {.
|
||||
inline, raises: [Defect, ResultError[CryptoError]].} =
|
||||
## Write Signature value ``sig`` to buffer ``vb``.
|
||||
vb.writeSeq(sig.getBytes().tryGet())
|
||||
|
||||
proc initProtoField*(index: int, pubkey: PublicKey): ProtoField {.raises: [Defect, ResultError[CryptoError]].} =
|
||||
proc initProtoField*(index: int, pubkey: PublicKey): ProtoField {.
|
||||
raises: [Defect, ResultError[CryptoError]].} =
|
||||
## Initialize ProtoField with PublicKey ``pubkey``.
|
||||
result = initProtoField(index, pubkey.getBytes().tryGet())
|
||||
|
||||
proc initProtoField*(index: int, seckey: PrivateKey): ProtoField {.raises: [Defect, ResultError[CryptoError]].} =
|
||||
proc initProtoField*(index: int, seckey: PrivateKey): ProtoField {.
|
||||
raises: [Defect, ResultError[CryptoError]].} =
|
||||
## Initialize ProtoField with PrivateKey ``seckey``.
|
||||
result = initProtoField(index, seckey.getBytes().tryGet())
|
||||
|
||||
|
@ -661,15 +661,16 @@ proc cmp(a: openarray[byte], b: openarray[byte]): bool =
|
||||
let blen = len(b)
|
||||
if alen == blen:
|
||||
if alen == 0:
|
||||
result = true
|
||||
true
|
||||
else:
|
||||
var n = alen
|
||||
var res, diff: int
|
||||
var res = 0
|
||||
while n > 0:
|
||||
dec(n)
|
||||
diff = int(a[n]) - int(b[n])
|
||||
res = (res and -not(diff)) or diff
|
||||
result = (res == 0)
|
||||
res = res or int(a[n] xor b[n])
|
||||
(res == 0)
|
||||
else:
|
||||
false
|
||||
|
||||
proc `==`*(a, b: RsaPrivateKey): bool =
|
||||
## Compare two RSA private keys for equality.
|
||||
|
@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped =
|
||||
try:
|
||||
body
|
||||
except CancelledError as exc:
|
||||
raise exc # TODO: why catch and re-raise?
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message
|
||||
trace "Exception details", exc = exc.msg
|
||||
|
@ -7,7 +7,7 @@
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import options, sequtils
|
||||
import options, sequtils, hashes
|
||||
import chronos, chronicles
|
||||
import peerid, multiaddress, crypto/crypto
|
||||
|
||||
@ -43,6 +43,8 @@ type
|
||||
# https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements
|
||||
maintain*: bool
|
||||
|
||||
proc hash*(i: PeerInfo): Hash = cast[int](i).hash # cast ptr to int and hash
|
||||
|
||||
proc id*(p: PeerInfo): string =
|
||||
if not(isNil(p)):
|
||||
return p.peerId.pretty()
|
||||
|
@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub,
|
||||
# unsubscribe the peer from the topic
|
||||
f.floodsub[topic].excl(peerId)
|
||||
|
||||
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
|
||||
await procCall PubSub(f).handleDisconnect(peer)
|
||||
|
||||
method handleDisconnect*(f: FloodSub, peer: PubSubPeer) =
|
||||
## handle peer disconnects
|
||||
for t in toSeq(f.floodsub.keys):
|
||||
if t in f.floodsub:
|
||||
f.floodsub[t].excl(peer.id)
|
||||
|
||||
procCall PubSub(f).handleDisconnect(peer)
|
||||
|
||||
method rpcHandler*(f: FloodSub,
|
||||
peer: PubSubPeer,
|
||||
rpcMsgs: seq[RPCMsg]) {.async.} =
|
||||
@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub,
|
||||
trace "calling handler for message", topicId = t,
|
||||
localPeer = f.peerInfo.id,
|
||||
fromPeer = msg.fromPeer.pretty
|
||||
await h(t, msg.data) # trigger user provided handler
|
||||
|
||||
try:
|
||||
await h(t, msg.data) # trigger user provided handler
|
||||
except CatchableError as exc:
|
||||
trace "exception in message handler", exc = exc.msg
|
||||
|
||||
# forward the message to all peers interested in it
|
||||
var sent: seq[Future[void]]
|
||||
# start the future but do not wait yet
|
||||
for p in toSendPeers:
|
||||
if p in f.peers and f.peers[p].id != peer.id:
|
||||
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
|
||||
let (published, failed) = await f.sendHelper(toSendPeers, m.messages)
|
||||
for p in failed:
|
||||
let peer = f.peers.getOrDefault(p)
|
||||
if not(isNil(peer)):
|
||||
f.handleDisconnect(peer) # cleanup failed peers
|
||||
|
||||
# wait for all the futures now
|
||||
sent = await allFinished(sent)
|
||||
checkFutures(sent)
|
||||
trace "forwared message to peers", peers = published.len
|
||||
|
||||
method init*(f: FloodSub) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
@ -111,9 +113,9 @@ method init*(f: FloodSub) =
|
||||
f.handler = handler
|
||||
f.codec = FloodSubCodec
|
||||
|
||||
method subscribeToPeer*(p: FloodSub,
|
||||
conn: Connection) {.async.} =
|
||||
await procCall PubSub(p).subscribeToPeer(conn)
|
||||
method subscribePeer*(p: FloodSub,
|
||||
conn: Connection) =
|
||||
procCall PubSub(p).subscribePeer(conn)
|
||||
asyncCheck p.handleConn(conn, FloodSubCodec)
|
||||
|
||||
method publish*(f: FloodSub,
|
||||
@ -132,20 +134,17 @@ method publish*(f: FloodSub,
|
||||
|
||||
trace "publishing on topic", name = topic
|
||||
let msg = Message.init(f.peerInfo, data, topic, f.sign)
|
||||
var sent: seq[Future[void]]
|
||||
# start the future but do not wait yet
|
||||
for p in f.floodsub.getOrDefault(topic):
|
||||
if p in f.peers:
|
||||
trace "publishing message", name = topic, peer = p, data = data.shortLog
|
||||
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
|
||||
|
||||
# wait for all the futures now
|
||||
sent = await allFinished(sent)
|
||||
checkFutures(sent)
|
||||
let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
|
||||
for p in failed:
|
||||
let peer = f.peers.getOrDefault(p)
|
||||
f.handleDisconnect(peer) # cleanup failed peers
|
||||
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
||||
return sent.filterIt(not it.failed).len
|
||||
trace "published message to peers", peers = published.len,
|
||||
msg = msg.shortLog()
|
||||
return published.len
|
||||
|
||||
method unsubscribe*(f: FloodSub,
|
||||
topics: seq[TopicPair]) {.async.} =
|
||||
|
@ -56,9 +56,17 @@ type
|
||||
heartbeatRunning: bool
|
||||
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"])
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
|
||||
"gossipsub peers per topic in mesh",
|
||||
labels = ["topic"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_fanout,
|
||||
"gossipsub peers per topic in fanout",
|
||||
labels = ["topic"])
|
||||
|
||||
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
|
||||
"gossipsub peers per topic in gossipsub",
|
||||
labels = ["topic"])
|
||||
|
||||
method init*(g: GossipSub) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
@ -80,16 +88,54 @@ proc replenishFanout(g: GossipSub, topic: string) =
|
||||
|
||||
if g.fanout.getOrDefault(topic).len < GossipSubDLo:
|
||||
trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len
|
||||
if topic in g.gossipsub:
|
||||
if topic in toSeq(g.gossipsub.keys):
|
||||
for p in g.gossipsub.getOrDefault(topic):
|
||||
if not g.fanout[topic].containsOrIncl(p):
|
||||
if g.fanout.getOrDefault(topic).len == GossipSubD:
|
||||
break
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||
.set(g.fanout.getOrDefault(topic).len.int64,
|
||||
labelValues = [topic])
|
||||
|
||||
trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len
|
||||
|
||||
template moveToMeshHelper(g: GossipSub,
|
||||
topic: string,
|
||||
table: Table[string, HashSet[string]]) =
|
||||
## move peers from `table` into `mesh`
|
||||
##
|
||||
var peerIds = toSeq(table.getOrDefault(topic))
|
||||
|
||||
logScope:
|
||||
topic = topic
|
||||
meshPeers = g.mesh.getOrDefault(topic).len
|
||||
peers = peerIds.len
|
||||
|
||||
shuffle(peerIds)
|
||||
for id in peerIds:
|
||||
if g.mesh.getOrDefault(topic).len > GossipSubD:
|
||||
break
|
||||
|
||||
trace "gathering peers for mesh"
|
||||
if topic notin table:
|
||||
continue
|
||||
|
||||
trace "getting peers", topic,
|
||||
peers = peerIds.len
|
||||
|
||||
table[topic].excl(id) # always exclude
|
||||
if id in g.mesh[topic]:
|
||||
continue # we already have this peer in the mesh, try again
|
||||
|
||||
if id in g.peers:
|
||||
let p = g.peers[id]
|
||||
if p.connected:
|
||||
# send a graft message to the peer
|
||||
await p.sendGraft(@[topic])
|
||||
g.mesh[topic].incl(id)
|
||||
trace "got peer", peer = id
|
||||
|
||||
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||
try:
|
||||
trace "about to rebalance mesh"
|
||||
@ -97,47 +143,43 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||
if topic notin g.mesh:
|
||||
g.mesh[topic] = initHashSet[string]()
|
||||
|
||||
# https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#mesh-maintenance
|
||||
if g.mesh.getOrDefault(topic).len < GossipSubDlo and topic in g.topics:
|
||||
var availPeers = toSeq(g.gossipsub.getOrDefault(topic))
|
||||
shuffle(availPeers)
|
||||
if availPeers.len > GossipSubD:
|
||||
availPeers = availPeers[0..<GossipSubD]
|
||||
if g.mesh.getOrDefault(topic).len < GossipSubDlo:
|
||||
trace "replenishing mesh", topic
|
||||
# replenish the mesh if we're below GossipSubDlo
|
||||
|
||||
trace "gathering more mesh peers", current = g.mesh.getOrDefault(topic).len, avail = availPeers.len
|
||||
# move fanout nodes first
|
||||
g.moveToMeshHelper(topic, g.fanout)
|
||||
|
||||
for id in availPeers:
|
||||
if id in g.mesh[topic]:
|
||||
continue # we already have this peer in the mesh, try again
|
||||
# move gossipsub nodes second
|
||||
g.moveToMeshHelper(topic, g.gossipsub)
|
||||
|
||||
trace "got gossipsub peer", peer = id
|
||||
|
||||
g.mesh[topic].incl(id)
|
||||
if id in g.peers:
|
||||
let p = g.peers[id]
|
||||
# send a graft message to the peer
|
||||
await p.sendGraft(@[topic])
|
||||
|
||||
# prune peers if we've gone over
|
||||
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
|
||||
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
|
||||
while g.mesh.getOrDefault(topic).len > GossipSubD:
|
||||
trace "pruning peers", peers = g.mesh[topic].len
|
||||
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
||||
g.mesh[topic].excl(id)
|
||||
# prune peers if we've gone over
|
||||
var mesh = toSeq(g.mesh.getOrDefault(topic))
|
||||
shuffle(mesh)
|
||||
|
||||
trace "about to prune mesh", mesh = mesh.len
|
||||
for id in mesh:
|
||||
if g.mesh.getOrDefault(topic).len <= GossipSubD:
|
||||
break
|
||||
|
||||
trace "pruning peers", peers = g.mesh[topic].len
|
||||
let p = g.peers[id]
|
||||
# send a graft message to the peer
|
||||
await p.sendPrune(@[topic])
|
||||
g.mesh[topic].excl(id)
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||
.set(g.gossipsub.getOrDefault(topic).len.int64,
|
||||
labelValues = [topic])
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||
.set(g.fanout.getOrDefault(topic).len.int64,
|
||||
labelValues = [topic])
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
.set(g.mesh.getOrDefault(topic).len.int64, labelValues = [topic])
|
||||
.set(g.mesh.getOrDefault(topic).len.int64,
|
||||
labelValues = [topic])
|
||||
|
||||
trace "mesh balanced, got peers", peers = g.mesh.getOrDefault(topic).len,
|
||||
topicId = topic
|
||||
@ -164,77 +206,88 @@ proc dropFanoutPeers(g: GossipSub) {.async.} =
|
||||
|
||||
proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
|
||||
## gossip iHave messages to peers
|
||||
##
|
||||
|
||||
trace "getting gossip peers (iHave)"
|
||||
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
|
||||
let controlMsg = ControlMessage()
|
||||
for topic in topics:
|
||||
let mesh: HashSet[string] = g.mesh.getOrDefault(topic)
|
||||
let fanout: HashSet[string] = g.fanout.getOrDefault(topic)
|
||||
var allPeers = toSeq(g.gossipsub.getOrDefault(topic))
|
||||
shuffle(allPeers)
|
||||
|
||||
let mesh = g.mesh.getOrDefault(topic)
|
||||
let fanout = g.fanout.getOrDefault(topic)
|
||||
|
||||
let gossipPeers = mesh + fanout
|
||||
let mids = g.mcache.window(topic)
|
||||
if mids.len > 0:
|
||||
let ihave = ControlIHave(topicID: topic,
|
||||
messageIDs: toSeq(mids))
|
||||
if mids.len <= 0:
|
||||
continue
|
||||
|
||||
if topic notin g.gossipsub:
|
||||
trace "topic not in gossip array, skipping", topicID = topic
|
||||
let ihave = ControlIHave(topicID: topic,
|
||||
messageIDs: toSeq(mids))
|
||||
|
||||
if topic notin g.gossipsub:
|
||||
trace "topic not in gossip array, skipping", topicID = topic
|
||||
continue
|
||||
|
||||
for id in allPeers:
|
||||
if result.len >= GossipSubD:
|
||||
trace "got gossip peers", peers = result.len
|
||||
break
|
||||
|
||||
if allPeers.len == 0:
|
||||
trace "no peers for topic, skipping", topicID = topic
|
||||
break
|
||||
|
||||
if id in gossipPeers:
|
||||
continue
|
||||
|
||||
var extraPeers = toSeq(g.gossipsub[topic])
|
||||
shuffle(extraPeers)
|
||||
for peer in extraPeers:
|
||||
if result.len < GossipSubD and
|
||||
peer notin gossipPeers and
|
||||
peer notin result:
|
||||
result[peer] = ControlMessage(ihave: @[ihave])
|
||||
|
||||
if id notin result:
|
||||
result[id] = controlMsg
|
||||
|
||||
result[id].ihave.add(ihave)
|
||||
|
||||
proc heartbeat(g: GossipSub) {.async.} =
|
||||
while g.heartbeatRunning:
|
||||
withLock g.heartbeatLock:
|
||||
try:
|
||||
trace "running heartbeat"
|
||||
try:
|
||||
trace "running heartbeat"
|
||||
|
||||
for t in toSeq(g.topics.keys):
|
||||
await g.rebalanceMesh(t)
|
||||
for t in toSeq(g.topics.keys):
|
||||
await g.rebalanceMesh(t)
|
||||
|
||||
await g.dropFanoutPeers()
|
||||
await g.dropFanoutPeers()
|
||||
|
||||
# replenish known topics to the fanout
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.replenishFanout(t)
|
||||
# replenish known topics to the fanout
|
||||
for t in toSeq(g.fanout.keys):
|
||||
g.replenishFanout(t)
|
||||
|
||||
let peers = g.getGossipPeers()
|
||||
var sent: seq[Future[void]]
|
||||
for peer in peers.keys:
|
||||
if peer in g.peers:
|
||||
sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
|
||||
checkFutures(await allFinished(sent))
|
||||
let peers = g.getGossipPeers()
|
||||
var sent: seq[Future[void]]
|
||||
for peer in peers.keys:
|
||||
if peer in g.peers:
|
||||
sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
|
||||
checkFutures(await allFinished(sent))
|
||||
|
||||
g.mcache.shift() # shift the cache
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
|
||||
g.mcache.shift() # shift the cache
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
|
||||
## handle peer disconnects
|
||||
trace "peer disconnected", peer=peer.id
|
||||
procCall FloodSub(g).handleDisconnect(peer)
|
||||
|
||||
await procCall FloodSub(g).handleDisconnect(peer)
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub[t].excl(peer.id)
|
||||
|
||||
# must avoid running this while manipulating mesh/gossip tables
|
||||
withLock g.heartbeatLock:
|
||||
for t in toSeq(g.gossipsub.keys):
|
||||
g.gossipsub[t].excl(peer.id)
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
|
||||
|
||||
# mostly for metrics
|
||||
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
|
||||
|
||||
for t in toSeq(g.mesh.keys):
|
||||
for t in toSeq(g.mesh.keys):
|
||||
if t in g.mesh:
|
||||
g.mesh[t].excl(peer.id)
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_mesh
|
||||
@ -246,9 +299,9 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||
libp2p_gossipsub_peers_per_topic_fanout
|
||||
.set(g.fanout[t].len.int64, labelValues = [t])
|
||||
|
||||
method subscribeToPeer*(p: GossipSub,
|
||||
conn: Connection) {.async.} =
|
||||
await procCall PubSub(p).subscribeToPeer(conn)
|
||||
method subscribePeer*(p: GossipSub,
|
||||
conn: Connection) =
|
||||
procCall PubSub(p).subscribePeer(conn)
|
||||
asyncCheck p.handleConn(conn, GossipSubCodec)
|
||||
|
||||
method subscribeTopic*(g: GossipSub,
|
||||
@ -271,10 +324,26 @@ method subscribeTopic*(g: GossipSub,
|
||||
# unsubscribe remote peer from the topic
|
||||
g.gossipsub[topic].excl(peerId)
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||
if topic notin g.gossipsub:
|
||||
g.gossipsub[topic] = initHashSet[string]()
|
||||
|
||||
trace "gossip peers", peers = g.gossipsub[topic].len, topic
|
||||
if subscribe:
|
||||
trace "adding subscription for topic", peer = peerId, name = topic
|
||||
# subscribe remote peer to the topic
|
||||
g.gossipsub[topic].incl(peerId)
|
||||
else:
|
||||
trace "removing subscription for topic", peer = peerId, name = topic
|
||||
# unsubscribe remote peer from the topic
|
||||
g.gossipsub[topic].excl(peerId)
|
||||
|
||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||
.set(g.gossipsub[topic].len.int64, labelValues = [topic])
|
||||
|
||||
trace "gossip peers", peers = g.gossipsub[topic].len, topic
|
||||
|
||||
# also rebalance current topic if we are subbed to
|
||||
if topic in g.topics:
|
||||
await g.rebalanceMesh(topic)
|
||||
|
||||
proc handleGraft(g: GossipSub,
|
||||
peer: PubSubPeer,
|
||||
@ -377,29 +446,19 @@ method rpcHandler*(g: GossipSub,
|
||||
trace "calling handler for message", topicId = t,
|
||||
localPeer = g.peerInfo.id,
|
||||
fromPeer = msg.fromPeer.pretty
|
||||
await h(t, msg.data) # trigger user provided handler
|
||||
try:
|
||||
await h(t, msg.data) # trigger user provided handler
|
||||
except CatchableError as exc:
|
||||
trace "exception in message handler", exc = exc.msg
|
||||
|
||||
# forward the message to all peers interested in it
|
||||
for p in toSendPeers:
|
||||
if p in g.peers:
|
||||
let id = g.peers[p].peerInfo.peerId
|
||||
trace "about to forward message to peer", peerId = id, msgs = m.messages
|
||||
let (published, failed) = await g.sendHelper(toSendPeers, m.messages)
|
||||
for p in failed:
|
||||
let peer = g.peers.getOrDefault(p)
|
||||
if not(isNil(peer)):
|
||||
g.handleDisconnect(peer) # cleanup failed peers
|
||||
|
||||
if id == peer.peerInfo.peerId:
|
||||
trace "not forwarding message to originator", peerId = id
|
||||
continue
|
||||
|
||||
let msgs = m.messages.filterIt(
|
||||
# don't forward to message originator
|
||||
id != it.fromPeer
|
||||
)
|
||||
|
||||
var sent: seq[Future[void]]
|
||||
if msgs.len > 0:
|
||||
trace "forwarding message to", peerId = id
|
||||
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
|
||||
sent = await allFinished(sent)
|
||||
checkFutures(sent)
|
||||
trace "forwared message to peers", peers = published.len
|
||||
|
||||
var respControl: ControlMessage
|
||||
if m.control.isSome:
|
||||
@ -442,53 +501,42 @@ method publish*(g: GossipSub,
|
||||
discard await procCall PubSub(g).publish(topic, data)
|
||||
trace "about to publish message on topic", name = topic,
|
||||
data = data.shortLog
|
||||
|
||||
var peers: HashSet[string]
|
||||
|
||||
if topic.len > 0: # data could be 0/empty
|
||||
if topic in g.topics: # if we're subscribed use the mesh
|
||||
peers = g.mesh.getOrDefault(topic)
|
||||
else: # not subscribed, send to fanout peers
|
||||
# try optimistically
|
||||
peers = g.fanout.getOrDefault(topic)
|
||||
if peers.len == 0:
|
||||
# ok we had nothing.. let's try replenish inline
|
||||
g.replenishFanout(topic)
|
||||
peers = g.fanout.getOrDefault(topic)
|
||||
|
||||
let
|
||||
msg = Message.init(g.peerInfo, data, topic, g.sign)
|
||||
msgId = g.msgIdProvider(msg)
|
||||
|
||||
trace "created new message", msg
|
||||
|
||||
trace "publishing on topic", name = topic, peers = peers
|
||||
if msgId notin g.mcache:
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
var sent: seq[Future[void]]
|
||||
for p in peers:
|
||||
# avoid sending to self
|
||||
if p == g.peerInfo.id:
|
||||
continue
|
||||
|
||||
let peer = g.peers.getOrDefault(p)
|
||||
# This can actually happen, between heartbeats we might
|
||||
# still have peers in the mesh table but actually disconnected
|
||||
if not isNil(peer) and not isNil(peer.peerInfo):
|
||||
trace "publish: sending message to peer", peer = p
|
||||
sent.add(peer.send(@[RPCMsg(messages: @[msg])]))
|
||||
|
||||
sent = await allFinished(sent)
|
||||
checkFutures(sent)
|
||||
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
||||
return sent.filterIt(not it.failed).len
|
||||
else:
|
||||
if topic.len <= 0: # data could be 0/empty
|
||||
return 0
|
||||
|
||||
|
||||
if topic in g.topics: # if we're subscribed use the mesh
|
||||
peers = g.mesh.getOrDefault(topic)
|
||||
else: # not subscribed, send to fanout peers
|
||||
# try optimistically
|
||||
peers = g.fanout.getOrDefault(topic)
|
||||
if peers.len == 0:
|
||||
# ok we had nothing.. let's try replenish inline
|
||||
g.replenishFanout(topic)
|
||||
peers = g.fanout.getOrDefault(topic)
|
||||
|
||||
let
|
||||
msg = Message.init(g.peerInfo, data, topic, g.sign)
|
||||
msgId = g.msgIdProvider(msg)
|
||||
|
||||
trace "created new message", msg
|
||||
|
||||
trace "publishing on topic", name = topic, peers = peers
|
||||
if msgId notin g.mcache:
|
||||
g.mcache.put(msgId, msg)
|
||||
|
||||
let (published, failed) = await g.sendHelper(peers, @[msg])
|
||||
for p in failed:
|
||||
let peer = g.peers.getOrDefault(p)
|
||||
g.handleDisconnect(peer) # cleanup failed peers
|
||||
|
||||
if published.len > 0:
|
||||
libp2p_pubsub_messages_published.inc(labelValues = [topic])
|
||||
|
||||
trace "published message to peers", peers = published.len,
|
||||
msg = msg.shortLog()
|
||||
return published.len
|
||||
|
||||
method start*(g: GossipSub) {.async.} =
|
||||
trace "gossipsub start"
|
||||
|
||||
|
@ -31,6 +31,8 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag
|
||||
declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"])
|
||||
|
||||
type
|
||||
SendRes = tuple[published: seq[string], failed: seq[string]] # keep private
|
||||
|
||||
TopicHandler* = proc(topic: string,
|
||||
data: seq[byte]): Future[void] {.gcsafe.}
|
||||
|
||||
@ -58,6 +60,18 @@ type
|
||||
observers: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
|
||||
|
||||
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
|
||||
## handle peer disconnects
|
||||
##
|
||||
if peer.id in p.peers:
|
||||
trace "deleting peer", peer = peer.id, stack = getStackTrace()
|
||||
p.peers[peer.id] = nil
|
||||
p.peers.del(peer.id)
|
||||
|
||||
# metrics
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
trace "peer disconnected", peer = peer.id
|
||||
|
||||
proc sendSubs*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
topics: seq[string],
|
||||
@ -74,24 +88,26 @@ proc sendSubs*(p: PubSub,
|
||||
topicName = t
|
||||
msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe))
|
||||
|
||||
await peer.send(@[msg])
|
||||
try:
|
||||
# wait for a connection before publishing
|
||||
# this happens when
|
||||
if not peer.onConnect.isSet:
|
||||
trace "awaiting send connection"
|
||||
await peer.onConnect.wait()
|
||||
|
||||
await peer.send(@[msg])
|
||||
except CancelledError as exc:
|
||||
p.handleDisconnect(peer)
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "unable to send subscriptions", exc = exc.msg
|
||||
p.handleDisconnect(peer)
|
||||
|
||||
method subscribeTopic*(p: PubSub,
|
||||
topic: string,
|
||||
subscribe: bool,
|
||||
peerId: string) {.base, async.} =
|
||||
var peer = p.peers.getOrDefault(peerId)
|
||||
|
||||
if isNil(peer) or isNil(peer.peerInfo): # should not happen
|
||||
if subscribe:
|
||||
warn "subscribeTopic (subscribe) but peer was unknown!", peer = peerId
|
||||
assert(false, "subscribeTopic (subscribe) but peer was unknown!") # bad , stop here if debug
|
||||
return
|
||||
|
||||
if subscribe:
|
||||
peer.topics.incl(topic)
|
||||
else:
|
||||
peer.topics.excl(topic)
|
||||
discard
|
||||
|
||||
method rpcHandler*(p: PubSub,
|
||||
peer: PubSubPeer,
|
||||
@ -106,24 +122,6 @@ method rpcHandler*(p: PubSub,
|
||||
trace "about to subscribe to topic", topicId = s.topic
|
||||
await p.subscribeTopic(s.topic, s.subscribe, peer.id)
|
||||
|
||||
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
|
||||
## handle peer disconnects
|
||||
if peer.id in p.peers:
|
||||
trace "deleting peer", id = peer.id, trace = getStackTrace()
|
||||
p.peers.del(peer.id)
|
||||
|
||||
# metrics
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
|
||||
proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} =
|
||||
try:
|
||||
await p.cleanupLock.acquire()
|
||||
peer.refs.dec() # decrement refcount
|
||||
if peer.refs <= 0:
|
||||
await p.handleDisconnect(peer)
|
||||
finally:
|
||||
p.cleanupLock.release()
|
||||
|
||||
proc getPeer(p: PubSub,
|
||||
peerInfo: PeerInfo,
|
||||
proto: string): PubSubPeer =
|
||||
@ -132,26 +130,13 @@ proc getPeer(p: PubSub,
|
||||
|
||||
# create new pubsub peer
|
||||
let peer = newPubSubPeer(peerInfo, proto)
|
||||
trace "created new pubsub peer", peerId = peer.id
|
||||
|
||||
# metrics
|
||||
trace "created new pubsub peer", peerId = peer.id, stack = getStackTrace()
|
||||
|
||||
p.peers[peer.id] = peer
|
||||
peer.refs.inc # increment reference count
|
||||
peer.observers = p.observers
|
||||
libp2p_pubsub_peers.set(p.peers.len.int64)
|
||||
return peer
|
||||
|
||||
proc internalCleanup(p: PubSub, conn: Connection) {.async.} =
|
||||
# handle connection close
|
||||
if isNil(conn):
|
||||
return
|
||||
|
||||
var peer = p.getPeer(conn.peerInfo, p.codec)
|
||||
await conn.closeEvent.wait()
|
||||
trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id
|
||||
await p.cleanUpHelper(peer)
|
||||
|
||||
method handleConn*(p: PubSub,
|
||||
conn: Connection,
|
||||
proto: string) {.base, async.} =
|
||||
@ -166,41 +151,46 @@ method handleConn*(p: PubSub,
|
||||
## that we're interested in
|
||||
##
|
||||
|
||||
if isNil(conn.peerInfo):
|
||||
trace "no valid PeerId for peer"
|
||||
await conn.close()
|
||||
return
|
||||
|
||||
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||
# call pubsub rpc handler
|
||||
await p.rpcHandler(peer, msgs)
|
||||
|
||||
let peer = p.getPeer(conn.peerInfo, proto)
|
||||
let topics = toSeq(p.topics.keys)
|
||||
if topics.len > 0:
|
||||
await p.sendSubs(peer, topics, true)
|
||||
|
||||
try:
|
||||
if isNil(conn.peerInfo):
|
||||
trace "no valid PeerId for peer"
|
||||
await conn.close()
|
||||
return
|
||||
|
||||
proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||
# call pubsub rpc handler
|
||||
await p.rpcHandler(peer, msgs)
|
||||
|
||||
asyncCheck p.internalCleanup(conn)
|
||||
let peer = p.getPeer(conn.peerInfo, proto)
|
||||
let topics = toSeq(p.topics.keys)
|
||||
if topics.len > 0:
|
||||
await p.sendSubs(peer, topics, true)
|
||||
|
||||
peer.handler = handler
|
||||
await peer.handle(conn) # spawn peer read loop
|
||||
trace "pubsub peer handler ended, cleaning up"
|
||||
trace "pubsub peer handler ended", peer = peer.id
|
||||
except CancelledError as exc:
|
||||
await conn.close()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "exception ocurred in pubsub handle", exc = exc.msg
|
||||
finally:
|
||||
p.handleDisconnect(peer)
|
||||
await conn.close()
|
||||
|
||||
method subscribeToPeer*(p: PubSub,
|
||||
conn: Connection) {.base, async.} =
|
||||
method subscribePeer*(p: PubSub, conn: Connection) {.base.} =
|
||||
if not(isNil(conn)):
|
||||
let peer = p.getPeer(conn.peerInfo, p.codec)
|
||||
trace "setting connection for peer", peerId = conn.peerInfo.id
|
||||
trace "subscribing to peer", peerId = conn.peerInfo.id
|
||||
if not peer.connected:
|
||||
peer.conn = conn
|
||||
|
||||
asyncCheck p.internalCleanup(conn)
|
||||
method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} =
|
||||
let peer = p.getPeer(peerInfo, p.codec)
|
||||
trace "unsubscribing from peer", peerId = $peerInfo
|
||||
if not(isNil(peer.conn)):
|
||||
await peer.conn.close()
|
||||
|
||||
p.handleDisconnect(peer)
|
||||
|
||||
proc connected*(p: PubSub, peer: PeerInfo): bool =
|
||||
let peer = p.getPeer(peer, p.codec)
|
||||
@ -240,12 +230,43 @@ method subscribe*(p: PubSub,
|
||||
|
||||
p.topics[topic].handler.add(handler)
|
||||
|
||||
for peer in p.peers.values:
|
||||
for peer in toSeq(p.peers.values):
|
||||
await p.sendSubs(peer, @[topic], true)
|
||||
|
||||
# metrics
|
||||
libp2p_pubsub_topics.inc()
|
||||
|
||||
proc sendHelper*(p: PubSub,
|
||||
sendPeers: HashSet[string],
|
||||
msgs: seq[Message]): Future[SendRes] {.async.} =
|
||||
var sent: seq[tuple[id: string, fut: Future[void]]]
|
||||
for sendPeer in sendPeers:
|
||||
# avoid sending to self
|
||||
if sendPeer == p.peerInfo.id:
|
||||
continue
|
||||
|
||||
let peer = p.peers.getOrDefault(sendPeer)
|
||||
if isNil(peer):
|
||||
continue
|
||||
|
||||
trace "sending messages to peer", peer = peer.id, msgs
|
||||
sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)])))
|
||||
|
||||
var published: seq[string]
|
||||
var failed: seq[string]
|
||||
let futs = await allFinished(sent.mapIt(it.fut))
|
||||
for s in futs:
|
||||
let f = sent.filterIt(it.fut == s)
|
||||
if f.len > 0:
|
||||
if s.failed:
|
||||
trace "sending messages to peer failed", peer = f[0].id
|
||||
failed.add(f[0].id)
|
||||
else:
|
||||
trace "sending messages to peer succeeded", peer = f[0].id
|
||||
published.add(f[0].id)
|
||||
|
||||
return (published, failed)
|
||||
|
||||
method publish*(p: PubSub,
|
||||
topic: string,
|
||||
data: seq[byte]): Future[int] {.base, async.} =
|
||||
|
@ -48,8 +48,7 @@ type
|
||||
topics*: HashSet[string]
|
||||
sentRpcCache: TimedCache[string] # cache for already sent messages
|
||||
recvdRpcCache: TimedCache[string] # cache for already received messages
|
||||
refs*: int # refcount of the connections this peer is handling
|
||||
onConnect: AsyncEvent
|
||||
onConnect*: AsyncEvent
|
||||
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
|
||||
|
||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||
@ -69,6 +68,9 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) =
|
||||
p.sendConn = conn
|
||||
p.onConnect.fire()
|
||||
|
||||
proc conn*(p: PubSubPeer): Connection =
|
||||
p.sendConn
|
||||
|
||||
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||
# trigger hooks
|
||||
if not(isNil(p.observers)) and p.observers[].len > 0:
|
||||
@ -113,10 +115,17 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||
trace "exiting pubsub peer read loop", peer = p.id
|
||||
await conn.close()
|
||||
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
|
||||
raise exc
|
||||
|
||||
proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||
logScope:
|
||||
peer = p.id
|
||||
msgs = $msgs
|
||||
|
||||
for m in msgs.items:
|
||||
trace "sending msgs to peer", toPeer = p.id, msgs = $msgs
|
||||
|
||||
@ -135,38 +144,29 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||
libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id])
|
||||
continue
|
||||
|
||||
proc sendToRemote() {.async.} =
|
||||
try:
|
||||
trace "about to send message", peer = p.id,
|
||||
encoded = digest
|
||||
if not p.onConnect.isSet:
|
||||
await p.onConnect.wait()
|
||||
try:
|
||||
trace "about to send message", peer = p.id,
|
||||
encoded = digest
|
||||
if p.connected: # this can happen if the remote disconnected
|
||||
trace "sending encoded msgs to peer", peer = p.id,
|
||||
encoded = encoded.buffer.shortLog
|
||||
await p.sendConn.writeLp(encoded.buffer)
|
||||
p.sentRpcCache.put(digest)
|
||||
|
||||
if p.connected: # this can happen if the remote disconnected
|
||||
trace "sending encoded msgs to peer", peer = p.id,
|
||||
encoded = encoded.buffer.shortLog
|
||||
await p.sendConn.writeLp(encoded.buffer)
|
||||
p.sentRpcCache.put(digest)
|
||||
for m in msgs:
|
||||
for mm in m.messages:
|
||||
for t in mm.topicIDs:
|
||||
# metrics
|
||||
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
|
||||
|
||||
for m in msgs:
|
||||
for mm in m.messages:
|
||||
for t in mm.topicIDs:
|
||||
# metrics
|
||||
libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t])
|
||||
except CatchableError as exc:
|
||||
trace "unable to send to remote", exc = exc.msg
|
||||
if not(isNil(p.sendConn)):
|
||||
await p.sendConn.close()
|
||||
p.sendConn = nil
|
||||
p.onConnect.clear()
|
||||
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "unable to send to remote", exc = exc.msg
|
||||
if not(isNil(p.sendConn)):
|
||||
await p.sendConn.close()
|
||||
p.sendConn = nil
|
||||
p.onConnect.clear()
|
||||
|
||||
# if no connection has been set,
|
||||
# queue messages until a connection
|
||||
# becomes available
|
||||
asyncCheck sendToRemote()
|
||||
raise exc
|
||||
|
||||
proc sendMsg*(p: PubSubPeer,
|
||||
peerId: PeerID,
|
||||
@ -185,6 +185,9 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string], peers: seq[PeerInfoMsg] = @[
|
||||
trace "sending prune msg to peer", peer = p.id, topicID = topic
|
||||
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic, peers: peers, backoff: backoff)])))])
|
||||
|
||||
proc `$`*(p: PubSubPeer): string =
|
||||
p.id
|
||||
|
||||
proc newPubSubPeer*(peerInfo: PeerInfo,
|
||||
proto: string): PubSubPeer =
|
||||
new result
|
||||
|
@ -57,7 +57,9 @@ method handshake(s: Secure,
|
||||
initiator: bool): Future[SecureConn] {.async, base.} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} =
|
||||
proc handleConn*(s: Secure,
|
||||
conn: Connection,
|
||||
initiator: bool): Future[Connection] {.async, gcsafe.} =
|
||||
var sconn = await s.handshake(conn, initiator)
|
||||
|
||||
conn.closeEvent.wait()
|
||||
@ -73,7 +75,8 @@ method init*(s: Secure) {.gcsafe.} =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||
trace "handling connection upgrade", proto
|
||||
try:
|
||||
# We don't need the result but we definitely need to await the handshake
|
||||
# We don't need the result but we
|
||||
# definitely need to await the handshake
|
||||
discard await s.handleConn(conn, false)
|
||||
trace "connection secured"
|
||||
except CancelledError as exc:
|
||||
@ -86,7 +89,10 @@ method init*(s: Secure) {.gcsafe.} =
|
||||
|
||||
s.handler = handle
|
||||
|
||||
method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} =
|
||||
method secure*(s: Secure,
|
||||
conn: Connection,
|
||||
initiator: bool):
|
||||
Future[Connection] {.async, base, gcsafe.} =
|
||||
result = await s.handleConn(conn, initiator)
|
||||
|
||||
method readOnce*(s: SecureConn,
|
||||
|
@ -7,6 +7,7 @@
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import oids
|
||||
import chronos, chronicles
|
||||
import connection, ../utility
|
||||
|
||||
@ -40,7 +41,6 @@ template withExceptions(body: untyped) =
|
||||
except TransportError:
|
||||
# TODO https://github.com/status-im/nim-chronos/pull/99
|
||||
raise newLPStreamEOFError()
|
||||
# raise (ref LPStreamError)(msg: exc.msg, parent: exc)
|
||||
|
||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
if s.atEof:
|
||||
@ -73,11 +73,18 @@ method atEof*(s: ChronosStream): bool {.inline.} =
|
||||
method close*(s: ChronosStream) {.async.} =
|
||||
try:
|
||||
if not s.isClosed:
|
||||
await procCall Connection(s).close()
|
||||
trace "shutting down chronos stream", address = $s.client.remoteAddress(),
|
||||
oid = s.oid
|
||||
|
||||
trace "shutting down chronos stream", address = $s.client.remoteAddress(), oid = s.oid
|
||||
# TODO: the sequence here matters
|
||||
# don't move it after the connections
|
||||
# close bellow
|
||||
if not s.client.closed():
|
||||
await s.client.closeWait()
|
||||
|
||||
await procCall Connection(s).close()
|
||||
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error closing chronosstream", exc = exc.msg
|
||||
|
@ -12,7 +12,8 @@ import tables,
|
||||
options,
|
||||
strformat,
|
||||
sets,
|
||||
algorithm
|
||||
algorithm,
|
||||
oids
|
||||
|
||||
import chronos,
|
||||
chronicles,
|
||||
@ -65,10 +66,6 @@ type
|
||||
muxer: Muxer
|
||||
handle: Future[void]
|
||||
|
||||
Maintainer = object
|
||||
loopFut: Future[void]
|
||||
sleepFut: Future[void]
|
||||
|
||||
Switch* = ref object of RootObj
|
||||
peerInfo*: PeerInfo
|
||||
connections*: Table[string, seq[ConnectionHolder]]
|
||||
@ -81,10 +78,13 @@ type
|
||||
streamHandler*: StreamHandler
|
||||
secureManagers*: seq[Secure]
|
||||
pubSub*: Option[PubSub]
|
||||
dialedPubSubPeers: HashSet[string]
|
||||
running: bool
|
||||
maintainFuts: Table[string, Maintainer]
|
||||
dialLock: Table[string, AsyncLock]
|
||||
cleanUpLock: Table[string, AsyncLock]
|
||||
# gossip 1.1 related
|
||||
maintaining: HashSet[PeerInfo]
|
||||
maintainFut: Future[void]
|
||||
maintainSleepFut: Future[void]
|
||||
|
||||
proc newNoPubSubException(): ref NoPubSubException {.inline.} =
|
||||
result = newException(NoPubSubException, "no pubsub provided!")
|
||||
@ -93,7 +93,7 @@ proc newTooManyConnections(): ref TooManyConnections {.inline.} =
|
||||
result = newException(TooManyConnections, "too many connections for peer")
|
||||
|
||||
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.}
|
||||
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
|
||||
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
|
||||
|
||||
proc selectConn(s: Switch, peerInfo: PeerInfo): Connection =
|
||||
## select the "best" connection according to some criteria
|
||||
@ -166,6 +166,9 @@ proc storeConn(s: Switch,
|
||||
newSeq[MuxerHolder]())
|
||||
.add(MuxerHolder(muxer: muxer, handle: handle, dir: dir))
|
||||
|
||||
trace "storred connection", connections = s.connections.len
|
||||
libp2p_peers.set(s.connections.len.int64)
|
||||
|
||||
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
if s.secureManagers.len <= 0:
|
||||
raise newException(CatchableError, "No secure managers registered!")
|
||||
@ -259,45 +262,56 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||
if isNil(conn):
|
||||
return
|
||||
|
||||
defer:
|
||||
await conn.close()
|
||||
libp2p_peers.set(s.connections.len.int64)
|
||||
|
||||
if isNil(conn.peerInfo):
|
||||
return
|
||||
|
||||
let id = conn.peerInfo.id
|
||||
trace "cleaning up connection for peer", peerId = id
|
||||
if id in s.muxed:
|
||||
let muxerHolder = s.muxed[id]
|
||||
.filterIt(
|
||||
it.muxer.connection == conn
|
||||
)
|
||||
|
||||
if muxerHolder.len > 0:
|
||||
await muxerHolder[0].muxer.close()
|
||||
if not(isNil(muxerHolder[0].handle)):
|
||||
await muxerHolder[0].handle
|
||||
let lock = s.cleanUpLock.mgetOrPut(id, newAsyncLock())
|
||||
|
||||
try:
|
||||
await lock.acquire()
|
||||
trace "cleaning up connection for peer", peerId = id
|
||||
if id in s.muxed:
|
||||
s.muxed[id].keepItIf(
|
||||
it.muxer.connection != conn
|
||||
let muxerHolder = s.muxed[id]
|
||||
.filterIt(
|
||||
it.muxer.connection == conn
|
||||
)
|
||||
|
||||
if muxerHolder.len > 0:
|
||||
await muxerHolder[0].muxer.close()
|
||||
if not(isNil(muxerHolder[0].handle)):
|
||||
await muxerHolder[0].handle
|
||||
|
||||
if id in s.muxed:
|
||||
s.muxed[id].keepItIf(
|
||||
it.muxer.connection != conn
|
||||
)
|
||||
|
||||
if s.muxed[id].len == 0:
|
||||
s.muxed.del(id)
|
||||
|
||||
if s.pubSub.isSome:
|
||||
await s.pubSub.get()
|
||||
.unsubscribePeer(conn.peerInfo)
|
||||
|
||||
if id in s.connections:
|
||||
s.connections[id].keepItIf(
|
||||
it.conn != conn
|
||||
)
|
||||
|
||||
if s.muxed[id].len == 0:
|
||||
s.muxed.del(id)
|
||||
if s.connections[id].len == 0:
|
||||
s.connections.del(id)
|
||||
|
||||
if id in s.connections:
|
||||
s.connections[id].keepItIf(
|
||||
it.conn != conn
|
||||
)
|
||||
# TODO: Investigate cleanupConn() always called twice for one peer.
|
||||
if not(conn.peerInfo.isClosed()):
|
||||
conn.peerInfo.close()
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
if s.connections[id].len == 0:
|
||||
s.connections.del(id)
|
||||
if lock.locked():
|
||||
lock.release()
|
||||
|
||||
# TODO: Investigate cleanupConn() always called twice for one peer.
|
||||
if not(conn.peerInfo.isClosed()):
|
||||
conn.peerInfo.close()
|
||||
libp2p_peers.set(s.connections.len.int64)
|
||||
|
||||
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
|
||||
let connections = s.connections.getOrDefault(peer.id)
|
||||
@ -330,7 +344,6 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g
|
||||
raise newException(CatchableError,
|
||||
"unable to mux connection, stopping upgrade")
|
||||
|
||||
libp2p_peers.set(s.connections.len.int64)
|
||||
trace "succesfully upgraded outgoing connection", uoid = sconn.oid
|
||||
return sconn
|
||||
|
||||
@ -382,8 +395,8 @@ proc internalConnect(s: Switch,
|
||||
raise newException(CatchableError, "can't dial self!")
|
||||
|
||||
let id = peer.id
|
||||
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
|
||||
var conn: Connection
|
||||
let lock = s.dialLock.mgetOrPut(id, newAsyncLock())
|
||||
|
||||
defer:
|
||||
if lock.locked():
|
||||
@ -443,29 +456,48 @@ proc internalConnect(s: Switch,
|
||||
doAssert(conn.peerInfo.id in s.connections,
|
||||
"connection not tracked!")
|
||||
|
||||
trace "dial succesfull", oid = conn.oid
|
||||
await s.subscribeToPeer(peer)
|
||||
trace "dial succesfull", oid = $conn.oid,
|
||||
peer = $conn.peerInfo
|
||||
|
||||
await s.subscribePeer(peer)
|
||||
return conn
|
||||
|
||||
proc connect*(s: Switch, peer: PeerInfo) {.async.} =
|
||||
var conn = await s.internalConnect(peer)
|
||||
discard await s.internalConnect(peer)
|
||||
|
||||
proc dial*(s: Switch,
|
||||
peer: PeerInfo,
|
||||
proto: string):
|
||||
Future[Connection] {.async.} =
|
||||
var conn = await s.internalConnect(peer)
|
||||
let conn = await s.internalConnect(peer)
|
||||
let stream = await s.getMuxedStream(peer)
|
||||
if isNil(stream):
|
||||
await conn.close()
|
||||
raise newException(CatchableError, "Couldn't get muxed stream")
|
||||
|
||||
trace "Attempting to select remote", proto = proto, oid = conn.oid
|
||||
if not await s.ms.select(stream, proto):
|
||||
await stream.close()
|
||||
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
|
||||
proc cleanup() {.async.} =
|
||||
if not(isNil(stream)):
|
||||
await stream.close()
|
||||
|
||||
return stream
|
||||
if not(isNil(conn)):
|
||||
await conn.close()
|
||||
|
||||
try:
|
||||
if isNil(stream):
|
||||
await conn.close()
|
||||
raise newException(CatchableError, "Couldn't get muxed stream")
|
||||
|
||||
trace "Attempting to select remote", proto = proto, oid = conn.oid
|
||||
if not await s.ms.select(stream, proto):
|
||||
await stream.close()
|
||||
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
|
||||
|
||||
return stream
|
||||
except CancelledError as exc:
|
||||
trace "dial canceled"
|
||||
await cleanup()
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
trace "error dialing"
|
||||
await cleanup()
|
||||
raise exc
|
||||
|
||||
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||
if isNil(proto.handler):
|
||||
@ -478,6 +510,8 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||
|
||||
s.ms.addHandler(proto.codec, proto)
|
||||
|
||||
proc maintainPeers(s: Switch) {.async, gcsafe.}
|
||||
|
||||
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
||||
trace "starting switch for peer", peerInfo = shortLog(s.peerInfo)
|
||||
|
||||
@ -502,6 +536,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
||||
|
||||
if s.pubSub.isSome:
|
||||
await s.pubSub.get().start()
|
||||
s.maintainFut = maintainPeers(s)
|
||||
|
||||
info "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs
|
||||
result = startFuts # listen for incoming connections
|
||||
@ -511,26 +546,16 @@ proc stop*(s: Switch) {.async.} =
|
||||
|
||||
s.running = false
|
||||
|
||||
# Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs)
|
||||
# Cancel their sleep as it likely is running for 5 mins
|
||||
# running is false so they should exit after that
|
||||
# and so we just wait/ensure all has finished
|
||||
# Maintain has tryAndWarn so we should not be priting any error here
|
||||
# nevertheless use checkFutures!
|
||||
# Notice.. this is ugly but we have no clean way to express a Chain of operations/futures
|
||||
# and simply post a cancelation/stop from the root of the chain...
|
||||
let
|
||||
maintainers = toSeq(s.maintainFuts.values)
|
||||
sleepFuts = maintainers.mapIt(it.sleepFut)
|
||||
loopFuts = maintainers.mapIt(it.loopFut)
|
||||
for f in sleepFuts: f.cancel()
|
||||
checkFutures(await allFinished(sleepFuts))
|
||||
checkFutures(await allFinished(loopFuts))
|
||||
|
||||
# we want to report errors but we do not want to fail
|
||||
# or crash here, cos we need to clean possibly MANY items
|
||||
# and any following conn/transport won't be cleaned up
|
||||
if s.pubSub.isSome:
|
||||
# Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs)
|
||||
if not isNil(s.maintainSleepFut):
|
||||
s.maintainSleepFut.cancel()
|
||||
if not isNil(s.maintainFut):
|
||||
await s.maintainFut
|
||||
|
||||
await s.pubSub.get().stop()
|
||||
|
||||
for conns in toSeq(s.connections.values):
|
||||
@ -552,18 +577,20 @@ proc stop*(s: Switch) {.async.} =
|
||||
|
||||
trace "switch stopped"
|
||||
|
||||
proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} =
|
||||
proc maintainPeers(s: Switch) {.async.} =
|
||||
while s.running:
|
||||
tryAndWarn "explicit peer maintain":
|
||||
var conns = s.connections.getOrDefault(peerInfo.id)
|
||||
if conns.len == 0:
|
||||
# attempt re-connect in this case
|
||||
trace "explicit peering, trying to re-connect", peer=peerInfo
|
||||
await s.connect(peerInfo)
|
||||
for peer in s.maintaining:
|
||||
tryAndWarn "explicit peer maintain":
|
||||
var conns = s.connections.getOrDefault(peer.id)
|
||||
if conns.len == 0:
|
||||
# attempt re-connect in this case
|
||||
trace "explicit peering, trying to re-connect", peer
|
||||
await s.connect(peer)
|
||||
|
||||
await sleepAsync(5.minutes) # spec recommended
|
||||
s.maintainSleepFut = sleepAsync(5.minutes) # spec recommended
|
||||
await s.maintainSleepFut # do this in order to cancel it
|
||||
|
||||
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
|
||||
proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
|
||||
## Subscribe to pub sub peer
|
||||
if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)):
|
||||
trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog()
|
||||
@ -585,21 +612,13 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
|
||||
trace "unable to subscribe to peer", peer = peerInfo.shortLog
|
||||
return
|
||||
|
||||
s.dialedPubSubPeers.incl(peerInfo.id)
|
||||
try:
|
||||
if (await s.ms.select(stream, s.pubSub.get().codec)):
|
||||
await s.pubSub.get().subscribeToPeer(stream)
|
||||
else:
|
||||
if not await s.ms.select(stream, s.pubSub.get().codec):
|
||||
if not(isNil(stream)):
|
||||
await stream.close()
|
||||
except CatchableError as exc:
|
||||
trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg
|
||||
await stream.close()
|
||||
finally:
|
||||
s.dialedPubSubPeers.excl(peerInfo.id)
|
||||
|
||||
if peerInfo.maintain:
|
||||
s.maintainFuts[peerInfo.id].loopFut = maintainPeer(s, peerInfo)
|
||||
s.maintainFuts[peerInfo.id].sleepFut = newFuture[void]() # stub until real one happens
|
||||
return
|
||||
|
||||
s.maintaining.incl(peerInfo)
|
||||
s.pubSub.get().subscribePeer(stream)
|
||||
|
||||
proc subscribe*(s: Switch, topic: string,
|
||||
handler: TopicHandler): Future[void] =
|
||||
@ -653,8 +672,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
||||
if not(isNil(stream)):
|
||||
await stream.close()
|
||||
|
||||
trace "got new muxer"
|
||||
|
||||
try:
|
||||
# once we got a muxed connection, attempt to
|
||||
# identify it
|
||||
@ -667,14 +684,15 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
|
||||
|
||||
# store muxer and muxed connection
|
||||
await s.storeConn(muxer, Direction.In)
|
||||
libp2p_peers.set(s.connections.len.int64)
|
||||
|
||||
muxer.connection.closeEvent.wait()
|
||||
.addCallback do(udata: pointer):
|
||||
asyncCheck s.cleanupConn(muxer.connection)
|
||||
|
||||
trace "got new muxer", peer = $muxer.connection.peerInfo
|
||||
|
||||
# try establishing a pubsub connection
|
||||
await s.subscribeToPeer(muxer.connection.peerInfo)
|
||||
await s.subscribePeer(muxer.connection.peerInfo)
|
||||
|
||||
except CancelledError as exc:
|
||||
await muxer.close()
|
||||
@ -702,6 +720,7 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||
identity: identity,
|
||||
muxers: muxers,
|
||||
secureManagers: @secureManagers,
|
||||
maintaining: initHashSet[PeerInfo]()
|
||||
)
|
||||
|
||||
let s = result # can't capture result
|
||||
|
@ -59,7 +59,7 @@ suite "FloodSub":
|
||||
await nodes[1].subscribe("foobar", handler)
|
||||
await waitSub(nodes[0], nodes[1], "foobar")
|
||||
|
||||
discard await nodes[0].publish("foobar", "Hello!".toBytes())
|
||||
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
|
||||
|
||||
result = await completionFut.wait(5.seconds)
|
||||
|
||||
@ -90,7 +90,7 @@ suite "FloodSub":
|
||||
await nodes[0].subscribe("foobar", handler)
|
||||
await waitSub(nodes[1], nodes[0], "foobar")
|
||||
|
||||
discard await nodes[1].publish("foobar", "Hello!".toBytes())
|
||||
check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0
|
||||
|
||||
result = await completionFut.wait(5.seconds)
|
||||
|
||||
@ -125,7 +125,7 @@ suite "FloodSub":
|
||||
|
||||
nodes[1].addValidator("foobar", validator)
|
||||
|
||||
discard await nodes[0].publish("foobar", "Hello!".toBytes())
|
||||
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
|
||||
|
||||
check (await handlerFut) == true
|
||||
await allFuturesThrowing(
|
||||
@ -197,8 +197,8 @@ suite "FloodSub":
|
||||
|
||||
nodes[1].addValidator("foo", "bar", validator)
|
||||
|
||||
discard await nodes[0].publish("foo", "Hello!".toBytes())
|
||||
discard await nodes[0].publish("bar", "Hello!".toBytes())
|
||||
check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0
|
||||
check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].stop(),
|
||||
|
@ -60,7 +60,7 @@ suite "GossipSub internal":
|
||||
let gossipSub = newPubSub(TestGossipSub, randomPeerInfo())
|
||||
|
||||
let topic = "foobar"
|
||||
gossipSub.gossipsub[topic] = initHashSet[string]()
|
||||
gossipSub.mesh[topic] = initHashSet[string]()
|
||||
gossipSub.topics[topic] = Topic() # has to be in topics to rebalance
|
||||
|
||||
var conns = newSeq[Connection]()
|
||||
@ -71,9 +71,9 @@ suite "GossipSub internal":
|
||||
conn.peerInfo = peerInfo
|
||||
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
||||
gossipSub.peers[peerInfo.id].conn = conn
|
||||
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
||||
gossipSub.mesh[topic].incl(peerInfo.id)
|
||||
|
||||
check gossipSub.gossipsub[topic].len == 15
|
||||
check gossipSub.mesh[topic].len == 15
|
||||
await gossipSub.rebalanceMesh(topic)
|
||||
check gossipSub.mesh[topic].len == GossipSubD
|
||||
|
||||
@ -102,6 +102,7 @@ suite "GossipSub internal":
|
||||
conn.peerInfo = peerInfo
|
||||
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
||||
gossipSub.peers[peerInfo.id].handler = handler
|
||||
gossipSub.peers[peerInfo.id].topics &= topic
|
||||
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
||||
|
||||
check gossipSub.gossipsub[topic].len == 15
|
||||
|
@ -90,7 +90,6 @@ suite "GossipSub":
|
||||
|
||||
nodes[1].addValidator("foobar", validator)
|
||||
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
|
||||
|
||||
result = (await validatorFut) and (await handlerFut)
|
||||
await allFuturesThrowing(
|
||||
nodes[0].stop(),
|
||||
@ -146,7 +145,6 @@ suite "GossipSub":
|
||||
awaiters.add((await nodes[1].start()))
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
await nodes[1].subscribe("foo", handler)
|
||||
await nodes[1].subscribe("bar", handler)
|
||||
|
||||
@ -181,7 +179,8 @@ suite "GossipSub":
|
||||
|
||||
var nodes: seq[Switch] = newSeq[Switch]()
|
||||
for i in 0..<2:
|
||||
nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Noise])
|
||||
nodes.add newStandardSwitch(gossip = true,
|
||||
secureManagers = [SecureProtocol.Noise])
|
||||
|
||||
var awaitters: seq[Future[void]]
|
||||
for node in nodes:
|
||||
@ -298,7 +297,7 @@ suite "GossipSub":
|
||||
await nodes[1].stop()
|
||||
await allFuturesThrowing(wait)
|
||||
|
||||
# result = observed == 2
|
||||
check observed == 2
|
||||
result = true
|
||||
|
||||
check:
|
||||
@ -339,7 +338,9 @@ suite "GossipSub":
|
||||
var runs = 10
|
||||
|
||||
for i in 0..<runs:
|
||||
nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Noise])
|
||||
nodes.add newStandardSwitch(triggerSelf = true,
|
||||
gossip = true,
|
||||
secureManagers = [SecureProtocol.Noise])
|
||||
awaitters.add((await nodes[i].start()))
|
||||
|
||||
await subscribeRandom(nodes)
|
||||
@ -359,8 +360,7 @@ suite "GossipSub":
|
||||
if not seenFut.finished() and seen.len >= runs:
|
||||
seenFut.complete()
|
||||
|
||||
subs.add(allFutures(dialer.subscribe("foobar", handler),
|
||||
waitSub(nodes[0], dialer, "foobar")))
|
||||
subs &= dialer.subscribe("foobar", handler)
|
||||
|
||||
await allFuturesThrowing(subs)
|
||||
|
||||
@ -388,10 +388,12 @@ suite "GossipSub":
|
||||
var runs = 10
|
||||
|
||||
for i in 0..<runs:
|
||||
nodes.add newStandardSwitch(triggerSelf = true, gossip = true, secureManagers = [SecureProtocol.Secio])
|
||||
nodes.add newStandardSwitch(triggerSelf = true,
|
||||
gossip = true,
|
||||
secureManagers = [SecureProtocol.Secio])
|
||||
awaitters.add((await nodes[i].start()))
|
||||
|
||||
await subscribeSparseNodes(nodes, 4)
|
||||
await subscribeSparseNodes(nodes, 1) # TODO: figure out better sparse mesh
|
||||
|
||||
var seen: Table[string, int]
|
||||
var subs: seq[Future[void]]
|
||||
|
@ -243,18 +243,6 @@ const
|
||||
"""001CC33294544D898781010258C2FB81F02429D2DD54D0B59B8CD2335F57498F
|
||||
D4E444BEC94DA7BE2BAF3E3796AFA8388F626AEB3178355991985EE5FFC9D1AA
|
||||
CAD9""",
|
||||
# AES-256 SHA-1
|
||||
"85076756644AAC06A47B29E25CB39B3E0C717152EE587F50C5A10281DB7F2FA5",
|
||||
"""256D46C5E5449AA7B9BE985646D5F32E455BB4B7AAF3566507A023B72801A7BC
|
||||
5066647E82DE2BC37FE22AB0DE440F77""",
|
||||
"""01FF82C71215CFFD7A42A5CED03BD2256D4A5B6850472A5C5CA90665D510F038
|
||||
A21F3A6EA0BB0A64113960C54DDAFC5E7A5F018E4413D7CC93C736A8D30579ED
|
||||
5A2B""",
|
||||
# Edge case (where Go implementation returns 65 bytes of secret)
|
||||
# Nim implementation has leading `00`.
|
||||
"""00360D9410E58534AC6A89CA5AC17E9455F619DCA71A6C2FB6F3156AE58DDB91
|
||||
6E9A7D223D1D7DD05D5475BFC4C517C85475600AAF6F28703ED1203281369A41
|
||||
9A7C"""
|
||||
]
|
||||
|
||||
Ivs = [
|
||||
@ -275,15 +263,6 @@ const
|
||||
"A828F6D249F38A917CD297F7BDDE7B70",
|
||||
"54FC8070579A17F6DAD342174062D60A",
|
||||
"D33C7696183DA21C5CD40AB677BECE7C",
|
||||
|
||||
"9EFF93741DC080A15B9E554080AB9869",
|
||||
"7A007E351C9A5C930D01645D97648F8C",
|
||||
"934DB2529D1D3AC37BAD56FD1E522196",
|
||||
"30D19C3C1AB9A3391A5F88E92307261D",
|
||||
"32ED1A961630D94A279F99757B3131CB",
|
||||
"003ABE572377D59B74713425854FED29",
|
||||
"3338850C0A4D7BD53C70E40FA0079AA2",
|
||||
"62787F194DC218C5B0DAFD806F0D3125"
|
||||
]
|
||||
|
||||
CipherKeys = [
|
||||
@ -304,15 +283,6 @@ const
|
||||
"FC2797D1040FF162A90275EBA3FCC4330C2BDC28D23DA80B89842C2D7A6EFA06",
|
||||
"B83698789ED8E3B44E48EAAEB291B3003AD95FAF344EBA1B9071F4FB46A4E4E9",
|
||||
"5D90C579971B7B7F9ECDE55EBCE8921B807AAD45D61952228758BA80F4490E8F",
|
||||
|
||||
"1C429A32A2E16168D3E3F942AEEAD708456C6566D800D5B7A6DCE8184739F16D",
|
||||
"84987E7CC9F71243408454DD0787F438CCB62C79ED56078FD920FFFD7D5C44FF",
|
||||
"971372E385E0F9FED8F67C922D0F5EB77D7D7818F63B26EF80C4D3C81D9E1B97",
|
||||
"F20AE0A5387B2D1D38B8B340466D252F894C00C5907EE3A510442E4F38966AB0",
|
||||
"B58F32D83C7A91B6B1DA731334D70502348CD82EFB8258C0AE316B983F2F1E1E",
|
||||
"5903FE75A1C328BE6C98EB4A4EFF19C67D3C52C87B3131047F3773201D44BFCE",
|
||||
"55EAD85B3124C36828ED3A43698952111EECE8C7FB156D71EE3F84E088B4F4CE",
|
||||
"E4C99C782A88B69E5D83F4DEFDD2AE61A397486E17EC9EAE6EC679A75E47BBCD"
|
||||
]
|
||||
|
||||
MacKeys = [
|
||||
@ -333,15 +303,6 @@ const
|
||||
"EE66A1579D732E99A8500F48595BF25289E722DB",
|
||||
"E692EC73B0A2E68625221E1D01BA0E6B24BCB43F",
|
||||
"8613E8F86D2DD1CF3CEDC52AD91423F2F31E0003",
|
||||
|
||||
"F8A7EF47F37257B54A5028424E64F172E532E7E7",
|
||||
"4D3596723AECD3DF21A20E956755782E783C9E4A",
|
||||
"484860090D99F4B702C809294037E6C7F6E58BBA",
|
||||
"15163D55C0A32E79E0EDD8E8EDA5AC9564B5488C",
|
||||
"6116BCB44773E3342AB5671D2AC107D4C9EC0757",
|
||||
"1CA3FCA023C72B7695481CA815856FEF0C5D7E9E",
|
||||
"E34004C383C36201DC23E062DAE791C76738C28E",
|
||||
"FA5CB0689A1DFDBAE8618BC079D70E318377B0DA"
|
||||
]
|
||||
|
||||
proc cmp(a, b: openarray[byte]): bool =
|
||||
@ -459,8 +420,6 @@ suite "Key interface test suite":
|
||||
check testStretcher(0, 4, "AES-128", "SHA256") == true
|
||||
test "Go key stretch function AES256-SHA512 test vectors":
|
||||
check testStretcher(4, 8, "AES-256", "SHA512") == true
|
||||
test "Go key stretch function AES256-SHA1 test vectors":
|
||||
check testStretcher(8, 12, "AES-256", "SHA1") == true
|
||||
|
||||
test "ChaChaPoly":
|
||||
# test data from:
|
||||
|
Loading…
x
Reference in New Issue
Block a user