diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3562c84e8..b581b6a66 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,16 +7,17 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import sequtils, tables, sets, strutils +import std/[sequtils, sets, tables] import chronos, chronicles, metrics -import pubsub, - pubsubpeer, - timedcache, - peertable, - rpc/[messages, message], +import ./pubsub, + ./pubsubpeer, + ./timedcache, + ./peertable, + ./rpc/[message, messages], ../../stream/connection, ../../peerid, - ../../peerinfo + ../../peerinfo, + ../../utility logScope: topics = "floodsub" @@ -49,15 +50,13 @@ method subscribeTopic*(f: FloodSub, method unsubscribePeer*(f: FloodSub, peer: PeerID) = ## handle peer disconnects ## - trace "unsubscribing floodsub peer", peer = $peer let pubSubPeer = f.peers.getOrDefault(peer) if pubSubPeer.isNil: return - for t in toSeq(f.floodsub.keys): - if t in f.floodsub: - f.floodsub[t].excl(pubSubPeer) + for _, v in f.floodsub.mpairs(): + v.excl(pubSubPeer) procCall PubSub(f).unsubscribePeer(peer) @@ -66,35 +65,34 @@ method rpcHandler*(f: FloodSub, rpcMsg: RPCMsg) {.async.} = await procCall PubSub(f).rpcHandler(peer, rpcMsg) - if rpcMsg.messages.len > 0: # if there are any messages + for msg in rpcMsg.messages: # for every message + let msgId = f.msgIdProvider(msg) + logScope: + msgId + peer = peer.id + + if f.seen.put(msgId): + trace "Dropping already-seen message" + continue + + if f.verifySignature and not msg.verify(peer.peerId): + debug "Dropping message due to failed signature verification" + continue + + if not (await f.validate(msg)): + trace "Dropping message due to failed validation" + continue + var toSendPeers = initHashSet[PubSubPeer]() - for msg in rpcMsg.messages: # for every message - let msgId = f.msgIdProvider(msg) - logScope: msgId + for t in msg.topicIDs: # for every topic in the message + f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) - if msgId notin f.seen: - f.seen.put(msgId) # add the message to the seen cache + await handleData(f, t, msg.data) - if f.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - continue - - if not (await f.validate(msg)): - trace "dropping message due to failed validation" - continue - - for t in msg.topicIDs: # for every topic in the message - if t in f.floodsub: - toSendPeers.incl(f.floodsub[t]) # get all the peers interested in this topic - - await handleData(f, t, msg.data) - - # forward the message to all peers interested in it - f.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: rpcMsg.messages)) - - trace "forwared message to peers", peers = toSendPeers.len + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + trace "Forwared message to peers", peers = toSendPeers.len method init*(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -114,30 +112,41 @@ method publish*(f: FloodSub, # base returns always 0 discard await procCall PubSub(f).publish(topic, data) - if data.len <= 0 or topic.len <= 0: - trace "topic or data missing, skipping publish" + logScope: topic + trace "Publishing message on topic", data = data.shortLog + + if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish" return 0 - if topic notin f.floodsub: - trace "missing peers for topic, skipping publish" - return + let peers = toSeq(f.floodsub.getOrDefault(topic)) + + if peers.len == 0: + debug "No peers for topic, skipping publish" + return 0 - trace "publishing on topic", name = topic inc f.msgSeqno let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign) - peers = toSeq(f.floodsub.getOrDefault(topic)) + msgId = f.msgIdProvider(msg) - # start the future but do not wait yet - f.broadcast( - peers, - RPCMsg(messages: @[msg])) + logScope: msgId + + trace "Created new message", msg = shortLog(msg), peers = peers.len + + if f.seen.put(msgId): + # custom msgid providers might cause this + trace "Dropping already-seen message" + return 0 + + # Try to send to all peers that are known to be interested + f.broadcast(peers, RPCMsg(messages: @[msg])) when defined(libp2p_expensive_metrics): libp2p_pubsub_messages_published.inc(labelValues = [topic]) - trace "published message to peers", peers = peers.len, - msg = msg.shortLog() + trace "Published message to peers" + return peers.len method unsubscribe*(f: FloodSub, @@ -156,5 +165,5 @@ method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.floodsub = initTable[string, HashSet[PubSubPeer]]() - f.seen = newTimedCache[string](2.minutes) + f.seen = TimedCache[string].init(2.minutes) f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 3714cfe7d..173e91dd5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -7,20 +7,19 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[tables, sets, options, sequtils, random] +import std/[options, random, sequtils, sets, tables] import chronos, chronicles, metrics -import pubsub, - floodsub, - pubsubpeer, - peertable, - mcache, - timedcache, - rpc/[messages, message], +import ./pubsub, + ./floodsub, + ./pubsubpeer, + ./peertable, + ./mcache, + ./timedcache, + ./rpc/[messages, message], ../protocol, - ../../peerinfo, ../../stream/connection, + ../../peerinfo, ../../peerid, - ../../errors, ../../utility logScope: @@ -379,47 +378,36 @@ method rpcHandler*(g: GossipSub, rpcMsg: RPCMsg) {.async.} = await procCall PubSub(g).rpcHandler(peer, rpcMsg) - if rpcMsg.messages.len > 0: # if there are any messages - var toSendPeers: HashSet[PubSubPeer] - for msg in rpcMsg.messages: # for every message - let msgId = g.msgIdProvider(msg) - logScope: msgId + for msg in rpcMsg.messages: # for every message + let msgId = g.msgIdProvider(msg) + logScope: + msgId + peer = peer.id - if msgId in g.seen: - trace "message already processed, skipping" - continue + if g.seen.put(msgId): + trace "Dropping already-seen message" + continue - trace "processing message" + g.mcache.put(msgId, msg) - g.seen.put(msgId) # add the message to the seen cache + if g.verifySignature and not msg.verify(peer.peerId): + debug "Dropping message due to failed signature verification" + continue - if g.verifySignature and not msg.verify(peer.peerId): - trace "dropping message due to failed signature verification" - continue + if not (await g.validate(msg)): + trace "Dropping message due to failed validation" + continue - if not (await g.validate(msg)): - trace "dropping message due to failed validation" - continue + var toSendPeers = initHashSet[PubSubPeer]() + for t in msg.topicIDs: # for every topic in the message + g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) + g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) - # this shouldn't happen - if g.peerInfo.peerId == msg.fromPeer: - trace "skipping messages from self" - continue - - for t in msg.topicIDs: # for every topic in the message - if t in g.floodsub: - toSendPeers.incl(g.floodsub[t]) # get all floodsub peers for topic - - if t in g.mesh: - toSendPeers.incl(g.mesh[t]) # get all mesh peers for topic - - await handleData(g, t, msg.data) - - # forward the message to all peers interested in it - g.broadcast( - toSeq(toSendPeers), - RPCMsg(messages: rpcMsg.messages)) + await handleData(g, t, msg.data) + # In theory, if topics are the same in all messages, we could batch - we'd + # also have to be careful to only include validated messages + g.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) trace "forwared message to peers", peers = toSendPeers.len if rpcMsg.control.isSome: @@ -451,12 +439,13 @@ method unsubscribe*(g: GossipSub, for (topic, handler) in topics: # delete from mesh only if no handlers are left - if g.topics[topic].handler.len <= 0: + if topic notin g.topics: if topic in g.mesh: - let peers = g.mesh.getOrDefault(topic) + let peers = g.mesh[topic] g.mesh.del(topic) - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + let prune = RPCMsg( + control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) g.broadcast(toSeq(peers), prune) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = @@ -474,9 +463,12 @@ method publish*(g: GossipSub, data: seq[byte]): Future[int] {.async.} = # base returns always 0 discard await procCall PubSub(g).publish(topic, data) - trace "publishing message on topic", topic, data = data.shortLog + + logScope: topic + trace "Publishing message on topic", data = data.shortLog if topic.len <= 0: # data could be 0/empty + debug "Empty topic, skipping publish" return 0 var peers: HashSet[PubSubPeer] @@ -497,29 +489,35 @@ method publish*(g: GossipSub, # time g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) + if peers.len == 0: + debug "No peers for topic, skipping publish" + return 0 + inc g.msgSeqno let msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign) msgId = g.msgIdProvider(msg) - trace "created new message", msg, topic, peers = peers.len + logScope: msgId - if msgId notin g.mcache: - g.mcache.put(msgId, msg) + trace "Created new message", msg = shortLog(msg), peers = peers.len - if peers.len > 0: - g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) - when defined(libp2p_expensive_metrics): - if peers.len > 0: - libp2p_pubsub_messages_published.inc(labelValues = [topic]) - - trace "published message to peers", peers = peers.len, - msg = msg.shortLog() - return peers.len - else: - debug "No peers for gossip message", topic, msg = msg.shortLog() + if g.seen.put(msgId): + # custom msgid providers might cause this + trace "Dropping already-seen message" return 0 + g.mcache.put(msgId, msg) + + g.broadcast(toSeq(peers), RPCMsg(messages: @[msg])) + when defined(libp2p_expensive_metrics): + if peers.len > 0: + libp2p_pubsub_messages_published.inc(labelValues = [topic]) + + trace "Published message to peers" + + return peers.len + method start*(g: GossipSub) {.async.} = trace "gossipsub start" @@ -556,7 +554,7 @@ method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub() randomize() - g.mcache = newMCache(GossipSubHistoryGossip, GossipSubHistoryLength) + g.mcache = MCache.init(GossipSubHistoryGossip, GossipSubHistoryLength) g.mesh = initTable[string, HashSet[PubSubPeer]]() # meshes - topic to peer g.fanout = initTable[string, HashSet[PubSubPeer]]() # fanout - topic to peer g.gossipsub = initTable[string, HashSet[PubSubPeer]]()# topic to peer map of all gossipsub peers diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index 82231f550..172e8bc62 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -7,69 +7,57 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, chronicles -import tables, options, sets, sequtils -import rpc/[messages], timedcache +import std/[sets, tables, options] +import rpc/[messages] + +export sets, tables, messages, options type CacheEntry* = object mid*: string - msg*: Message + topicIDs*: seq[string] - MCache* = ref object of RootObj - msgs*: TimedCache[Message] + MCache* = object of RootObj + msgs*: Table[string, Message] history*: seq[seq[CacheEntry]] historySize*: Natural windowSize*: Natural -proc get*(c: MCache, mid: string): Option[Message] = +func get*(c: MCache, mid: string): Option[Message] = result = none(Message) if mid in c.msgs: result = some(c.msgs[mid]) -proc contains*(c: MCache, mid: string): bool = +func contains*(c: MCache, mid: string): bool = c.get(mid).isSome -proc put*(c: MCache, msgId: string, msg: Message) = - proc handler(key: string, val: Message) {.gcsafe.} = - ## make sure we remove the message from history - ## to keep things consisten - c.history.applyIt( - it.filterIt(it.mid != msgId) - ) - +func put*(c: var MCache, msgId: string, msg: Message) = if msgId notin c.msgs: - c.msgs.put(msgId, msg, handler = handler) - c.history[0].add(CacheEntry(mid: msgId, msg: msg)) + c.msgs[msgId] = msg + c.history[0].add(CacheEntry(mid: msgId, topicIDs: msg.topicIDs)) -proc window*(c: MCache, topic: string): HashSet[string] = +func window*(c: MCache, topic: string): HashSet[string] = result = initHashSet[string]() - let len = - if c.windowSize > c.history.len: - c.history.len - else: - c.windowSize + let + len = min(c.windowSize, c.history.len) - if c.history.len > 0: - for slot in c.history[0.. c.historySize: - for entry in c.history.pop(): - c.msgs.del(entry.mid) +func shift*(c: var MCache) = + for entry in c.history.pop(): + c.msgs.del(entry.mid) c.history.insert(@[]) -proc newMCache*(window: Natural, history: Natural): MCache = - new result - result.historySize = history - result.windowSize = window - result.history = newSeq[seq[CacheEntry]]() - result.history.add(@[]) # initialize with empty slot - result.msgs = newTimedCache[Message](2.minutes) +func init*(T: type MCache, window, history: Natural): T = + T( + history: newSeq[seq[CacheEntry]](history), + historySize: history, + windowSize: window + ) diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index e3c242010..6946ca463 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -201,16 +201,17 @@ method unsubscribe*(p: PubSub, topics: seq[TopicPair]) {.base, async.} = ## unsubscribe from a list of ``topic`` strings for t in topics: - for i, h in p.topics[t.topic].handler: - if h == t.handler: - p.topics[t.topic].handler.del(i) + p.topics.withValue(t.topic, subs): + for i, h in subs[].handler: + if h == t.handler: + subs[].handler.del(i) - # make sure we delete the topic if - # no more handlers are left - if p.topics[t.topic].handler.len <= 0: - p.topics.del(t.topic) - # metrics - libp2p_pubsub_topics.set(p.topics.len.int64) + # make sure we delete the topic if + # no more handlers are left + if subs.handler.len <= 0: + p.topics.del(t.topic) # careful, invalidates subs + # metrics + libp2p_pubsub_topics.set(p.topics.len.int64) proc unsubscribe*(p: PubSub, topic: string, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 1d8a0ef5c..8cf50aeba 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -10,8 +10,6 @@ import std/[hashes, options, strutils, tables] import chronos, chronicles, nimcrypto/sha2, metrics import rpc/[messages, message, protobuf], - timedcache, - ../../switch, ../../peerid, ../../peerinfo, ../../stream/connection, @@ -42,8 +40,6 @@ type connections*: seq[Connection] # connections to this peer peerId*: PeerID handler*: RPCHandler - sentRpcCache: TimedCache[string] # cache for already sent messages - recvdRpcCache: TimedCache[string] # cache for already received messages observers*: ref seq[PubSubObserver] # ref as in smart_ptr dialLock: AsyncLock @@ -87,33 +83,24 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = while not conn.atEof: trace "waiting for data" let data = await conn.readLp(64 * 1024) - let digest = $(sha256.digest(data)) trace "read data from peer", data = data.shortLog - if digest in p.recvdRpcCache: - when defined(libp2p_expensive_metrics): - libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id]) - trace "message already received, skipping" - continue var rmsg = decodeRpcMsg(data) if rmsg.isErr(): notice "failed to decode msg from peer" break - var msg = rmsg.get() - - trace "decoded msg from peer", msg = msg.shortLog + trace "decoded msg from peer", msg = rmsg.get().shortLog # trigger hooks - p.recvObservers(msg) + p.recvObservers(rmsg.get()) when defined(libp2p_expensive_metrics): - for m in msg.messages: + for m in rmsg.get().messages: for t in m.topicIDs: # metrics libp2p_pubsub_received_messages.inc(labelValues = [p.id, t]) - await p.handler(p, msg) - p.recvdRpcCache.put(digest) + await p.handler(p, rmsg.get()) finally: await conn.close() @@ -227,13 +214,6 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = logScope: encoded = shortLog(encoded) - let digest = $(sha256.digest(encoded)) - if digest in p.sentRpcCache: - trace "message already sent to peer, skipping" - when defined(libp2p_expensive_metrics): - libp2p_pubsub_skipped_sent_messages.inc(labelValues = [p.id]) - return - var conn = await p.getSendConn() try: trace "about to send message" @@ -243,8 +223,6 @@ proc sendImpl(p: PubSubPeer, msg: RPCMsg) {.async.} = trace "sending encoded msgs to peer", connId = $conn.oid await conn.writeLp(encoded) - - p.sentRpcCache.put(digest) trace "sent pubsub message to remote", connId = $conn.oid when defined(libp2p_expensive_metrics): @@ -282,6 +260,4 @@ proc newPubSubPeer*(peerId: PeerID, result.getConn = getConn result.codec = codec result.peerId = peerId - result.sentRpcCache = newTimedCache[string](2.minutes) - result.recvdRpcCache = newTimedCache[string](2.minutes) result.dialLock = newAsyncLock() diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim index e7d08f3b2..35a12aed3 100644 --- a/libp2p/protocols/pubsub/timedcache.nim +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -7,73 +7,59 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import tables -import chronos, chronicles +import std/[heapqueue, sets] -logScope: - topics = "timedcache" +import chronos/timer const Timeout* = 10.seconds # default timeout in ms type - ExpireHandler*[V] = proc(key: string, val: V) {.gcsafe.} - TimedEntry*[V] = object of RootObj - val: V - handler: ExpireHandler[V] + TimedEntry*[K] = ref object of RootObj + key: K + expiresAt: Moment - TimedCache*[V] = ref object of RootObj - cache*: Table[string, TimedEntry[V]] - onExpire*: ExpireHandler[V] - timeout*: Duration + TimedCache*[K] = object of RootObj + expiries: HeapQueue[TimedEntry[K]] + entries: HashSet[K] + timeout: Duration -# TODO: This belong in chronos, temporary left here until chronos is updated -proc addTimer*(at: Duration, cb: CallbackFunc, udata: pointer = nil) = - ## Arrange for the callback ``cb`` to be called at the given absolute - ## timestamp ``at``. You can also pass ``udata`` to callback. - addTimer(Moment.fromNow(at), cb, udata) +func `<`*(a, b: TimedEntry): bool = + a.expiresAt < b.expiresAt -proc put*[V](t: TimedCache[V], - key: string, - val: V = "", - timeout: Duration, - handler: ExpireHandler[V] = nil) = - trace "adding entry to timed cache", key = key - t.cache[key] = TimedEntry[V](val: val, handler: handler) +func expire*(t: var TimedCache, now: Moment = Moment.now()) = + while t.expiries.len() > 0 and t.expiries[0].expiresAt < now: + t.entries.excl(t.expiries.pop().key) - addTimer( - timeout, - proc (arg: pointer = nil) {.gcsafe.} = - trace "deleting expired entry from timed cache", key = key - if key in t.cache: - let entry = t.cache[key] - t.cache.del(key) - if not isNil(entry.handler): - entry.handler(key, entry.val) +func del*[K](t: var TimedCache[K], key: K): bool = + # Removes existing key from cache, returning false if it was not present + if not t.entries.missingOrExcl(key): + for i in 0..