diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e3c8eb44e..8135259ff 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -451,6 +451,7 @@ method publish*(g: GossipSub, data = data.shortLog var peers: HashSet[string] + # TODO: we probably don't need to try multiple times if data.len > 0 and topic.len > 0: for _ in 0..<5: # try to get peers up to 5 times if peers.len > 0: @@ -470,14 +471,18 @@ method publish*(g: GossipSub, await sleepAsync(1.seconds) let msg = newMessage(g.peerInfo, data, topic, g.sign) + trace "created new message", msg var sent: seq[Future[void]] for p in peers: if p == g.peerInfo.id: continue trace "publishing on topic", name = topic - g.mcache.put(msg) - sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) + if msg.msgId notin g.mcache: + g.mcache.put(msg) + + if p in g.peers: + sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) checkFutures(await allFinished(sent)) libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/mcache.nim b/libp2p/protocols/pubsub/mcache.nim index 31b18d1cf..06157c942 100644 --- a/libp2p/protocols/pubsub/mcache.nim +++ b/libp2p/protocols/pubsub/mcache.nim @@ -22,6 +22,14 @@ type historySize*: Natural windowSize*: Natural +proc get*(c: MCache, mid: string): Option[Message] = + result = none(Message) + if mid in c.msgs: + result = some(c.msgs[mid]) + +proc contains*(c: MCache, mid: string): bool = + c.get(mid).isSome + proc put*(c: MCache, msg: Message) = proc handler(key: string, val: Message) {.gcsafe.} = ## make sure we remove the message from history @@ -30,13 +38,9 @@ proc put*(c: MCache, msg: Message) = it.filterIt(it.mid != msg.msgId) ) - c.msgs.put(msg.msgId, msg, handler = handler) - c.history[0].add(CacheEntry(mid: msg.msgId, msg: msg)) - -proc get*(c: MCache, mid: string): Option[Message] = - result = none(Message) - if mid in c.msgs: - result = some(c.msgs[mid]) + if msg.msgId notin c.msgs: + c.msgs.put(msg.msgId, msg, handler = handler) + c.history[0].add(CacheEntry(mid: msg.msgId, msg: msg)) proc window*(c: MCache, topic: string): HashSet[string] = result = initHashSet[string]() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 0f085000a..18d4dfac8 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -379,17 +379,16 @@ proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer if s.pubSub.isSome and (peerInfo.id notin s.dialedPubSubPeers): let conn = await s.getMuxedStream(peerInfo) - try: - if isNil(conn): - trace "unable to subscribe to peer", peer = peerInfo.shortLog - return + if isNil(conn): + trace "unable to subscribe to peer", peer = peerInfo.shortLog + return - s.dialedPubSubPeers.incl(peerInfo.id) + s.dialedPubSubPeers.incl(peerInfo.id) + try: if (await s.ms.select(conn, s.pubSub.get().codec)): await s.pubSub.get().subscribeToPeer(conn) else: await conn.close() - except CatchableError as exc: trace "exception in subscribe to peer", peer = peerInfo.shortLog, exc = exc.msg await conn.close()