From e31966b6f8574eaa95d6dfa489d19fcd7bc55578 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 8 Sep 2019 01:43:33 -0600 Subject: [PATCH] fix: switch, with identify and mplex --- libp2p/connection.nim | 7 ++-- libp2p/debug.nim | 68 ----------------------------------- libp2p/multistream.nim | 26 +++++++++++--- libp2p/protocols/identify.nim | 17 ++++++--- libp2p/switch.nim | 26 ++++++++++---- tests/testswitch.nim | 11 +++--- 6 files changed, 61 insertions(+), 94 deletions(-) delete mode 100644 libp2p/debug.nim diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 91be78e..36c1517 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -103,11 +103,12 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = await s.readExactly(addr buffer[0], int(size)) except TransportIncompleteError: buffer.setLen(0) - raise newLPStreamIncompleteError() - + except AsyncStreamIncompleteError: + buffer.setLen(0) + result = buffer -proc writeLp*(s: Connection, msg: string | seq[byte]) {.async, gcsafe.} = +proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} = ## write lenght prefixed var buf = initVBuffer() buf.writeSeq(msg) diff --git a/libp2p/debug.nim b/libp2p/debug.nim deleted file mode 100644 index a04632d..0000000 --- a/libp2p/debug.nim +++ /dev/null @@ -1,68 +0,0 @@ -## Nim-LibP2P -## Copyright (c) 2018 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -## Small debug module that's enabled through a ``-d:debugout`` -## flag. It's inspired by the Nodejs ``debug`` module that -## allows printing colorized output based on patterns. This -## module is powered by the standard ``nre`` module and as such -## it supports the same set of regexp expressions. -## -## To enable debug output, pass the ``debugout`` flag during build -## time with ``-d`` flag. In addition, the debugout flag takes a comma -## separated list of patters that will narow the debug output to -## only those matched by the patterns. By default however, all -## debug statements are outputed to the stderr. - -when defined(debugout) and not defined(release): - import random, - tables, - nre, - strutils, - times, - terminal, - sequtils - - const debugout {.strdefine.}: string = ".*" - - type - Match = object - pattern: Regex - color: string - - var matches: OrderedTable[string, Match] = initOrderedTable[string, Match]() - var patrns = @[".*"] - if debugout != "true": - patrns = debugout.split(re"[,\s]").filterIt(it.len > 0) - - randomize() - for p in patrns: - matches[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors - # matches["*"] = Match(pattern: re(".*"), color: $rand(0..272)) # 256 ansi colors - - if isTrueColorSupported(): - system.addQuitProc(resetAttributes) - enableTrueColors() - - proc doDebug(data: string): void {.gcsafe.} = - for m in matches.values: - if data.match(m.pattern).isSome: - stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(data, 4) & "\e[0m") - return - - template debug*(data: string) = - let module = instantiationInfo() - let line = "$# $#:$# - $#" % - [now().format("yyyy-MM-dd HH:mm:ss:fffffffff"), - module.filename[0 .. ^5], - $module.line, - data] - doDebug(line) - -else: - template debug*(data: string) = discard \ No newline at end of file diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index 1ccc173..3cfaf52 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -46,41 +46,49 @@ proc select*(m: MultisteamSelect, conn: Connection, proto: seq[string]): Future[string] {.async.} = + debug &"select: initiating handshake" ## select a remote protocol await conn.write(m.codec) # write handshake if proto.len() > 0: + debug &"select: selecting proto {proto}" await conn.writeLp((proto[0] & "\n")) # select proto result = cast[string](await conn.readLp()) # read ms header result.removeSuffix("\n") if result != Codec: + debug &"select: handshake failed" return "" if proto.len() == 0: # no protocols, must be a handshake call return result = cast[string](await conn.readLp()) # read the first proto + debug &"select: reading first requested proto" result.removeSuffix("\n") if result == proto[0]: + debug &"select: succesfully selected {proto}" return if not result.len > 0: + debug &"select: selecting one of several protos" for p in proto[1.. 0: - (await m.select(conn, @[proto])) == proto - else: - (await m.select(conn, @[])) == Codec + if proto.len > 0: + result = (await m.select(conn, @[proto])) == proto + else: + result = (await m.select(conn, @[])) == Codec -proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = m.select(conn, "") +proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = + m.select(conn, "") proc list*(m: MultisteamSelect, conn: Connection): Future[seq[string]] {.async.} = @@ -99,19 +107,25 @@ proc list*(m: MultisteamSelect, result = list proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = + debug &"select: starting multistream handling" while not conn.closed: block main: var ms = cast[string](await conn.readLp()) ms.removeSuffix("\n") + + debug &"select: got request for {ms}" if ms.len() <= 0: + debug &"select: invalid proto" await conn.write(m.na) if m.handlers.len() == 0: + debug &"select: {ms} is na" await conn.write(m.na) continue case ms: of "ls": + debug &"select: listing protos" var protos = "" for h in m.handlers: protos &= (h.proto & "\n") @@ -121,12 +135,14 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = else: for h in m.handlers: if (not isNil(h.match) and h.match(ms)) or ms == h.proto: + debug &"select: found handler for {ms}" await conn.writeLp((h.proto & "\n")) try: await h.protocol.handler(conn, ms) break main except Exception as exc: debug exc.msg # TODO: Logging + debug &"select: no handlers for {ms}" await conn.write(m.na) proc addHandler*[T: LPProtocol](m: MultisteamSelect, diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index db41774..17afeb3 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -7,12 +7,16 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import options +import options, strformat import chronos -import ../protobuf/minprotobuf, ../peerinfo, - protocol as proto, ../connection, - ../peer, ../crypto/crypto, - ../multiaddress +import ../protobuf/minprotobuf, + ../peerinfo, + ../connection, + ../peer, + ../crypto/crypto, + ../multiaddress, + ../protocols/protocol, + ../helpers/debug const IdentifyCodec* = "/ipfs/id/1.0.0" const IdentifyPushCodec* = "/ipfs/id/push/1.0.0" @@ -99,12 +103,15 @@ proc identify*(p: Identify, Future[IdentifyInfo] {.async.} = var message = await conn.readLp() if len(message) == 0: + debug "identify: Invalid or empty message received!" raise newException(IdentityInvalidMsgError, "Invalid or empty message received!") result = decodeMsg(message) + debug &"identify: Identify for remote peer succeded" if remotePeerInfo.isSome and remotePeerInfo.get().peerId.publicKey != result.pubKey: + debug "identify: Peer's remote public key doesn't match" raise newException(IdentityNoMatchError, "Peer's remote public key doesn't match") diff --git a/libp2p/switch.nim b/libp2p/switch.nim index afdcf09..d11911d 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -61,12 +61,13 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = peerInfo.peerId = PeerID.init(info.pubKey) # we might not have a peerId at all peerInfo.addrs = info.addrs peerInfo.protocols = info.protos + debug &"identify: identified remote peer {peerInfo.peerId.pretty}" except IdentityInvalidMsgError as exc: debug exc.msg except IdentityNoMatchError as exc: debug exc.msg -proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = +proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = ## mux incoming connection let muxers = toSeq(s.muxers.keys) let muxerName = await s.ms.select(conn, muxers) @@ -79,9 +80,17 @@ proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = # do identify first, so that we have a # PeerInfo in case we didn't before - result = await muxer.newStream() - asyncCheck muxer.handle() - await s.identify(result) + let stream = await muxer.newStream() + let handlerFut = muxer.handle() + + # add muxer handler cleanup proc + handlerFut.addCallback( + proc(udata: pointer = nil) {.gcsafe.} = + if handlerFut.finished: + debug &"Muxer handler completed for peer {conn.peerInfo.get().peerId.pretty}" + ) + await s.identify(stream) + await stream.close() # close idenity stream # store it in muxed connections if we have a peer for it # TODO: We should make sure that this are cleaned up properly @@ -105,7 +114,7 @@ proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe s.connections[id] = result result = await s.secure(conn) # secure the connection - result = await s.mux(result) # mux it if possible + await s.mux(result) # mux it if possible proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = let id = if conn.peerInfo.isSome: conn.peerInfo.get().peerId.pretty else: "" @@ -126,12 +135,17 @@ proc dial*(s: Switch, result = await t.dial(a) result.peerInfo = some(peer) result = await s.handleConn(result) + + # if there is a muxer for the connection + # use it instead to create a muxed stream if s.muxed.contains(peer.peerId.pretty): result = await s.muxed[peer.peerId.pretty].newStream() + + debug &"dial: attempting to select remote proto {proto}" if not (await s.ms.select(result, proto)): + debug &"dial: Unable to select protocol: {proto}" raise newException(CatchableError, &"Unable to select protocol: {proto}") - await s.muxed[peer.peerId.pretty].handle() proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler): diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 29ba714..8735ca0 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -12,7 +12,8 @@ import ../libp2p/switch, ../libp2p/protocols/protocol, ../libp2p/muxers/muxer, ../libp2p/muxers/mplex/mplex, - ../libp2p/muxers/mplex/types + ../libp2p/muxers/mplex/types, + ../libp2p/helpers/debug const TestCodec = "/test/proto/1.0.0" @@ -22,9 +23,9 @@ type method init(p: TestProto) {.gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = let msg = cast[string](await conn.readLp()) - echo msg check "Hello!" == msg await conn.writeLp("Hello!") + await conn.close() p.codec = TestCodec p.handler = handle @@ -64,13 +65,9 @@ suite "Switch": (switch2, peerInfo2) = createSwitch(ma2) await switch2.start() let conn = await switch2.dial(peerInfo1, TestCodec) - echo "DIALED???" - echo conn.repr + debug "TEST SWITCH: dial succesful" await conn.writeLp("Hello!") - echo "WROTE FROM TEST" - echo conn.repr let msg = cast[string](await conn.readLp()) - echo msg check "Hello!" == msg result = true