diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 36a9c39f9..cc52b1f97 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -118,7 +118,7 @@ proc internalCleanup(p: PubSub, conn: Connection) {.async.} = var peer = p.getPeer(conn.peerInfo, p.codec) await conn.closeEvent.wait() - trace "connection closed, cleaning up peer", peer = conn.peerInfo.id + trace "pubsub conn closed, cleaning up peer", peer = conn.peerInfo.id await p.cleanUpHelper(peer) method handleConn*(p: PubSub, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 713bf0772..d2f6273c1 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -78,52 +78,42 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = await conn.close() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = - try: - for m in msgs.items: - trace "sending msgs to peer", toPeer = p.id, msgs = msgs - let encoded = encodeRpcMsg(m) - # trigger hooks - if not(isNil(p.observers)) and p.observers[].len > 0: - var mm = m - for obs in p.observers[]: - obs.onSend(p, mm) + for m in msgs.items: + trace "sending msgs to peer", toPeer = p.id, msgs = msgs + let encoded = encodeRpcMsg(m) + # trigger hooks + if not(isNil(p.observers)) and p.observers[].len > 0: + var mm = m + for obs in p.observers[]: + obs.onSend(p, mm) - if encoded.buffer.len <= 0: - trace "empty message, skipping", peer = p.id - return + if encoded.buffer.len <= 0: + trace "empty message, skipping", peer = p.id + return - let digest = $(sha256.digest(encoded.buffer)) - if digest in p.sentRpcCache: - trace "message already sent to peer, skipping", peer = p.id - continue + let digest = $(sha256.digest(encoded.buffer)) + if digest in p.sentRpcCache: + trace "message already sent to peer, skipping", peer = p.id + continue - proc sendToRemote() {.async.} = - trace "about send message", peer = p.id, - encoded = digest + proc sendToRemote() {.async.} = + try: + trace "about to send message", peer = p.id, + encoded = digest await p.onConnect.wait() - try: - trace "sending encoded msgs to peer", peer = p.id, - encoded = encoded.buffer.shortLog - await p.sendConn.writeLp(encoded.buffer) - p.sentRpcCache.put(digest) - except CatchableError as exc: - trace "unable to send to remote", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil - p.onConnect.clear() + trace "sending encoded msgs to peer", peer = p.id, + encoded = encoded.buffer.shortLog + await p.sendConn.writeLp(encoded.buffer) + p.sentRpcCache.put(digest) + except CatchableError as exc: + trace "unable to send to remote", exc = exc.msg + p.sendConn = nil + p.onConnect.clear() - # if no connection has been set, - # queue messages untill a connection - # becomes available - asyncCheck sendToRemote() - - except CatchableError as exc: - trace "Exception occurred in PubSubPeer.send", exc = exc.msg - if not(isNil(p.sendConn)): - await p.sendConn.close() - p.sendConn = nil - p.onConnect.clear() + # if no connection has been set, + # queue messages untill a connection + # becomes available + asyncCheck sendToRemote() proc sendMsg*(p: PubSubPeer, peerId: PeerID, diff --git a/libp2p/protocols/pubsub/rpc/protobuf.nim b/libp2p/protocols/pubsub/rpc/protobuf.nim index b5435b183..78a422cd1 100644 --- a/libp2p/protocols/pubsub/rpc/protobuf.nim +++ b/libp2p/protocols/pubsub/rpc/protobuf.nim @@ -244,7 +244,6 @@ proc encodeRpcMsg*(msg: RPCMsg): ProtoBuffer {.gcsafe.} = proc decodeRpcMsg*(msg: seq[byte]): RPCMsg {.gcsafe.} = var pb = initProtoBuffer(msg) - result.subscriptions = newSeq[SubOpts]() while true: # decode SubOpts array var field = pb.enterSubMessage()