fix: switch, with identify and mplex
This commit is contained in:
parent
701e048ee6
commit
e31966b6f8
|
@ -103,11 +103,12 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
|||
await s.readExactly(addr buffer[0], int(size))
|
||||
except TransportIncompleteError:
|
||||
buffer.setLen(0)
|
||||
raise newLPStreamIncompleteError()
|
||||
|
||||
except AsyncStreamIncompleteError:
|
||||
buffer.setLen(0)
|
||||
|
||||
result = buffer
|
||||
|
||||
proc writeLp*(s: Connection, msg: string | seq[byte]) {.async, gcsafe.} =
|
||||
proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(msg)
|
||||
|
|
|
@ -1,68 +0,0 @@
|
|||
## Nim-LibP2P
|
||||
## Copyright (c) 2018 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
## Small debug module that's enabled through a ``-d:debugout``
|
||||
## flag. It's inspired by the Nodejs ``debug`` module that
|
||||
## allows printing colorized output based on patterns. This
|
||||
## module is powered by the standard ``nre`` module and as such
|
||||
## it supports the same set of regexp expressions.
|
||||
##
|
||||
## To enable debug output, pass the ``debugout`` flag during build
|
||||
## time with ``-d`` flag. In addition, the debugout flag takes a comma
|
||||
## separated list of patters that will narow the debug output to
|
||||
## only those matched by the patterns. By default however, all
|
||||
## debug statements are outputed to the stderr.
|
||||
|
||||
when defined(debugout) and not defined(release):
|
||||
import random,
|
||||
tables,
|
||||
nre,
|
||||
strutils,
|
||||
times,
|
||||
terminal,
|
||||
sequtils
|
||||
|
||||
const debugout {.strdefine.}: string = ".*"
|
||||
|
||||
type
|
||||
Match = object
|
||||
pattern: Regex
|
||||
color: string
|
||||
|
||||
var matches: OrderedTable[string, Match] = initOrderedTable[string, Match]()
|
||||
var patrns = @[".*"]
|
||||
if debugout != "true":
|
||||
patrns = debugout.split(re"[,\s]").filterIt(it.len > 0)
|
||||
|
||||
randomize()
|
||||
for p in patrns:
|
||||
matches[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors
|
||||
# matches["*"] = Match(pattern: re(".*"), color: $rand(0..272)) # 256 ansi colors
|
||||
|
||||
if isTrueColorSupported():
|
||||
system.addQuitProc(resetAttributes)
|
||||
enableTrueColors()
|
||||
|
||||
proc doDebug(data: string): void {.gcsafe.} =
|
||||
for m in matches.values:
|
||||
if data.match(m.pattern).isSome:
|
||||
stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(data, 4) & "\e[0m")
|
||||
return
|
||||
|
||||
template debug*(data: string) =
|
||||
let module = instantiationInfo()
|
||||
let line = "$# $#:$# - $#" %
|
||||
[now().format("yyyy-MM-dd HH:mm:ss:fffffffff"),
|
||||
module.filename[0 .. ^5],
|
||||
$module.line,
|
||||
data]
|
||||
doDebug(line)
|
||||
|
||||
else:
|
||||
template debug*(data: string) = discard
|
|
@ -46,41 +46,49 @@ proc select*(m: MultisteamSelect,
|
|||
conn: Connection,
|
||||
proto: seq[string]):
|
||||
Future[string] {.async.} =
|
||||
debug &"select: initiating handshake"
|
||||
## select a remote protocol
|
||||
await conn.write(m.codec) # write handshake
|
||||
if proto.len() > 0:
|
||||
debug &"select: selecting proto {proto}"
|
||||
await conn.writeLp((proto[0] & "\n")) # select proto
|
||||
|
||||
result = cast[string](await conn.readLp()) # read ms header
|
||||
result.removeSuffix("\n")
|
||||
if result != Codec:
|
||||
debug &"select: handshake failed"
|
||||
return ""
|
||||
|
||||
if proto.len() == 0: # no protocols, must be a handshake call
|
||||
return
|
||||
|
||||
result = cast[string](await conn.readLp()) # read the first proto
|
||||
debug &"select: reading first requested proto"
|
||||
result.removeSuffix("\n")
|
||||
if result == proto[0]:
|
||||
debug &"select: succesfully selected {proto}"
|
||||
return
|
||||
|
||||
if not result.len > 0:
|
||||
debug &"select: selecting one of several protos"
|
||||
for p in proto[1..<proto.len()]:
|
||||
await conn.writeLp((p & "\n")) # select proto
|
||||
result = cast[string](await conn.readLp()) # read the first proto
|
||||
result.removeSuffix("\n")
|
||||
if result == p:
|
||||
debug &"select: selected {result}"
|
||||
break
|
||||
|
||||
proc select*(m: MultisteamSelect,
|
||||
conn: Connection,
|
||||
proto: string): Future[bool] {.async.} =
|
||||
result = if proto.len > 0:
|
||||
(await m.select(conn, @[proto])) == proto
|
||||
else:
|
||||
(await m.select(conn, @[])) == Codec
|
||||
if proto.len > 0:
|
||||
result = (await m.select(conn, @[proto])) == proto
|
||||
else:
|
||||
result = (await m.select(conn, @[])) == Codec
|
||||
|
||||
proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = m.select(conn, "")
|
||||
proc select*(m: MultisteamSelect, conn: Connection): Future[bool] =
|
||||
m.select(conn, "")
|
||||
|
||||
proc list*(m: MultisteamSelect,
|
||||
conn: Connection): Future[seq[string]] {.async.} =
|
||||
|
@ -99,19 +107,25 @@ proc list*(m: MultisteamSelect,
|
|||
result = list
|
||||
|
||||
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
|
||||
debug &"select: starting multistream handling"
|
||||
while not conn.closed:
|
||||
block main:
|
||||
var ms = cast[string](await conn.readLp())
|
||||
ms.removeSuffix("\n")
|
||||
|
||||
debug &"select: got request for {ms}"
|
||||
if ms.len() <= 0:
|
||||
debug &"select: invalid proto"
|
||||
await conn.write(m.na)
|
||||
|
||||
if m.handlers.len() == 0:
|
||||
debug &"select: {ms} is na"
|
||||
await conn.write(m.na)
|
||||
continue
|
||||
|
||||
case ms:
|
||||
of "ls":
|
||||
debug &"select: listing protos"
|
||||
var protos = ""
|
||||
for h in m.handlers:
|
||||
protos &= (h.proto & "\n")
|
||||
|
@ -121,12 +135,14 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
|
|||
else:
|
||||
for h in m.handlers:
|
||||
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
|
||||
debug &"select: found handler for {ms}"
|
||||
await conn.writeLp((h.proto & "\n"))
|
||||
try:
|
||||
await h.protocol.handler(conn, ms)
|
||||
break main
|
||||
except Exception as exc:
|
||||
debug exc.msg # TODO: Logging
|
||||
debug &"select: no handlers for {ms}"
|
||||
await conn.write(m.na)
|
||||
|
||||
proc addHandler*[T: LPProtocol](m: MultisteamSelect,
|
||||
|
|
|
@ -7,12 +7,16 @@
|
|||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import options
|
||||
import options, strformat
|
||||
import chronos
|
||||
import ../protobuf/minprotobuf, ../peerinfo,
|
||||
protocol as proto, ../connection,
|
||||
../peer, ../crypto/crypto,
|
||||
../multiaddress
|
||||
import ../protobuf/minprotobuf,
|
||||
../peerinfo,
|
||||
../connection,
|
||||
../peer,
|
||||
../crypto/crypto,
|
||||
../multiaddress,
|
||||
../protocols/protocol,
|
||||
../helpers/debug
|
||||
|
||||
const IdentifyCodec* = "/ipfs/id/1.0.0"
|
||||
const IdentifyPushCodec* = "/ipfs/id/push/1.0.0"
|
||||
|
@ -99,12 +103,15 @@ proc identify*(p: Identify,
|
|||
Future[IdentifyInfo] {.async.} =
|
||||
var message = await conn.readLp()
|
||||
if len(message) == 0:
|
||||
debug "identify: Invalid or empty message received!"
|
||||
raise newException(IdentityInvalidMsgError,
|
||||
"Invalid or empty message received!")
|
||||
|
||||
result = decodeMsg(message)
|
||||
debug &"identify: Identify for remote peer succeded"
|
||||
if remotePeerInfo.isSome and
|
||||
remotePeerInfo.get().peerId.publicKey != result.pubKey:
|
||||
debug "identify: Peer's remote public key doesn't match"
|
||||
raise newException(IdentityNoMatchError,
|
||||
"Peer's remote public key doesn't match")
|
||||
|
||||
|
|
|
@ -61,12 +61,13 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
|
|||
peerInfo.peerId = PeerID.init(info.pubKey) # we might not have a peerId at all
|
||||
peerInfo.addrs = info.addrs
|
||||
peerInfo.protocols = info.protos
|
||||
debug &"identify: identified remote peer {peerInfo.peerId.pretty}"
|
||||
except IdentityInvalidMsgError as exc:
|
||||
debug exc.msg
|
||||
except IdentityNoMatchError as exc:
|
||||
debug exc.msg
|
||||
|
||||
proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
|
||||
## mux incoming connection
|
||||
let muxers = toSeq(s.muxers.keys)
|
||||
let muxerName = await s.ms.select(conn, muxers)
|
||||
|
@ -79,9 +80,17 @@ proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
|||
|
||||
# do identify first, so that we have a
|
||||
# PeerInfo in case we didn't before
|
||||
result = await muxer.newStream()
|
||||
asyncCheck muxer.handle()
|
||||
await s.identify(result)
|
||||
let stream = await muxer.newStream()
|
||||
let handlerFut = muxer.handle()
|
||||
|
||||
# add muxer handler cleanup proc
|
||||
handlerFut.addCallback(
|
||||
proc(udata: pointer = nil) {.gcsafe.} =
|
||||
if handlerFut.finished:
|
||||
debug &"Muxer handler completed for peer {conn.peerInfo.get().peerId.pretty}"
|
||||
)
|
||||
await s.identify(stream)
|
||||
await stream.close() # close idenity stream
|
||||
|
||||
# store it in muxed connections if we have a peer for it
|
||||
# TODO: We should make sure that this are cleaned up properly
|
||||
|
@ -105,7 +114,7 @@ proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe
|
|||
s.connections[id] = result
|
||||
|
||||
result = await s.secure(conn) # secure the connection
|
||||
result = await s.mux(result) # mux it if possible
|
||||
await s.mux(result) # mux it if possible
|
||||
|
||||
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||
let id = if conn.peerInfo.isSome: conn.peerInfo.get().peerId.pretty else: ""
|
||||
|
@ -126,12 +135,17 @@ proc dial*(s: Switch,
|
|||
result = await t.dial(a)
|
||||
result.peerInfo = some(peer)
|
||||
result = await s.handleConn(result)
|
||||
|
||||
# if there is a muxer for the connection
|
||||
# use it instead to create a muxed stream
|
||||
if s.muxed.contains(peer.peerId.pretty):
|
||||
result = await s.muxed[peer.peerId.pretty].newStream()
|
||||
|
||||
debug &"dial: attempting to select remote proto {proto}"
|
||||
if not (await s.ms.select(result, proto)):
|
||||
debug &"dial: Unable to select protocol: {proto}"
|
||||
raise newException(CatchableError,
|
||||
&"Unable to select protocol: {proto}")
|
||||
await s.muxed[peer.peerId.pretty].handle()
|
||||
|
||||
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||
if isNil(proto.handler):
|
||||
|
|
|
@ -12,7 +12,8 @@ import ../libp2p/switch,
|
|||
../libp2p/protocols/protocol,
|
||||
../libp2p/muxers/muxer,
|
||||
../libp2p/muxers/mplex/mplex,
|
||||
../libp2p/muxers/mplex/types
|
||||
../libp2p/muxers/mplex/types,
|
||||
../libp2p/helpers/debug
|
||||
|
||||
const TestCodec = "/test/proto/1.0.0"
|
||||
|
||||
|
@ -22,9 +23,9 @@ type
|
|||
method init(p: TestProto) {.gcsafe.} =
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
|
||||
let msg = cast[string](await conn.readLp())
|
||||
echo msg
|
||||
check "Hello!" == msg
|
||||
await conn.writeLp("Hello!")
|
||||
await conn.close()
|
||||
|
||||
p.codec = TestCodec
|
||||
p.handler = handle
|
||||
|
@ -64,13 +65,9 @@ suite "Switch":
|
|||
(switch2, peerInfo2) = createSwitch(ma2)
|
||||
await switch2.start()
|
||||
let conn = await switch2.dial(peerInfo1, TestCodec)
|
||||
echo "DIALED???"
|
||||
echo conn.repr
|
||||
debug "TEST SWITCH: dial succesful"
|
||||
await conn.writeLp("Hello!")
|
||||
echo "WROTE FROM TEST"
|
||||
echo conn.repr
|
||||
let msg = cast[string](await conn.readLp())
|
||||
echo msg
|
||||
check "Hello!" == msg
|
||||
|
||||
result = true
|
||||
|
|
Loading…
Reference in New Issue