fix(yamux): set EoF when remote peer half closes the stream in yamux (#1086)

This commit is contained in:
diegomrsantos 2024-05-24 14:11:27 +02:00 committed by GitHub
parent 0911cb20f4
commit 2fa2c4425f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 26 additions and 8 deletions

View File

@ -164,7 +164,6 @@ type
closedRemotely: Future[void].Raising([]) closedRemotely: Future[void].Raising([])
closedLocally: bool closedLocally: bool
receivedData: AsyncEvent receivedData: AsyncEvent
returnedEof: bool
proc `$`(channel: YamuxChannel): string = proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= " result = if channel.conn.dir == Out: "=> " else: "<= "
@ -204,8 +203,8 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} = method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally: if not channel.closedLocally:
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
channel.closedLocally = true channel.closedLocally = true
channel.isEof = true
if not channel.isReset and channel.sendQueue.len == 0: if not channel.isReset and channel.sendQueue.len == 0:
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
@ -273,7 +272,7 @@ method readOnce*(
newLPStreamClosedError() newLPStreamClosedError()
else: else:
newLPStreamConnDownError() newLPStreamConnDownError()
if channel.returnedEof: if channel.isEof:
raise newLPStreamRemoteClosedError() raise newLPStreamRemoteClosedError()
if channel.recvQueue.len == 0: if channel.recvQueue.len == 0:
channel.receivedData.clear() channel.receivedData.clear()
@ -281,9 +280,8 @@ method readOnce*(
discard await race(channel.closedRemotely, channel.receivedData.wait()) discard await race(channel.closedRemotely, channel.receivedData.wait())
except ValueError: raiseAssert("Futures list is not empty") except ValueError: raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0: if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true channel.isEof = true
return 0 return 0 # we return 0 to indicate that the channel is closed for reading from now on
let toRead = min(channel.recvQueue.len, nbytes) let toRead = min(channel.recvQueue.len, nbytes)

View File

@ -56,7 +56,7 @@ method init*(p: Ping) =
trace "handling ping", conn trace "handling ping", conn
var buf: array[PingSize, byte] var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize) await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn trace "echoing ping", conn, pingData = @buf
await conn.write(@buf) await conn.write(@buf)
if not isNil(p.pingHandler): if not isNil(p.pingHandler):
await p.pingHandler(conn.peerId) await p.pingHandler(conn.peerId)

View File

@ -315,7 +315,6 @@ suite "Circuit Relay V2":
await sleepAsync(chronos.timer.seconds(ttl + 1)) await sleepAsync(chronos.timer.seconds(ttl + 1))
expect(DialFailedError): expect(DialFailedError):
check: conn.atEof()
await conn.close() await conn.close()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec) conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)

View File

@ -377,3 +377,24 @@ suite "Yamux":
expect LPStreamClosedError: discard await streamA.readLp(100) expect LPStreamClosedError: discard await streamA.readLp(100)
blocker.complete() blocker.complete()
await streamA.close() await streamA.close()
asyncTest "Peer must be able to read from stream after closing it for writing":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
try:
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
await streamA.close()
check (await streamA.readLp(100)) == fromHex("5678")

View File

@ -11,6 +11,6 @@ COPY . nim-libp2p/
RUN \ RUN \
cd nim-libp2p && \ cd nim-libp2p && \
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN --threads:off ./tests/transport-interop/main.nim nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"] ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]