GossipSub: TimedEntry & shortAgent fixes (#858)

This commit is contained in:
Tanguy 2023-04-03 11:05:01 +02:00 committed by GitHub
parent 6b61ce8c91
commit 4aa615c44c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 39 additions and 17 deletions

View File

@ -339,7 +339,7 @@ proc validateAndRelay(g: GossipSub,
# In theory, if topics are the same in all messages, we could batch - we'd # In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages # also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds: for topic in msg.topicIds:
if topic notin g.topics: continue if topic notin g.topics: continue

View File

@ -68,12 +68,18 @@ type
appScore*: float64 # application specific score appScore*: float64 # application specific score
behaviourPenalty*: float64 # the eventual penalty score behaviourPenalty*: float64 # the eventual penalty score
when defined(libp2p_agents_metrics):
shortAgent*: string
RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void]
{.gcsafe, raises: [Defect].} {.gcsafe, raises: [Defect].}
when defined(libp2p_agents_metrics):
func shortAgent*(p: PubSubPeer): string =
if p.sendConn.isNil or p.sendConn.getWrapped().isNil:
"unknown"
else:
#TODO the sendConn is setup before identify,
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent
func hash*(p: PubSubPeer): Hash = func hash*(p: PubSubPeer): Hash =
p.peerId.hash p.peerId.hash

View File

@ -14,7 +14,7 @@ else:
import std/[tables] import std/[tables]
import chronos/timer import chronos/timer, stew/results
const Timeout* = 10.seconds # default timeout in ms const Timeout* = 10.seconds # default timeout in ms
@ -22,6 +22,7 @@ type
TimedEntry*[K] = ref object of RootObj TimedEntry*[K] = ref object of RootObj
key: K key: K
addedAt: Moment addedAt: Moment
expiresAt: Moment
next, prev: TimedEntry[K] next, prev: TimedEntry[K]
TimedCache*[K] = object of RootObj TimedCache*[K] = object of RootObj
@ -30,15 +31,14 @@ type
timeout: Duration timeout: Duration
func expire*(t: var TimedCache, now: Moment = Moment.now()) = func expire*(t: var TimedCache, now: Moment = Moment.now()) =
let expirationLimit = now - t.timeout while t.head != nil and t.head.expiresAt < now:
while t.head != nil and t.head.addedAt < expirationLimit:
t.entries.del(t.head.key) t.entries.del(t.head.key)
t.head.prev = nil t.head.prev = nil
t.head = t.head.next t.head = t.head.next
if t.head == nil: t.tail = nil if t.head == nil: t.tail = nil
func del*[K](t: var TimedCache[K], key: K): bool = func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] =
# Removes existing key from cache, returning false if it was not present # Removes existing key from cache, returning the previous value if present
var item: TimedEntry[K] var item: TimedEntry[K]
if t.entries.pop(key, item): if t.entries.pop(key, item):
if t.head == item: t.head = item.next if t.head == item: t.head = item.next
@ -46,9 +46,9 @@ func del*[K](t: var TimedCache[K], key: K): bool =
if item.next != nil: item.next.prev = item.prev if item.next != nil: item.next.prev = item.prev
if item.prev != nil: item.prev.next = item.next if item.prev != nil: item.prev.next = item.next
true Opt.some(item)
else: else:
false Opt.none(TimedEntry[K])
func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# Puts k in cache, returning true if the item was already present and false # Puts k in cache, returning true if the item was already present and false
@ -56,9 +56,13 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
# refreshed. # refreshed.
t.expire(now) t.expire(now)
var res = t.del(k) # Refresh existing item var previous = t.del(k) # Refresh existing item
let node = TimedEntry[K](key: k, addedAt: now) let addedAt =
if previous.isSome: previous.get().addedAt
else: now
let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout)
if t.head == nil: if t.head == nil:
t.tail = node t.tail = node
@ -66,7 +70,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
else: else:
# search from tail because typically that's where we add when now grows # search from tail because typically that's where we add when now grows
var cur = t.tail var cur = t.tail
while cur != nil and node.addedAt < cur.addedAt: while cur != nil and node.expiresAt < cur.expiresAt:
cur = cur.prev cur = cur.prev
if cur == nil: if cur == nil:
@ -82,7 +86,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool =
t.entries[k] = node t.entries[k] = node
res previous.isSome()
func contains*[K](t: TimedCache[K], k: K): bool = func contains*[K](t: TimedCache[K], k: K): bool =
k in t.entries k in t.entries

View File

@ -63,5 +63,5 @@ when defined(libp2p_agents_metrics):
err("toLowerAscii failed") err("toLowerAscii failed")
const const
KnownLibP2PAgents* {.strdefine.} = "" KnownLibP2PAgents* {.strdefine.} = "nim-libp2p"
KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",") KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",")

View File

@ -85,6 +85,8 @@ type
method write*(s: TestBufferStream, msg: seq[byte]): Future[void] = method write*(s: TestBufferStream, msg: seq[byte]): Future[void] =
s.writeHandler(msg) s.writeHandler(msg)
method getWrapped*(s: TestBufferStream): Connection = nil
proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T = proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =
let testBufferStream = T(writeHandler: writeHandler) let testBufferStream = T(writeHandler: writeHandler)
testBufferStream.initStream() testBufferStream.initStream()

View File

@ -19,7 +19,9 @@ import utils,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
protocols/pubsub/floodsub, protocols/pubsub/floodsub,
protocols/pubsub/rpc/messages, protocols/pubsub/rpc/messages,
protocols/pubsub/peertable] protocols/pubsub/peertable,
protocols/pubsub/pubsubpeer
]
import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../../libp2p/protocols/pubsub/errors as pubsub_errors
import ../helpers import ../helpers
@ -62,6 +64,14 @@ suite "FloodSub":
check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0
check (await completionFut.wait(5.seconds)) == true check (await completionFut.wait(5.seconds)) == true
when defined(libp2p_agents_metrics):
let
agentA = nodes[0].peers[nodes[1].switch.peerInfo.peerId].shortAgent
agentB = nodes[1].peers[nodes[0].switch.peerInfo.peerId].shortAgent
check:
agentA == "nim-libp2p"
agentB == "nim-libp2p"
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].switch.stop(), nodes[0].switch.stop(),
nodes[1].switch.stop() nodes[1].switch.stop()