add debug logging
This commit is contained in:
parent
54d740949e
commit
701e048ee6
|
@ -39,21 +39,21 @@ when defined(debugout) and not defined(release):
|
||||||
system.addQuitProc(resetAttributes)
|
system.addQuitProc(resetAttributes)
|
||||||
enableTrueColors()
|
enableTrueColors()
|
||||||
|
|
||||||
var matches {.threadvar.} : OrderedTableRef[string, Match]
|
var context {.threadvar.} : OrderedTableRef[string, Match]
|
||||||
proc initDebugCtx() =
|
proc initDebugCtx() =
|
||||||
matches = newOrderedTable[string, Match]()
|
context = newOrderedTable[string, Match]()
|
||||||
var patrns = @[".*"]
|
var patrns = @[".*"]
|
||||||
if debugout != "true":
|
if debugout != "true":
|
||||||
patrns = debugout.split(re"[,\s]").filterIt(it.len > 0)
|
patrns = debugout.split(re"[,\s]").filterIt(it.len > 0)
|
||||||
|
|
||||||
randomize()
|
randomize()
|
||||||
for p in patrns:
|
for p in patrns:
|
||||||
matches[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors
|
context[p] = Match(pattern: re(p), color: $rand(0..272)) # 256 ansi colors
|
||||||
|
|
||||||
proc doDebug(data: string): void {.gcsafe.} =
|
proc doDebug(data: string): void {.gcsafe.} =
|
||||||
if isNil(matches):
|
if isNil(context):
|
||||||
initDebugCtx()
|
initDebugCtx()
|
||||||
for m in matches.values:
|
for m in context.values:
|
||||||
if data.match(m.pattern).isSome:
|
if data.match(m.pattern).isSome:
|
||||||
stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(data, 4) & "\e[0m")
|
stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(data, 4) & "\e[0m")
|
||||||
return
|
return
|
||||||
|
|
|
@ -12,7 +12,9 @@ import ../../stream/bufferstream,
|
||||||
../../stream/lpstream,
|
../../stream/lpstream,
|
||||||
../../connection,
|
../../connection,
|
||||||
nimcrypto/utils,
|
nimcrypto/utils,
|
||||||
types, coder
|
types,
|
||||||
|
coder,
|
||||||
|
../../helpers/debug
|
||||||
|
|
||||||
const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb
|
const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb
|
||||||
|
|
||||||
|
@ -50,6 +52,7 @@ proc newChannel*(id: uint,
|
||||||
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
|
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
|
||||||
# writes should happen in sequence
|
# writes should happen in sequence
|
||||||
await chan.asyncLock.acquire()
|
await chan.asyncLock.acquire()
|
||||||
|
debug &"writeHandler: sending data {data} from {chan.id}"
|
||||||
await conn.writeMsg(chan.id, chan.msgCode, data) # write header
|
await conn.writeMsg(chan.id, chan.msgCode, data) # write header
|
||||||
chan.asyncLock.release()
|
chan.asyncLock.release()
|
||||||
|
|
||||||
|
|
|
@ -7,13 +7,14 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import chronos, options, sequtils
|
import chronos, options, sequtils, strformat
|
||||||
import types,
|
import types,
|
||||||
../../connection,
|
../../connection,
|
||||||
../../varint,
|
../../varint,
|
||||||
../../vbuffer,
|
../../vbuffer,
|
||||||
../../stream/lpstream,
|
../../stream/lpstream,
|
||||||
nimcrypto/utils
|
nimcrypto/utils,
|
||||||
|
../../helpers/debug
|
||||||
|
|
||||||
type
|
type
|
||||||
Msg* = tuple
|
Msg* = tuple
|
||||||
|
@ -45,9 +46,12 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
|
||||||
if headerVarint.isNone:
|
if headerVarint.isNone:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
debug &"readMsg: read header varint {$headerVarint}"
|
||||||
|
|
||||||
let dataLenVarint = await conn.readMplexVarint()
|
let dataLenVarint = await conn.readMplexVarint()
|
||||||
var data: seq[byte]
|
var data: seq[byte]
|
||||||
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint:
|
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint:
|
||||||
|
debug &"readMsg: read size varint {$dataLenVarint}"
|
||||||
data = await conn.read(dataLenVarint.get().int)
|
data = await conn.read(dataLenVarint.get().int)
|
||||||
|
|
||||||
let header = headerVarint.get()
|
let header = headerVarint.get()
|
||||||
|
|
|
@ -25,7 +25,8 @@ import coder, types, channel,
|
||||||
../../protocols/protocol,
|
../../protocols/protocol,
|
||||||
../../stream/bufferstream,
|
../../stream/bufferstream,
|
||||||
../../stream/lpstream,
|
../../stream/lpstream,
|
||||||
../muxer
|
../muxer,
|
||||||
|
../../helpers/debug
|
||||||
|
|
||||||
type
|
type
|
||||||
Mplex* = ref object of Muxer
|
Mplex* = ref object of Muxer
|
||||||
|
@ -70,31 +71,39 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
if MessageType(msgType) != MessageType.New:
|
if MessageType(msgType) != MessageType.New:
|
||||||
let channels = m.getChannelList(initiator)
|
let channels = m.getChannelList(initiator)
|
||||||
if not channels.contains(id):
|
if not channels.contains(id):
|
||||||
raise newMplexNoSuchChannel(id, msgType)
|
debug &"handle: Channel with id {id} message type {msgType} not found"
|
||||||
|
continue
|
||||||
channel = channels[id]
|
channel = channels[id]
|
||||||
|
|
||||||
case msgType:
|
case msgType:
|
||||||
of MessageType.New:
|
of MessageType.New:
|
||||||
channel = await m.newStreamInternal(false, id, cast[string](data))
|
let name = cast[string](data)
|
||||||
|
channel = await m.newStreamInternal(false, id, name)
|
||||||
|
debug &"handle: created channel with id {$id} and name {name}"
|
||||||
if not isNil(m.streamHandler):
|
if not isNil(m.streamHandler):
|
||||||
let handlerFut = m.streamHandler(newConnection(channel))
|
let handlerFut = m.streamHandler(newConnection(channel))
|
||||||
|
|
||||||
|
# TODO: don't use a closure?
|
||||||
|
# channel cleanup routine
|
||||||
proc cleanUpChan(udata: pointer) {.gcsafe.} =
|
proc cleanUpChan(udata: pointer) {.gcsafe.} =
|
||||||
if handlerFut.finished:
|
if handlerFut.finished:
|
||||||
channel.close().addCallback(
|
channel.close().addCallback(
|
||||||
proc(udata: pointer) =
|
proc(udata: pointer) =
|
||||||
# TODO: is waitFor() OK here?
|
# TODO: is waitFor() OK here?
|
||||||
channel.cleanUp()
|
channel.cleanUp()
|
||||||
.addCallback(proc(udata: pointer) =
|
.addCallback(proc(udata: pointer)
|
||||||
echo &"cleaned up channel {$id}")
|
= debug &"handle: cleaned up channel {$id}"))
|
||||||
)
|
|
||||||
handlerFut.addCallback(cleanUpChan)
|
handlerFut.addCallback(cleanUpChan)
|
||||||
continue
|
continue
|
||||||
of MessageType.MsgIn, MessageType.MsgOut:
|
of MessageType.MsgIn, MessageType.MsgOut:
|
||||||
|
debug &"handle: pushing data to channel {$id} type {msgType}"
|
||||||
await channel.pushTo(data)
|
await channel.pushTo(data)
|
||||||
of MessageType.CloseIn, MessageType.CloseOut:
|
of MessageType.CloseIn, MessageType.CloseOut:
|
||||||
|
debug &"handle: closing channel {$id} type {msgType}"
|
||||||
await channel.closedByRemote()
|
await channel.closedByRemote()
|
||||||
m.getChannelList(initiator).del(id)
|
m.getChannelList(initiator).del(id)
|
||||||
of MessageType.ResetIn, MessageType.ResetOut:
|
of MessageType.ResetIn, MessageType.ResetOut:
|
||||||
|
debug &"handle: resetting channel {$id} type {msgType}"
|
||||||
await channel.resetByRemote()
|
await channel.resetByRemote()
|
||||||
break
|
break
|
||||||
else: raise newMplexUnknownMsgError()
|
else: raise newMplexUnknownMsgError()
|
||||||
|
|
|
@ -19,7 +19,8 @@ import connection,
|
||||||
multiaddress,
|
multiaddress,
|
||||||
protocols/identify,
|
protocols/identify,
|
||||||
muxers/muxer,
|
muxers/muxer,
|
||||||
peer
|
peer,
|
||||||
|
helpers/debug
|
||||||
|
|
||||||
type
|
type
|
||||||
Switch* = ref object of RootObj
|
Switch* = ref object of RootObj
|
||||||
|
@ -61,9 +62,9 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
peerInfo.addrs = info.addrs
|
peerInfo.addrs = info.addrs
|
||||||
peerInfo.protocols = info.protos
|
peerInfo.protocols = info.protos
|
||||||
except IdentityInvalidMsgError as exc:
|
except IdentityInvalidMsgError as exc:
|
||||||
echo exc.msg # TODO: Loging
|
debug exc.msg
|
||||||
except IdentityNoMatchError as exc:
|
except IdentityNoMatchError as exc:
|
||||||
echo exc.msg # TODO: Loging
|
debug exc.msg
|
||||||
|
|
||||||
proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||||
## mux incoming connection
|
## mux incoming connection
|
||||||
|
@ -79,7 +80,8 @@ proc mux(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
|
||||||
# do identify first, so that we have a
|
# do identify first, so that we have a
|
||||||
# PeerInfo in case we didn't before
|
# PeerInfo in case we didn't before
|
||||||
result = await muxer.newStream()
|
result = await muxer.newStream()
|
||||||
# await s.identify(result)
|
asyncCheck muxer.handle()
|
||||||
|
await s.identify(result)
|
||||||
|
|
||||||
# store it in muxed connections if we have a peer for it
|
# store it in muxed connections if we have a peer for it
|
||||||
# TODO: We should make sure that this are cleaned up properly
|
# TODO: We should make sure that this are cleaned up properly
|
||||||
|
@ -126,9 +128,10 @@ proc dial*(s: Switch,
|
||||||
result = await s.handleConn(result)
|
result = await s.handleConn(result)
|
||||||
if s.muxed.contains(peer.peerId.pretty):
|
if s.muxed.contains(peer.peerId.pretty):
|
||||||
result = await s.muxed[peer.peerId.pretty].newStream()
|
result = await s.muxed[peer.peerId.pretty].newStream()
|
||||||
if (await s.ms.select(result, proto)):
|
if not (await s.ms.select(result, proto)):
|
||||||
raise newException(CatchableError,
|
raise newException(CatchableError,
|
||||||
&"Unable to select protocol: {proto}")
|
&"Unable to select protocol: {proto}")
|
||||||
|
await s.muxed[peer.peerId.pretty].handle()
|
||||||
|
|
||||||
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
|
||||||
if isNil(proto.handler):
|
if isNil(proto.handler):
|
||||||
|
|
Loading…
Reference in New Issue