diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 3c7e27ab3..36a9c39f9 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -135,24 +135,27 @@ method handleConn*(p: PubSub, ## that we're interested in ## - if isNil(conn.peerInfo): - trace "no valid PeerId for peer" - await conn.close() - return + try: + if isNil(conn.peerInfo): + trace "no valid PeerId for peer" + await conn.close() + return - proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = - # call pubsub rpc handler - await p.rpcHandler(peer, msgs) + proc handler(peer: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = + # call pubsub rpc handler + await p.rpcHandler(peer, msgs) - let peer = p.getPeer(conn.peerInfo, proto) - let topics = toSeq(p.topics.keys) - if topics.len > 0: - await p.sendSubs(peer, topics, true) + let peer = p.getPeer(conn.peerInfo, proto) + let topics = toSeq(p.topics.keys) + if topics.len > 0: + await p.sendSubs(peer, topics, true) - peer.handler = handler - await peer.handle(conn) # spawn peer read loop - trace "pubsub peer handler ended, cleaning up" - await p.internalCleanup(conn) + peer.handler = handler + await peer.handle(conn) # spawn peer read loop + trace "pubsub peer handler ended, cleaning up" + await p.internalClenaup(conn) + except CatchableError as exc: + trace "exception ocurred in pubsub handle", exc = exc.msg method subscribeToPeer*(p: PubSub, conn: Connection) {.base, async.} = diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index aef219154..f83bac42b 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -436,35 +436,24 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async. if message.len == 0: return - try: + var + left = message.len + offset = 0 + while left > 0: + let + chunkSize = if left > MaxPlainSize: MaxPlainSize else: left + packed = packNoisePayload(message.toOpenArray(offset, offset + chunkSize - 1)) + cipher = sconn.writeCs.encryptWithAd([], packed) + left = left - chunkSize + offset = offset + chunkSize var - left = message.len - offset = 0 - while left > 0: - let - chunkSize = if left > MaxPlainSize: MaxPlainSize else: left - packed = packNoisePayload(message.toOpenArray(offset, offset + chunkSize - 1)) - cipher = sconn.writeCs.encryptWithAd([], packed) - left = left - chunkSize - offset = offset + chunkSize - var - lesize = cipher.len.uint16 - besize = lesize.toBytesBE - outbuf = newSeqOfCap[byte](cipher.len + 2) - trace "sendEncryptedMessage", size = lesize, peer = $sconn.peerInfo, left, offset - outbuf &= besize - outbuf &= cipher - await sconn.stream.write(outbuf) - except LPStreamEOFError: - trace "Ignoring EOF while writing" - except CancelledError as exc: - raise exc - except CatchableError as exc: - # TODO these exceptions are ignored since it's likely that if writes are - # are failing, the underlying connection is already closed - this needs - # more cleanup though - debug "Could not write to connection", error = exc.name - trace "Could not write to connection - verbose", msg = exc.msg + lesize = cipher.len.uint16 + besize = lesize.toBytesBE + outbuf = newSeqOfCap[byte](cipher.len + 2) + trace "sendEncryptedMessage", size = lesize, peer = $sconn.peerInfo, left, offset + outbuf &= besize + outbuf &= cipher + await sconn.stream.write(outbuf) method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[SecureConn] {.async.} = trace "Starting Noise handshake", initiator diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index b5ca97906..599ff8295 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -212,40 +212,29 @@ method write*(sconn: SecioConn, message: seq[byte]) {.async.} = if message.len == 0: return - try: - var - left = message.len - offset = 0 - while left > 0: - let - chunkSize = if left > SecioMaxMessageSize - 64: SecioMaxMessageSize - 64 else: left - macsize = sconn.writerMac.sizeDigest() - length = chunkSize + macsize + var + left = message.len + offset = 0 + while left > 0: + let + chunkSize = if left > SecioMaxMessageSize - 64: SecioMaxMessageSize - 64 else: left + macsize = sconn.writerMac.sizeDigest() + length = chunkSize + macsize - var msg = newSeq[byte](chunkSize + 4 + macsize) - msg[0..<4] = uint32(length).toBytesBE() + var msg = newSeq[byte](chunkSize + 4 + macsize) + msg[0..<4] = uint32(length).toBytesBE() - sconn.writerCoder.encrypt(message.toOpenArray(offset, offset + chunkSize - 1), - msg.toOpenArray(4, 4 + chunkSize - 1)) - left = left - chunkSize - offset = offset + chunkSize - let mo = 4 + chunkSize - sconn.writerMac.update(msg.toOpenArray(4, 4 + chunkSize - 1)) - sconn.writerMac.finish(msg.toOpenArray(mo, mo + macsize - 1)) - sconn.writerMac.reset() + sconn.writerCoder.encrypt(message.toOpenArray(offset, offset + chunkSize - 1), + msg.toOpenArray(4, 4 + chunkSize - 1)) + left = left - chunkSize + offset = offset + chunkSize + let mo = 4 + chunkSize + sconn.writerMac.update(msg.toOpenArray(4, 4 + chunkSize - 1)) + sconn.writerMac.finish(msg.toOpenArray(mo, mo + macsize - 1)) + sconn.writerMac.reset() - trace "Writing message", message = msg.shortLog, left, offset - await sconn.stream.write(msg) - except LPStreamEOFError: - trace "Ignoring EOF while writing" - except CancelledError as exc: - raise exc - except CatchableError as exc: - # TODO these exceptions are ignored since it's likely that if writes are - # are failing, the underlying connection is already closed - this needs - # more cleanup though - debug "Could not write to connection", error = exc.name - trace "Could not write to connection - verbose", msg = exc.msg + trace "Writing message", message = msg.shortLog, left, offset + await sconn.stream.write(msg) proc newSecioConn(conn: Connection, hash: string,