chore: cleanups (#1092)

* remove cruft
* remove redundant error handling (reduces warnings)
* remove redundant copying
This commit is contained in:
Jacek Sieka 2024-05-08 14:33:26 +02:00 committed by GitHub
parent 88e233db81
commit 21cbe3a91a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 47 additions and 48 deletions

View File

@ -7,7 +7,6 @@ if dirExists("nimbledeps/pkgs2"):
switch("warning", "CaseTransition:off") switch("warning", "CaseTransition:off")
switch("warning", "ObservableStores:off") switch("warning", "ObservableStores:off")
switch("warning", "LockLevel:off") switch("warning", "LockLevel:off")
--define:chronosStrictException
--styleCheck:usages --styleCheck:usages
switch("warningAsError", "UseBase:on") switch("warningAsError", "UseBase:on")
--styleCheck:error --styleCheck:error

View File

@ -13,8 +13,8 @@
{.push public.} {.push public.}
import pkg/chronos, chronicles import pkg/chronos, chronicles
import std/[nativesockets, hashes] import std/[nativesockets, net, hashes]
import tables, strutils, sets, stew/shims/net import tables, strutils, sets
import multicodec, multihash, multibase, transcoder, vbuffer, peerid, import multicodec, multihash, multibase, transcoder, vbuffer, peerid,
protobuf/minprotobuf, errors, utility protobuf/minprotobuf, errors, utility
import stew/[base58, base32, endians2, results] import stew/[base58, base32, endians2, results]

View File

@ -266,6 +266,7 @@ proc addHandler*[E](
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} = proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])` # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([])`
# TODO https://github.com/nim-lang/Nim/issues/23445
var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len) var futs = newSeqOfCap[Future[void].Raising([CancelledError])](m.handlers.len)
for it in m.handlers: for it in m.handlers:
futs.add it.protocol.start() futs.add it.protocol.start()
@ -278,7 +279,7 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
doAssert m.handlers.len == futs.len, "Handlers modified while starting" doAssert m.handlers.len == futs.len, "Handlers modified while starting"
for i, fut in futs: for i, fut in futs:
if not fut.finished: if not fut.finished:
pending.add noCancel fut.cancelAndWait() pending.add fut.cancelAndWait()
elif fut.completed: elif fut.completed:
pending.add m.handlers[i].protocol.stop() pending.add m.handlers[i].protocol.stop()
else: else:
@ -286,7 +287,6 @@ proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
await noCancel allFutures(pending) await noCancel allFutures(pending)
raise exc raise exc
proc stop*(m: MultistreamSelect) {.async: (raises: []).} = proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])` # Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len) var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)

View File

@ -30,7 +30,7 @@ declareGauge(libp2p_gossipsub_healthy_peers_topics, "number of topics in mesh wi
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"]) declareGauge(libp2p_gossipsub_received_iwants, "received iwants", labels = ["kind"])
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [].} = proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) =
g.withPeerStats(p.peerId) do (stats: var PeerStats): g.withPeerStats(p.peerId) do (stats: var PeerStats):
var info = stats.topicInfos.getOrDefault(topic) var info = stats.topicInfos.getOrDefault(topic)
info.graftTime = Moment.now() info.graftTime = Moment.now()
@ -46,7 +46,7 @@ proc pruned*(g: GossipSub,
p: PubSubPeer, p: PubSubPeer,
topic: string, topic: string,
setBackoff: bool = true, setBackoff: bool = true,
backoff = none(Duration)) {.raises: [].} = backoff = none(Duration)) =
if setBackoff: if setBackoff:
let let
backoffDuration = backoff.get(g.parameters.pruneBackoff) backoffDuration = backoff.get(g.parameters.pruneBackoff)
@ -70,7 +70,7 @@ proc pruned*(g: GossipSub,
trace "pruned", peer=p, topic trace "pruned", peer=p, topic
proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} = proc handleBackingOff*(t: var BackoffTable, topic: string) =
let now = Moment.now() let now = Moment.now()
var expired = toSeq(t.getOrDefault(topic).pairs()) var expired = toSeq(t.getOrDefault(topic).pairs())
expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool: expired.keepIf do (pair: tuple[peer: PeerId, expire: Moment]) -> bool:
@ -79,7 +79,7 @@ proc handleBackingOff*(t: var BackoffTable, topic: string) {.raises: [].} =
t.withValue(topic, v): t.withValue(topic, v):
v[].del(peer) v[].del(peer)
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: [].} = proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
if not g.parameters.enablePX: if not g.parameters.enablePX:
return @[] return @[]
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
@ -100,7 +100,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises:
proc handleGraft*(g: GossipSub, proc handleGraft*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows grafts: seq[ControlGraft]): seq[ControlPrune] =
var prunes: seq[ControlPrune] var prunes: seq[ControlPrune]
for graft in grafts: for graft in grafts:
let topic = graft.topicID let topic = graft.topicID
@ -205,7 +205,7 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
routingRecords routingRecords
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [].} = proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes: for prune in prunes:
let topic = prune.topicID let topic = prune.topicID
@ -239,7 +239,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r
proc handleIHave*(g: GossipSub, proc handleIHave*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
ihaves: seq[ControlIHave]): ControlIWant {.raises: [].} = ihaves: seq[ControlIHave]): ControlIWant =
var res: ControlIWant var res: ControlIWant
if peer.score < g.parameters.gossipThreshold: if peer.score < g.parameters.gossipThreshold:
trace "ihave: ignoring low score peer", peer, score = peer.score trace "ihave: ignoring low score peer", peer, score = peer.score
@ -273,7 +273,7 @@ proc handleIDontWant*(g: GossipSub,
proc handleIWant*(g: GossipSub, proc handleIWant*(g: GossipSub,
peer: PubSubPeer, peer: PubSubPeer,
iwants: seq[ControlIWant]): seq[Message] {.raises: [].} = iwants: seq[ControlIWant]): seq[Message] =
var var
messages: seq[Message] messages: seq[Message]
invalidRequests = 0 invalidRequests = 0
@ -299,7 +299,7 @@ proc handleIWant*(g: GossipSub,
messages.add(msg) messages.add(msg)
return messages return messages
proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} = proc commitMetrics(metrics: var MeshMetrics) =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
@ -308,7 +308,7 @@ proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) {.raises: [].} = proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
logScope: logScope:
topic topic
mesh = g.mesh.peers(topic) mesh = g.mesh.peers(topic)
@ -538,7 +538,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
backoff: g.parameters.pruneBackoff.seconds.uint64)]))) backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune, isHighPriority = true) g.broadcast(prunes, prune, isHighPriority = true)
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} = proc dropFanoutPeers*(g: GossipSub) =
# drop peers that we haven't published to in # drop peers that we haven't published to in
# GossipSubFanoutTTL seconds # GossipSubFanoutTTL seconds
let now = Moment.now() let now = Moment.now()
@ -551,7 +551,7 @@ proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
for topic in drops: for topic in drops:
g.lastFanoutPubSub.del topic g.lastFanoutPubSub.del topic
proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} = proc replenishFanout*(g: GossipSub, topic: string) =
## get fanout peers for a topic ## get fanout peers for a topic
logScope: topic logScope: topic
trace "about to replenish fanout" trace "about to replenish fanout"
@ -567,7 +567,7 @@ proc replenishFanout*(g: GossipSub, topic: string) {.raises: [].} =
trace "fanout replenished with peers", peers = g.fanout.peers(topic) trace "fanout replenished with peers", peers = g.fanout.peers(topic)
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: [].} = proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] =
## gossip iHave messages to peers ## gossip iHave messages to peers
## ##
@ -620,17 +620,16 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises:
g.rng.shuffle(allPeers) g.rng.shuffle(allPeers)
allPeers.setLen(target) allPeers.setLen(target)
let msgIdsAsSet = ihave.messageIDs.toHashSet()
for peer in allPeers: for peer in allPeers:
control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave) control.mgetOrPut(peer, ControlMessage()).ihave.add(ihave)
peer.sentIHaves[^1].incl(msgIdsAsSet) for msgId in ihave.messageIDs:
peer.sentIHaves[^1].incl(msgId)
libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64)
return control return control
proc onHeartbeat(g: GossipSub) {.raises: [].} = proc onHeartbeat(g: GossipSub) =
# reset IWANT budget # reset IWANT budget
# reset IHAVE cap # reset IHAVE cap
block: block:
@ -694,8 +693,6 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
g.mcache.shift() # shift the cache g.mcache.shift() # shift the cache
# {.pop.} # raises []
proc heartbeat*(g: GossipSub) {.async.} = proc heartbeat*(g: GossipSub) {.async.} =
heartbeat "GossipSub", g.parameters.heartbeatInterval: heartbeat "GossipSub", g.parameters.heartbeatInterval:
trace "running heartbeat", instance = cast[int](g) trace "running heartbeat", instance = cast[int](g)

View File

@ -87,8 +87,6 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
else: else:
0.0 0.0
{.pop.}
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try: try:
await g.switch.disconnect(peer.peerId) await g.switch.disconnect(peer.peerId)

View File

@ -9,50 +9,57 @@
{.push raises: [].} {.push raises: [].}
import std/[sets, tables, options] import std/[sets, tables]
import rpc/[messages] import rpc/[messages]
import results
export sets, tables, messages, options export sets, tables, messages, results
type type
CacheEntry* = object CacheEntry* = object
mid*: MessageId msgId*: MessageId
topic*: string topic*: string
MCache* = object of RootObj MCache* = object of RootObj
msgs*: Table[MessageId, Message] msgs*: Table[MessageId, Message]
history*: seq[seq[CacheEntry]] history*: seq[seq[CacheEntry]]
pos*: int
windowSize*: Natural windowSize*: Natural
func get*(c: MCache, mid: MessageId): Option[Message] = func get*(c: MCache, msgId: MessageId): Opt[Message] =
if mid in c.msgs: if msgId in c.msgs:
try: some(c.msgs[mid]) try: Opt.some(c.msgs[msgId])
except KeyError: raiseAssert "checked" except KeyError: raiseAssert "checked"
else: else:
none(Message) Opt.none(Message)
func contains*(c: MCache, mid: MessageId): bool = func contains*(c: MCache, msgId: MessageId): bool =
mid in c.msgs msgId in c.msgs
func put*(c: var MCache, msgId: MessageId, msg: Message) = func put*(c: var MCache, msgId: MessageId, msg: Message) =
if not c.msgs.hasKeyOrPut(msgId, msg): if not c.msgs.hasKeyOrPut(msgId, msg):
# Only add cache entry if the message was not already in the cache # Only add cache entry if the message was not already in the cache
c.history[0].add(CacheEntry(mid: msgId, topic: msg.topic)) c.history[c.pos].add(CacheEntry(msgId: msgId, topic: msg.topic))
func window*(c: MCache, topic: string): HashSet[MessageId] = func window*(c: MCache, topic: string): HashSet[MessageId] =
let let
len = min(c.windowSize, c.history.len) len = min(c.windowSize, c.history.len)
for i in 0..<len: for i in 0..<len:
for entry in c.history[i]: # Work backwards from `pos` in the circular buffer
for entry in c.history[(c.pos + c.history.len - i) mod c.history.len]:
if entry.topic == topic: if entry.topic == topic:
result.incl(entry.mid) result.incl(entry.msgId)
func shift*(c: var MCache) = func shift*(c: var MCache) =
for entry in c.history.pop(): # Shift circular buffer to write to a new position, clearing it from past
c.msgs.del(entry.mid) # iterations
c.pos = (c.pos + 1) mod c.history.len
c.history.insert(@[]) for entry in c.history[c.pos]:
c.msgs.del(entry.msgId)
reset(c.history[c.pos])
func init*(T: type MCache, window, history: Natural): T = func init*(T: type MCache, window, history: Natural): T =
T( T(

View File

@ -30,7 +30,6 @@ import ./errors as pubsub_errors,
../../errors, ../../errors,
../../utility ../../utility
import metrics
import stew/results import stew/results
export results export results

View File

@ -9,8 +9,8 @@
{.push raises: [].} {.push raises: [].}
import options, sequtils, sugar import options, sequtils
import "../../.."/[ import ../../../[
peerid, peerid,
routing_record, routing_record,
utility utility

View File

@ -270,4 +270,4 @@ method dial*(
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address): if procCall Transport(t).handles(address):
if address.protocols.isOk: if address.protocols.isOk:
return TCP.match(address) return TCP.match(address)

View File

@ -1,12 +1,11 @@
{.used.} {.used.}
import unittest2, options, sets, sequtils import unittest2, sequtils
import stew/byteutils import stew/byteutils
import ../../libp2p/[peerid, import ../../libp2p/[peerid,
crypto/crypto, crypto/crypto,
protocols/pubsub/mcache, protocols/pubsub/mcache,
protocols/pubsub/rpc/messages] protocols/pubsub/rpc/message]
import ./utils
var rng = newRng() var rng = newRng()