Replace table by seq for storing muxers (#741)
Replace table by seq for stocking muxers
This commit is contained in:
parent
78a65eebcc
commit
912873f8b3
|
@ -19,7 +19,7 @@ runnableExamples:
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
options, tables, chronos, chronicles,
|
||||
options, tables, chronos, chronicles, sequtils,
|
||||
switch, peerid, peerinfo, stream/connection, multiaddress,
|
||||
crypto/crypto, transports/[transport, tcptransport],
|
||||
muxers/[muxer, mplex/mplex, yamux/yamux],
|
||||
|
@ -38,15 +38,11 @@ type
|
|||
Noise,
|
||||
Secio {.deprecated.}
|
||||
|
||||
MuxerBuilder = object
|
||||
codec: string
|
||||
newMuxer: MuxerConstructor
|
||||
|
||||
SwitchBuilder* = ref object
|
||||
privKey: Option[PrivateKey]
|
||||
addresses: seq[MultiAddress]
|
||||
secureManagers: seq[SecureProtocol]
|
||||
muxers: seq[MuxerBuilder]
|
||||
muxers: seq[MuxerProvider]
|
||||
transports: seq[TransportProvider]
|
||||
rng: ref HmacDrbgContext
|
||||
maxConnections: int
|
||||
|
@ -119,13 +115,15 @@ proc withMplex*(
|
|||
outTimeout,
|
||||
maxChannCount)
|
||||
|
||||
b.muxers.add(MuxerBuilder(codec: MplexCodec, newMuxer: newMuxer))
|
||||
assert b.muxers.countIt(it.codec == MplexCodec) == 0, "Mplex build multiple times"
|
||||
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
|
||||
b
|
||||
|
||||
proc withYamux*(b: SwitchBuilder): SwitchBuilder =
|
||||
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn)
|
||||
|
||||
b.muxers.add(MuxerBuilder(codec: YamuxCodec, newMuxer: newMuxer))
|
||||
assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
|
||||
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))
|
||||
b
|
||||
|
||||
proc withNoise*(b: SwitchBuilder): SwitchBuilder {.public.} =
|
||||
|
@ -212,18 +210,11 @@ proc build*(b: SwitchBuilder): Switch
|
|||
protoVersion = b.protoVersion,
|
||||
agentVersion = b.agentVersion)
|
||||
|
||||
let
|
||||
muxers = block:
|
||||
var muxers: Table[string, MuxerProvider]
|
||||
for m in b.muxers:
|
||||
muxers[m.codec] = MuxerProvider.new(m.newMuxer, m.codec)
|
||||
muxers
|
||||
|
||||
let
|
||||
identify = Identify.new(peerInfo, b.sendSignedPeerRecord)
|
||||
connManager = ConnManager.new(b.maxConnsPerPeer, b.maxConnections, b.maxIn, b.maxOut)
|
||||
ms = MultistreamSelect.new()
|
||||
muxedUpgrade = MuxedUpgrade.new(identify, muxers, secureManagerInstances, connManager, ms)
|
||||
muxedUpgrade = MuxedUpgrade.new(identify, b.muxers, secureManagerInstances, connManager, ms)
|
||||
|
||||
let
|
||||
transports = block:
|
||||
|
@ -248,7 +239,6 @@ proc build*(b: SwitchBuilder): Switch
|
|||
peerInfo = peerInfo,
|
||||
transports = transports,
|
||||
identity = identify,
|
||||
muxers = muxers,
|
||||
secureManagers = secureManagerInstances,
|
||||
connManager = connManager,
|
||||
ms = ms,
|
||||
|
|
|
@ -322,7 +322,6 @@ proc start*(s: Switch) {.async, gcsafe, public.} =
|
|||
proc newSwitch*(peerInfo: PeerInfo,
|
||||
transports: seq[Transport],
|
||||
identity: Identify,
|
||||
muxers: Table[string, MuxerProvider],
|
||||
secureManagers: openArray[Secure] = [],
|
||||
connManager: ConnManager,
|
||||
ms: MultistreamSelect,
|
||||
|
|
|
@ -22,9 +22,14 @@ logScope:
|
|||
|
||||
type
|
||||
MuxedUpgrade* = ref object of Upgrade
|
||||
muxers*: Table[string, MuxerProvider]
|
||||
muxers*: seq[MuxerProvider]
|
||||
streamHandler*: StreamHandler
|
||||
|
||||
proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider =
|
||||
for m in self.muxers:
|
||||
if muxerName in m.codecs:
|
||||
return m
|
||||
|
||||
proc identify*(
|
||||
self: MuxedUpgrade,
|
||||
muxer: Muxer) {.async, gcsafe.} =
|
||||
|
@ -50,7 +55,7 @@ proc mux*(
|
|||
warn "no muxers registered, skipping upgrade flow", conn
|
||||
return
|
||||
|
||||
let muxerName = await self.ms.select(conn, toSeq(self.muxers.keys()))
|
||||
let muxerName = await self.ms.select(conn, self.muxers.mapIt(it.codec))
|
||||
if muxerName.len == 0 or muxerName == "na":
|
||||
debug "no muxer available, early exit", conn
|
||||
return
|
||||
|
@ -58,7 +63,7 @@ proc mux*(
|
|||
trace "Found a muxer", conn, muxerName
|
||||
|
||||
# create new muxer for connection
|
||||
let muxer = self.muxers[muxerName].newMuxer(conn)
|
||||
let muxer = self.getMuxerByCodec(muxerName).newMuxer(conn)
|
||||
|
||||
# install stream handler
|
||||
muxer.streamHandler = self.streamHandler
|
||||
|
@ -127,7 +132,7 @@ method upgradeIncoming*(
|
|||
|
||||
cconn = sconn
|
||||
# add the muxer
|
||||
for muxer in self.muxers.values:
|
||||
for muxer in self.muxers:
|
||||
ms.addHandler(muxer.codecs, muxer)
|
||||
|
||||
# handle subsequent secure requests
|
||||
|
@ -197,7 +202,7 @@ proc muxerHandler(
|
|||
proc new*(
|
||||
T: type MuxedUpgrade,
|
||||
identity: Identify,
|
||||
muxers: Table[string, MuxerProvider],
|
||||
muxers: seq[MuxerProvider],
|
||||
secureManagers: openArray[Secure] = [],
|
||||
connManager: ConnManager,
|
||||
ms: MultistreamSelect): T =
|
||||
|
|
|
@ -61,7 +61,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
|
|||
let
|
||||
identify = Identify.new(peerInfo)
|
||||
mplexProvider = MuxerProvider.new(createMplex, MplexCodec)
|
||||
muxers = [(MplexCodec, mplexProvider)].toTable()
|
||||
muxers = @[mplexProvider]
|
||||
secureManagers = if secio:
|
||||
[Secure(Secio.new(rng, privateKey))]
|
||||
else:
|
||||
|
@ -75,7 +75,6 @@ proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switc
|
|||
peerInfo,
|
||||
transports,
|
||||
identify,
|
||||
muxers,
|
||||
secureManagers,
|
||||
connManager,
|
||||
ms)
|
||||
|
|
Loading…
Reference in New Issue