logging cleanups and small fixes (#331)
This commit is contained in:
parent
397f9edfd4
commit
ab864fc747
|
@ -122,7 +122,7 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
|
||||||
##
|
##
|
||||||
|
|
||||||
await conn.join()
|
await conn.join()
|
||||||
trace "triggering connection cleanup"
|
trace "triggering connection cleanup", peer = $conn.peerInfo
|
||||||
await c.cleanupConn(conn)
|
await c.cleanupConn(conn)
|
||||||
|
|
||||||
proc selectConn*(c: ConnManager,
|
proc selectConn*(c: ConnManager,
|
||||||
|
@ -145,6 +145,8 @@ proc selectConn*(c: ConnManager, peerId: PeerID): Connection =
|
||||||
var conn = c.selectConn(peerId, Direction.Out)
|
var conn = c.selectConn(peerId, Direction.Out)
|
||||||
if isNil(conn):
|
if isNil(conn):
|
||||||
conn = c.selectConn(peerId, Direction.In)
|
conn = c.selectConn(peerId, Direction.In)
|
||||||
|
if isNil(conn):
|
||||||
|
trace "connection not found", peerId
|
||||||
|
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
@ -157,6 +159,8 @@ proc selectMuxer*(c: ConnManager, conn: Connection): Muxer =
|
||||||
|
|
||||||
if conn in c.muxed:
|
if conn in c.muxed:
|
||||||
return c.muxed[conn].muxer
|
return c.muxed[conn].muxer
|
||||||
|
else:
|
||||||
|
debug "no muxer for connection", conn = $conn
|
||||||
|
|
||||||
proc storeConn*(c: ConnManager, conn: Connection) =
|
proc storeConn*(c: ConnManager, conn: Connection) =
|
||||||
## store a connection
|
## store a connection
|
||||||
|
@ -171,8 +175,7 @@ proc storeConn*(c: ConnManager, conn: Connection) =
|
||||||
let peerId = conn.peerInfo.peerId
|
let peerId = conn.peerInfo.peerId
|
||||||
if c.conns.getOrDefault(peerId).len > c.maxConns:
|
if c.conns.getOrDefault(peerId).len > c.maxConns:
|
||||||
trace "too many connections", peer = $peerId,
|
trace "too many connections", peer = $peerId,
|
||||||
conns = c.conns
|
conns = c.conns.getOrDefault(peerId).len
|
||||||
.getOrDefault(peerId).len
|
|
||||||
|
|
||||||
raise newTooManyConnections()
|
raise newTooManyConnections()
|
||||||
|
|
||||||
|
@ -185,6 +188,8 @@ proc storeConn*(c: ConnManager, conn: Connection) =
|
||||||
asyncCheck c.onClose(conn)
|
asyncCheck c.onClose(conn)
|
||||||
libp2p_peers.set(c.conns.len.int64)
|
libp2p_peers.set(c.conns.len.int64)
|
||||||
|
|
||||||
|
trace "stored connection", connections = c.conns.len, peer = peerId
|
||||||
|
|
||||||
proc storeOutgoing*(c: ConnManager, conn: Connection) =
|
proc storeOutgoing*(c: ConnManager, conn: Connection) =
|
||||||
conn.dir = Direction.Out
|
conn.dir = Direction.Out
|
||||||
c.storeConn(conn)
|
c.storeConn(conn)
|
||||||
|
@ -209,7 +214,7 @@ proc storeMuxer*(c: ConnManager,
|
||||||
muxer: muxer,
|
muxer: muxer,
|
||||||
handle: handle)
|
handle: handle)
|
||||||
|
|
||||||
trace "stored connection", connections = c.conns.len
|
trace "stored muxer", connections = c.conns.len
|
||||||
|
|
||||||
proc getMuxedStream*(c: ConnManager,
|
proc getMuxedStream*(c: ConnManager,
|
||||||
peerId: PeerID,
|
peerId: PeerID,
|
||||||
|
|
|
@ -174,24 +174,19 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
|
||||||
finally:
|
finally:
|
||||||
trace "leaving multistream loop"
|
trace "leaving multistream loop"
|
||||||
|
|
||||||
proc addHandler*[T: LPProtocol](m: MultistreamSelect,
|
proc addHandler*(m: MultistreamSelect,
|
||||||
codec: string,
|
codec: string,
|
||||||
protocol: T,
|
protocol: LPProtocol,
|
||||||
matcher: Matcher = nil) =
|
matcher: Matcher = nil) =
|
||||||
## register a protocol
|
## register a protocol
|
||||||
# TODO: This is a bug in chronicles,
|
trace "registering protocol", codec = codec
|
||||||
# it break if I uncomment this line.
|
|
||||||
# Which is almost the same as the
|
|
||||||
# one on the next override of addHandler
|
|
||||||
#
|
|
||||||
# trace "registering protocol", codec = codec
|
|
||||||
m.handlers.add(HandlerHolder(proto: codec,
|
m.handlers.add(HandlerHolder(proto: codec,
|
||||||
protocol: protocol,
|
protocol: protocol,
|
||||||
match: matcher))
|
match: matcher))
|
||||||
|
|
||||||
proc addHandler*[T: LPProtoHandler](m: MultistreamSelect,
|
proc addHandler*(m: MultistreamSelect,
|
||||||
codec: string,
|
codec: string,
|
||||||
handler: T,
|
handler: LPProtoHandler,
|
||||||
matcher: Matcher = nil) =
|
matcher: Matcher = nil) =
|
||||||
## helper to allow registering pure handlers
|
## helper to allow registering pure handlers
|
||||||
|
|
||||||
|
|
|
@ -103,8 +103,10 @@ proc replenishFanout(g: GossipSub, topic: string) =
|
||||||
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
logScope:
|
logScope:
|
||||||
topic
|
topic
|
||||||
|
mesh = g.mesh.peers(topic)
|
||||||
|
gossipsub = g.gossipsub.peers(topic)
|
||||||
|
|
||||||
trace "about to rebalance mesh"
|
trace "rebalancing mesh"
|
||||||
|
|
||||||
# create a mesh topic that we're subscribing to
|
# create a mesh topic that we're subscribing to
|
||||||
|
|
||||||
|
@ -119,16 +121,12 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
)
|
)
|
||||||
|
|
||||||
logScope:
|
|
||||||
meshPeers = g.mesh.peers(topic)
|
|
||||||
grafts = grafts.len
|
|
||||||
|
|
||||||
shuffle(grafts)
|
shuffle(grafts)
|
||||||
|
|
||||||
# Graft peers so we reach a count of D
|
# Graft peers so we reach a count of D
|
||||||
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
||||||
|
|
||||||
trace "getting peers", topic, peers = grafts.len
|
trace "grafting", grafts = grafts.len
|
||||||
|
|
||||||
for peer in grafts:
|
for peer in grafts:
|
||||||
if g.mesh.addPeer(topic, peer):
|
if g.mesh.addPeer(topic, peer):
|
||||||
|
@ -140,7 +138,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
shuffle(prunes)
|
shuffle(prunes)
|
||||||
prunes.setLen(prunes.len - GossipSubD) # .. down to D peers
|
prunes.setLen(prunes.len - GossipSubD) # .. down to D peers
|
||||||
|
|
||||||
trace "about to prune mesh", prunes = prunes.len
|
trace "pruning", prunes = prunes.len
|
||||||
for peer in prunes:
|
for peer in prunes:
|
||||||
g.mesh.removePeer(topic, peer)
|
g.mesh.removePeer(topic, peer)
|
||||||
|
|
||||||
|
@ -154,13 +152,15 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
libp2p_gossipsub_peers_per_topic_mesh
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||||
|
|
||||||
# Send changes to peers after table updates to avoid stale state
|
trace "mesh balanced"
|
||||||
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
|
||||||
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
|
||||||
discard await g.broadcast(grafts, graft, DefaultSendTimeout)
|
|
||||||
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
|
|
||||||
|
|
||||||
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
|
# Send changes to peers after table updates to avoid stale state
|
||||||
|
if grafts.len > 0:
|
||||||
|
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
||||||
|
discard await g.broadcast(grafts, graft, DefaultSendTimeout)
|
||||||
|
if prunes.len > 0:
|
||||||
|
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
|
||||||
|
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
|
||||||
|
|
||||||
proc dropFanoutPeers(g: GossipSub) =
|
proc dropFanoutPeers(g: GossipSub) =
|
||||||
# drop peers that we haven't published to in
|
# drop peers that we haven't published to in
|
||||||
|
@ -506,10 +506,10 @@ method publish*(g: GossipSub,
|
||||||
discard await procCall PubSub(g).publish(topic, data, timeout)
|
discard await procCall PubSub(g).publish(topic, data, timeout)
|
||||||
trace "publishing message on topic", topic, data = data.shortLog
|
trace "publishing message on topic", topic, data = data.shortLog
|
||||||
|
|
||||||
var peers: HashSet[PubSubPeer]
|
|
||||||
if topic.len <= 0: # data could be 0/empty
|
if topic.len <= 0: # data could be 0/empty
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
var peers: HashSet[PubSubPeer]
|
||||||
if topic in g.topics: # if we're subscribed use the mesh
|
if topic in g.topics: # if we're subscribed use the mesh
|
||||||
peers = g.mesh.getOrDefault(topic)
|
peers = g.mesh.getOrDefault(topic)
|
||||||
else: # not subscribed, send to fanout peers
|
else: # not subscribed, send to fanout peers
|
||||||
|
@ -537,6 +537,7 @@ method publish*(g: GossipSub,
|
||||||
if msgId notin g.mcache:
|
if msgId notin g.mcache:
|
||||||
g.mcache.put(msgId, msg)
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
|
if peers.len > 0:
|
||||||
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
|
let published = await g.broadcast(toSeq(peers), RPCMsg(messages: @[msg]), timeout)
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
if published > 0:
|
if published > 0:
|
||||||
|
@ -545,6 +546,9 @@ method publish*(g: GossipSub,
|
||||||
trace "published message to peers", peers = published,
|
trace "published message to peers", peers = published,
|
||||||
msg = msg.shortLog()
|
msg = msg.shortLog()
|
||||||
return published
|
return published
|
||||||
|
else:
|
||||||
|
debug "No peers for gossip message", topic, msg = msg.shortLog()
|
||||||
|
return 0
|
||||||
|
|
||||||
method start*(g: GossipSub) {.async.} =
|
method start*(g: GossipSub) {.async.} =
|
||||||
trace "gossipsub start"
|
trace "gossipsub start"
|
||||||
|
|
|
@ -80,13 +80,14 @@ proc send*(
|
||||||
## send to remote peer
|
## send to remote peer
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "sending pubsub message to peer", peer = $peer, msg = msg
|
trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg)
|
||||||
try:
|
try:
|
||||||
await peer.send(msg, timeout)
|
await peer.send(msg, timeout)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception sending pubsub message to peer", peer = $peer, msg = msg
|
trace "exception sending pubsub message to peer",
|
||||||
|
peer = $peer, msg = shortLog(msg)
|
||||||
p.unsubscribePeer(peer.peerId)
|
p.unsubscribePeer(peer.peerId)
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
@ -98,11 +99,11 @@ proc broadcast*(
|
||||||
## send messages and cleanup failed peers
|
## send messages and cleanup failed peers
|
||||||
##
|
##
|
||||||
|
|
||||||
trace "broadcasting messages to peers", peers = sendPeers.len, message = msg
|
trace "broadcasting messages to peers",
|
||||||
|
peers = sendPeers.len, message = shortLog(msg)
|
||||||
let sent = await allFinished(
|
let sent = await allFinished(
|
||||||
sendPeers.mapIt( p.send(it, msg, timeout) ))
|
sendPeers.mapIt( p.send(it, msg, timeout) ))
|
||||||
return sent.filterIt( it.finished and it.error.isNil ).len
|
return sent.filterIt( it.finished and it.error.isNil ).len
|
||||||
trace "messages broadcasted to peers", peers = sent.len
|
|
||||||
|
|
||||||
proc sendSubs*(p: PubSub,
|
proc sendSubs*(p: PubSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
|
|
|
@ -77,7 +77,7 @@ method initStream*(s: Connection) =
|
||||||
s.timeoutHandler = proc() {.async.} =
|
s.timeoutHandler = proc() {.async.} =
|
||||||
await s.close()
|
await s.close()
|
||||||
|
|
||||||
trace "timeout set at", timeout = $s.timeout.millis
|
trace "timeout set at", timeout = s.timeout.millis
|
||||||
doAssert(isNil(s.timerTaskFut))
|
doAssert(isNil(s.timerTaskFut))
|
||||||
# doAssert(s.timeout > 0.millis)
|
# doAssert(s.timeout > 0.millis)
|
||||||
if s.timeout > 0.millis:
|
if s.timeout > 0.millis:
|
||||||
|
|
|
@ -7,7 +7,8 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import oids
|
import std/oids
|
||||||
|
import stew/byteutils
|
||||||
import chronicles, chronos, metrics
|
import chronicles, chronos, metrics
|
||||||
import ../varint,
|
import ../varint,
|
||||||
../vbuffer,
|
../vbuffer,
|
||||||
|
@ -16,6 +17,8 @@ import ../varint,
|
||||||
|
|
||||||
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
|
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
|
||||||
|
|
||||||
|
export oids
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "lpstream"
|
topics = "lpstream"
|
||||||
|
|
||||||
|
@ -182,6 +185,9 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, gcsafe.} =
|
||||||
await s.readExactly(addr res[0], res.len)
|
await s.readExactly(addr res[0], res.len)
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base.} =
|
||||||
|
doAssert(false, "not implemented!")
|
||||||
|
|
||||||
proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
||||||
## write length prefixed
|
## write length prefixed
|
||||||
var buf = initVBuffer()
|
var buf = initVBuffer()
|
||||||
|
@ -189,14 +195,11 @@ proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} =
|
||||||
buf.finish()
|
buf.finish()
|
||||||
s.write(buf.buffer)
|
s.write(buf.buffer)
|
||||||
|
|
||||||
method write*(s: LPStream, msg: seq[byte]) {.base, async.} =
|
|
||||||
doAssert(false, "not implemented!")
|
|
||||||
|
|
||||||
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
|
proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecated: "seq".} =
|
||||||
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
|
s.write(@(toOpenArray(cast[ptr UncheckedArray[byte]](pbytes), 0, nbytes - 1)))
|
||||||
|
|
||||||
proc write*(s: LPStream, msg: string): Future[void] =
|
proc write*(s: LPStream, msg: string): Future[void] =
|
||||||
s.write(@(toOpenArrayByte(msg, 0, msg.high)))
|
s.write(msg.toBytes())
|
||||||
|
|
||||||
# TODO: split `close` into `close` and `dispose/destroy`
|
# TODO: split `close` into `close` and `dispose/destroy`
|
||||||
method close*(s: LPStream) {.base, async.} =
|
method close*(s: LPStream) {.base, async.} =
|
||||||
|
|
|
@ -15,7 +15,7 @@ import ../libp2p/[errors,
|
||||||
|
|
||||||
import ./helpers
|
import ./helpers
|
||||||
|
|
||||||
when defined(nimHasUsed): {.used.}
|
{.used.}
|
||||||
|
|
||||||
suite "Mplex":
|
suite "Mplex":
|
||||||
teardown:
|
teardown:
|
||||||
|
@ -28,8 +28,7 @@ suite "Mplex":
|
||||||
proc encHandler(msg: seq[byte]) {.async.} =
|
proc encHandler(msg: seq[byte]) {.async.} =
|
||||||
check msg == fromHex("000873747265616d2031")
|
check msg == fromHex("000873747265616d2031")
|
||||||
|
|
||||||
let stream = newBufferStream(encHandler)
|
let conn = newBufferStream(encHandler)
|
||||||
let conn = stream
|
|
||||||
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
|
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
@ -40,8 +39,7 @@ suite "Mplex":
|
||||||
proc encHandler(msg: seq[byte]) {.async.} =
|
proc encHandler(msg: seq[byte]) {.async.} =
|
||||||
check msg == fromHex("88010873747265616d2031")
|
check msg == fromHex("88010873747265616d2031")
|
||||||
|
|
||||||
let stream = newBufferStream(encHandler)
|
let conn = newBufferStream(encHandler)
|
||||||
let conn = stream
|
|
||||||
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
|
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
@ -52,8 +50,7 @@ suite "Mplex":
|
||||||
proc encHandler(msg: seq[byte]) {.async.} =
|
proc encHandler(msg: seq[byte]) {.async.} =
|
||||||
check msg == fromHex("020873747265616d2031")
|
check msg == fromHex("020873747265616d2031")
|
||||||
|
|
||||||
let stream = newBufferStream(encHandler)
|
let conn = newBufferStream(encHandler)
|
||||||
let conn = stream
|
|
||||||
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
|
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
@ -64,8 +61,7 @@ suite "Mplex":
|
||||||
proc encHandler(msg: seq[byte]) {.async.} =
|
proc encHandler(msg: seq[byte]) {.async.} =
|
||||||
check msg == fromHex("8a010873747265616d2031")
|
check msg == fromHex("8a010873747265616d2031")
|
||||||
|
|
||||||
let stream = newBufferStream(encHandler)
|
let conn = newBufferStream(encHandler)
|
||||||
let conn = stream
|
|
||||||
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
|
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue