Passing correct dir to Yamux during dcutr
This commit is contained in:
parent
a0e8c796c5
commit
18061be605
|
@ -111,7 +111,7 @@ proc withMplex*(
|
||||||
maxChannCount = 200): SwitchBuilder {.public.} =
|
maxChannCount = 200): SwitchBuilder {.public.} =
|
||||||
## | Uses `Mplex <https://docs.libp2p.io/concepts/stream-multiplexing/#mplex>`_ as a multiplexer
|
## | Uses `Mplex <https://docs.libp2p.io/concepts/stream-multiplexing/#mplex>`_ as a multiplexer
|
||||||
## | `Timeout` is the duration after which a inactive connection will be closed
|
## | `Timeout` is the duration after which a inactive connection will be closed
|
||||||
proc newMuxer(conn: Connection): Muxer =
|
proc newMuxer(conn: Connection, direction: Opt[Direction] = Opt.none(Direction)): Muxer =
|
||||||
Mplex.new(
|
Mplex.new(
|
||||||
conn,
|
conn,
|
||||||
inTimeout,
|
inTimeout,
|
||||||
|
@ -123,7 +123,7 @@ proc withMplex*(
|
||||||
b
|
b
|
||||||
|
|
||||||
proc withYamux*(b: SwitchBuilder): SwitchBuilder =
|
proc withYamux*(b: SwitchBuilder): SwitchBuilder =
|
||||||
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn)
|
proc newMuxer(conn: Connection, dir: Opt[Direction] = Opt.none(Direction)): Muxer = Yamux.new(conn, dir)
|
||||||
|
|
||||||
assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
|
assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
|
||||||
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
|
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
|
||||||
|
|
|
@ -75,9 +75,7 @@ proc dialAndUpgrade(
|
||||||
|
|
||||||
let mux =
|
let mux =
|
||||||
try:
|
try:
|
||||||
let m = await transport.upgrade(dialed, upgradeDir, peerId)
|
await transport.upgrade(dialed, upgradeDir, peerId)
|
||||||
m.connection.dir = upgradeDir
|
|
||||||
m
|
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
# If we failed to establish the connection through one transport,
|
# If we failed to establish the connection through one transport,
|
||||||
# we won't succeeded through another - no use in trying again
|
# we won't succeeded through another - no use in trying again
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
|
import stew/results
|
||||||
import ../stream/connection,
|
import ../stream/connection,
|
||||||
../errors
|
../errors
|
||||||
|
|
||||||
|
@ -32,7 +33,7 @@ type
|
||||||
connection*: Connection
|
connection*: Connection
|
||||||
|
|
||||||
# user provider proc that returns a constructed Muxer
|
# user provider proc that returns a constructed Muxer
|
||||||
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].}
|
MuxerConstructor* = proc(conn: Connection, direction: Opt[Direction] = Opt.none(Direction)): Muxer {.gcsafe, closure, raises: [].}
|
||||||
|
|
||||||
# this wraps a creator proc that knows how to make muxers
|
# this wraps a creator proc that knows how to make muxers
|
||||||
MuxerProvider* = object
|
MuxerProvider* = object
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import sequtils, std/[tables]
|
import sequtils, std/[tables]
|
||||||
import chronos, chronicles, metrics, stew/[endians2, byteutils, objects]
|
import chronos, chronicles, metrics, stew/[endians2, byteutils, objects, results]
|
||||||
import ../muxer,
|
import ../muxer,
|
||||||
../../stream/connection
|
../../stream/connection
|
||||||
|
|
||||||
|
@ -389,14 +389,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
|
||||||
closedRemotely: newFuture[void]()
|
closedRemotely: newFuture[void]()
|
||||||
)
|
)
|
||||||
result.objName = "YamuxStream"
|
result.objName = "YamuxStream"
|
||||||
result.dir =
|
result.dir = if isSrc: Direction.Out else: Direction.In
|
||||||
if isSrc:
|
|
||||||
if m.connection.dir == Direction.In:
|
|
||||||
Direction.In
|
|
||||||
else:
|
|
||||||
Direction.Out
|
|
||||||
else:
|
|
||||||
Direction.In
|
|
||||||
result.timeoutHandler = proc(): Future[void] {.gcsafe.} =
|
result.timeoutHandler = proc(): Future[void] {.gcsafe.} =
|
||||||
trace "Idle timeout expired, resetting YamuxChannel"
|
trace "Idle timeout expired, resetting YamuxChannel"
|
||||||
result.reset()
|
result.reset()
|
||||||
|
@ -531,9 +524,14 @@ method newStream*(
|
||||||
await stream.open()
|
await stream.open()
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T =
|
proc new*(T: type[Yamux], conn: Connection, direction: Opt[Direction] = Opt.none(Direction), maxChannCount: int = MaxChannelCount): T =
|
||||||
|
let dir =
|
||||||
|
block:
|
||||||
|
direction.withValue(d):
|
||||||
|
d
|
||||||
|
else: conn.dir
|
||||||
T(
|
T(
|
||||||
connection: conn,
|
connection: conn,
|
||||||
currentId: if conn.dir == Out: 1 else: 2,
|
currentId: if dir == Out: 1 else: 2,
|
||||||
maxChannCount: maxChannCount
|
maxChannCount: maxChannCount
|
||||||
)
|
)
|
||||||
|
|
|
@ -52,7 +52,7 @@ proc mux*(
|
||||||
trace "Found a muxer", conn, muxerName
|
trace "Found a muxer", conn, muxerName
|
||||||
|
|
||||||
# create new muxer for connection
|
# create new muxer for connection
|
||||||
let muxer = self.getMuxerByCodec(muxerName).newMuxer(conn)
|
let muxer = self.getMuxerByCodec(muxerName).newMuxer(conn, Opt.some(direction))
|
||||||
|
|
||||||
# install stream handler
|
# install stream handler
|
||||||
muxer.streamHandler = self.streamHandler
|
muxer.streamHandler = self.streamHandler
|
||||||
|
|
Loading…
Reference in New Issue