From 701e048ee66bb04d22926485d6975fe7c1184a1a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 8 Sep 2019 00:34:08 -0600 Subject: [PATCH] add debug logging --- libp2p/helpers/debug.nim | 10 +++++----- libp2p/muxers/mplex/channel.nim | 5 ++++- libp2p/muxers/mplex/coder.nim | 8 ++++++-- libp2p/muxers/mplex/mplex.nim | 21 +++++++++++++++------ libp2p/switch.nim | 15 +++++++++------ 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/libp2p/helpers/debug.nim b/libp2p/helpers/debug.nim index b257fca..72aff52 100644 --- a/libp2p/helpers/debug.nim +++ b/libp2p/helpers/debug.nim @@ -39,21 +39,21 @@ when defined(debugout) and not defined(release): system.addQuitProc(resetAttributes) enableTrueColors() - var matches {.threadvar.} : OrderedTableRef[string, Match] + var context {.threadvar.} : OrderedTableRef[string, Match] proc initDebugCtx() = - matches = newOrderedTable[string, Match]() + context = newOrderedTable[string, Match]() var patrns = @[".*"] if debugout != "true": patrns = debugout.split(re"[,\s]").filterIt(it.len > 0) randomize() for p in patrns: - matches[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors + context[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors proc doDebug(data: string): void {.gcsafe.} = - if isNil(matches): + if isNil(context): initDebugCtx() - for m in matches.values: + for m in context.values: if data.match(m.pattern).isSome: stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(data, 4) & "\e[0m") return diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim index 13a8737..8b37f92 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/channel.nim @@ -12,7 +12,9 @@ import ../../stream/bufferstream, ../../stream/lpstream, ../../connection, nimcrypto/utils, - types, coder + types, + coder, + ../../helpers/debug const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb @@ -50,6 +52,7 @@ proc newChannel*(id: uint, proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = # writes should happen in sequence await chan.asyncLock.acquire() + debug &"writeHandler: sending data {data} from {chan.id}" await conn.writeMsg(chan.id, chan.msgCode, data) # write header chan.asyncLock.release() diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 6e44bd0..11ba341 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -7,13 +7,14 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, options, sequtils +import chronos, options, sequtils, strformat import types, ../../connection, ../../varint, ../../vbuffer, ../../stream/lpstream, - nimcrypto/utils + nimcrypto/utils, + ../../helpers/debug type Msg* = tuple @@ -44,10 +45,13 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = let headerVarint = await conn.readMplexVarint() if headerVarint.isNone: return + + debug &"readMsg: read header varint {$headerVarint}" let dataLenVarint = await conn.readMplexVarint() var data: seq[byte] if dataLenVarint.isSome and dataLenVarint.get() > 0.uint: + debug &"readMsg: read size varint {$dataLenVarint}" data = await conn.read(dataLenVarint.get().int) let header = headerVarint.get() diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index c3aaa9d..9c8f1d7 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -25,7 +25,8 @@ import coder, types, channel, ../../protocols/protocol, ../../stream/bufferstream, ../../stream/lpstream, - ../muxer + ../muxer, + ../../helpers/debug type Mplex* = ref object of Muxer @@ -70,31 +71,39 @@ method handle*(m: Mplex) {.async, gcsafe.} = if MessageType(msgType) != MessageType.New: let channels = m.getChannelList(initiator) if not channels.contains(id): - raise newMplexNoSuchChannel(id, msgType) + debug &"handle: Channel with id {id} message type {msgType} not found" + continue channel = channels[id] case msgType: of MessageType.New: - channel = await m.newStreamInternal(false, id, cast[string](data)) + let name = cast[string](data) + channel = await m.newStreamInternal(false, id, name) + debug &"handle: created channel with id {$id} and name {name}" if not isNil(m.streamHandler): let handlerFut = m.streamHandler(newConnection(channel)) + + # TODO: don't use a closure? + # channel cleanup routine proc cleanUpChan(udata: pointer) {.gcsafe.} = if handlerFut.finished: channel.close().addCallback( proc(udata: pointer) = # TODO: is waitFor() OK here? channel.cleanUp() - .addCallback(proc(udata: pointer) = - echo &"cleaned up channel {$id}") - ) + .addCallback(proc(udata: pointer) + = debug &"handle: cleaned up channel {$id}")) handlerFut.addCallback(cleanUpChan) continue of MessageType.MsgIn, MessageType.MsgOut: + debug &"handle: pushing data to channel {$id} type {msgType}" await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: + debug &"handle: closing channel {$id} type {msgType}" await channel.closedByRemote() m.getChannelList(initiator).del(id) of MessageType.ResetIn, MessageType.ResetOut: + debug &"handle: resetting channel {$id} type {msgType}" await channel.resetByRemote() break else: raise newMplexUnknownMsgError() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index d6dddf8..afdcf09 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -19,7 +19,8 @@ import connection, multiaddress, protocols/identify, muxers/muxer, - peer + peer, + helpers/debug type Switch* = ref object of RootObj @@ -61,9 +62,9 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = peerInfo.addrs = info.addrs peerInfo.protocols = info.protos except IdentityInvalidMsgError as exc: - echo exc.msg # TODO: Loging + debug exc.msg except IdentityNoMatchError as exc: - echo exc.msg # TODO: Loging + debug exc.msg proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = ## mux incoming connection @@ -79,8 +80,9 @@ proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = # do identify first, so that we have a # PeerInfo in case we didn't before result = await muxer.newStream() - # await s.identify(result) - + asyncCheck muxer.handle() + await s.identify(result) + # store it in muxed connections if we have a peer for it # TODO: We should make sure that this are cleaned up properly # on exit even if there is no peer for it. This shouldn't @@ -126,9 +128,10 @@ proc dial*(s: Switch, result = await s.handleConn(result) if s.muxed.contains(peer.peerId.pretty): result = await s.muxed[peer.peerId.pretty].newStream() - if (await s.ms.select(result, proto)): + if not (await s.ms.select(result, proto)): raise newException(CatchableError, &"Unable to select protocol: {proto}") + await s.muxed[peer.peerId.pretty].handle() proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = if isNil(proto.handler):