From cf51da00b2bb393141a512648c8f5aca47df2f26 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Fri, 26 Jun 2020 12:31:08 +0900 Subject: [PATCH] take into account explicit direct peers in publish --- libp2p/protocols/pubsub/gossipsub.nim | 45 ++++++++++++-------------- libp2p/protocols/pubsub/pubsubpeer.nim | 9 ++++++ 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index d234d0b50..2df4c21b5 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -524,40 +524,37 @@ method publish*(g: GossipSub, await procCall PubSub(g).publish(topic, data) trace "about to publish message on topic", name = topic, data = data.shortLog + # directly copy explicit peers + # as we will always publish to those + var peers = g.explicitPeers - var peers: HashSet[string] - # TODO: we probably don't need to try multiple times if data.len > 0 and topic.len > 0: - for _ in 0..<5: # try to get peers up to 5 times - if peers.len > 0: - break - - if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh - await g.rebalanceMesh(topic) - peers = g.mesh.getOrDefault(topic) - else: # send to fanout peers - await g.replenishFanout(topic) - if topic in g.fanout: - peers = g.fanout.getOrDefault(topic) - # set the fanout expiry time - g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) - - # wait a second between tries - await sleepAsync(1.seconds) + if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh + await g.rebalanceMesh(topic) + peers.incl(g.mesh.getOrDefault(topic)) + else: # send to fanout peers + await g.replenishFanout(topic) + if topic in g.fanout: + peers.incl(g.fanout.getOrDefault(topic)) + # set the fanout expiry time + g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) let msg = newMessage(g.peerInfo, data, topic, g.sign) trace "created new message", msg + + trace "publishing on topic", name = topic + if msg.msgId notin g.mcache: + g.mcache.put(msg) + var sent: seq[Future[void]] for p in peers: + # avoid sending to self if p == g.peerInfo.id: continue - - trace "publishing on topic", name = topic - if msg.msgId notin g.mcache: - g.mcache.put(msg) - - if p in g.peers: + let peer = g.peers.getOrDefault(p) + if not isNil(peer.peerInfo): sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) + checkFutures(await allFinished(sent)) libp2p_pubsub_messages_published.inc(labelValues = [topic]) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index af5222c67..9912c157a 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -27,6 +27,15 @@ declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skip declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"]) type + TopicScoreParams = object + topicWeight: float + timeInMeshWeight: float + timeInMeshQuantum: Duration + timeInMeshCap: float + + PeerScoreParams = object + topics: Table[string, TopicScoreParams] + PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}