diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2142c7c3b..d707bf285 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -339,7 +339,7 @@ proc validateAndRelay(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = toSendPeers.len, msgId, peer + trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIds: if topic notin g.topics: continue diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index cea5107ef..31fbbe7a6 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -68,12 +68,18 @@ type appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - when defined(libp2p_agents_metrics): - shortAgent*: string - RPCHandler* = proc(peer: PubSubPeer, msg: RPCMsg): Future[void] {.gcsafe, raises: [Defect].} +when defined(libp2p_agents_metrics): + func shortAgent*(p: PubSubPeer): string = + if p.sendConn.isNil or p.sendConn.getWrapped().isNil: + "unknown" + else: + #TODO the sendConn is setup before identify, + #so we have to read the parents short agent.. + p.sendConn.getWrapped().shortAgent + func hash*(p: PubSubPeer): Hash = p.peerId.hash diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index 681ec48a5..9bcfed1fc 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -14,7 +14,7 @@ else: import std/[tables] -import chronos/timer +import chronos/timer, stew/results const Timeout* = 10.seconds # default timeout in ms @@ -22,6 +22,7 @@ type TimedEntry*[K] = ref object of RootObj key: K addedAt: Moment + expiresAt: Moment next, prev: TimedEntry[K] TimedCache*[K] = object of RootObj @@ -30,15 +31,14 @@ type timeout: Duration func expire*(t: var TimedCache, now: Moment = Moment.now()) = - let expirationLimit = now - t.timeout - while t.head != nil and t.head.addedAt < expirationLimit: + while t.head != nil and t.head.expiresAt < now: t.entries.del(t.head.key) t.head.prev = nil t.head = t.head.next if t.head == nil: t.tail = nil -func del*[K](t: var TimedCache[K], key: K): bool = - # Removes existing key from cache, returning false if it was not present +func del*[K](t: var TimedCache[K], key: K): Opt[TimedEntry[K]] = + # Removes existing key from cache, returning the previous value if present var item: TimedEntry[K] if t.entries.pop(key, item): if t.head == item: t.head = item.next @@ -46,9 +46,9 @@ func del*[K](t: var TimedCache[K], key: K): bool = if item.next != nil: item.next.prev = item.prev if item.prev != nil: item.prev.next = item.next - true + Opt.some(item) else: - false + Opt.none(TimedEntry[K]) func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = # Puts k in cache, returning true if the item was already present and false @@ -56,9 +56,13 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = # refreshed. t.expire(now) - var res = t.del(k) # Refresh existing item + var previous = t.del(k) # Refresh existing item - let node = TimedEntry[K](key: k, addedAt: now) + let addedAt = + if previous.isSome: previous.get().addedAt + else: now + + let node = TimedEntry[K](key: k, addedAt: addedAt, expiresAt: now + t.timeout) if t.head == nil: t.tail = node @@ -66,7 +70,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = else: # search from tail because typically that's where we add when now grows var cur = t.tail - while cur != nil and node.addedAt < cur.addedAt: + while cur != nil and node.expiresAt < cur.expiresAt: cur = cur.prev if cur == nil: @@ -82,7 +86,7 @@ func put*[K](t: var TimedCache[K], k: K, now = Moment.now()): bool = t.entries[k] = node - res + previous.isSome() func contains*[K](t: TimedCache[K], k: K): bool = k in t.entries diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 1000394ae..15050b10f 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -63,5 +63,5 @@ when defined(libp2p_agents_metrics): err("toLowerAscii failed") const - KnownLibP2PAgents* {.strdefine.} = "" + KnownLibP2PAgents* {.strdefine.} = "nim-libp2p" KnownLibP2PAgentsSeq* = KnownLibP2PAgents.safeToLowerAscii().tryGet().split(",") diff --git a/tests/helpers.nim b/tests/helpers.nim index 42533a1c7..ec7657500 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -85,6 +85,8 @@ type method write*(s: TestBufferStream, msg: seq[byte]): Future[void] = s.writeHandler(msg) +method getWrapped*(s: TestBufferStream): Connection = nil + proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T = let testBufferStream = T(writeHandler: writeHandler) testBufferStream.initStream() diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 53303bce6..27cd1fb92 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -19,7 +19,9 @@ import utils, protocols/pubsub/pubsub, protocols/pubsub/floodsub, protocols/pubsub/rpc/messages, - protocols/pubsub/peertable] + protocols/pubsub/peertable, + protocols/pubsub/pubsubpeer + ] import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers @@ -62,6 +64,14 @@ suite "FloodSub": check (await nodes[0].publish("foobar", "Hello!".toBytes())) > 0 check (await completionFut.wait(5.seconds)) == true + when defined(libp2p_agents_metrics): + let + agentA = nodes[0].peers[nodes[1].switch.peerInfo.peerId].shortAgent + agentB = nodes[1].peers[nodes[0].switch.peerInfo.peerId].shortAgent + check: + agentA == "nim-libp2p" + agentB == "nim-libp2p" + await allFuturesThrowing( nodes[0].switch.stop(), nodes[1].switch.stop()