diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 88c46cbec..52df181c3 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -31,6 +31,10 @@ logScope: ## directions are closed and when the reader of the channel has read the ## EOF marker +const + MaxWrites = 1024 ##\ + ## Maximum number of in-flight writes - after this, we disconnect the peer + type LPChannel* = ref object of BufferStream id*: uint64 # channel id @@ -42,6 +46,7 @@ type msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code + writes*: int # In-flight writes proc open*(s: LPChannel) {.async, gcsafe.} @@ -149,6 +154,10 @@ method readOnce*(s: LPChannel, pbytes: pointer, nbytes: int): Future[int] {.async.} = + ## Mplex relies on reading being done regularly from every channel, or all + ## channels are blocked - in particular, this means that reading from one + ## channel must not be done from within a callback / read handler of another + ## or the reads will lock each other. try: let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes) trace "readOnce", s, bytes @@ -160,12 +169,21 @@ method readOnce*(s: LPChannel, raise exc method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = + ## Write to mplex channel - there may be up to MaxWrite concurrent writes + ## pending after which the peer is disconencted if s.closedLocal or s.conn.closed: raise newLPStreamClosedError() if msg.len == 0: return + if s.writes >= MaxWrites: + debug "Closing connection, too many in-flight writes on channel", + s, conn = s.conn, writes = s.writes + await s.conn.close() + return + + s.writes += 1 try: if not s.isOpen: await s.open() @@ -179,6 +197,8 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = trace "exception in lpchannel write handler", s, msg = exc.msg await s.conn.close() raise exc + finally: + s.writes -= 1 proc init*( L: type LPChannel, diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index af2a40fbb..7a60ef4fb 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -530,7 +530,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if g.mesh.addPeer(topic, peer): g.grafted(peer, topic) grafting &= peer - trace "opportunistic grafting", peer = $peer + trace "opportunistic grafting", peer when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_gossipsub @@ -977,9 +977,9 @@ proc handleIHave(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant = if peer.score < g.parameters.gossipThreshold: - trace "ihave: ignoring low score peer", peer = $peer, score = peer.score + trace "ihave: ignoring low score peer", peer, score = peer.score elif peer.iHaveBudget == 0: - trace "ihave: ignoring out of budget peer", peer = $peer, score = peer.score + trace "ihave: ignoring out of budget peer", peer, score = peer.score else: dec peer.iHaveBudget for ihave in ihaves: @@ -995,7 +995,7 @@ proc handleIWant(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] = if peer.score < g.parameters.gossipThreshold: - trace "iwant: ignoring low score peer", peer = $peer, score = peer.score + trace "iwant: ignoring low score peer", peer, score = peer.score else: for iwant in iwants: for mid in iwant.messageIDs: @@ -1178,7 +1178,7 @@ method publish*(g: GossipSub, # but a peer's own messages will always be published to all known peers in the topic. for peer in g.gossipsub.getOrDefault(topic): if peer.score >= g.parameters.publishThreshold: - trace "publish: including flood/high score peer", peer = $peer + trace "publish: including flood/high score peer", peer peers.incl(peer) # add always direct peers diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index d784de1ed..ade6f4468 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, strutils, tables, hashes, sets] +import std/[sequtils, strutils, tables, hashes] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], ../../peerid, @@ -17,6 +17,8 @@ import rpc/[messages, message, protobuf], ../../protobuf/minprotobuf, ../../utility +export peerid, connection + logScope: topics = "pubsubpeer" @@ -60,8 +62,6 @@ type RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe.} -chronicles.formatIt(PubSubPeer): $it.peerId - func hash*(p: PubSubPeer): Hash = # int is either 32/64, so intptr basically, pubsubpeer is a ref cast[pointer](p).hash @@ -75,6 +75,9 @@ proc connected*(p: PubSubPeer): bool = not p.sendConn.isNil and not (p.sendConn.closed or p.sendConn.atEof) +proc hasObservers(p: PubSubPeer): bool = + p.observers != nil and anyIt(p.observers[], it != nil) + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: @@ -90,7 +93,6 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = obs.onSend(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async.} = - debug "starting pubsub read loop", conn, peer = p, closed = conn.closed try: @@ -176,50 +178,54 @@ proc connectImpl(p: PubSubPeer) {.async.} = proc connect*(p: PubSubPeer) = asyncSpawn connectImpl(p) -proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = - doAssert(not isNil(p), "pubsubpeer nil!") - - let conn = p.sendConn - if conn == nil: - trace "No send connection, skipping message", p, msg - return - - trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) - - # trigger send hooks - var mm = msg # hooks can modify the message - p.sendObservers(mm) - - let encoded = encodeRpcMsg(mm) - if encoded.len <= 0: - info "empty message, skipping" - return - +proc sendImpl(conn: Connection, encoded: seq[byte]) {.async.} = try: trace "sending encoded msgs to peer", conn, encoded = shortLog(encoded) await conn.writeLp(encoded) trace "sent pubsub message to remote", conn - when defined(libp2p_expensive_metrics): - for x in mm.messages: - for t in x.topicIDs: - # metrics - libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) - except CatchableError as exc: # Because we detach the send call from the currently executing task using # asyncSpawn, no exceptions may leak out of it - trace "Unable to send to remote", conn, exc = exc.msg + trace "Unable to send to remote", conn, msg = exc.msg # Next time sendConn is used, it will be have its close flag set and thus # will be recycled await conn.close() # This will clean up the send connection proc send*(p: PubSubPeer, msg: RPCMsg) = - asyncSpawn sendImpl(p, msg) + doAssert(not isNil(p), "pubsubpeer nil!") -proc `$`*(p: PubSubPeer): string = - $p.peerId + let conn = p.sendConn + if conn == nil or conn.closed(): + trace "No send connection, skipping message", p, msg + return + + trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg) + + let encoded = if p.hasObservers(): + # trigger send hooks + var mm = msg # hooks can modify the message + p.sendObservers(mm) + encodeRpcMsg(mm) + else: + # If there are no send hooks, we redundantly re-encode the message to + # protobuf for every peer - this could easily be improved! + encodeRpcMsg(msg) + + if encoded.len <= 0: + debug "empty message, skipping", p, msg + return + + # To limit the size of the closure, we only pass the encoded message and + # connection to the spawned send task + asyncSpawn sendImpl(conn, encoded) + + when defined(libp2p_expensive_metrics): + for x in mm.messages: + for t in x.topicIDs: + # metrics + libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t]) proc newPubSubPeer*(peerId: PeerID, getConn: GetConn, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index 28755cdcc..7a8c0742f 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -14,6 +14,8 @@ import messages, ../../../utility, ../../../protobuf/minprotobuf +{.push raises: [Defect].} + logScope: topics = "pubsubprotobuf" diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index a7ab8de14..311ef81e6 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -165,6 +165,7 @@ method readOnce*(s: BufferStream, if buf.len == 0 or s.isEof: # Another task might have set EOF! # No more data will arrive on read queue + trace "EOF", s s.isEof = true else: let remaining = min(buf.len, nbytes - rbytes)