wip: integrating and testing secio

This commit is contained in:
Dmitriy Ryajov 2019-09-14 07:55:52 -06:00
parent 9d93301a3a
commit 9bb892de69
7 changed files with 116 additions and 56 deletions

View File

@ -46,39 +46,39 @@ proc newMultistream*(): MultisteamSelect =
proc select*(m: MultisteamSelect, proc select*(m: MultisteamSelect,
conn: Connection, conn: Connection,
proto: seq[string]): proto: seq[string]):
Future[string] {.async.} = Future[string] {.async.} =
debug "select: initiating handshake", codec = m.codec debug "initiating handshake", codec = m.codec
## select a remote protocol ## select a remote protocol
await conn.write(m.codec) # write handshake await conn.write(m.codec) # write handshake
if proto.len() > 0: if proto.len() > 0:
debug "select: selecting proto", proto = proto info "selecting proto", proto = proto
await conn.writeLp((proto[0] & "\n")) # select proto await conn.writeLp((proto[0] & "\n")) # select proto
result = cast[string](await conn.readLp()) # read ms header result = cast[string](await conn.readLp()) # read ms header
result.removeSuffix("\n") result.removeSuffix("\n")
if result != Codec: if result != Codec:
debug "select: handshake failed", codec = result debug "handshake failed", codec = result
return "" return ""
if proto.len() == 0: # no protocols, must be a handshake call if proto.len() == 0: # no protocols, must be a handshake call
return return
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
debug "select: reading first requested proto" info "reading first requested proto"
result.removeSuffix("\n") result.removeSuffix("\n")
if result == proto[0]: if result == proto[0]:
debug "select: succesfully selected ", proto = proto debug "succesfully selected ", proto = proto
return return
if not result.len > 0: if not result.len > 0:
debug "select: selecting one of several protos" info "selecting one of several protos"
for p in proto[1..<proto.len()]: for p in proto[1..<proto.len()]:
await conn.writeLp((p & "\n")) # select proto await conn.writeLp((p & "\n")) # select proto
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
result.removeSuffix("\n") result.removeSuffix("\n")
if result == p: if result == p:
debug "select: selected protocol", protocol = result debug "selected protocol", protocol = result
break break
proc select*(m: MultisteamSelect, proc select*(m: MultisteamSelect,
@ -109,24 +109,24 @@ proc list*(m: MultisteamSelect,
result = list result = list
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
debug "handle: starting multistream handling" info "handle: starting multistream handling"
while not conn.closed: while not conn.closed:
var ms = cast[string](await conn.readLp()) var ms = cast[string](await conn.readLp())
ms.removeSuffix("\n") ms.removeSuffix("\n")
debug "handle: got request for ", ms info "handle: got request for ", ms
if ms.len() <= 0: if ms.len() <= 0:
debug "handle: invalid proto" info "handle: invalid proto"
await conn.write(m.na) await conn.write(m.na)
if m.handlers.len() == 0: if m.handlers.len() == 0:
debug "handle: sending `na` for protocol ", protocol = ms info "handle: sending `na` for protocol ", protocol = ms
await conn.write(m.na) await conn.write(m.na)
continue continue
case ms: case ms:
of "ls": of "ls":
debug "handle: listing protos" info "handle: listing protos"
var protos = "" var protos = ""
for h in m.handlers: for h in m.handlers:
protos &= (h.proto & "\n") protos &= (h.proto & "\n")
@ -136,21 +136,42 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
else: else:
for h in m.handlers: for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto: if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
debug "handle: found handler for", protocol = ms info "found handler for", protocol = ms
await conn.writeLp((h.proto & "\n")) await conn.writeLp((h.proto & "\n"))
try: try:
await h.protocol.handler(conn, ms) await h.protocol.handler(conn, ms)
return return
except Exception as exc: except Exception as exc:
debug "handle: exception while handling ", msg = exc.msg warn "exception while handling ", msg = exc.msg
debug "handle: no handlers for ", protocol = ms warn "no handlers for ", protocol = ms
await conn.write(m.na) await conn.write(m.na)
proc addHandler*[T: LPProtocol](m: MultisteamSelect, proc addHandler*[T: LPProtocol](m: MultisteamSelect,
codec: string, codec: string,
protocol: T, protocol: T,
matcher: Matcher = nil) = matcher: Matcher = nil) =
## register a handler for the protocol ## register a protocol
# TODO: This is a bug in chronicles,
# it break if I uncoment this line.
# Which is almost the same as the
# one on the next override of addHandler
#
# info "registering protocol", codec = codec
m.handlers.add(HandlerHolder(proto: codec,
protocol: protocol,
match: matcher))
proc addHandler*[T: LPProtoHandler](m: MultisteamSelect,
codec: string,
handler: T,
matcher: Matcher = nil) =
## helper to allow registering pure handlers
info "registering proto handler", codec = codec
let protocol = new LPProtocol
protocol.codec = codec
protocol.handler = handler
m.handlers.add(HandlerHolder(proto: codec, m.handlers.add(HandlerHolder(proto: codec,
protocol: protocol, protocol: protocol,
match: matcher)) match: matcher))

View File

@ -55,7 +55,7 @@ proc newChannel*(id: uint,
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
# writes should happen in sequence # writes should happen in sequence
await chan.asyncLock.acquire() await chan.asyncLock.acquire()
debug "writeHandler: sending data ", data, id = chan.id info "writeHandler: sending data ", data = data.toHex(), id = chan.id
await conn.writeMsg(chan.id, chan.msgCode, data) # write header await conn.writeMsg(chan.id, chan.msgCode, data) # write header
chan.asyncLock.release() chan.asyncLock.release()

View File

@ -0,0 +1,29 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
import secure,
../../connection
const PlainTextCodec* = "/plaintext/1.0.0"
type
PlainText* = ref object of Secure
method init(p: PlainText) {.gcsafe.} =
proc handle(conn: Connection, proto: string)
{.async, gcsafe.} = discard
## plain text doesn't do anything
p.codec = PlainTextCodec
p.handler = handle
proc newPlainText*(): PlainText =
new result
result.init()

View File

@ -400,20 +400,26 @@ proc handshake*(s: Secio, conn: Connection): Future[SecureConnection] {.async.}
else: else:
debug "Secure handshake succeeded" debug "Secure handshake succeeded"
proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async.} = proc readLoop(sconn: SecureConnection, stream: BufferStream) {.async.} =
var sconn = await s.handshake(conn) while not sconn.conn.closed:
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
await sconn.writeMessage(data)
var stream = newBufferStream(writeHandler)
result = newConnection(stream)
while not conn.closed:
let msg = await sconn.readMessage() let msg = await sconn.readMessage()
await stream.pushTo(msg) await stream.pushTo(msg)
proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async.} =
var sconn = await s.handshake(conn)
proc writeHandler(data: seq[byte]) {.async, gcsafe.} =
debug "sending encrypted bytes", bytes = data.toHex()
await sconn.writeMessage(data)
var stream = newBufferStream(writeHandler)
asyncCheck readLoop(sconn, stream)
result = newConnection(stream)
method init(s: Secio) {.gcsafe.} = method init(s: Secio) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
asyncCheck s.handleConn(conn) debug "handling connection"
discard await s.handleConn(conn)
debug "connection secured"
s.codec = SecioCodec s.codec = SecioCodec
s.handler = handle s.handler = handle

View File

@ -8,26 +8,12 @@
## those terms. ## those terms.
import chronos import chronos
import ../protocol import ../protocol,
import ../../connection ../../connection
const PlainTextCodec* = "/plaintext/1.0.0"
type type
Secure* = ref object of LPProtocol # base type for secure managers Secure* = ref object of LPProtocol # base type for secure managers
PlainText* = ref object of Secure
method init(p: PlainText) {.gcsafe.} =
proc handle(conn: Connection, proto: string)
{.async, gcsafe.} = discard
## plain text doesn't do anything
p.codec = PlainTextCodec
p.handler = handle
method secure*(p: Secure, conn: Connection): Future[Connection] method secure*(p: Secure, conn: Connection): Future[Connection]
{.base, async, gcsafe.} = discard {.base, async, gcsafe.} =
result = conn
proc newPlainText*(): PlainText =
new result
result.init()

View File

@ -14,7 +14,8 @@ import connection,
stream/lpstream, stream/lpstream,
multistream, multistream,
protocols/protocol, protocols/protocol,
protocols/secure/secure, # for plain text protocols/secure/secure,
protocols/secure/plaintext, # for plain text
peerinfo, peerinfo,
multiaddress, multiaddress,
protocols/identify, protocols/identify,
@ -94,6 +95,7 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
## mux incoming connection ## mux incoming connection
let muxers = toSeq(s.muxers.keys) let muxers = toSeq(s.muxers.keys)
if muxers.len == 0: if muxers.len == 0:
debug "no muxers registered"
return return
let muxerName = await s.ms.select(conn, muxers) let muxerName = await s.ms.select(conn, muxers)
@ -194,17 +196,31 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
s.ms.addHandler(proto.codec, proto) s.ms.addHandler(proto.codec, proto)
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
debug "upgrading incoming connection"
let ms = newMultistream() let ms = newMultistream()
if (await ms.select(conn)): # just handshake
for secure in s.secureManagers.values:
ms.addHandler(secure.codec, secure)
await ms.handle(conn)
for muxer in s.muxers.values: # secure incoming connections
ms.addHandler(muxer.codec, muxer) proc securedHandler (conn: Connection,
proto: string)
{.async, gcsafe, closure.} =
debug "Securing connection"
let secure = s.secureManagers[proto]
let sconn = await secure.secure(conn)
if not isNil(sconn):
# add the muxer
for muxer in s.muxers.values:
ms.addHandler(muxer.codec, muxer)
await ms.handle(conn) # handle subsequent requests
await ms.handle(sconn)
if (await ms.select(conn)): # just handshake
# add the secure handlers
for k in s.secureManagers.keys:
ms.addHandler(k, securedHandler)
# handle secured connections
await ms.handle(conn)
proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =

View File

@ -24,6 +24,8 @@ type
method init(p: TestProto) {.gcsafe.} = method init(p: TestProto) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
let msg = cast[string](await conn.readLp()) let msg = cast[string](await conn.readLp())
echo "GOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOTTTTTTTTTTTTTTT"
echo "msg"
check "Hello!" == msg check "Hello!" == msg
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
await conn.close() await conn.close()
@ -46,8 +48,8 @@ suite "Switch":
let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(newTransport(TcpTransport))] let transports = @[Transport(newTransport(TcpTransport))]
let muxers = [(MplexCodec, mplexProvider)].toTable() let muxers = [(MplexCodec, mplexProvider)].toTable()
# let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable() let secureManagers = [(SecioCodec, Secure(newSecio(seckey)))].toTable()
let switch = newSwitch(peerInfo, transports, identify, muxers) let switch = newSwitch(peerInfo, transports, identify, muxers, secureManagers)
result = (switch, peerInfo) result = (switch, peerInfo)
proc testSwitch(): Future[bool] {.async, gcsafe.} = proc testSwitch(): Future[bool] {.async, gcsafe.} =