mirror of https://github.com/vacp2p/nim-libp2p.git
always allow new data be received if the recvWindow is > 0
This commit is contained in:
parent
b30b2656d5
commit
5cbb473d1b
|
@ -45,7 +45,7 @@ type
|
|||
Ping = 0x2
|
||||
GoAway = 0x3
|
||||
|
||||
MsgFlags {.size: 2.} = enum
|
||||
MsgFlags* {.size: 2.} = enum
|
||||
Syn
|
||||
Ack
|
||||
Fin
|
||||
|
@ -56,14 +56,14 @@ type
|
|||
ProtocolError = 0x1,
|
||||
InternalError = 0x2,
|
||||
|
||||
YamuxHeader = object
|
||||
YamuxHeader* = object
|
||||
version: uint8
|
||||
msgType: MsgType
|
||||
flags: set[MsgFlags]
|
||||
streamId: uint32
|
||||
length: uint32
|
||||
|
||||
proc readHeader(
|
||||
proc readHeader*(
|
||||
conn: LPStream
|
||||
): Future[YamuxHeader] {.async: (raises: [
|
||||
CancelledError, LPStreamError, MuxerError]).} =
|
||||
|
@ -92,7 +92,7 @@ proc encode(header: YamuxHeader): array[12, byte] =
|
|||
result[4..7] = toBytesBE(header.streamId)
|
||||
result[8..11] = toBytesBE(header.length)
|
||||
|
||||
proc write(
|
||||
proc write*(
|
||||
conn: LPStream,
|
||||
header: YamuxHeader
|
||||
): Future[void] {.async: (raises: [
|
||||
|
@ -116,7 +116,7 @@ proc goAway(T: type[YamuxHeader], status: GoAwayStatus): T =
|
|||
length: uint32(status)
|
||||
)
|
||||
|
||||
proc data(
|
||||
proc data*(
|
||||
T: type[YamuxHeader],
|
||||
streamId: uint32,
|
||||
length: uint32 = 0,
|
||||
|
@ -129,7 +129,7 @@ proc data(
|
|||
streamId: streamId
|
||||
)
|
||||
|
||||
proc windowUpdate(
|
||||
proc windowUpdate*(
|
||||
T: type[YamuxHeader],
|
||||
streamId: uint32,
|
||||
delta: uint32,
|
||||
|
@ -148,12 +148,12 @@ type
|
|||
sent: int
|
||||
fut: Future[void].Raising([CancelledError, LPStreamError])
|
||||
YamuxChannel* = ref object of Connection
|
||||
id: uint32
|
||||
id*: uint32
|
||||
recvWindow: int
|
||||
sendWindow: int
|
||||
maxRecvWindow: int
|
||||
maxSendQueueSize: int
|
||||
conn: Connection
|
||||
conn*: Connection
|
||||
isSrc: bool
|
||||
opened: bool
|
||||
isSending: bool
|
||||
|
@ -204,6 +204,7 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
|
|||
|
||||
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
|
||||
if not channel.closedLocally:
|
||||
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
|
||||
channel.closedLocally = true
|
||||
channel.isEof = true
|
||||
|
||||
|
@ -442,7 +443,7 @@ proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
|
|||
when defined(libp2p_yamux_metrics):
|
||||
libp2p_yamux_channels.set(
|
||||
m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId])
|
||||
if channel.isReset and channel.recvWindow > 0:
|
||||
if channel.recvWindow > 0:
|
||||
m.flushed[channel.id] = channel.recvWindow
|
||||
|
||||
proc createStream(
|
||||
|
|
Loading…
Reference in New Issue