finished integrating secio

This commit is contained in:
Dmitriy Ryajov 2019-09-14 09:55:58 -06:00
parent 3eb0cdd5f7
commit 011df568b7
5 changed files with 32 additions and 17 deletions

View File

@ -52,7 +52,7 @@ proc select*(m: MultisteamSelect,
## select a remote protocol ## select a remote protocol
await conn.write(m.codec) # write handshake await conn.write(m.codec) # write handshake
if proto.len() > 0: if proto.len() > 0:
info "selecting proto", proto = proto trace "selecting proto", proto = proto
await conn.writeLp((proto[0] & "\n")) # select proto await conn.writeLp((proto[0] & "\n")) # select proto
result = cast[string](await conn.readLp()) # read ms header result = cast[string](await conn.readLp()) # read ms header
@ -65,14 +65,14 @@ proc select*(m: MultisteamSelect,
return return
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
info "reading first requested proto" trace "reading first requested proto"
result.removeSuffix("\n") result.removeSuffix("\n")
if result == proto[0]: if result == proto[0]:
debug "succesfully selected ", proto = proto debug "succesfully selected ", proto = proto
return return
if not result.len > 0: if not result.len > 0:
info "selecting one of several protos" trace "selecting one of several protos"
for p in proto[1..<proto.len()]: for p in proto[1..<proto.len()]:
await conn.writeLp((p & "\n")) # select proto await conn.writeLp((p & "\n")) # select proto
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
@ -109,24 +109,24 @@ proc list*(m: MultisteamSelect,
result = list result = list
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
info "handle: starting multistream handling" trace "handle: starting multistream handling"
while not conn.closed: while not conn.closed:
var ms = cast[string](await conn.readLp()) var ms = cast[string](await conn.readLp())
ms.removeSuffix("\n") ms.removeSuffix("\n")
info "handle: got request for ", ms trace "handle: got request for ", ms
if ms.len() <= 0: if ms.len() <= 0:
info "handle: invalid proto" trace "handle: invalid proto"
await conn.write(m.na) await conn.write(m.na)
if m.handlers.len() == 0: if m.handlers.len() == 0:
info "handle: sending `na` for protocol ", protocol = ms trace "handle: sending `na` for protocol ", protocol = ms
await conn.write(m.na) await conn.write(m.na)
continue continue
case ms: case ms:
of "ls": of "ls":
info "handle: listing protos" trace "handle: listing protos"
var protos = "" var protos = ""
for h in m.handlers: for h in m.handlers:
protos &= (h.proto & "\n") protos &= (h.proto & "\n")
@ -136,7 +136,7 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
else: else:
for h in m.handlers: for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto: if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
info "found handler for", protocol = ms trace "found handler for", protocol = ms
await conn.writeLp((h.proto & "\n")) await conn.writeLp((h.proto & "\n"))
try: try:
await h.protocol.handler(conn, ms) await h.protocol.handler(conn, ms)
@ -156,7 +156,7 @@ proc addHandler*[T: LPProtocol](m: MultisteamSelect,
# Which is almost the same as the # Which is almost the same as the
# one on the next override of addHandler # one on the next override of addHandler
# #
# info "registering protocol", codec = codec # trace "registering protocol", codec = codec
m.handlers.add(HandlerHolder(proto: codec, m.handlers.add(HandlerHolder(proto: codec,
protocol: protocol, protocol: protocol,
match: matcher)) match: matcher))
@ -167,7 +167,7 @@ proc addHandler*[T: LPProtoHandler](m: MultisteamSelect,
matcher: Matcher = nil) = matcher: Matcher = nil) =
## helper to allow registering pure handlers ## helper to allow registering pure handlers
info "registering proto handler", codec = codec trace "registering proto handler", codec = codec
let protocol = new LPProtocol let protocol = new LPProtocol
protocol.codec = codec protocol.codec = codec
protocol.handler = handler protocol.handler = handler

View File

@ -122,7 +122,7 @@ proc identify*(p: Identify,
"Invalid or empty message received!") "Invalid or empty message received!")
result = decodeMsg(message) result = decodeMsg(message)
debug "identify: Identify for remote peer succeded" debug "Identify for remote peer succeded"
# TODO: To enable the blow code, the private and public # TODO: To enable the blow code, the private and public
# keys in PeerID need to be wrapped with Option[T] # keys in PeerID need to be wrapped with Option[T]

View File

@ -413,7 +413,9 @@ proc handleConn(s: Secio, conn: Connection): Future[Connection] {.async.} =
var stream = newBufferStream(writeHandler) var stream = newBufferStream(writeHandler)
asyncCheck readLoop(sconn, stream) asyncCheck readLoop(sconn, stream)
result = newConnection(stream) var secured = newConnection(stream)
secured.peerInfo = sconn.conn.peerInfo
result = secured
method init(s: Secio) {.gcsafe.} = method init(s: Secio) {.gcsafe.} =
proc handle(conn: Connection, proto: string) {.async, gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} =

View File

@ -95,19 +95,19 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
## mux incoming connection ## mux incoming connection
let muxers = toSeq(s.muxers.keys) let muxers = toSeq(s.muxers.keys)
if muxers.len == 0: if muxers.len == 0:
debug "no muxers registered" trace "no muxers registered, skipping upgrade flow"
return return
let muxerName = await s.ms.select(conn, muxers) let muxerName = await s.ms.select(conn, muxers)
if muxerName.len == 0 or muxerName == "na": if muxerName.len == 0 or muxerName == "na":
return return
# create new muxer for connection
let muxer = s.muxers[muxerName].newMuxer(conn) let muxer = s.muxers[muxerName].newMuxer(conn)
# install stream handler # install stream handler
muxer.streamHandler = s.streamHandler muxer.streamHandler = s.streamHandler
# do identify first, so that we have a # new stream for identify
# PeerInfo in case we didn't before
let stream = await muxer.newStream() let stream = await muxer.newStream()
let handlerFut = muxer.handle() let handlerFut = muxer.handle()
@ -117,15 +117,25 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
debug "mux: Muxer handler completed for peer ", debug "mux: Muxer handler completed for peer ",
peer = conn.peerInfo.peerId.get().pretty peer = conn.peerInfo.peerId.get().pretty
) )
# do identify first, so that we have a
# PeerInfo in case we didn't before
await s.identify(stream) await s.identify(stream)
# update main connection with refreshed info
if stream.peerInfo.peerId.isSome:
conn.peerInfo = stream.peerInfo
await stream.close() # close idenity stream await stream.close() # close idenity stream
trace "connection's peerInfo", peerInfo = conn.peerInfo.peerId
# store it in muxed connections if we have a peer for it # store it in muxed connections if we have a peer for it
# TODO: We should make sure that this are cleaned up properly # TODO: We should make sure that this are cleaned up properly
# on exit even if there is no peer for it. This shouldn't # on exit even if there is no peer for it. This shouldn't
# happen once secio is in place, but still something to keep # happen once secio is in place, but still something to keep
# in mind # in mind
if conn.peerInfo.peerId.isSome: if conn.peerInfo.peerId.isSome:
trace "adding muxer for peer", peer = conn.peerInfo.peerId.get().pretty
s.muxed[conn.peerInfo.peerId.get().pretty] = muxer s.muxed[conn.peerInfo.peerId.get().pretty] = muxer
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
@ -158,6 +168,7 @@ proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Option[Connection]] {
# if there is a muxer for the connection # if there is a muxer for the connection
# use it instead to create a muxed stream # use it instead to create a muxed stream
if s.muxed.contains(peerInfo.peerId.get().pretty): if s.muxed.contains(peerInfo.peerId.get().pretty):
trace "connection is muxed, retriving muxer and setting up a stream"
let muxer = s.muxed[peerInfo.peerId.get().pretty] let muxer = s.muxed[peerInfo.peerId.get().pretty]
let conn = await muxer.newStream() let conn = await muxer.newStream()
result = some(conn) result = some(conn)
@ -166,6 +177,7 @@ proc dial*(s: Switch,
peer: PeerInfo, peer: PeerInfo,
proto: string = ""): proto: string = ""):
Future[Connection] {.async.} = Future[Connection] {.async.} =
trace "dialing peer", peer = peer.peerId.get().pretty
for t in s.transports: # for each transport for t in s.transports: # for each transport
for a in peer.addrs: # for each address for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it if t.handles(a): # check if it can dial it
@ -176,6 +188,7 @@ proc dial*(s: Switch,
let stream = await s.getMuxedStream(peer) let stream = await s.getMuxedStream(peer)
if stream.isSome: if stream.isSome:
trace "connection is muxed, return muxed stream"
result = stream.get() result = stream.get()
debug "dial: attempting to select remote ", proto = proto debug "dial: attempting to select remote ", proto = proto

View File

@ -55,7 +55,7 @@ suite "Switch":
proc testSwitch(): Future[bool] {.async, gcsafe.} = proc testSwitch(): Future[bool] {.async, gcsafe.} =
let ma1: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53370") let ma1: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53370")
let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53371") let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53371")
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch
(switch1, peerInfo1) = createSwitch(ma1) (switch1, peerInfo1) = createSwitch(ma1)