adding chronicles logging

This commit is contained in:
Dmitriy Ryajov 2019-09-09 11:33:32 -06:00
parent 5bc8e7e7b1
commit 435c69633f
10 changed files with 68 additions and 148 deletions

View File

@ -1,77 +0,0 @@
## Nim-LibP2P
## Copyright (c) 2018 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
## Small debug module that's only runs in non release builds.
## It's inspired by the Nodejs ``debug`` module which allows printing
## colorized output based on matched patterns. This module is powered
## by the standard ``nre`` module and as such it supports the same set
## of regexp expressions.
##
## The output is controled through two environment variables ``DEBUG``
## and ``DEBUG_COLOR``. By default, it will print everything to stderr
## but if only some output is of interested, ``DEBUG`` accepts a coma
## separated list of regexp expressions to match the output against.
## Each match gets assigned a different color to help differentiate
## the output lines.
##
## Colorization is done with 256 ANSI colors, and each run gets a random
## color, if more than one match is specified, each one gets a random color
## to disable this behavior use ``DEBUG_COLOR`` with a valid ANSI color code.
## This will also disable individual colorization for each matched line.
##
when not defined(release):
import random,
tables,
nre,
strutils,
times,
terminal,
sequtils,
os
type
Match = object
pattern: Regex
color: string
# if isTrueColorSupported():
# system.addQuitProc(resetAttributes)
# enableTrueColors()
var context {.threadvar.} : OrderedTableRef[string, Match]
proc initDebugCtx() =
let debugout = getEnv("DEBUG", ".*")
let debugColor = getEnv("DEBUG_COLOR")
let isDebugColor = debugColor.len > 0
context = newOrderedTable[string, Match]()
var patrns: seq[string] = @[]
patrns = debugout.split(re"[,\s]").filterIt(it.len > 0)
randomize()
for p in patrns:
context[p] = Match(pattern: re(p), color: if isDebugColor: debugColor else: $rand(0..272)) # 256 ansi colors
proc doDebug(data: string, line: string): void {.gcsafe.} =
if isNil(context):
initDebugCtx()
for m in context.values:
if data.contains(m.pattern):
stderr.writeLine("\u001b[38;5;" & m.color & "m " & alignLeft(line & data, 4) & "\e[0m")
return
template debug*(data: string) =
let module = instantiationInfo()
let line = "$# $#:$# - " %
[now().format("yyyy-MM-dd HH:mm:ss:fffffffff"),
module.filename[0..^5],
$module.line]
doDebug(data, line)
else:
template debug*(data: string) = discard

View File

@ -8,12 +8,11 @@
## those terms. ## those terms.
import sequtils, strutils, strformat import sequtils, strutils, strformat
import chronos import chronos, chronicles
import connection, import connection,
varint, varint,
vbuffer, vbuffer,
protocols/protocol, protocols/protocol
helpers/debug
const MsgSize* = 64*1024 const MsgSize* = 64*1024
const Codec* = "/multistream/1.0.0" const Codec* = "/multistream/1.0.0"
@ -46,37 +45,37 @@ proc select*(m: MultisteamSelect,
conn: Connection, conn: Connection,
proto: seq[string]): proto: seq[string]):
Future[string] {.async.} = Future[string] {.async.} =
debug &"select: initiating handshake" debug "select: initiating handshake"
## select a remote protocol ## select a remote protocol
await conn.write(m.codec) # write handshake await conn.write(m.codec) # write handshake
if proto.len() > 0: if proto.len() > 0:
debug &"select: selecting proto {proto}" debug "select: selecting proto", proto = proto
await conn.writeLp((proto[0] & "\n")) # select proto await conn.writeLp((proto[0] & "\n")) # select proto
result = cast[string](await conn.readLp()) # read ms header result = cast[string](await conn.readLp()) # read ms header
result.removeSuffix("\n") result.removeSuffix("\n")
if result != Codec: if result != Codec:
debug &"select: handshake failed" debug "select: handshake failed"
return "" return ""
if proto.len() == 0: # no protocols, must be a handshake call if proto.len() == 0: # no protocols, must be a handshake call
return return
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
debug &"select: reading first requested proto" debug "select: reading first requested proto"
result.removeSuffix("\n") result.removeSuffix("\n")
if result == proto[0]: if result == proto[0]:
debug &"select: succesfully selected {proto}" debug "select: succesfully selected ", proto = proto
return return
if not result.len > 0: if not result.len > 0:
debug &"select: selecting one of several protos" debug "select: selecting one of several protos"
for p in proto[1..<proto.len()]: for p in proto[1..<proto.len()]:
await conn.writeLp((p & "\n")) # select proto await conn.writeLp((p & "\n")) # select proto
result = cast[string](await conn.readLp()) # read the first proto result = cast[string](await conn.readLp()) # read the first proto
result.removeSuffix("\n") result.removeSuffix("\n")
if result == p: if result == p:
debug &"select: selected {result}" debug "select: selected protocol", protocol = result
break break
proc select*(m: MultisteamSelect, proc select*(m: MultisteamSelect,
@ -107,25 +106,25 @@ proc list*(m: MultisteamSelect,
result = list result = list
proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
debug &"select: starting multistream handling" debug "handle: starting multistream handling"
while not conn.closed: while not conn.closed:
block main: block main:
var ms = cast[string](await conn.readLp()) var ms = cast[string](await conn.readLp())
ms.removeSuffix("\n") ms.removeSuffix("\n")
debug &"select: got request for {ms}" debug "handle: got request for ", ms
if ms.len() <= 0: if ms.len() <= 0:
debug &"select: invalid proto" debug "handle: invalid proto"
await conn.write(m.na) await conn.write(m.na)
if m.handlers.len() == 0: if m.handlers.len() == 0:
debug &"select: {ms} is na" debug "handle: sending `na` for protocol ", protocol = ms
await conn.write(m.na) await conn.write(m.na)
continue continue
case ms: case ms:
of "ls": of "ls":
debug &"select: listing protos" debug "handle: listing protos"
var protos = "" var protos = ""
for h in m.handlers: for h in m.handlers:
protos &= (h.proto & "\n") protos &= (h.proto & "\n")
@ -135,14 +134,14 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
else: else:
for h in m.handlers: for h in m.handlers:
if (not isNil(h.match) and h.match(ms)) or ms == h.proto: if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
debug &"select: found handler for {ms}" debug "handle: found handler for", protocol = ms
await conn.writeLp((h.proto & "\n")) await conn.writeLp((h.proto & "\n"))
try: try:
await h.protocol.handler(conn, ms) await h.protocol.handler(conn, ms)
break main break main
except Exception as exc: except Exception as exc:
debug exc.msg # TODO: Logging debug "handle: exception while handling ", msg = exc.msg # TODO: Logging
debug &"select: no handlers for {ms}" debug "handle: no handlers for ", protocol = ms
await conn.write(m.na) await conn.write(m.na)
proc addHandler*[T: LPProtocol](m: MultisteamSelect, proc addHandler*[T: LPProtocol](m: MultisteamSelect,

View File

@ -7,14 +7,14 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import chronos, strformat import strformat
import chronos, chronicles
import types, import types,
coder, coder,
nimcrypto/utils, nimcrypto/utils,
../../stream/bufferstream, ../../stream/bufferstream,
../../stream/lpstream, ../../stream/lpstream,
../../connection, ../../connection
../../helpers/debug
const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb const DefaultChannelSize* = DefaultBufferSize * 64 # 64kb
@ -52,7 +52,7 @@ proc newChannel*(id: uint,
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
# writes should happen in sequence # writes should happen in sequence
await chan.asyncLock.acquire() await chan.asyncLock.acquire()
debug &"writeHandler: sending data {data} from {chan.id}" debug "writeHandler: sending data ", data, id = chan.id
await conn.writeMsg(chan.id, chan.msgCode, data) # write header await conn.writeMsg(chan.id, chan.msgCode, data) # write header
chan.asyncLock.release() chan.asyncLock.release()

View File

@ -8,13 +8,12 @@
## those terms. ## those terms.
import chronos, options, sequtils, strformat import chronos, options, sequtils, strformat
import nimcrypto/utils, chronicles
import types, import types,
../../connection, ../../connection,
../../varint, ../../varint,
../../vbuffer, ../../vbuffer,
../../stream/lpstream, ../../stream/lpstream
nimcrypto/utils,
../../helpers/debug
type type
Msg* = tuple Msg* = tuple
@ -38,7 +37,7 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} =
if res != VarintStatus.Success: if res != VarintStatus.Success:
buffer.setLen(0) buffer.setLen(0)
return return
except TransportIncompleteError, AsyncStreamIncompleteError: except LPStreamIncompleteError:
buffer.setLen(0) buffer.setLen(0)
proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
@ -46,12 +45,12 @@ proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} =
if headerVarint.isNone: if headerVarint.isNone:
return return
debug &"readMsg: read header varint {$headerVarint}" debug "readMsg: read header varint ", varint = headerVarint
let dataLenVarint = await conn.readMplexVarint() let dataLenVarint = await conn.readMplexVarint()
var data: seq[byte] var data: seq[byte]
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint: if dataLenVarint.isSome and dataLenVarint.get() > 0.uint:
debug &"readMsg: read size varint {$dataLenVarint}" debug "readMsg: read size varint ", varint = dataLenVarint
data = await conn.read(dataLenVarint.get().int) data = await conn.read(dataLenVarint.get().int)
let header = headerVarint.get() let header = headerVarint.get()

View File

@ -17,16 +17,15 @@
## here to not forget that this needs to be fixed ASAP. ## here to not forget that this needs to be fixed ASAP.
import tables, sequtils, options, strformat import tables, sequtils, options, strformat
import chronos import chronos, chronicles
import coder, types, channel, import coder, types, channel,
../muxer,
../../varint, ../../varint,
../../connection, ../../connection,
../../vbuffer, ../../vbuffer,
../../protocols/protocol, ../../protocols/protocol,
../../stream/bufferstream, ../../stream/bufferstream,
../../stream/lpstream, ../../stream/lpstream
../muxer,
../../helpers/debug
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
@ -35,9 +34,6 @@ type
currentId*: uint currentId*: uint
maxChannels*: uint maxChannels*: uint
proc newMplexNoSuchChannel(id: uint, msgType: MessageType): ref MplexNoSuchChannel =
result = newException(MplexNoSuchChannel, &"No such channel id {$id} and message {$msgType}")
proc newMplexUnknownMsgError(): ref MplexUnknownMsgError = proc newMplexUnknownMsgError(): ref MplexUnknownMsgError =
result = newException(MplexUnknownMsgError, "Unknown mplex message type") result = newException(MplexUnknownMsgError, "Unknown mplex message type")
@ -71,7 +67,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
if MessageType(msgType) != MessageType.New: if MessageType(msgType) != MessageType.New:
let channels = m.getChannelList(initiator) let channels = m.getChannelList(initiator)
if not channels.contains(id): if not channels.contains(id):
debug &"handle: Channel with id {id} message type {msgType} not found" # debug "handle: Channel with id and msg type ", id = id, msg = msgType
continue continue
channel = channels[id] channel = channels[id]
@ -79,7 +75,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
of MessageType.New: of MessageType.New:
let name = cast[string](data) let name = cast[string](data)
channel = await m.newStreamInternal(false, id, name) channel = await m.newStreamInternal(false, id, name)
debug &"handle: created channel with id {$id} and name {name}" # debug "handle: created channel ", id = id, name = name
if not isNil(m.streamHandler): if not isNil(m.streamHandler):
let handlerFut = m.streamHandler(newConnection(channel)) let handlerFut = m.streamHandler(newConnection(channel))
@ -91,19 +87,19 @@ method handle*(m: Mplex) {.async, gcsafe.} =
proc(udata: pointer) = proc(udata: pointer) =
# TODO: is waitFor() OK here? # TODO: is waitFor() OK here?
channel.cleanUp() channel.cleanUp()
.addCallback(proc(udata: pointer) .addCallback(proc(udata: pointer) =
= debug &"handle: cleaned up channel {$id}")) debug "handle: cleaned up channel ", id = id))
handlerFut.addCallback(cleanUpChan) handlerFut.addCallback(cleanUpChan)
continue continue
of MessageType.MsgIn, MessageType.MsgOut: of MessageType.MsgIn, MessageType.MsgOut:
debug &"handle: pushing data to channel {$id} type {msgType}" debug "handle: pushing data to channel ", id = id, msgType = msgType
await channel.pushTo(data) await channel.pushTo(data)
of MessageType.CloseIn, MessageType.CloseOut: of MessageType.CloseIn, MessageType.CloseOut:
debug &"handle: closing channel {$id} type {msgType}" debug "handle: closing channel ", id = id, msgType = msgType
await channel.closedByRemote() await channel.closedByRemote()
m.getChannelList(initiator).del(id) m.getChannelList(initiator).del(id)
of MessageType.ResetIn, MessageType.ResetOut: of MessageType.ResetIn, MessageType.ResetOut:
debug &"handle: resetting channel {$id} type {msgType}" debug "handle: resetting channel ", id = id
await channel.resetByRemote() await channel.resetByRemote()
break break
else: raise newMplexUnknownMsgError() else: raise newMplexUnknownMsgError()

View File

@ -8,15 +8,14 @@
## those terms. ## those terms.
import options, strformat import options, strformat
import chronos import chronos, chronicles
import ../protobuf/minprotobuf, import ../protobuf/minprotobuf,
../peerinfo, ../peerinfo,
../connection, ../connection,
../peer, ../peer,
../crypto/crypto, ../crypto/crypto,
../multiaddress, ../multiaddress,
../protocols/protocol, ../protocols/protocol
../helpers/debug
const IdentifyCodec* = "/ipfs/id/1.0.0" const IdentifyCodec* = "/ipfs/id/1.0.0"
const IdentifyPushCodec* = "/ipfs/id/push/1.0.0" const IdentifyPushCodec* = "/ipfs/id/push/1.0.0"
@ -108,7 +107,7 @@ proc identify*(p: Identify,
"Invalid or empty message received!") "Invalid or empty message received!")
result = decodeMsg(message) result = decodeMsg(message)
debug &"identify: Identify for remote peer succeded" debug "identify: Identify for remote peer succeded"
if remotePeerInfo.isSome and if remotePeerInfo.isSome and
remotePeerInfo.get().peerId.publicKey != result.pubKey: remotePeerInfo.get().peerId.publicKey != result.pubKey:
debug "identify: Peer's remote public key doesn't match" debug "identify: Peer's remote public key doesn't match"

View File

@ -8,19 +8,19 @@
## those terms. ## those terms.
import tables, sequtils, options, strformat import tables, sequtils, options, strformat
import chronos import chronos, chronicles
import connection, import connection,
transports/transport, transports/transport,
stream/lpstream, stream/lpstream,
multistream, multistream,
protocols/protocol, protocols/protocol,
protocols/secure, protocols/secure/secure, # for plain text
protocols/secure/secio,
peerinfo, peerinfo,
multiaddress, multiaddress,
protocols/identify, protocols/identify,
muxers/muxer, muxers/muxer,
peer, peer
helpers/debug
type type
Switch* = ref object of RootObj Switch* = ref object of RootObj
@ -33,13 +33,13 @@ type
ms*: MultisteamSelect ms*: MultisteamSelect
identity*: Identify identity*: Identify
streamHandler*: StreamHandler streamHandler*: StreamHandler
secureManager*: Secure secureManagers*: seq[Secure]
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
## secure the incoming connection ## secure the incoming connection
# plaintext for now, doesn't do anything # plaintext for now, doesn't do anything
if not (await s.ms.select(conn, s.secureManager.codec)): if (await s.ms.select(conn, s.secureManagers.mapIt(it.codec))).len == 0:
raise newException(CatchableError, "Unable to negotiate a secure channel!") raise newException(CatchableError, "Unable to negotiate a secure channel!")
result = conn result = conn
@ -61,11 +61,11 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} =
peerInfo.peerId = PeerID.init(info.pubKey) # we might not have a peerId at all peerInfo.peerId = PeerID.init(info.pubKey) # we might not have a peerId at all
peerInfo.addrs = info.addrs peerInfo.addrs = info.addrs
peerInfo.protocols = info.protos peerInfo.protocols = info.protos
debug &"identify: identified remote peer {peerInfo.peerId.pretty}" debug "identify: identified remote peer ", peer = peerInfo.peerId.pretty
except IdentityInvalidMsgError as exc: except IdentityInvalidMsgError as exc:
debug exc.msg debug "identify: invalid message", msg = exc.msg
except IdentityNoMatchError as exc: except IdentityNoMatchError as exc:
debug exc.msg debug "identify: peer's public keys don't match ", msg = exc.msg
proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
## mux incoming connection ## mux incoming connection
@ -87,7 +87,7 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
handlerFut.addCallback( handlerFut.addCallback(
proc(udata: pointer = nil) {.gcsafe.} = proc(udata: pointer = nil) {.gcsafe.} =
if handlerFut.finished: if handlerFut.finished:
debug &"Muxer handler completed for peer {conn.peerInfo.get().peerId.pretty}" debug "mux: Muxer handler completed for peer ", peer = conn.peerInfo.get().peerId.pretty
) )
await s.identify(stream) await s.identify(stream)
await stream.close() # close idenity stream await stream.close() # close idenity stream
@ -141,9 +141,9 @@ proc dial*(s: Switch,
if s.muxed.contains(peer.peerId.pretty): if s.muxed.contains(peer.peerId.pretty):
result = await s.muxed[peer.peerId.pretty].newStream() result = await s.muxed[peer.peerId.pretty].newStream()
debug &"dial: attempting to select remote proto {proto}" debug "dial: attempting to select remote ", proto = proto
if not (await s.ms.select(result, proto)): if not (await s.ms.select(result, proto)):
debug &"dial: Unable to select protocol: {proto}" debug "dial: Unable to select protocol: ", proto = proto
raise newException(CatchableError, raise newException(CatchableError,
&"Unable to select protocol: {proto}") &"Unable to select protocol: {proto}")
@ -177,7 +177,8 @@ proc stop*(s: Switch) {.async.} =
proc newSwitch*(peerInfo: PeerInfo, proc newSwitch*(peerInfo: PeerInfo,
transports: seq[Transport], transports: seq[Transport],
identity: Identify, identity: Identify,
muxers: Table[string, MuxerProvider]): Switch = muxers: Table[string, MuxerProvider],
secureManagers: seq[Secure] = @[]): Switch =
new result new result
result.peerInfo = peerInfo result.peerInfo = peerInfo
result.ms = newMultistream() result.ms = newMultistream()
@ -199,5 +200,10 @@ proc newSwitch*(peerInfo: PeerInfo,
val.streamHandler = result.streamHandler val.streamHandler = result.streamHandler
result.mount(val) result.mount(val)
result.secureManager = Secure(newPlainText()) for s in secureManagers:
result.mount(result.secureManager) result.secureManagers.add(s)
result.mount(s)
if result.secureManagers.len == 0:
# use plain text if no secure managers are provided
result.mount(Secure(newPlainText()))

View File

@ -1,5 +1,5 @@
import unittest, sequtils, sugar, strformat, options import unittest, sequtils, sugar, strformat, options, strformat
import chronos, nimcrypto/utils import chronos, nimcrypto/utils, chronicles
import ../libp2p/connection, import ../libp2p/connection,
../libp2p/stream/lpstream, ../libp2p/stream/lpstream,
../libp2p/stream/bufferstream, ../libp2p/stream/bufferstream,
@ -10,8 +10,7 @@ import ../libp2p/connection,
../libp2p/muxers/mplex/mplex, ../libp2p/muxers/mplex/mplex,
../libp2p/muxers/mplex/coder, ../libp2p/muxers/mplex/coder,
../libp2p/muxers/mplex/types, ../libp2p/muxers/mplex/types,
../libp2p/muxers/mplex/channel, ../libp2p/muxers/mplex/channel
../libp2p/helpers/debug
suite "Mplex": suite "Mplex":
test "encode header with channel id 0": test "encode header with channel id 0":

View File

@ -2,4 +2,4 @@ import unittest
import testvarint, testbase32, testbase58, testbase64 import testvarint, testbase32, testbase58, testbase64
import testrsa, testecnist, tested25519, testsecp256k1, testcrypto import testrsa, testecnist, tested25519, testsecp256k1, testcrypto
import testmultibase, testmultihash, testmultiaddress, testcid, testpeer import testmultibase, testmultihash, testmultiaddress, testcid, testpeer
import testidentify, testtransport, testmultistream, testbufferstream, testmplex import testidentify, testtransport, testmultistream, testbufferstream, testmplex, testswitch

View File

@ -1,5 +1,5 @@
import unittest, tables import unittest, tables
import chronos import chronos, chronicles
import ../libp2p/switch, import ../libp2p/switch,
../libp2p/multistream, ../libp2p/multistream,
../libp2p/protocols/identify, ../libp2p/protocols/identify,
@ -12,8 +12,7 @@ import ../libp2p/switch,
../libp2p/protocols/protocol, ../libp2p/protocols/protocol,
../libp2p/muxers/muxer, ../libp2p/muxers/muxer,
../libp2p/muxers/mplex/mplex, ../libp2p/muxers/mplex/mplex,
../libp2p/muxers/mplex/types, ../libp2p/muxers/mplex/types
../libp2p/helpers/debug
const TestCodec = "/test/proto/1.0.0" const TestCodec = "/test/proto/1.0.0"