diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 0590111..4361bb0 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -127,7 +127,12 @@ method close*(s: Connection) {.async, gcsafe.} = if not isNil(s.stream) and not s.stream.closed: trace "closing child stream", closed = s.closed, conn = $s - await s.stream.close() + try: + await s.stream.close() + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Error while closing child stream", err = exc.msg s.closeEvent.fire() diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 41ada6e..30cc60d 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -61,7 +61,7 @@ proc select*(m: MultistreamSelect, result = cast[string]((await conn.readLp(1024))) # read ms header result.removeSuffix("\n") if result != Codec: - error "handshake failed", codec = result.toHex() + notice "handshake failed", codec = result.toHex() raise newMultistreamHandshakeException() if proto.len() == 0: # no protocols, must be a handshake call diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index f64391a..b6152fa 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -50,26 +50,34 @@ proc writeMsg*(conn: Connection, trace "sending data over mplex", id, msgType, data = data.len - var + try: + var left = data.len offset = 0 - while left > 0 or data.len == 0: - let - chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left - chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data - ## write lenght prefixed - var buf = initVBuffer() - buf.writePBVarint(id shl 3 or ord(msgType).uint64) - buf.writePBVarint(chunkSize.uint64) # size should be always sent - buf.finish() - left = left - chunkSize - offset = offset + chunkSize - try: + while left > 0 or data.len == 0: + let + chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left + chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data + ## write lenght prefixed + var buf = initVBuffer() + buf.writePBVarint(id shl 3 or ord(msgType).uint64) + buf.writePBVarint(chunkSize.uint64) # size should be always sent + buf.finish() + left = left - chunkSize + offset = offset + chunkSize await conn.write(buf.buffer & chunk) - except LPStreamIncompleteError as exc: - trace "unable to send message", exc = exc.msg - if data.len == 0: - return + + if data.len == 0: + return + except LPStreamEOFError: + trace "Ignoring EOF while writing" + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO these exceptions are ignored since it's likely that if writes are + # are failing, the underlying connection is already closed - this needs + # more cleanup though + debug "Could not write to connection", msg = exc.msg proc writeMsg*(conn: Connection, id: uint64, diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 2bbb900..1a0fb07 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -92,14 +92,7 @@ proc open*(s: LPChannel): Future[void] = method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true - # If remote is closed - # EOF will happepn here - # We can safely ignore in that case - # s.closed won't be true sadly - try: - await s.closeMessage() - except LPStreamEOFError: - discard + await s.closeMessage() proc resetMessage(s: LPChannel) {.async.} = await s.conn.writeMsg(s.id, s.resetCode) @@ -163,13 +156,11 @@ method readOnce*(s: LPChannel, result = await procCall readOnce(BufferStream(s), pbytes, nbytes) await s.tryCleanup() -template writePrefix: untyped = +method write*(s: LPChannel, msg: seq[byte]) {.async.} = if s.closedLocal or s.isReset: raise newLPStreamEOFError() if s.isLazy and not s.isOpen: await s.open() -method write*(s: LPChannel, msg: seq[byte]) {.async.} = - writePrefix() await procCall write(BufferStream(s), msg) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 3bfb1cc..8c3fb2c 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -201,7 +201,17 @@ method publish*(p: PubSub, if p.triggerSelf and topic in p.topics: for h in p.topics[topic].handler: trace "triggering handler", topicID = topic - await h(topic, data) + try: + await h(topic, data) + except LPStreamEOFError: + trace "Ignoring EOF while writing" + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO these exceptions are ignored since it's likely that if writes are + # are failing, the underlying connection is already closed - this needs + # more cleanup though + debug "Could not write to pubsub connection", msg = exc.msg method initPubSub*(p: PubSub) {.base.} = ## perform pubsub initializaion diff --git a/libp2p/protocols/secure/noise.nim b/libp2p/protocols/secure/noise.nim index e152e26..46d3d69 100644 --- a/libp2p/protocols/secure/noise.nim +++ b/libp2p/protocols/secure/noise.nim @@ -455,8 +455,15 @@ method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] {.async. outbuf &= besize outbuf &= cipher await sconn.stream.write(outbuf) - except AsyncStreamWriteError: - trace "Could not write to connection" + except LPStreamEOFError: + trace "Ignoring EOF while writing" + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO these exceptions are ignored since it's likely that if writes are + # are failing, the underlying connection is already closed - this needs + # more cleanup though + debug "Could not write to connection", msg = exc.msg method handshake*(p: Noise, conn: Connection, initiator: bool = false): Future[SecureConn] {.async.} = trace "Starting Noise handshake", initiator diff --git a/libp2p/protocols/secure/secio.nim b/libp2p/protocols/secure/secio.nim index 44e1553..da57353 100644 --- a/libp2p/protocols/secure/secio.nim +++ b/libp2p/protocols/secure/secio.nim @@ -236,8 +236,15 @@ method write*(sconn: SecioConn, message: seq[byte]) {.async.} = trace "Writing message", message = msg.shortLog, left, offset await sconn.stream.write(msg) - except AsyncStreamWriteError: - trace "Could not write to connection" + except LPStreamEOFError: + trace "Ignoring EOF while writing" + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO these exceptions are ignored since it's likely that if writes are + # are failing, the underlying connection is already closed - this needs + # more cleanup though + debug "Could not write to connection", msg = exc.msg proc newSecioConn(conn: Connection, hash: string, diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index fe98d42..fce18d4 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -67,6 +67,12 @@ method close*(s: ChronosStream) {.async.} = if not s.closed: trace "shutting chronos stream", address = $s.client.remoteAddress() if not s.client.closed(): - await s.client.closeWait() + try: + await s.client.closeWait() + except CancelledError as exc: + raise exc + except CatchableError as exc: + # Shouldn't happen, but we can't be sure + warn "error while closing connection", msg = exc.msg s.closeEvent.fire() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index de1c67c..cf32bcc 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -167,19 +167,25 @@ proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async, result = conn proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - trace "handling connection", conn = $conn - result = conn + try: + trace "handling connection", conn = $conn + result = conn - # don't mux/secure twise - if conn.peerInfo.id in s.muxed: - return + # don't mux/secure twise + if conn.peerInfo.id in s.muxed: + return - result = await s.secure(result) # secure the connection - if isNil(result): - return + result = await s.secure(result) # secure the connection + if isNil(result): + return - await s.mux(result) # mux it if possible - s.connections[conn.peerInfo.id] = result + await s.mux(result) # mux it if possible + s.connections[conn.peerInfo.id] = result + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Couldn't upgrade outgoing connection", msg = exc.msg + return nil proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = trace "upgrading incoming connection", conn = $conn @@ -189,25 +195,37 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc securedHandler (conn: Connection, proto: string) {.async, gcsafe, closure.} = - trace "Securing connection" - let secure = s.secureManagers[proto] - let sconn = await secure.secure(conn, false) - if not isNil(sconn): + try: + trace "Securing connection" + let secure = s.secureManagers[proto] + let sconn = await secure.secure(conn, false) + if sconn.isNil: + return + # add the muxer for muxer in s.muxers.values: ms.addHandler(muxer.codec, muxer) - # handle subsequent requests - await ms.handle(sconn) - await sconn.close() + # handle subsequent requests + await ms.handle(sconn) + await sconn.close() + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "ending secured handler", err = exc.msg if (await ms.select(conn)): # just handshake # add the secure handlers for k in s.secureManagers.keys: ms.addHandler(k, securedHandler) - # handle secured connections - await ms.handle(conn) + try: + # handle secured connections + await ms.handle(conn) + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "ending multistream", err = exc.msg proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} @@ -223,6 +241,8 @@ proc internalConnect(s: Switch, trace "Dialing address", address = $a try: conn = await t.dial(a) + except CancelledError as exc: + raise exc except CatchableError as exc: trace "couldn't dial peer, transport failed", exc = exc.msg, address = a continue @@ -288,6 +308,8 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: await s.upgradeIncoming(conn) # perform upgrade on incoming connection + except CancelledError as exc: + raise exc except CatchableError as exc: trace "Exception occurred in Switch.start", exc = exc.msg finally: @@ -333,6 +355,8 @@ proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = s.dialedPubSubPeers.incl(peerInfo.id) let conn = await s.dial(peerInfo, s.pubSub.get().codec) await s.pubSub.get().subscribeToPeer(conn) + except CancelledError as exc: + raise exc except CatchableError as exc: warn "unable to initiate pubsub", exc = exc.msg finally: diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 4b80ee7..c9c8866 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -64,7 +64,7 @@ proc cleanup(t: Transport, conn: Connection) {.async.} = proc connHandler*(t: TcpTransport, client: StreamTransport, initiator: bool): Connection = - trace "handling connection for", address = $client.remoteAddress + trace "handling connection", address = $client.remoteAddress let conn: Connection = newConnection(newChronosStream(client)) conn.observedAddrs = MultiAddress.init(client.remoteAddress) if not initiator: @@ -78,11 +78,24 @@ proc connHandler*(t: TcpTransport, proc connCb(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = - trace "incomming connection for", address = $client.remoteAddress - let t = cast[TcpTransport](server.udata) - # we don't need result connection in this case - # as it's added inside connHandler - discard t.connHandler(client, false) + trace "incoming connection", address = $client.remoteAddress + try: + let t = cast[TcpTransport](server.udata) + # we don't need result connection in this case + # as it's added inside connHandler + discard t.connHandler(client, false) + except CancelledError as exc: + raise exc + except CatchableError as err: + debug "Connection setup failed", err = err.msg + if not client.closed: + try: + client.close() + except CancelledError as err: + raise err + except CatchableError as err: + # shouldn't happen but.. + warn "Error closing connection", err = err.msg method init*(t: TcpTransport) = t.multicodec = multiCodec("tcp") diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 362d96f..99a3535 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -105,14 +105,14 @@ proc testPubSubDaemonPublish(gossip: bool = false, let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo - var handlerFuture = newFuture[bool]() + var finished = false var times = 0 proc nativeHandler(topic: string, data: seq[byte]) {.async.} = let smsg = cast[string](data) check smsg == pubsubData times.inc() - if times >= count and not handlerFuture.finished: - handlerFuture.complete(true) + if times >= count and not finished: + finished = true await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer, daemonPeer.addresses)) @@ -126,12 +126,16 @@ proc testPubSubDaemonPublish(gossip: bool = false, asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) await nativeNode.subscribe(testTopic, nativeHandler) - while times < count: - await sleepAsync(1.seconds) - await daemonNode.pubsubPublish(testTopic, msgData) - await sleepAsync(100.millis) + await sleepAsync(1.seconds) - result = await handlerFuture + proc publisher() {.async.} = + while not finished: + await daemonNode.pubsubPublish(testTopic, msgData) + await sleepAsync(100.millis) + + await wait(publisher(), 1.minutes) # should be plenty of time + + result = true await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() @@ -152,7 +156,6 @@ proc testPubSubNodePublish(gossip: bool = false, let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo - var handlerFuture = newFuture[bool]() await nativeNode.connect(NativePeerInfo.init(daemonPeer.peer, daemonPeer.addresses)) @@ -160,26 +163,30 @@ proc testPubSubNodePublish(gossip: bool = false, await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) var times = 0 + var finished = false proc pubsubHandler(api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage): Future[bool] {.async.} = let smsg = cast[string](message.data) check smsg == pubsubData times.inc() - if times >= count: - handlerFuture.complete(true) + if times >= count and not finished: + finished = true result = true # don't cancel subscription discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard await nativeNode.subscribe(testTopic, nativeHandler) await sleepAsync(1.seconds) - while times < count: - await sleepAsync(1.seconds) - await nativeNode.publish(testTopic, msgData) - await sleepAsync(100.millis) - result = await handlerFuture + proc publisher() {.async.} = + while not finished: + await nativeNode.publish(testTopic, msgData) + await sleepAsync(100.millis) + + await wait(publisher(), 1.minutes) # should be plenty of time + + result = finished await nativeNode.stop() await allFutures(awaiters) await daemonNode.close() @@ -385,7 +392,7 @@ suite "Interop": check test == cast[string](line) inc(count2) - result = 10 == (await wait(testFuture, 10.secs)) + result = 10 == (await wait(testFuture, 1.minutes)) await stream.close() await nativeNode.stop() await allFutures(awaiters)