wip: integrating secio
This commit is contained in:
parent
ea142f0e6d
commit
b47dc89589
|
@ -189,7 +189,7 @@ proc readMessage*(sconn: SecureConnection): Future[seq[byte]] {.async.} =
|
||||||
if sconn.macCheckAndDecode(buf):
|
if sconn.macCheckAndDecode(buf):
|
||||||
result = buf
|
result = buf
|
||||||
else:
|
else:
|
||||||
debug "Message MAC verification failed"
|
debug "Message MAC verification failed", buf = toHex(buf)
|
||||||
else:
|
else:
|
||||||
debug "Received message header size is more then allowed",
|
debug "Received message header size is more then allowed",
|
||||||
length = length, allowed_length = SecioMaxMessageSize
|
length = length, allowed_length = SecioMaxMessageSize
|
||||||
|
|
|
@ -30,15 +30,15 @@ type
|
||||||
|
|
||||||
Switch* = ref object of RootObj
|
Switch* = ref object of RootObj
|
||||||
peerInfo*: PeerInfo
|
peerInfo*: PeerInfo
|
||||||
connections*: TableRef[string, Connection]
|
connections*: Table[string, Connection]
|
||||||
muxed*: TableRef[string, Muxer]
|
muxed*: Table[string, Muxer]
|
||||||
transports*: seq[Transport]
|
transports*: seq[Transport]
|
||||||
protocols*: seq[LPProtocol]
|
protocols*: seq[LPProtocol]
|
||||||
muxers*: Table[string, MuxerProvider]
|
muxers*: Table[string, MuxerProvider]
|
||||||
ms*: MultisteamSelect
|
ms*: MultisteamSelect
|
||||||
identity*: Identify
|
identity*: Identify
|
||||||
streamHandler*: StreamHandler
|
streamHandler*: StreamHandler
|
||||||
secureManagers*: seq[Secure]
|
secureManagers*: Table[string, Secure]
|
||||||
pubSub*: Option[PubSub]
|
pubSub*: Option[PubSub]
|
||||||
|
|
||||||
proc newNoPubSubException(): ref Exception {.inline.} =
|
proc newNoPubSubException(): ref Exception {.inline.} =
|
||||||
|
@ -48,15 +48,15 @@ proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||||
## secure the incoming connection
|
## secure the incoming connection
|
||||||
|
|
||||||
# plaintext for now, doesn't do anything
|
# plaintext for now, doesn't do anything
|
||||||
let managers = s.secureManagers.mapIt(it.codec).deduplicate()
|
let managers = toSeq(s.secureManagers.keys)
|
||||||
if managers.len == 0:
|
if managers.len == 0:
|
||||||
raise newException(CatchableError, "No secure managers registered!")
|
raise newException(CatchableError, "No secure managers registered!")
|
||||||
|
|
||||||
if (await s.ms.select(conn, s.secureManagers.mapIt(it.codec))).len == 0:
|
let manager = await s.ms.select(conn, toSeq(s.secureManagers.values).mapIt(it.codec))
|
||||||
|
if manager.len == 0:
|
||||||
raise newException(CatchableError, "Unable to negotiate a secure channel!")
|
raise newException(CatchableError, "Unable to negotiate a secure channel!")
|
||||||
|
|
||||||
var n = await s.secureManagers[0].secure(conn)
|
result = await s.secureManagers[manager].secure(conn)
|
||||||
result = conn
|
|
||||||
|
|
||||||
proc identify*(s: Switch, conn: Connection) {.async, gcsafe.} =
|
proc identify*(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
## identify the connection
|
## identify the connection
|
||||||
|
@ -193,7 +193,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||||
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
let ms = newMultistream()
|
let ms = newMultistream()
|
||||||
if (await ms.select(conn)): # just handshake
|
if (await ms.select(conn)): # just handshake
|
||||||
for secure in s.secureManagers:
|
for secure in s.secureManagers.values:
|
||||||
ms.addHandler(secure.codec, secure)
|
ms.addHandler(secure.codec, secure)
|
||||||
|
|
||||||
await ms.handle(conn)
|
await ms.handle(conn)
|
||||||
|
@ -252,16 +252,17 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||||
transports: seq[Transport],
|
transports: seq[Transport],
|
||||||
identity: Identify,
|
identity: Identify,
|
||||||
muxers: Table[string, MuxerProvider],
|
muxers: Table[string, MuxerProvider],
|
||||||
secureManagers: seq[Secure] = @[],
|
secureManagers: Table[string, Secure],
|
||||||
pubSub: Option[PubSub] = none(PubSub)): Switch =
|
pubSub: Option[PubSub] = none(PubSub)): Switch =
|
||||||
new result
|
new result
|
||||||
result.peerInfo = peerInfo
|
result.peerInfo = peerInfo
|
||||||
result.ms = newMultistream()
|
result.ms = newMultistream()
|
||||||
result.transports = transports
|
result.transports = transports
|
||||||
result.connections = newTable[string, Connection]()
|
result.connections = initTable[string, Connection]()
|
||||||
result.muxed = newTable[string, Muxer]()
|
result.muxed = initTable[string, Muxer]()
|
||||||
result.identity = identity
|
result.identity = identity
|
||||||
result.muxers = muxers
|
result.muxers = muxers
|
||||||
|
result.secureManagers = initTable[string, Secure]()
|
||||||
|
|
||||||
let s = result # can't capture result
|
let s = result # can't capture result
|
||||||
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
|
result.streamHandler = proc(stream: Connection) {.async, gcsafe.} =
|
||||||
|
@ -276,16 +277,13 @@ proc newSwitch*(peerInfo: PeerInfo,
|
||||||
let stream = await muxer.newStream()
|
let stream = await muxer.newStream()
|
||||||
await s.identify(stream)
|
await s.identify(stream)
|
||||||
|
|
||||||
for s in secureManagers.deduplicate():
|
for k in secureManagers.keys:
|
||||||
debug "adding secure manager ", codec = s.codec
|
debug "adding secure manager ", codec = secureManagers[k].codec
|
||||||
result.secureManagers.add(s)
|
result.secureManagers[k] = secureManagers[k]
|
||||||
|
|
||||||
if result.secureManagers.len == 0:
|
if result.secureManagers.len == 0:
|
||||||
# use plain text if no secure managers are provided
|
# use plain text if no secure managers are provided
|
||||||
let manager = Secure(newPlainText())
|
result.secureManagers[PlainTextCodec] = Secure(newPlainText())
|
||||||
result.secureManagers.add(manager)
|
|
||||||
|
|
||||||
result.secureManagers = result.secureManagers.deduplicate()
|
|
||||||
|
|
||||||
if pubSub.isSome:
|
if pubSub.isSome:
|
||||||
result.pubSub = pubSub
|
result.pubSub = pubSub
|
||||||
|
|
Loading…
Reference in New Issue