misc cleanups (#303)
This commit is contained in:
parent
d544b64010
commit
e655a510cd
|
@ -129,16 +129,16 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
|
||||||
|
|
||||||
if not handshaked and ms != Codec:
|
if not handshaked and ms != Codec:
|
||||||
error "expected handshake message", instead=ms
|
error "expected handshake message", instead=ms
|
||||||
raise newException(CatchableError,
|
raise newException(CatchableError,
|
||||||
"MultistreamSelect handling failed, invalid first message")
|
"MultistreamSelect handling failed, invalid first message")
|
||||||
|
|
||||||
trace "handle: got request for ", ms
|
trace "handle: got request", ms
|
||||||
if ms.len() <= 0:
|
if ms.len() <= 0:
|
||||||
trace "handle: invalid proto"
|
trace "handle: invalid proto"
|
||||||
await conn.write(Na)
|
await conn.write(Na)
|
||||||
|
|
||||||
if m.handlers.len() == 0:
|
if m.handlers.len() == 0:
|
||||||
trace "handle: sending `na` for protocol ", protocol = ms
|
trace "handle: sending `na` for protocol", protocol = ms
|
||||||
await conn.write(Na)
|
await conn.write(Na)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -159,11 +159,11 @@ proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.asy
|
||||||
else:
|
else:
|
||||||
for h in m.handlers:
|
for h in m.handlers:
|
||||||
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
|
if (not isNil(h.match) and h.match(ms)) or ms == h.proto:
|
||||||
trace "found handler for", protocol = ms
|
trace "found handler", protocol = ms
|
||||||
await conn.writeLp((h.proto & "\n"))
|
await conn.writeLp((h.proto & "\n"))
|
||||||
await h.protocol.handler(conn, ms)
|
await h.protocol.handler(conn, ms)
|
||||||
return
|
return
|
||||||
debug "no handlers for ", protocol = ms
|
debug "no handlers", protocol = ms
|
||||||
await conn.write(Na)
|
await conn.write(Na)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
|
@ -81,7 +81,7 @@ template withEOFExceptions(body: untyped): untyped =
|
||||||
|
|
||||||
proc cleanupTimer(s: LPChannel) {.async.} =
|
proc cleanupTimer(s: LPChannel) {.async.} =
|
||||||
## cleanup timers
|
## cleanup timers
|
||||||
if not s.timerTaskFut.finished:
|
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
|
||||||
await s.timerTaskFut.cancelAndWait()
|
await s.timerTaskFut.cancelAndWait()
|
||||||
|
|
||||||
proc closeMessage(s: LPChannel) {.async.} =
|
proc closeMessage(s: LPChannel) {.async.} =
|
||||||
|
|
|
@ -106,10 +106,10 @@ method init*(p: Identify) =
|
||||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||||
try:
|
try:
|
||||||
defer:
|
defer:
|
||||||
trace "exiting identify handler", oid = conn.oid
|
trace "exiting identify handler", oid = $conn.oid
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
trace "handling identify request", oid = conn.oid
|
trace "handling identify request", oid = $conn.oid
|
||||||
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
var pb = encodeMsg(p.peerInfo, conn.observedAddr)
|
||||||
await conn.writeLp(pb.buffer)
|
await conn.writeLp(pb.buffer)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
|
|
|
@ -228,9 +228,9 @@ proc heartbeat(g: GossipSub) {.async.} =
|
||||||
|
|
||||||
let peers = g.getGossipPeers()
|
let peers = g.getGossipPeers()
|
||||||
var sent: seq[Future[void]]
|
var sent: seq[Future[void]]
|
||||||
for peer in peers.keys:
|
for peer, control in peers:
|
||||||
if peer in g.peers:
|
g.peers.withValue(peer, pubsubPeer) do:
|
||||||
sent &= g.peers[peer].send(RPCMsg(control: some(peers[peer])))
|
sent &= pubsubPeer[].send(RPCMsg(control: some(control)))
|
||||||
checkFutures(await allFinished(sent))
|
checkFutures(await allFinished(sent))
|
||||||
|
|
||||||
g.mcache.shift() # shift the cache
|
g.mcache.shift() # shift the cache
|
||||||
|
|
|
@ -184,9 +184,8 @@ method handleConn*(p: PubSub,
|
||||||
|
|
||||||
let peer = p.getOrCreatePeer(conn.peerInfo, proto)
|
let peer = p.getOrCreatePeer(conn.peerInfo, proto)
|
||||||
|
|
||||||
let topics = toSeq(p.topics.keys)
|
if p.topics.len > 0:
|
||||||
if topics.len > 0:
|
await p.sendSubs(peer, toSeq(p.topics.keys), true)
|
||||||
await p.sendSubs(peer, topics, true)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
peer.handler = handler
|
peer.handler = handler
|
||||||
|
|
|
@ -195,7 +195,7 @@ proc decodeMessage*(pb: ProtoBuffer): ProtoResult[Message] {.inline.} =
|
||||||
else:
|
else:
|
||||||
trace "decodeMessage: data is missing"
|
trace "decodeMessage: data is missing"
|
||||||
if ? pb.getField(3, msg.seqno):
|
if ? pb.getField(3, msg.seqno):
|
||||||
trace "decodeMessage: read seqno", seqno = msg.data.shortLog()
|
trace "decodeMessage: read seqno", seqno = msg.seqno
|
||||||
else:
|
else:
|
||||||
trace "decodeMessage: seqno is missing"
|
trace "decodeMessage: seqno is missing"
|
||||||
if ? pb.getRepeatedField(4, msg.topicIDs):
|
if ? pb.getRepeatedField(4, msg.topicIDs):
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
|
import oids
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
import connection
|
import connection
|
||||||
|
|
||||||
|
@ -73,7 +74,7 @@ method close*(s: ChronosStream) {.async.} =
|
||||||
try:
|
try:
|
||||||
if not s.isClosed:
|
if not s.isClosed:
|
||||||
trace "shutting down chronos stream", address = $s.client.remoteAddress(),
|
trace "shutting down chronos stream", address = $s.client.remoteAddress(),
|
||||||
oid = s.oid
|
oid = $s.oid
|
||||||
if not s.client.closed():
|
if not s.client.closed():
|
||||||
await s.client.closeWait()
|
await s.client.closeWait()
|
||||||
|
|
||||||
|
|
|
@ -8,13 +8,16 @@
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import hashes
|
import hashes
|
||||||
import chronos, metrics
|
import chronicles, chronos, metrics
|
||||||
import lpstream,
|
import lpstream,
|
||||||
../multiaddress,
|
../multiaddress,
|
||||||
../peerinfo
|
../peerinfo
|
||||||
|
|
||||||
export lpstream
|
export lpstream
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "connection"
|
||||||
|
|
||||||
const
|
const
|
||||||
ConnectionTrackerName* = "libp2p.connection"
|
ConnectionTrackerName* = "libp2p.connection"
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,9 @@ import ../varint,
|
||||||
|
|
||||||
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
|
declareGauge(libp2p_open_streams, "open stream instances", labels = ["type"])
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "lpstream"
|
||||||
|
|
||||||
type
|
type
|
||||||
LPStream* = ref object of RootObj
|
LPStream* = ref object of RootObj
|
||||||
closeEvent*: AsyncEvent
|
closeEvent*: AsyncEvent
|
||||||
|
|
|
@ -167,12 +167,11 @@ proc mux(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
## mux incoming connection
|
## mux incoming connection
|
||||||
|
|
||||||
trace "muxing connection", peer = $conn
|
trace "muxing connection", peer = $conn
|
||||||
let muxers = toSeq(s.muxers.keys)
|
if s.muxers.len == 0:
|
||||||
if muxers.len == 0:
|
|
||||||
warn "no muxers registered, skipping upgrade flow"
|
warn "no muxers registered, skipping upgrade flow"
|
||||||
return
|
return
|
||||||
|
|
||||||
let muxerName = await s.ms.select(conn, muxers)
|
let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys()))
|
||||||
if muxerName.len == 0 or muxerName == "na":
|
if muxerName.len == 0 or muxerName == "na":
|
||||||
debug "no muxer available, early exit", peer = $conn
|
debug "no muxer available, early exit", peer = $conn
|
||||||
return
|
return
|
||||||
|
@ -387,7 +386,9 @@ proc dial*(s: Switch,
|
||||||
await conn.close()
|
await conn.close()
|
||||||
raise newException(CatchableError, "Couldn't get muxed stream")
|
raise newException(CatchableError, "Couldn't get muxed stream")
|
||||||
|
|
||||||
trace "Attempting to select remote", proto = proto, oid = conn.oid
|
trace "Attempting to select remote", proto = proto,
|
||||||
|
streamOid = $stream.oid,
|
||||||
|
oid = $conn.oid
|
||||||
if not await s.ms.select(stream, proto):
|
if not await s.ms.select(stream, proto):
|
||||||
await stream.close()
|
await stream.close()
|
||||||
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
|
raise newException(CatchableError, "Unable to select sub-protocol " & proto)
|
||||||
|
@ -498,7 +499,7 @@ proc subscribePeerInternal(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
|
||||||
if not(isNil(stream)):
|
if not(isNil(stream)):
|
||||||
await stream.close()
|
await stream.close()
|
||||||
|
|
||||||
proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} =
|
proc pubsubMonitor(s: Switch, peer: PeerInfo) {.async.} =
|
||||||
## while peer connected maintain a
|
## while peer connected maintain a
|
||||||
## pubsub connection as well
|
## pubsub connection as well
|
||||||
##
|
##
|
||||||
|
@ -506,11 +507,11 @@ proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} =
|
||||||
var tries = 0
|
var tries = 0
|
||||||
var backoffFactor = 5 # up to ~10 mins
|
var backoffFactor = 5 # up to ~10 mins
|
||||||
var backoff = 1.seconds
|
var backoff = 1.seconds
|
||||||
while switch.isConnected(peer) and
|
while s.isConnected(peer) and
|
||||||
tries < MaxPubsubReconnectAttempts:
|
tries < MaxPubsubReconnectAttempts:
|
||||||
try:
|
try:
|
||||||
debug "subscribing to pubsub peer", peer = $peer
|
debug "subscribing to pubsub peer", peer = $peer
|
||||||
await switch.subscribePeerInternal(peer)
|
await s.subscribePeerInternal(peer)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
|
import oids
|
||||||
import chronos, chronicles, sequtils
|
import chronos, chronicles, sequtils
|
||||||
import transport,
|
import transport,
|
||||||
../errors,
|
../errors,
|
||||||
|
@ -71,7 +72,7 @@ proc connHandler*(t: TcpTransport,
|
||||||
proc cleanup() {.async.} =
|
proc cleanup() {.async.} =
|
||||||
try:
|
try:
|
||||||
await client.join()
|
await client.join()
|
||||||
trace "cleaning up client", addrs = $client.remoteAddress, connoid = conn.oid
|
trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid
|
||||||
if not(isNil(conn)):
|
if not(isNil(conn)):
|
||||||
await conn.close()
|
await conn.close()
|
||||||
t.clients.keepItIf(it != client)
|
t.clients.keepItIf(it != client)
|
||||||
|
|
Loading…
Reference in New Issue