diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 3edd2ff00..8d101146a 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -122,7 +122,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = ## await conn.join() - trace "triggering connection cleanup" + trace "triggering connection cleanup", peer = $conn.peerInfo await c.cleanupConn(conn) proc selectConn*(c: ConnManager, @@ -145,6 +145,8 @@ proc selectConn*(c: ConnManager, peerId: PeerID): Connection = var conn = c.selectConn(peerId, Direction.Out) if isNil(conn): conn = c.selectConn(peerId, Direction.In) + if isNil(conn): + trace "connection not found", peerId return conn @@ -157,6 +159,8 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer = if conn in c.muxed: return c.muxed[conn].muxer + else: + debug "no muxer for connection", conn = $conn proc storeConn*(c: ConnManager, conn: Connection) = ## store a connection @@ -171,8 +175,7 @@ proc storeConn*(c: ConnManager, conn: Connection) = let peerId = conn.peerInfo.peerId if c.conns.getOrDefault(peerId).len > c.maxConns: trace "too many connections", peer = $peerId, - conns = c.conns - .getOrDefault(peerId).len + conns = c.conns.getOrDefault(peerId).len raise newTooManyConnections() @@ -185,6 +188,8 @@ proc storeConn*(c: ConnManager, conn: Connection) = asyncCheck c.onClose(conn) libp2p_peers.set(c.conns.len.int64) + trace "stored connection", connections = c.conns.len, peer = peerId + proc storeOutgoing*(c: ConnManager, conn: Connection) = conn.dir = Direction.Out c.storeConn(conn) @@ -209,7 +214,7 @@ proc storeMuxer*(c: ConnManager, muxer: muxer, handle: handle) - trace "stored connection", connections = c.conns.len + trace "stored muxer", connections = c.conns.len proc getMuxedStream*(c: ConnManager, peerId: PeerID, diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index c74a81fd8..9bc120bf7 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -174,25 +174,20 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy finally: trace "leaving multistream loop" -proc addHandler*[T: LPProtocol](m: MultistreamSelect, - codec: string, - protocol: T, - matcher: Matcher = nil) = +proc addHandler*(m: MultistreamSelect, + codec: string, + protocol: LPProtocol, + matcher: Matcher = nil) = ## register a protocol - # TODO: This is a bug in chronicles, - # it break if I uncomment this line. - # Which is almost the same as the - # one on the next override of addHandler - # - # trace "registering protocol", codec = codec + trace "registering protocol", codec = codec m.handlers.add(HandlerHolder(proto: codec, protocol: protocol, match: matcher)) -proc addHandler*[T: LPProtoHandler](m: MultistreamSelect, - codec: string, - handler: T, - matcher: Matcher = nil) = +proc addHandler*(m: MultistreamSelect, + codec: string, + handler: LPProtoHandler, + matcher: Matcher = nil) = ## helper to allow registering pure handlers trace "registering proto handler", codec = codec diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 613c95710..9195207c0 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -103,8 +103,10 @@ proc replenishFanout(g: GossipSub, topic: string) = proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = logScope: topic + mesh = g.mesh.peers(topic) + gossipsub = g.gossipsub.peers(topic) - trace "about to rebalance mesh" + trace "rebalancing mesh" # create a mesh topic that we're subscribing to @@ -119,16 +121,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) ) - logScope: - meshPeers = g.mesh.peers(topic) - grafts = grafts.len - shuffle(grafts) # Graft peers so we reach a count of D grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic))) - trace "getting peers", topic, peers = grafts.len + trace "grafting", grafts = grafts.len for peer in grafts: if g.mesh.addPeer(topic, peer): @@ -140,7 +138,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = shuffle(prunes) prunes.setLen(prunes.len - GossipSubD) # .. down to D peers - trace "about to prune mesh", prunes = prunes.len + trace "pruning", prunes = prunes.len for peer in prunes: g.mesh.removePeer(topic, peer) @@ -154,13 +152,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(topic).int64, labelValues = [topic]) - # Send changes to peers after table updates to avoid stale state - let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) - discard await g.broadcast(grafts, graft, DefaultSendTimeout) - discard await g.broadcast(prunes, prune, DefaultSendTimeout) + trace "mesh balanced" - trace "mesh balanced, got peers", peers = g.mesh.peers(topic) + # Send changes to peers after table updates to avoid stale state + if grafts.len > 0: + let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) + discard await g.broadcast(grafts, graft, DefaultSendTimeout) + if prunes.len > 0: + let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + discard await g.broadcast(prunes, prune, DefaultSendTimeout) proc dropFanoutPeers(g: GossipSub) = # drop peers that we haven't published to in @@ -506,10 +506,10 @@ method publish*(g: GossipSub, discard await procCall PubSub(g).publish(topic, data, timeout) trace "publishing message on topic", topic, data = data.shortLog - var peers: HashSet[PubSubPeer] if topic.len <= 0: # data could be 0/empty return 0 + var peers: HashSet[PubSubPeer] if topic in g.topics: # if we're subscribed use the mesh peers = g.mesh.getOrDefault(topic) else: # not subscribed, send to fanout peers @@ -537,14 +537,18 @@ method publish*(g: GossipSub, if msgId notin g.mcache: g.mcache.put(msgId, msg) - let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) - when defined(libp2p_expensive_metrics): - if published > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) + if peers.len > 0: + let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout) + when defined(libp2p_expensive_metrics): + if published > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = published, - msg = msg.shortLog() - return published + trace "published message to peers", peers = published, + msg = msg.shortLog() + return published + else: + debug "No peers for gossip message", topic, msg = msg.shortLog() + return 0 method start*(g: GossipSub) {.async.} = trace "gossipsub start" diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a3a9dfeca..17e0203e8 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -80,13 +80,14 @@ proc send*( ## send to remote peer ## - trace "sending pubsub message to peer", peer = $peer, msg = msg + trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) try: await peer.send(msg, timeout) except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception sending pubsub message to peer", peer = $peer, msg = msg + trace "exception sending pubsub message to peer", + peer = $peer, msg = shortLog(msg) p.unsubscribePeer(peer.peerId) raise exc @@ -98,11 +99,11 @@ proc broadcast*( ## send messages and cleanup failed peers ## - trace "broadcasting messages to peers", peers = sendPeers.len, message = msg + trace "broadcasting messages to peers", + peers = sendPeers.len, message = shortLog(msg) let sent = await allFinished( sendPeers.mapIt( p.send(it, msg, timeout) )) return sent.filterIt( it.finished and it.error.isNil ).len - trace "messages broadcasted to peers", peers = sent.len proc sendSubs*(p: PubSub, peer: PubSubPeer, diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 95e0e8105..3359e3344 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -77,7 +77,7 @@ method initStream*(s: Connection) = s.timeoutHandler = proc() {.async.} = await s.close() - trace "timeout set at", timeout = $s.timeout.millis + trace "timeout set at", timeout = s.timeout.millis doAssert(isNil(s.timerTaskFut)) # doAssert(s.timeout > 0.millis) if s.timeout > 0.millis: diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index efd9f9440..c308b3b62 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids +import std/oids +import stew/byteutils import chronicles, chronos, metrics import ../varint, ../vbuffer, @@ -16,6 +17,8 @@ import ../varint, declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"]) +export oids + logScope: topics = "lpstream" @@ -182,6 +185,9 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} = await s.readExactly(addr res[0], res.len) return res +method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} = + doAssert(false, "not implemented!") + proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} = ## write length prefixed var buf = initVBuffer() @@ -189,14 +195,11 @@ proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} = buf.finish() s.write(buf.buffer) -method write*(s: LPStream, msg: seq[byte]) {.base, async.} = - doAssert(false, "not implemented!") - proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} = s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1))) proc write*(s: LPStream, msg: string): Future[void] = - s.write(@(toOpenArrayByte(msg, 0, msg.high))) + s.write(msg.toBytes()) # TODO: split `close` into `close` and `dispose/destroy` method close*(s: LPStream) {.base, async.} = diff --git a/tests/testmplex.nim b/tests/testmplex.nim index c17ca1c6b..0ea9131fd 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -15,7 +15,7 @@ import ../libp2p/[errors, import ./helpers -when defined(nimHasUsed): {.used.} +{.used.} suite "Mplex": teardown: @@ -28,8 +28,7 @@ suite "Mplex": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("000873747265616d2031") - let stream = newBufferStream(encHandler) - let conn = stream + let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes) await conn.close() @@ -40,8 +39,7 @@ suite "Mplex": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("88010873747265616d2031") - let stream = newBufferStream(encHandler) - let conn = stream + let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes) await conn.close() @@ -52,8 +50,7 @@ suite "Mplex": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("020873747265616d2031") - let stream = newBufferStream(encHandler) - let conn = stream + let conn = newBufferStream(encHandler) await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes) await conn.close() @@ -64,8 +61,7 @@ suite "Mplex": proc encHandler(msg: seq[byte]) {.async.} = check msg == fromHex("8a010873747265616d2031") - let stream = newBufferStream(encHandler) - let conn = stream + let conn = newBufferStream(encHandler) await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes) await conn.close()