From d1d53ff369206bf8efbb02e288d66779579ed8e5 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 21 Jun 2024 12:11:18 +0200 Subject: [PATCH] chore(yamux): change closedRemotely from Future into AsyncEvent (#1133) --- libp2p/muxers/yamux/yamux.nim | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 08acab38b..667f60fba 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -155,7 +155,7 @@ type recvQueue: seq[byte] isReset: bool remoteReset: bool - closedRemotely: Future[void].Raising([]) + closedRemotely: AsyncEvent closedLocally: bool receivedData: AsyncEvent @@ -163,7 +163,7 @@ proc `$`(channel: YamuxChannel): string = result = if channel.conn.dir == Out: "=> " else: "<= " result &= $channel.id var s: seq[string] = @[] - if channel.closedRemotely.completed(): + if channel.closedRemotely.isSet(): s.add("ClosedRemotely") if channel.closedLocally: s.add("ClosedLocally") @@ -198,12 +198,12 @@ proc lengthSendQueueWithLimit(channel: YamuxChannel): int = proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} = if channel.closedLocally and channel.sendQueue.len == 0 and - channel.closedRemotely.completed(): + channel.closedRemotely.isSet(): await procCall Connection(channel).closeImpl() proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} = - if not channel.closedRemotely.completed(): - channel.closedRemotely.complete() + if not channel.closedRemotely.isSet(): + channel.closedRemotely.fire() await channel.actuallyClose() method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} = @@ -239,7 +239,7 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} except CancelledError, LPStreamError: discard await channel.close() - if not channel.closedRemotely.completed(): + if not channel.closedRemotely.isSet(): await channel.remoteClosed() channel.receivedData.fire() if not isLocal: @@ -280,10 +280,10 @@ method readOnce*( if channel.recvQueue.len == 0: channel.receivedData.clear() try: # https://github.com/status-im/nim-chronos/issues/516 - discard await race(channel.closedRemotely, channel.receivedData.wait()) + discard await race(channel.closedRemotely.wait(), channel.receivedData.wait()) except ValueError: raiseAssert("Futures list is not empty") - if channel.closedRemotely.completed() and channel.recvQueue.len == 0: + if channel.closedRemotely.isSet() and channel.recvQueue.len == 0: channel.isEof = true return 0 # we return 0 to indicate that the channel is closed for reading from now on @@ -460,9 +460,6 @@ proc createStream( # that the initial recvWindow is 256k. # To solve this contradiction, no updateWindow will be sent until # recvWindow is less than maxRecvWindow - proc newClosedRemotelyFut(): Future[void] {.async: (raises: [], raw: true).} = - newFuture[void]() - var stream = YamuxChannel( id: id, maxRecvWindow: recvWindow, @@ -473,7 +470,7 @@ proc createStream( isSrc: isSrc, conn: m.connection, receivedData: newAsyncEvent(), - closedRemotely: newClosedRemotelyFut(), + closedRemotely: newAsyncEvent(), ) stream.objName = "YamuxStream" if isSrc: