diff --git a/libp2p/crypto/crypto.nim b/libp2p/crypto/crypto.nim index 05c51a5fc..66fe81c9e 100644 --- a/libp2p/crypto/crypto.nim +++ b/libp2p/crypto/crypto.nim @@ -13,13 +13,13 @@ import rsa, ecnist, ed25519/ed25519, secp, bearssl import ../protobuf/minprotobuf, ../vbuffer, ../multihash, ../multicodec -import nimcrypto/[rijndael, blowfish, twofish, sha, sha2, hash, hmac, utils] +import nimcrypto/[rijndael, blowfish, twofish, sha2, hash, hmac, utils] import ../utility import stew/results export results -# Export modules of types that are part of public API -export rijndael, blowfish, twofish, sha, sha2, hash, hmac, utils +# This is workaround for Nim's `import` bug +export rijndael, blowfish, twofish, sha2, hash, hmac, utils from strutils import split @@ -37,7 +37,6 @@ type Blowfish DigestSheme* = enum - Sha1, Sha256, Sha512 @@ -184,7 +183,8 @@ proc getKey*(key: PrivateKey): CryptoResult[PublicKey] = else: err(KeyError) -proc toRawBytes*(key: PrivateKey | PublicKey, data: var openarray[byte]): CryptoResult[int] = +proc toRawBytes*(key: PrivateKey | PublicKey, + data: var openarray[byte]): CryptoResult[int] = ## Serialize private key ``key`` (using scheme's own serialization) and store ## it to ``data``. ## @@ -274,7 +274,7 @@ proc getBytes*(sig: Signature): seq[byte] = ## Return signature ``sig`` in binary form. result = sig.data -proc init*(key: var PrivateKey, data: openarray[byte]): bool = +proc init*[T: PrivateKey|PublicKey](key: var T, data: openarray[byte]): bool = ## Initialize private key ``key`` from libp2p's protobuf serialized raw ## binary form. ## @@ -287,54 +287,29 @@ proc init*(key: var PrivateKey, data: openarray[byte]): bool = if pb.getBytes(2, buffer) != 0: if cast[int8](id) in SupportedSchemesInt: var scheme = cast[PKScheme](cast[int8](id)) - var nkey = PrivateKey(scheme: scheme) - if scheme == RSA: + when key is PrivateKey: + var nkey = PrivateKey(scheme: scheme) + else: + var nkey = PublicKey(scheme: scheme) + case scheme: + of PKScheme.RSA: if init(nkey.rsakey, buffer).isOk: key = nkey - result = true - elif scheme == Ed25519: + return true + of PKScheme.Ed25519: if init(nkey.edkey, buffer): key = nkey - result = true - elif scheme == ECDSA: + return true + of PKScheme.ECDSA: if init(nkey.eckey, buffer).isOk: key = nkey - result = true - elif scheme == Secp256k1: + return true + of PKScheme.Secp256k1: if init(nkey.skkey, buffer).isOk: key = nkey - result = true - -proc init*(key: var PublicKey, data: openarray[byte]): bool = - ## Initialize public key ``key`` from libp2p's protobuf serialized raw - ## binary form. - ## - ## Returns ``true`` on success. - var id: uint64 - var buffer: seq[byte] - if len(data) > 0: - var pb = initProtoBuffer(@data) - if pb.getVarintValue(1, id) != 0: - if pb.getBytes(2, buffer) != 0: - if cast[int8](id) in SupportedSchemesInt: - var scheme = cast[PKScheme](cast[int8](id)) - var nkey = PublicKey(scheme: scheme) - if scheme == RSA: - if init(nkey.rsakey, buffer).isOk: - key = nkey - result = true - elif scheme == Ed25519: - if init(nkey.edkey, buffer): - key = nkey - result = true - elif scheme == ECDSA: - if init(nkey.eckey, buffer).isOk: - key = nkey - result = true - elif scheme == Secp256k1: - if init(nkey.skkey, buffer).isOk: - key = nkey - result = true + return true + else: + return false proc init*(sig: var Signature, data: openarray[byte]): bool = ## Initialize signature ``sig`` from raw binary form. @@ -344,18 +319,8 @@ proc init*(sig: var Signature, data: openarray[byte]): bool = sig.data = @data result = true -proc init*(key: var PrivateKey, data: string): bool = - ## Initialize private key ``key`` from libp2p's protobuf serialized - ## hexadecimal string representation. - ## - ## Returns ``true`` on success. - try: - key.init(fromHex(data)) - except ValueError: - false - -proc init*(key: var PublicKey, data: string): bool = - ## Initialize public key ``key`` from libp2p's protobuf serialized +proc init*[T: PrivateKey|PublicKey](key: var T, data: string): bool = + ## Initialize private/public key ``key`` from libp2p's protobuf serialized ## hexadecimal string representation. ## ## Returns ``true`` on success. @@ -374,7 +339,8 @@ proc init*(sig: var Signature, data: string): bool = except ValueError: false -proc init*(t: typedesc[PrivateKey], data: openarray[byte]): CryptoResult[PrivateKey] = +proc init*(t: typedesc[PrivateKey], + data: openarray[byte]): CryptoResult[PrivateKey] = ## Create new private key from libp2p's protobuf serialized binary form. var res: t if not res.init(data): @@ -382,7 +348,8 @@ proc init*(t: typedesc[PrivateKey], data: openarray[byte]): CryptoResult[Private else: ok(res) -proc init*(t: typedesc[PublicKey], data: openarray[byte]): CryptoResult[PublicKey] = +proc init*(t: typedesc[PublicKey], + data: openarray[byte]): CryptoResult[PublicKey] = ## Create new public key from libp2p's protobuf serialized binary form. var res: t if not res.init(data): @@ -390,7 +357,8 @@ proc init*(t: typedesc[PublicKey], data: openarray[byte]): CryptoResult[PublicKe else: ok(res) -proc init*(t: typedesc[Signature], data: openarray[byte]): CryptoResult[Signature] = +proc init*(t: typedesc[Signature], + data: openarray[byte]): CryptoResult[Signature] = ## Create new public key from libp2p's protobuf serialized binary form. var res: t if not res.init(data): @@ -421,117 +389,93 @@ proc init*(t: typedesc[Signature], data: string): CryptoResult[Signature] = except ValueError: err(SigError) -proc `==`*(key1, key2: PublicKey): bool = +proc `==`*(key1, key2: PublicKey): bool {.inline.} = ## Return ``true`` if two public keys ``key1`` and ``key2`` of the same ## scheme and equal. if key1.scheme == key2.scheme: - if key1.scheme == RSA: - result = (key1.rsakey == key2.rsakey) - elif key1.scheme == Ed25519: - result = (key1.edkey == key2.edkey) - elif key1.scheme == ECDSA: - result = (key1.eckey == key2.eckey) + case key1.scheme + of PKScheme.RSA: + (key1.rsakey == key2.rsakey) + of PKScheme.Ed25519: + (key1.edkey == key2.edkey) + of PKScheme.ECDSA: + (key1.eckey == key2.eckey) + of PKScheme.Secp256k1: + (key1.skkey == key2.skkey) + of PKScheme.NoSupport: + false + else: + false proc `==`*(key1, key2: PrivateKey): bool = ## Return ``true`` if two private keys ``key1`` and ``key2`` of the same ## scheme and equal. if key1.scheme == key2.scheme: - if key1.scheme == RSA: - result = (key1.rsakey == key2.rsakey) - elif key1.scheme == Ed25519: - result = (key1.edkey == key2.edkey) - elif key1.scheme == ECDSA: - result = (key1.eckey == key2.eckey) + case key1.scheme + of PKScheme.RSA: + (key1.rsakey == key2.rsakey) + of PKScheme.Ed25519: + (key1.edkey == key2.edkey) + of PKScheme.ECDSA: + (key1.eckey == key2.eckey) + of PKScheme.Secp256k1: + (key1.skkey == key2.skkey) + of PKScheme.NoSupport: + false + else: + false -proc `$`*(key: PrivateKey): string = - ## Get string representation of private key ``key``. - if key.scheme == RSA: - result = $(key.rsakey) - elif key.scheme == Ed25519: - result = "Ed25519 key (" - result.add($(key.edkey)) - result.add(")") - elif key.scheme == ECDSA: - result = "Secp256r1 key (" - result.add($(key.eckey)) - result.add(")") - elif key.scheme == Secp256k1: - result = "Secp256k1 key (" - result.add($(key.skkey)) - result.add(")") +proc `$`*(key: PrivateKey|PublicKey): string = + ## Get string representation of private/public key ``key``. + case key.scheme: + of PKScheme.RSA: + $(key.rsakey) + of PKScheme.Ed25519: + "ed25519 key (" & $key.edkey & ")" + of PKScheme.ECDSA: + "secp256r1 key (" & $key.eckey & ")" + of PKScheme.Secp256k1: + "secp256k1 key (" & $key.skkey & ")" + of PKScheme.NoSupport: + "not supported" -proc `$`*(key: PublicKey): string = - ## Get string representation of public key ``key``. - if key.scheme == RSA: - result = $(key.rsakey) - elif key.scheme == Ed25519: - result = "Ed25519 key (" - result.add($(key.edkey)) - result.add(")") - elif key.scheme == ECDSA: - result = "Secp256r1 key (" - result.add($(key.eckey)) - result.add(")") - elif key.scheme == Secp256k1: - result = "Secp256k1 key (" - result.add($(key.skkey)) - result.add(")") - -func shortLog*(key: PrivateKey): string = - ## Get string representation of private key ``key``. - if key.scheme == RSA: - result = ($key.rsakey).shortLog - elif key.scheme == Ed25519: - result = "Ed25519 key (" - result.add(($key.edkey).shortLog) - result.add(")") - elif key.scheme == ECDSA: - result = "Secp256r1 key (" - result.add(($key.eckey).shortLog) - result.add(")") - elif key.scheme == Secp256k1: - result = "Secp256k1 key (" - result.add(($key.skkey).shortLog) - result.add(")") - -proc shortLog*(key: PublicKey): string = - ## Get string representation of public key ``key``. - if key.scheme == RSA: - result = ($key.rsakey).shortLog - elif key.scheme == Ed25519: - result = "Ed25519 key (" - result.add(($key.edkey).shortLog) - result.add(")") - elif key.scheme == ECDSA: - result = "Secp256r1 key (" - result.add(($key.eckey).shortLog) - result.add(")") - elif key.scheme == Secp256k1: - result = "Secp256k1 key (" - result.add(($key.skkey).shortLog) - result.add(")") +func shortLog*(key: PrivateKey|PublicKey): string = + ## Get short string representation of private/public key ``key``. + case key.scheme: + of PKScheme.RSA: + ($key.rsakey).shortLog + of PKScheme.Ed25519: + "ed25519 key (" & ($key.edkey).shortLog & ")" + of PKScheme.ECDSA: + "secp256r1 key (" & ($key.eckey).shortLog & ")" + of PKScheme.Secp256k1: + "secp256k1 key (" & ($key.skkey).shortLog & ")" + of PKScheme.NoSupport: + "not supported" proc `$`*(sig: Signature): string = ## Get string representation of signature ``sig``. result = toHex(sig.data) -proc sign*(key: PrivateKey, data: openarray[byte]): CryptoResult[Signature] {.gcsafe.} = +proc sign*(key: PrivateKey, + data: openarray[byte]): CryptoResult[Signature] {.gcsafe.} = ## Sign message ``data`` using private key ``key`` and return generated ## signature in raw binary form. var res: Signature - if key.scheme == RSA: + case key.scheme: + of PKScheme.RSA: let sig = ? key.rsakey.sign(data).orError(SigError) res.data = ? sig.getBytes().orError(SigError) ok(res) - elif key.scheme == Ed25519: + of PKScheme.Ed25519: let sig = key.edkey.sign(data) res.data = sig.getBytes() ok(res) - elif key.scheme == ECDSA: + of PKScheme.ECDSA: let sig = ? key.eckey.sign(data).orError(SigError) res.data = ? sig.getBytes().orError(SigError) ok(res) - elif key.scheme == Secp256k1: + of PKScheme.Secp256k1: let sig = key.skkey.sign(data) res.data = sig.getBytes() ok(res) @@ -541,22 +485,33 @@ proc sign*(key: PrivateKey, data: openarray[byte]): CryptoResult[Signature] {.gc proc verify*(sig: Signature, message: openarray[byte], key: PublicKey): bool = ## Verify signature ``sig`` using message ``message`` and public key ``key``. ## Return ``true`` if message signature is valid. - if key.scheme == RSA: + case key.scheme: + of PKScheme.RSA: var signature: RsaSignature if signature.init(sig.data).isOk: - result = signature.verify(message, key.rsakey) - elif key.scheme == Ed25519: + signature.verify(message, key.rsakey) + else: + false + of PKScheme.Ed25519: var signature: EdSignature if signature.init(sig.data): - result = signature.verify(message, key.edkey) - elif key.scheme == ECDSA: + signature.verify(message, key.edkey) + else: + false + of PKScheme.ECDSA: var signature: EcSignature if signature.init(sig.data).isOk: - result = signature.verify(message, key.eckey) - elif key.scheme == Secp256k1: + signature.verify(message, key.eckey) + else: + false + of PKScheme.Secp256k1: var signature: SkSignature if signature.init(sig.data).isOk: - result = signature.verify(message, key.skkey) + signature.verify(message, key.skkey) + else: + false + else: + false template makeSecret(buffer, hmactype, secret, seed: untyped) {.dirty.}= var ctx: hmactype @@ -609,8 +564,6 @@ proc stretchKeys*(cipherType: string, hashType: string, makeSecret(result.data, HMAC[sha256], sharedSecret, seed) elif hashType == "SHA512": makeSecret(result.data, HMAC[sha512], sharedSecret, seed) - elif hashType == "SHA1": - makeSecret(result.data, HMAC[sha1], sharedSecret, seed) template goffset*(secret, id, o: untyped): untyped = id * (len(secret.data) shr 1) + o @@ -802,23 +755,28 @@ proc decodeExchange*(message: seq[byte], ## Serialization/Deserialization helpers -proc write*(vb: var VBuffer, pubkey: PublicKey) {.inline, raises: [Defect, ResultError[CryptoError]].} = +proc write*(vb: var VBuffer, pubkey: PublicKey) {. + inline, raises: [Defect, ResultError[CryptoError]].} = ## Write PublicKey value ``pubkey`` to buffer ``vb``. vb.writeSeq(pubkey.getBytes().tryGet()) -proc write*(vb: var VBuffer, seckey: PrivateKey) {.inline, raises: [Defect, ResultError[CryptoError]].} = +proc write*(vb: var VBuffer, seckey: PrivateKey) {. + inline, raises: [Defect, ResultError[CryptoError]].} = ## Write PrivateKey value ``seckey`` to buffer ``vb``. vb.writeSeq(seckey.getBytes().tryGet()) -proc write*(vb: var VBuffer, sig: PrivateKey) {.inline, raises: [Defect, ResultError[CryptoError]].} = +proc write*(vb: var VBuffer, sig: PrivateKey) {. + inline, raises: [Defect, ResultError[CryptoError]].} = ## Write Signature value ``sig`` to buffer ``vb``. vb.writeSeq(sig.getBytes().tryGet()) -proc initProtoField*(index: int, pubkey: PublicKey): ProtoField {.raises: [Defect, ResultError[CryptoError]].} = +proc initProtoField*(index: int, pubkey: PublicKey): ProtoField {. + raises: [Defect, ResultError[CryptoError]].} = ## Initialize ProtoField with PublicKey ``pubkey``. result = initProtoField(index, pubkey.getBytes().tryGet()) -proc initProtoField*(index: int, seckey: PrivateKey): ProtoField {.raises: [Defect, ResultError[CryptoError]].} = +proc initProtoField*(index: int, seckey: PrivateKey): ProtoField {. + raises: [Defect, ResultError[CryptoError]].} = ## Initialize ProtoField with PrivateKey ``seckey``. result = initProtoField(index, seckey.getBytes().tryGet()) diff --git a/libp2p/crypto/rsa.nim b/libp2p/crypto/rsa.nim index 3aa7f7100..5f7c5a50b 100644 --- a/libp2p/crypto/rsa.nim +++ b/libp2p/crypto/rsa.nim @@ -661,15 +661,16 @@ proc cmp(a: openarray[byte], b: openarray[byte]): bool = let blen = len(b) if alen == blen: if alen == 0: - result = true + true else: var n = alen - var res, diff: int + var res = 0 while n > 0: dec(n) - diff = int(a[n]) - int(b[n]) - res = (res and -not(diff)) or diff - result = (res == 0) + res = res or int(a[n] xor b[n]) + (res == 0) + else: + false proc `==`*(a, b: RsaPrivateKey): bool = ## Compare two RSA private keys for equality. diff --git a/libp2p/errors.nim b/libp2p/errors.nim index f59ab9367..ed541fb1f 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -59,7 +59,7 @@ template tryAndWarn*(message: static[string]; body: untyped): untyped = try: body except CancelledError as exc: - raise exc # TODO: why catch and re-raise? + raise exc except CatchableError as exc: warn "An exception has ocurred, enable trace logging for details", name = exc.name, msg = message trace "Exception details", exc = exc.msg diff --git a/libp2p/peerinfo.nim b/libp2p/peerinfo.nim index fb19937d3..60078e7e0 100644 --- a/libp2p/peerinfo.nim +++ b/libp2p/peerinfo.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options, sequtils +import options, sequtils, hashes import chronos, chronicles import peerid, multiaddress, crypto/crypto @@ -43,6 +43,8 @@ type # https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#explicit-peering-agreements maintain*: bool +proc hash*(i: PeerInfo): Hash = cast[int](i).hash # cast ptr to int and hash + proc id*(p: PeerInfo): string = if not(isNil(p)): return p.peerId.pretty() diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index cc7fa7ea5..f629270b0 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -47,14 +47,14 @@ method subscribeTopic*(f: FloodSub, # unsubscribe the peer from the topic f.floodsub[topic].excl(peerId) -method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} = - await procCall PubSub(f).handleDisconnect(peer) - +method handleDisconnect*(f: FloodSub, peer: PubSubPeer) = ## handle peer disconnects for t in toSeq(f.floodsub.keys): if t in f.floodsub: f.floodsub[t].excl(peer.id) + procCall PubSub(f).handleDisconnect(peer) + method rpcHandler*(f: FloodSub, peer: PubSubPeer, rpcMsgs: seq[RPCMsg]) {.async.} = @@ -86,18 +86,20 @@ method rpcHandler*(f: FloodSub, trace "calling handler for message", topicId = t, localPeer = f.peerInfo.id, fromPeer = msg.fromPeer.pretty - await h(t, msg.data) # trigger user provided handler + + try: + await h(t, msg.data) # trigger user provided handler + except CatchableError as exc: + trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - var sent: seq[Future[void]] - # start the future but do not wait yet - for p in toSendPeers: - if p in f.peers and f.peers[p].id != peer.id: - sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) + let (published, failed) = await f.sendHelper(toSendPeers, m.messages) + for p in failed: + let peer = f.peers.getOrDefault(p) + if not(isNil(peer)): + f.handleDisconnect(peer) # cleanup failed peers - # wait for all the futures now - sent = await allFinished(sent) - checkFutures(sent) + trace "forwared message to peers", peers = published.len method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -111,9 +113,9 @@ method init*(f: FloodSub) = f.handler = handler f.codec = FloodSubCodec -method subscribeToPeer*(p: FloodSub, - conn: Connection) {.async.} = - await procCall PubSub(p).subscribeToPeer(conn) +method subscribePeer*(p: FloodSub, + conn: Connection) = + procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, FloodSubCodec) method publish*(f: FloodSub, @@ -132,20 +134,17 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic let msg = Message.init(f.peerInfo, data, topic, f.sign) - var sent: seq[Future[void]] # start the future but do not wait yet - for p in f.floodsub.getOrDefault(topic): - if p in f.peers: - trace "publishing message", name = topic, peer = p, data = data.shortLog - sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) - - # wait for all the futures now - sent = await allFinished(sent) - checkFutures(sent) + let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg]) + for p in failed: + let peer = f.peers.getOrDefault(p) + f.handleDisconnect(peer) # cleanup failed peers libp2p_pubsub_messages_published.inc(labelValues = [topic]) - return sent.filterIt(not it.failed).len + trace "published message to peers", peers = published.len, + msg = msg.shortLog() + return published.len method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 1f6b159fe..5e875e582 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -56,9 +56,17 @@ type heartbeatRunning: bool heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats -declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, + "gossipsub peers per topic in mesh", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, + "gossipsub peers per topic in fanout", + labels = ["topic"]) + +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, + "gossipsub peers per topic in gossipsub", + labels = ["topic"]) method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -80,16 +88,54 @@ proc replenishFanout(g: GossipSub, topic: string) = if g.fanout.getOrDefault(topic).len < GossipSubDLo: trace "replenishing fanout", peers = g.fanout.getOrDefault(topic).len - if topic in g.gossipsub: + if topic in toSeq(g.gossipsub.keys): for p in g.gossipsub.getOrDefault(topic): if not g.fanout[topic].containsOrIncl(p): if g.fanout.getOrDefault(topic).len == GossipSubD: break libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.getOrDefault(topic).len.int64, labelValues = [topic]) + .set(g.fanout.getOrDefault(topic).len.int64, + labelValues = [topic]) + trace "fanout replenished with peers", peers = g.fanout.getOrDefault(topic).len +template moveToMeshHelper(g: GossipSub, + topic: string, + table: Table[string, HashSet[string]]) = + ## move peers from `table` into `mesh` + ## + var peerIds = toSeq(table.getOrDefault(topic)) + + logScope: + topic = topic + meshPeers = g.mesh.getOrDefault(topic).len + peers = peerIds.len + + shuffle(peerIds) + for id in peerIds: + if g.mesh.getOrDefault(topic).len > GossipSubD: + break + + trace "gathering peers for mesh" + if topic notin table: + continue + + trace "getting peers", topic, + peers = peerIds.len + + table[topic].excl(id) # always exclude + if id in g.mesh[topic]: + continue # we already have this peer in the mesh, try again + + if id in g.peers: + let p = g.peers[id] + if p.connected: + # send a graft message to the peer + await p.sendGraft(@[topic]) + g.mesh[topic].incl(id) + trace "got peer", peer = id + proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = try: trace "about to rebalance mesh" @@ -97,47 +143,43 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if topic notin g.mesh: g.mesh[topic] = initHashSet[string]() - # https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#mesh-maintenance - if g.mesh.getOrDefault(topic).len < GossipSubDlo and topic in g.topics: - var availPeers = toSeq(g.gossipsub.getOrDefault(topic)) - shuffle(availPeers) - if availPeers.len > GossipSubD: - availPeers = availPeers[0.. GossipSubDhi: - trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len - while g.mesh.getOrDefault(topic).len > GossipSubD: - trace "pruning peers", peers = g.mesh[topic].len - let id = toSeq(g.mesh[topic])[rand(0.. 0: - let ihave = ControlIHave(topicID: topic, - messageIDs: toSeq(mids)) + if mids.len <= 0: + continue - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic + let ihave = ControlIHave(topicID: topic, + messageIDs: toSeq(mids)) + + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + for id in allPeers: + if result.len >= GossipSubD: + trace "got gossip peers", peers = result.len + break + + if allPeers.len == 0: + trace "no peers for topic, skipping", topicID = topic + break + + if id in gossipPeers: continue - - var extraPeers = toSeq(g.gossipsub[topic]) - shuffle(extraPeers) - for peer in extraPeers: - if result.len < GossipSubD and - peer notin gossipPeers and - peer notin result: - result[peer] = ControlMessage(ihave: @[ihave]) + + if id notin result: + result[id] = controlMsg + + result[id].ihave.add(ihave) proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: - withLock g.heartbeatLock: - try: - trace "running heartbeat" + try: + trace "running heartbeat" - for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) + for t in toSeq(g.topics.keys): + await g.rebalanceMesh(t) - await g.dropFanoutPeers() + await g.dropFanoutPeers() - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) - let peers = g.getGossipPeers() - var sent: seq[Future[void]] - for peer in peers.keys: - if peer in g.peers: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) - checkFutures(await allFinished(sent)) + let peers = g.getGossipPeers() + var sent: seq[Future[void]] + for peer in peers.keys: + if peer in g.peers: + sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + checkFutures(await allFinished(sent)) - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception ocurred in gossipsub heartbeat", exc = exc.msg await sleepAsync(1.seconds) -method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = +method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = ## handle peer disconnects - trace "peer disconnected", peer=peer.id + procCall FloodSub(g).handleDisconnect(peer) - await procCall FloodSub(g).handleDisconnect(peer) + for t in toSeq(g.gossipsub.keys): + g.gossipsub[t].excl(peer.id) - # must avoid running this while manipulating mesh/gossip tables - withLock g.heartbeatLock: - for t in toSeq(g.gossipsub.keys): - g.gossipsub[t].excl(peer.id) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - - # mostly for metrics - await procCall PubSub(g).subscribeTopic(t, false, peer.id) - - for t in toSeq(g.mesh.keys): + for t in toSeq(g.mesh.keys): + if t in g.mesh: g.mesh[t].excl(peer.id) libp2p_gossipsub_peers_per_topic_mesh @@ -246,9 +299,9 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout[t].len.int64, labelValues = [t]) -method subscribeToPeer*(p: GossipSub, - conn: Connection) {.async.} = - await procCall PubSub(p).subscribeToPeer(conn) +method subscribePeer*(p: GossipSub, + conn: Connection) = + procCall PubSub(p).subscribePeer(conn) asyncCheck p.handleConn(conn, GossipSubCodec) method subscribeTopic*(g: GossipSub, @@ -271,10 +324,26 @@ method subscribeTopic*(g: GossipSub, # unsubscribe remote peer from the topic g.gossipsub[topic].excl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() - trace "gossip peers", peers = g.gossipsub[topic].len, topic + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe remote peer to the topic + g.gossipsub[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe remote peer from the topic + g.gossipsub[topic].excl(peerId) + + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + + trace "gossip peers", peers = g.gossipsub[topic].len, topic + + # also rebalance current topic if we are subbed to + if topic in g.topics: + await g.rebalanceMesh(topic) proc handleGraft(g: GossipSub, peer: PubSubPeer, @@ -377,29 +446,19 @@ method rpcHandler*(g: GossipSub, trace "calling handler for message", topicId = t, localPeer = g.peerInfo.id, fromPeer = msg.fromPeer.pretty - await h(t, msg.data) # trigger user provided handler + try: + await h(t, msg.data) # trigger user provided handler + except CatchableError as exc: + trace "exception in message handler", exc = exc.msg # forward the message to all peers interested in it - for p in toSendPeers: - if p in g.peers: - let id = g.peers[p].peerInfo.peerId - trace "about to forward message to peer", peerId = id, msgs = m.messages + let (published, failed) = await g.sendHelper(toSendPeers, m.messages) + for p in failed: + let peer = g.peers.getOrDefault(p) + if not(isNil(peer)): + g.handleDisconnect(peer) # cleanup failed peers - if id == peer.peerInfo.peerId: - trace "not forwarding message to originator", peerId = id - continue - - let msgs = m.messages.filterIt( - # don't forward to message originator - id != it.fromPeer - ) - - var sent: seq[Future[void]] - if msgs.len > 0: - trace "forwarding message to", peerId = id - sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) - sent = await allFinished(sent) - checkFutures(sent) + trace "forwared message to peers", peers = published.len var respControl: ControlMessage if m.control.isSome: @@ -442,53 +501,42 @@ method publish*(g: GossipSub, discard await procCall PubSub(g).publish(topic, data) trace "about to publish message on topic", name = topic, data = data.shortLog - var peers: HashSet[string] - - if topic.len > 0: # data could be 0/empty - if topic in g.topics: # if we're subscribed use the mesh - peers = g.mesh.getOrDefault(topic) - else: # not subscribed, send to fanout peers - # try optimistically - peers = g.fanout.getOrDefault(topic) - if peers.len == 0: - # ok we had nothing.. let's try replenish inline - g.replenishFanout(topic) - peers = g.fanout.getOrDefault(topic) - - let - msg = Message.init(g.peerInfo, data, topic, g.sign) - msgId = g.msgIdProvider(msg) - - trace "created new message", msg - - trace "publishing on topic", name = topic, peers = peers - if msgId notin g.mcache: - g.mcache.put(msgId, msg) - - var sent: seq[Future[void]] - for p in peers: - # avoid sending to self - if p == g.peerInfo.id: - continue - - let peer = g.peers.getOrDefault(p) - # This can actually happen, between heartbeats we might - # still have peers in the mesh table but actually disconnected - if not isNil(peer) and not isNil(peer.peerInfo): - trace "publish: sending message to peer", peer = p - sent.add(peer.send(@[RPCMsg(messages: @[msg])])) - - sent = await allFinished(sent) - checkFutures(sent) - - libp2p_pubsub_messages_published.inc(labelValues = [topic]) - - return sent.filterIt(not it.failed).len - else: + if topic.len <= 0: # data could be 0/empty return 0 - + if topic in g.topics: # if we're subscribed use the mesh + peers = g.mesh.getOrDefault(topic) + else: # not subscribed, send to fanout peers + # try optimistically + peers = g.fanout.getOrDefault(topic) + if peers.len == 0: + # ok we had nothing.. let's try replenish inline + g.replenishFanout(topic) + peers = g.fanout.getOrDefault(topic) + + let + msg = Message.init(g.peerInfo, data, topic, g.sign) + msgId = g.msgIdProvider(msg) + + trace "created new message", msg + + trace "publishing on topic", name = topic, peers = peers + if msgId notin g.mcache: + g.mcache.put(msgId, msg) + + let (published, failed) = await g.sendHelper(peers, @[msg]) + for p in failed: + let peer = g.peers.getOrDefault(p) + g.handleDisconnect(peer) # cleanup failed peers + + if published.len > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + + trace "published message to peers", peers = published.len, + msg = msg.shortLog() + return published.len + method start*(g: GossipSub) {.async.} = trace "gossipsub start" diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 65568d7b0..b714cc7c7 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -31,6 +31,8 @@ declareCounter(libp2p_pubsub_validation_failure, "pubsub failed validated messag declarePublicCounter(libp2p_pubsub_messages_published, "published messages", labels = ["topic"]) type + SendRes = tuple[published: seq[string], failed: seq[string]] # keep private + TopicHandler* = proc(topic: string, data: seq[byte]): Future[void] {.gcsafe.} @@ -58,6 +60,18 @@ type observers: ref seq[PubSubObserver] # ref as in smart_ptr msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) +method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} = + ## handle peer disconnects + ## + if peer.id in p.peers: + trace "deleting peer", peer = peer.id, stack = getStackTrace() + p.peers[peer.id] = nil + p.peers.del(peer.id) + + # metrics + libp2p_pubsub_peers.set(p.peers.len.int64) + trace "peer disconnected", peer = peer.id + proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], @@ -74,24 +88,26 @@ proc sendSubs*(p: PubSub, topicName = t msg.subscriptions.add(SubOpts(topic: t, subscribe: subscribe)) - await peer.send(@[msg]) + try: + # wait for a connection before publishing + # this happens when + if not peer.onConnect.isSet: + trace "awaiting send connection" + await peer.onConnect.wait() + + await peer.send(@[msg]) + except CancelledError as exc: + p.handleDisconnect(peer) + raise exc + except CatchableError as exc: + trace "unable to send subscriptions", exc = exc.msg + p.handleDisconnect(peer) method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, peerId: string) {.base, async.} = - var peer = p.peers.getOrDefault(peerId) - - if isNil(peer) or isNil(peer.peerInfo): # should not happen - if subscribe: - warn "subscribeTopic (subscribe) but peer was unknown!", peer = peerId - assert(false, "subscribeTopic (subscribe) but peer was unknown!") # bad , stop here if debug - return - - if subscribe: - peer.topics.incl(topic) - else: - peer.topics.excl(topic) + discard method rpcHandler*(p: PubSub, peer: PubSubPeer, @@ -106,24 +122,6 @@ method rpcHandler*(p: PubSub, trace "about to subscribe to topic", topicId = s.topic await p.subscribeTopic(s.topic, s.subscribe, peer.id) -method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} = - ## handle peer disconnects - if peer.id in p.peers: - trace "deleting peer", id = peer.id, trace = getStackTrace() - p.peers.del(peer.id) - - # metrics - libp2p_pubsub_peers.set(p.peers.len.int64) - -proc cleanUpHelper(p: PubSub, peer: PubSubPeer) {.async.} = - try: - await p.cleanupLock.acquire() - peer.refs.dec() # decrement refcount - if peer.refs <= 0: - await p.handleDisconnect(peer) - finally: - p.cleanupLock.release() - proc getPeer(p: PubSub, peerInfo: PeerInfo, proto: string): PubSubPeer = @@ -132,26 +130,13 @@ proc getPeer(p: PubSub, # create new pubsub peer let peer = newPubSubPeer(peerInfo, proto) - trace "created new pubsub peer", peerId = peer.id - - # metrics + trace "created new pubsub peer", peerId = peer.id, stack = getStackTrace() p.peers[peer.id] = peer - peer.refs.inc # increment reference count peer.observers = p.observers libp2p_pubsub_peers.set(p.peers.len.int64) return peer -proc internalCleanup(p: PubSub, conn: Connection) {.async.} = - # handle connection close - if isNil(conn): - return - - var peer = p.getPeer(conn.peerInfo, p.codec) - await conn.closeEvent.wait() - trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id - await p.cleanUpHelper(peer) - method handleConn*(p: PubSub, conn: Connection, proto: string) {.base, async.} = @@ -166,41 +151,46 @@ method handleConn*(p: PubSub, ## that we're interested in ## + if isNil(conn.peerInfo): + trace "no valid PeerId for peer" + await conn.close() + return + + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + # call pubsub rpc handler + await p.rpcHandler(peer, msgs) + + let peer = p.getPeer(conn.peerInfo, proto) + let topics = toSeq(p.topics.keys) + if topics.len > 0: + await p.sendSubs(peer, topics, true) + try: - if isNil(conn.peerInfo): - trace "no valid PeerId for peer" - await conn.close() - return - - proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = - # call pubsub rpc handler - await p.rpcHandler(peer, msgs) - - asyncCheck p.internalCleanup(conn) - let peer = p.getPeer(conn.peerInfo, proto) - let topics = toSeq(p.topics.keys) - if topics.len > 0: - await p.sendSubs(peer, topics, true) - peer.handler = handler await peer.handle(conn) # spawn peer read loop - trace "pubsub peer handler ended, cleaning up" + trace "pubsub peer handler ended", peer = peer.id except CancelledError as exc: - await conn.close() raise exc except CatchableError as exc: trace "exception ocurred in pubsub handle", exc = exc.msg + finally: + p.handleDisconnect(peer) await conn.close() -method subscribeToPeer*(p: PubSub, - conn: Connection) {.base, async.} = +method subscribePeer*(p: PubSub, conn: Connection) {.base.} = if not(isNil(conn)): let peer = p.getPeer(conn.peerInfo, p.codec) - trace "setting connection for peer", peerId = conn.peerInfo.id + trace "subscribing to peer", peerId = conn.peerInfo.id if not peer.connected: peer.conn = conn - asyncCheck p.internalCleanup(conn) +method unsubscribePeer*(p: PubSub, peerInfo: PeerInfo) {.base, async.} = + let peer = p.getPeer(peerInfo, p.codec) + trace "unsubscribing from peer", peerId = $peerInfo + if not(isNil(peer.conn)): + await peer.conn.close() + + p.handleDisconnect(peer) proc connected*(p: PubSub, peer: PeerInfo): bool = let peer = p.getPeer(peer, p.codec) @@ -240,12 +230,43 @@ method subscribe*(p: PubSub, p.topics[topic].handler.add(handler) - for peer in p.peers.values: + for peer in toSeq(p.peers.values): await p.sendSubs(peer, @[topic], true) # metrics libp2p_pubsub_topics.inc() +proc sendHelper*(p: PubSub, + sendPeers: HashSet[string], + msgs: seq[Message]): Future[SendRes] {.async.} = + var sent: seq[tuple[id: string, fut: Future[void]]] + for sendPeer in sendPeers: + # avoid sending to self + if sendPeer == p.peerInfo.id: + continue + + let peer = p.peers.getOrDefault(sendPeer) + if isNil(peer): + continue + + trace "sending messages to peer", peer = peer.id, msgs + sent.add((id: peer.id, fut: peer.send(@[RPCMsg(messages: msgs)]))) + + var published: seq[string] + var failed: seq[string] + let futs = await allFinished(sent.mapIt(it.fut)) + for s in futs: + let f = sent.filterIt(it.fut == s) + if f.len > 0: + if s.failed: + trace "sending messages to peer failed", peer = f[0].id + failed.add(f[0].id) + else: + trace "sending messages to peer succeeded", peer = f[0].id + published.add(f[0].id) + + return (published, failed) + method publish*(p: PubSub, topic: string, data: seq[byte]): Future[int] {.base, async.} = diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 3b281ec80..96a139bce 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -48,8 +48,7 @@ type topics*: HashSet[string] sentRpcCache: TimedCache[string] # cache for already sent messages recvdRpcCache: TimedCache[string] # cache for already received messages - refs*: int # refcount of the connections this peer is handling - onConnect: AsyncEvent + onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -69,6 +68,9 @@ proc `conn=`*(p: PubSubPeer, conn: Connection) = p.sendConn = conn p.onConnect.fire() +proc conn*(p: PubSubPeer): Connection = + p.sendConn + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -113,10 +115,17 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "exiting pubsub peer read loop", peer = p.id await conn.close() + except CancelledError as exc: + raise exc except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg + raise exc proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + logScope: + peer = p.id + msgs = $msgs + for m in msgs.items: trace "sending msgs to peer", toPeer = p.id, msgs = $msgs @@ -135,38 +144,29 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) continue - proc sendToRemote() {.async.} = - try: - trace "about to send message", peer = p.id, - encoded = digest - if not p.onConnect.isSet: - await p.onConnect.wait() + try: + trace "about to send message", peer = p.id, + encoded = digest + if p.connected: # this can happen if the remote disconnected + trace "sending encoded msgs to peer", peer = p.id, + encoded = encoded.buffer.shortLog + await p.sendConn.writeLp(encoded.buffer) + p.sentRpcCache.put(digest) - if p.connected: # this can happen if the remote disconnected - trace "sending encoded msgs to peer", peer = p.id, - encoded = encoded.buffer.shortLog - await p.sendConn.writeLp(encoded.buffer) - p.sentRpcCache.put(digest) + for m in msgs: + for mm in m.messages: + for t in mm.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) - for m in msgs: - for mm in m.messages: - for t in mm.topicIDs: - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [p.id, t]) + except CatchableError as exc: + trace "unable to send to remote", exc = exc.msg + if not(isNil(p.sendConn)): + await p.sendConn.close() + p.sendConn = nil + p.onConnect.clear() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil - p.onConnect.clear() - - # if no connection has been set, - # queue messages until a connection - # becomes available - asyncCheck sendToRemote() + raise exc proc sendMsg*(p: PubSubPeer, peerId: PeerID, @@ -185,6 +185,9 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string], peers: seq[PeerInfoMsg] = @[ trace "sending prune msg to peer", peer = p.id, topicID = topic await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic, peers: peers, backoff: backoff)])))]) +proc `$`*(p: PubSubPeer): string = + p.id + proc newPubSubPeer*(peerInfo: PeerInfo, proto: string): PubSubPeer = new result diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 2ab1b114e..38fe47b3a 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -57,7 +57,9 @@ method handshake(s: Secure, initiator: bool): Future[SecureConn] {.async, base.} = doAssert(false, "Not implemented!") -proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} = +proc handleConn*(s: Secure, + conn: Connection, + initiator: bool): Future[Connection] {.async, gcsafe.} = var sconn = await s.handshake(conn, initiator) conn.closeEvent.wait() @@ -73,7 +75,8 @@ method init*(s: Secure) {.gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = trace "handling connection upgrade", proto try: - # We don't need the result but we definitely need to await the handshake + # We don't need the result but we + # definitely need to await the handshake discard await s.handleConn(conn, false) trace "connection secured" except CancelledError as exc: @@ -86,7 +89,10 @@ method init*(s: Secure) {.gcsafe.} = s.handler = handle -method secure*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, base, gcsafe.} = +method secure*(s: Secure, + conn: Connection, + initiator: bool): + Future[Connection] {.async, base, gcsafe.} = result = await s.handleConn(conn, initiator) method readOnce*(s: SecureConn, diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 4ff0d039c..312ae237e 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import oids import chronos, chronicles import connection, ../utility @@ -40,7 +41,6 @@ template withExceptions(body: untyped) = except TransportError: # TODO https://github.com/status-im/nim-chronos/pull/99 raise newLPStreamEOFError() - # raise (ref LPStreamError)(msg: exc.msg, parent: exc) method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = if s.atEof: @@ -73,11 +73,18 @@ method atEof*(s: ChronosStream): bool {.inline.} = method close*(s: ChronosStream) {.async.} = try: if not s.isClosed: - await procCall Connection(s).close() + trace "shutting down chronos stream", address = $s.client.remoteAddress(), + oid = s.oid - trace "shutting down chronos stream", address = $s.client.remoteAddress(), oid = s.oid + # TODO: the sequence here matters + # don't move it after the connections + # close bellow if not s.client.closed(): await s.client.closeWait() + await procCall Connection(s).close() + + except CancelledError as exc: + raise exc except CatchableError as exc: trace "error closing chronosstream", exc = exc.msg diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 188a79f70..454b0581c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -12,7 +12,8 @@ import tables, options, strformat, sets, - algorithm + algorithm, + oids import chronos, chronicles, @@ -65,10 +66,6 @@ type muxer: Muxer handle: Future[void] - Maintainer = object - loopFut: Future[void] - sleepFut: Future[void] - Switch* = ref object of RootObj peerInfo*: PeerInfo connections*: Table[string, seq[ConnectionHolder]] @@ -81,10 +78,13 @@ type streamHandler*: StreamHandler secureManagers*: seq[Secure] pubSub*: Option[PubSub] - dialedPubSubPeers: HashSet[string] running: bool - maintainFuts: Table[string, Maintainer] dialLock: Table[string, AsyncLock] + cleanUpLock: Table[string, AsyncLock] + # gossip 1.1 related + maintaining: HashSet[PeerInfo] + maintainFut: Future[void] + maintainSleepFut: Future[void] proc newNoPubSubException(): ref NoPubSubException {.inline.} = result = newException(NoPubSubException, "no pubsub provided!") @@ -93,7 +93,7 @@ proc newTooManyConnections(): ref TooManyConnections {.inline.} = result = newException(TooManyConnections, "too many connections for peer") proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} -proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} +proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc selectConn(s: Switch, peerInfo: PeerInfo): Connection = ## select the "best" connection according to some criteria @@ -166,6 +166,9 @@ proc storeConn(s: Switch, newSeq[MuxerHolder]()) .add(MuxerHolder(muxer: muxer, handle: handle, dir: dir)) + trace "storred connection", connections = s.connections.len + libp2p_peers.set(s.connections.len.int64) + proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: raise newException(CatchableError, "No secure managers registered!") @@ -259,45 +262,56 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = if isNil(conn): return - defer: - await conn.close() - libp2p_peers.set(s.connections.len.int64) - if isNil(conn.peerInfo): return let id = conn.peerInfo.id - trace "cleaning up connection for peer", peerId = id - if id in s.muxed: - let muxerHolder = s.muxed[id] - .filterIt( - it.muxer.connection == conn - ) - - if muxerHolder.len > 0: - await muxerHolder[0].muxer.close() - if not(isNil(muxerHolder[0].handle)): - await muxerHolder[0].handle + let lock = s.cleanUpLock.mgetOrPut(id, newAsyncLock()) + try: + await lock.acquire() + trace "cleaning up connection for peer", peerId = id if id in s.muxed: - s.muxed[id].keepItIf( - it.muxer.connection != conn + let muxerHolder = s.muxed[id] + .filterIt( + it.muxer.connection == conn + ) + + if muxerHolder.len > 0: + await muxerHolder[0].muxer.close() + if not(isNil(muxerHolder[0].handle)): + await muxerHolder[0].handle + + if id in s.muxed: + s.muxed[id].keepItIf( + it.muxer.connection != conn + ) + + if s.muxed[id].len == 0: + s.muxed.del(id) + + if s.pubSub.isSome: + await s.pubSub.get() + .unsubscribePeer(conn.peerInfo) + + if id in s.connections: + s.connections[id].keepItIf( + it.conn != conn ) - if s.muxed[id].len == 0: - s.muxed.del(id) + if s.connections[id].len == 0: + s.connections.del(id) - if id in s.connections: - s.connections[id].keepItIf( - it.conn != conn - ) + # TODO: Investigate cleanupConn() always called twice for one peer. + if not(conn.peerInfo.isClosed()): + conn.peerInfo.close() + finally: + await conn.close() - if s.connections[id].len == 0: - s.connections.del(id) + if lock.locked(): + lock.release() - # TODO: Investigate cleanupConn() always called twice for one peer. - if not(conn.peerInfo.isClosed()): - conn.peerInfo.close() + libp2p_peers.set(s.connections.len.int64) proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = let connections = s.connections.getOrDefault(peer.id) @@ -330,7 +344,6 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g raise newException(CatchableError, "unable to mux connection, stopping upgrade") - libp2p_peers.set(s.connections.len.int64) trace "succesfully upgraded outgoing connection", uoid = sconn.oid return sconn @@ -382,8 +395,8 @@ proc internalConnect(s: Switch, raise newException(CatchableError, "can't dial self!") let id = peer.id - let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) var conn: Connection + let lock = s.dialLock.mgetOrPut(id, newAsyncLock()) defer: if lock.locked(): @@ -443,29 +456,48 @@ proc internalConnect(s: Switch, doAssert(conn.peerInfo.id in s.connections, "connection not tracked!") - trace "dial succesfull", oid = conn.oid - await s.subscribeToPeer(peer) + trace "dial succesfull", oid = $conn.oid, + peer = $conn.peerInfo + + await s.subscribePeer(peer) return conn proc connect*(s: Switch, peer: PeerInfo) {.async.} = - var conn = await s.internalConnect(peer) + discard await s.internalConnect(peer) proc dial*(s: Switch, peer: PeerInfo, proto: string): Future[Connection] {.async.} = - var conn = await s.internalConnect(peer) + let conn = await s.internalConnect(peer) let stream = await s.getMuxedStream(peer) - if isNil(stream): - await conn.close() - raise newException(CatchableError, "Couldn't get muxed stream") - trace "Attempting to select remote", proto = proto, oid = conn.oid - if not await s.ms.select(stream, proto): - await stream.close() - raise newException(CatchableError, "Unable to select sub-protocol " & proto) + proc cleanup() {.async.} = + if not(isNil(stream)): + await stream.close() - return stream + if not(isNil(conn)): + await conn.close() + + try: + if isNil(stream): + await conn.close() + raise newException(CatchableError, "Couldn't get muxed stream") + + trace "Attempting to select remote", proto = proto, oid = conn.oid + if not await s.ms.select(stream, proto): + await stream.close() + raise newException(CatchableError, "Unable to select sub-protocol " & proto) + + return stream + except CancelledError as exc: + trace "dial canceled" + await cleanup() + raise exc + except CatchableError as exc: + trace "error dialing" + await cleanup() + raise exc proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler): @@ -478,6 +510,8 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = s.ms.addHandler(proto.codec, proto) +proc maintainPeers(s: Switch) {.async, gcsafe.} + proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch for peer", peerInfo = shortLog(s.peerInfo) @@ -502,6 +536,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = if s.pubSub.isSome: await s.pubSub.get().start() + s.maintainFut = maintainPeers(s) info "started libp2p node", peer = $s.peerInfo, addrs = s.peerInfo.addrs result = startFuts # listen for incoming connections @@ -511,26 +546,16 @@ proc stop*(s: Switch) {.async.} = s.running = false - # Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs) - # Cancel their sleep as it likely is running for 5 mins - # running is false so they should exit after that - # and so we just wait/ensure all has finished - # Maintain has tryAndWarn so we should not be priting any error here - # nevertheless use checkFutures! - # Notice.. this is ugly but we have no clean way to express a Chain of operations/futures - # and simply post a cancelation/stop from the root of the chain... - let - maintainers = toSeq(s.maintainFuts.values) - sleepFuts = maintainers.mapIt(it.sleepFut) - loopFuts = maintainers.mapIt(it.loopFut) - for f in sleepFuts: f.cancel() - checkFutures(await allFinished(sleepFuts)) - checkFutures(await allFinished(loopFuts)) - # we want to report errors but we do not want to fail # or crash here, cos we need to clean possibly MANY items # and any following conn/transport won't be cleaned up if s.pubSub.isSome: + # Stop explicit peering system (gossip 1.1 related, but useful even with other pubsubs) + if not isNil(s.maintainSleepFut): + s.maintainSleepFut.cancel() + if not isNil(s.maintainFut): + await s.maintainFut + await s.pubSub.get().stop() for conns in toSeq(s.connections.values): @@ -552,18 +577,20 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" -proc maintainPeer(s: Switch, peerInfo: PeerInfo) {.async.} = +proc maintainPeers(s: Switch) {.async.} = while s.running: - tryAndWarn "explicit peer maintain": - var conns = s.connections.getOrDefault(peerInfo.id) - if conns.len == 0: - # attempt re-connect in this case - trace "explicit peering, trying to re-connect", peer=peerInfo - await s.connect(peerInfo) + for peer in s.maintaining: + tryAndWarn "explicit peer maintain": + var conns = s.connections.getOrDefault(peer.id) + if conns.len == 0: + # attempt re-connect in this case + trace "explicit peering, trying to re-connect", peer + await s.connect(peer) - await sleepAsync(5.minutes) # spec recommended + s.maintainSleepFut = sleepAsync(5.minutes) # spec recommended + await s.maintainSleepFut # do this in order to cancel it -proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = +proc subscribePeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer if s.pubSub.isSome and not(s.pubSub.get().connected(peerInfo)): trace "about to subscribe to pubsub peer", peer = peerInfo.shortLog() @@ -585,21 +612,13 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = trace "unable to subscribe to peer", peer = peerInfo.shortLog return - s.dialedPubSubPeers.incl(peerInfo.id) - try: - if (await s.ms.select(stream, s.pubSub.get().codec)): - await s.pubSub.get().subscribeToPeer(stream) - else: + if not await s.ms.select(stream, s.pubSub.get().codec): + if not(isNil(stream)): await stream.close() - except CatchableError as exc: - trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg - await stream.close() - finally: - s.dialedPubSubPeers.excl(peerInfo.id) - - if peerInfo.maintain: - s.maintainFuts[peerInfo.id].loopFut = maintainPeer(s, peerInfo) - s.maintainFuts[peerInfo.id].sleepFut = newFuture[void]() # stub until real one happens + return + + s.maintaining.incl(peerInfo) + s.pubSub.get().subscribePeer(stream) proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] = @@ -653,8 +672,6 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = if not(isNil(stream)): await stream.close() - trace "got new muxer" - try: # once we got a muxed connection, attempt to # identify it @@ -667,14 +684,15 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # store muxer and muxed connection await s.storeConn(muxer, Direction.In) - libp2p_peers.set(s.connections.len.int64) muxer.connection.closeEvent.wait() .addCallback do(udata: pointer): asyncCheck s.cleanupConn(muxer.connection) + trace "got new muxer", peer = $muxer.connection.peerInfo + # try establishing a pubsub connection - await s.subscribeToPeer(muxer.connection.peerInfo) + await s.subscribePeer(muxer.connection.peerInfo) except CancelledError as exc: await muxer.close() @@ -702,6 +720,7 @@ proc newSwitch*(peerInfo: PeerInfo, identity: identity, muxers: muxers, secureManagers: @secureManagers, + maintaining: initHashSet[PeerInfo]() ) let s = result # can't capture result diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index b2a688251..f6798976f 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -59,7 +59,7 @@ suite "FloodSub": await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") - discard await nodes[0].publish("foobar", "Hello!".toBytes()) + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 result = await completionFut.wait(5.seconds) @@ -90,7 +90,7 @@ suite "FloodSub": await nodes[0].subscribe("foobar", handler) await waitSub(nodes[1], nodes[0], "foobar") - discard await nodes[1].publish("foobar", "Hello!".toBytes()) + check (await nodes[1].publish("foobar", "Hello!".toBytes())) > 0 result = await completionFut.wait(5.seconds) @@ -125,7 +125,7 @@ suite "FloodSub": nodes[1].addValidator("foobar", validator) - discard await nodes[0].publish("foobar", "Hello!".toBytes()) + check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await handlerFut) == true await allFuturesThrowing( @@ -197,8 +197,8 @@ suite "FloodSub": nodes[1].addValidator("foo", "bar", validator) - discard await nodes[0].publish("foo", "Hello!".toBytes()) - discard await nodes[0].publish("bar", "Hello!".toBytes()) + check (await nodes[0].publish("foo", "Hello!".toBytes())) > 0 + check (await nodes[0].publish("bar", "Hello!".toBytes())) > 0 await allFuturesThrowing( nodes[0].stop(), diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index ac33778f9..a1677e420 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -60,7 +60,7 @@ suite "GossipSub internal": let gossipSub = newPubSub(TestGossipSub, randomPeerInfo()) let topic = "foobar" - gossipSub.gossipsub[topic] = initHashSet[string]() + gossipSub.mesh[topic] = initHashSet[string]() gossipSub.topics[topic] = Topic() # has to be in topics to rebalance var conns = newSeq[Connection]() @@ -71,9 +71,9 @@ suite "GossipSub internal": conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].conn = conn - gossipSub.gossipsub[topic].incl(peerInfo.id) + gossipSub.mesh[topic].incl(peerInfo.id) - check gossipSub.gossipsub[topic].len == 15 + check gossipSub.mesh[topic].len == 15 await gossipSub.rebalanceMesh(topic) check gossipSub.mesh[topic].len == GossipSubD @@ -102,6 +102,7 @@ suite "GossipSub internal": conn.peerInfo = peerInfo gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec) gossipSub.peers[peerInfo.id].handler = handler + gossipSub.peers[peerInfo.id].topics &= topic gossipSub.gossipsub[topic].incl(peerInfo.id) check gossipSub.gossipsub[topic].len == 15 diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 833020eed..df43ffa64 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -90,7 +90,6 @@ suite "GossipSub": nodes[1].addValidator("foobar", validator) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - result = (await validatorFut) and (await handlerFut) await allFuturesThrowing( nodes[0].stop(), @@ -146,7 +145,6 @@ suite "GossipSub": awaiters.add((await nodes[1].start())) await subscribeNodes(nodes) - await nodes[1].subscribe("foo", handler) await nodes[1].subscribe("bar", handler) @@ -181,7 +179,8 @@ suite "GossipSub": var nodes: seq[Switch] = newSeq[Switch]() for i in 0..<2: - nodes.add newStandardSwitch(gossip = true, secureManagers = [SecureProtocol.Noise]) + nodes.add newStandardSwitch(gossip = true, + secureManagers = [SecureProtocol.Noise]) var awaitters: seq[Future[void]] for node in nodes: @@ -298,7 +297,7 @@ suite "GossipSub": await nodes[1].stop() await allFuturesThrowing(wait) - # result = observed == 2 + check observed == 2 result = true check: @@ -339,7 +338,9 @@ suite "GossipSub": var runs = 10 for i in 0..= runs: seenFut.complete() - subs.add(allFutures(dialer.subscribe("foobar", handler), - waitSub(nodes[0], dialer, "foobar"))) + subs &= dialer.subscribe("foobar", handler) await allFuturesThrowing(subs) @@ -388,10 +388,12 @@ suite "GossipSub": var runs = 10 for i in 0..