mirror of https://github.com/vacp2p/nim-libp2p.git
gossipsub fixes (#276)
* graft up to D peers * fix logging so it's clear who is grafting/pruning who * clear fanout when grafting
This commit is contained in:
parent
c76152f2c1
commit
170685f9c6
|
@ -99,49 +99,48 @@ proc replenishFanout(g: GossipSub, topic: string) =
|
||||||
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
|
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
|
||||||
|
|
||||||
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
|
logScope:
|
||||||
|
topic
|
||||||
|
|
||||||
trace "about to rebalance mesh"
|
trace "about to rebalance mesh"
|
||||||
|
|
||||||
# create a mesh topic that we're subscribing to
|
# create a mesh topic that we're subscribing to
|
||||||
|
|
||||||
var
|
var
|
||||||
grafts, prunes: seq[PubSubPeer]
|
grafts, prunes: seq[PubSubPeer]
|
||||||
|
|
||||||
if g.mesh.peers(topic) < GossipSubDlo:
|
if g.mesh.peers(topic) < GossipSubDlo:
|
||||||
trace "replenishing mesh", topic, peers = g.mesh.peers(topic)
|
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
||||||
# replenish the mesh if we're below GossipSubDlo
|
# replenish the mesh if we're below Dlo
|
||||||
var newPeers = toSeq(
|
grafts = toSeq(
|
||||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
)
|
)
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topic = topic
|
|
||||||
meshPeers = g.mesh.peers(topic)
|
meshPeers = g.mesh.peers(topic)
|
||||||
newPeers = newPeers.len
|
grafts = grafts.len
|
||||||
|
|
||||||
shuffle(newPeers)
|
shuffle(grafts)
|
||||||
|
|
||||||
trace "getting peers", topic, peers = newPeers.len
|
# Graft peers so we reach a count of D
|
||||||
|
grafts.setLen(min(grafts.len, GossipSubD - g.mesh.peers(topic)))
|
||||||
|
|
||||||
for peer in newPeers:
|
trace "getting peers", topic, peers = grafts.len
|
||||||
# send a graft message to the peer
|
|
||||||
grafts.add peer
|
for peer in grafts:
|
||||||
discard g.mesh.addPeer(topic, peer)
|
if g.mesh.addPeer(topic, peer):
|
||||||
trace "got peer", peer = $peer
|
g.fanout.removePeer(topic, peer)
|
||||||
|
|
||||||
if g.mesh.peers(topic) > GossipSubDhi:
|
if g.mesh.peers(topic) > GossipSubDhi:
|
||||||
# prune peers if we've gone over
|
# prune peers if we've gone over Dhi
|
||||||
var mesh = toSeq(g.mesh[topic])
|
prunes = toSeq(g.mesh[topic])
|
||||||
shuffle(mesh)
|
shuffle(prunes)
|
||||||
|
prunes.setLen(prunes.len - GossipSubD) # .. down to D peers
|
||||||
|
|
||||||
trace "about to prune mesh", mesh = mesh.len
|
trace "about to prune mesh", prunes = prunes.len
|
||||||
for peer in mesh:
|
for peer in prunes:
|
||||||
if g.mesh.peers(topic) <= GossipSubD:
|
|
||||||
break
|
|
||||||
|
|
||||||
trace "pruning peers", peers = g.mesh.peers(topic)
|
|
||||||
# send a graft message to the peer
|
|
||||||
g.mesh.removePeer(topic, peer)
|
g.mesh.removePeer(topic, peer)
|
||||||
prunes.add(peer)
|
|
||||||
|
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||||
|
@ -158,8 +157,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
for p in prunes:
|
for p in prunes:
|
||||||
await p.sendPrune(@[topic])
|
await p.sendPrune(@[topic])
|
||||||
|
|
||||||
trace "mesh balanced, got peers", peers = g.mesh.peers(topic),
|
trace "mesh balanced, got peers", peers = g.mesh.peers(topic)
|
||||||
topicId = topic
|
|
||||||
|
|
||||||
proc dropFanoutPeers(g: GossipSub) =
|
proc dropFanoutPeers(g: GossipSub) =
|
||||||
# drop peers that we haven't published to in
|
# drop peers that we haven't published to in
|
||||||
|
@ -275,17 +273,21 @@ method subscribeTopic*(g: GossipSub,
|
||||||
peerId: string) {.gcsafe, async.} =
|
peerId: string) {.gcsafe, async.} =
|
||||||
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
|
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
peer = peerId
|
||||||
|
topic
|
||||||
|
|
||||||
let peer = g.peers.getOrDefault(peerId)
|
let peer = g.peers.getOrDefault(peerId)
|
||||||
if peer == nil:
|
if peer == nil:
|
||||||
debug "subscribeTopic on a nil peer!"
|
debug "subscribeTopic on a nil peer!"
|
||||||
return
|
return
|
||||||
|
|
||||||
if subscribe:
|
if subscribe:
|
||||||
trace "adding subscription for topic", peer = peerId, name = topic
|
trace "peer subscribed to topic"
|
||||||
# subscribe remote peer to the topic
|
# subscribe remote peer to the topic
|
||||||
discard g.gossipsub.addPeer(topic, peer)
|
discard g.gossipsub.addPeer(topic, peer)
|
||||||
else:
|
else:
|
||||||
trace "removing subscription for topic", peer = peerId, name = topic
|
trace "peer unsubscribed from topic"
|
||||||
# unsubscribe remote peer from the topic
|
# unsubscribe remote peer from the topic
|
||||||
g.gossipsub.removePeer(topic, peer)
|
g.gossipsub.removePeer(topic, peer)
|
||||||
g.mesh.removePeer(topic, peer)
|
g.mesh.removePeer(topic, peer)
|
||||||
|
@ -310,7 +312,11 @@ proc handleGraft(g: GossipSub,
|
||||||
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
||||||
for graft in grafts:
|
for graft in grafts:
|
||||||
let topic = graft.topicID
|
let topic = graft.topicID
|
||||||
trace "processing graft message", topic, peer = $peer
|
logScope:
|
||||||
|
peer = peer.id
|
||||||
|
topic
|
||||||
|
|
||||||
|
trace "peer grafted topic"
|
||||||
|
|
||||||
# If they send us a graft before they send us a subscribe, what should
|
# If they send us a graft before they send us a subscribe, what should
|
||||||
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||||
|
@ -323,19 +329,21 @@ proc handleGraft(g: GossipSub,
|
||||||
if g.mesh.addPeer(topic, peer):
|
if g.mesh.addPeer(topic, peer):
|
||||||
g.fanout.removePeer(topic, peer)
|
g.fanout.removePeer(topic, peer)
|
||||||
else:
|
else:
|
||||||
trace "Peer already in mesh", topic, peer = $peer
|
trace "peer already in mesh"
|
||||||
else:
|
else:
|
||||||
result.add(ControlPrune(topicID: topic))
|
result.add(ControlPrune(topicID: topic))
|
||||||
else:
|
else:
|
||||||
|
debug "peer grafting topic we're not interested in"
|
||||||
result.add(ControlPrune(topicID: topic))
|
result.add(ControlPrune(topicID: topic))
|
||||||
|
|
||||||
libp2p_gossipsub_peers_per_topic_mesh
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||||
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||||
|
|
||||||
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
for prune in prunes:
|
for prune in prunes:
|
||||||
trace "processing prune message", peer = $peer,
|
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
|
||||||
topicID = prune.topicID
|
|
||||||
|
|
||||||
g.mesh.removePeer(prune.topicID, peer)
|
g.mesh.removePeer(prune.topicID, peer)
|
||||||
libp2p_gossipsub_peers_per_topic_mesh
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
@ -345,9 +353,8 @@ proc handleIHave(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
ihaves: seq[ControlIHave]): ControlIWant =
|
ihaves: seq[ControlIHave]): ControlIWant =
|
||||||
for ihave in ihaves:
|
for ihave in ihaves:
|
||||||
trace "processing ihave message", peer = $peer,
|
trace "peer sent ihave",
|
||||||
topicID = ihave.topicID,
|
peer = peer.id, topic = ihave.topicID, msgs = ihave.messageIDs
|
||||||
msgs = ihave.messageIDs
|
|
||||||
|
|
||||||
if ihave.topicID in g.mesh:
|
if ihave.topicID in g.mesh:
|
||||||
for m in ihave.messageIDs:
|
for m in ihave.messageIDs:
|
||||||
|
@ -359,8 +366,7 @@ proc handleIWant(g: GossipSub,
|
||||||
iwants: seq[ControlIWant]): seq[Message] =
|
iwants: seq[ControlIWant]): seq[Message] =
|
||||||
for iwant in iwants:
|
for iwant in iwants:
|
||||||
for mid in iwant.messageIDs:
|
for mid in iwant.messageIDs:
|
||||||
trace "processing iwant message", peer = $peer,
|
trace "peer sent iwant", peer = peer.id, messageID = mid
|
||||||
messageID = mid
|
|
||||||
let msg = g.mcache.get(mid)
|
let msg = g.mcache.get(mid)
|
||||||
if msg.isSome:
|
if msg.isSome:
|
||||||
result.add(msg.get())
|
result.add(msg.get())
|
||||||
|
@ -462,8 +468,8 @@ method publish*(g: GossipSub,
|
||||||
data: seq[byte]): Future[int] {.async.} =
|
data: seq[byte]): Future[int] {.async.} =
|
||||||
# base returns always 0
|
# base returns always 0
|
||||||
discard await procCall PubSub(g).publish(topic, data)
|
discard await procCall PubSub(g).publish(topic, data)
|
||||||
trace "about to publish message on topic", name = topic,
|
trace "publishing message on topic", topic, data = data.shortLog
|
||||||
data = data.shortLog
|
|
||||||
var peers: HashSet[PubSubPeer]
|
var peers: HashSet[PubSubPeer]
|
||||||
if topic.len <= 0: # data could be 0/empty
|
if topic.len <= 0: # data could be 0/empty
|
||||||
return 0
|
return 0
|
||||||
|
@ -490,9 +496,8 @@ method publish*(g: GossipSub,
|
||||||
msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
|
msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
|
||||||
msgId = g.msgIdProvider(msg)
|
msgId = g.msgIdProvider(msg)
|
||||||
|
|
||||||
trace "created new message", msg
|
trace "created new message", msg, topic, peers = peers.len
|
||||||
|
|
||||||
trace "publishing on topic", topic, peers = peers.len
|
|
||||||
if msgId notin g.mcache:
|
if msgId notin g.mcache:
|
||||||
g.mcache.put(msgId, msg)
|
g.mcache.put(msgId, msg)
|
||||||
|
|
||||||
|
|
|
@ -210,9 +210,9 @@ method unsubscribe*(p: PubSub,
|
||||||
if p.topics[t.topic].handler.len <= 0:
|
if p.topics[t.topic].handler.len <= 0:
|
||||||
p.topics.del(t.topic)
|
p.topics.del(t.topic)
|
||||||
|
|
||||||
method unsubscribe*(p: PubSub,
|
proc unsubscribe*(p: PubSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
handler: TopicHandler): Future[void] {.base.} =
|
handler: TopicHandler): Future[void] =
|
||||||
## unsubscribe from a ``topic`` string
|
## unsubscribe from a ``topic`` string
|
||||||
p.unsubscribe(@[(topic, handler)])
|
p.unsubscribe(@[(topic, handler)])
|
||||||
|
|
||||||
|
|
|
@ -97,28 +97,30 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||||
obs.onSend(p, msg)
|
obs.onSend(p, msg)
|
||||||
|
|
||||||
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
trace "handling pubsub rpc", peer = p.id, closed = conn.closed
|
logScope:
|
||||||
|
peer = p.id
|
||||||
|
debug "starting pubsub read loop for peer", closed = conn.closed
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
p.refs.inc()
|
p.refs.inc()
|
||||||
while not conn.closed:
|
while not conn.closed:
|
||||||
trace "waiting for data", peer = p.id, closed = conn.closed
|
trace "waiting for data", closed = conn.closed
|
||||||
let data = await conn.readLp(64 * 1024)
|
let data = await conn.readLp(64 * 1024)
|
||||||
let digest = $(sha256.digest(data))
|
let digest = $(sha256.digest(data))
|
||||||
trace "read data from peer", peer = p.id, data = data.shortLog
|
trace "read data from peer", data = data.shortLog
|
||||||
if digest in p.recvdRpcCache:
|
if digest in p.recvdRpcCache:
|
||||||
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
|
libp2p_pubsub_skipped_received_messages.inc(labelValues = [p.id])
|
||||||
trace "message already received, skipping", peer = p.id
|
trace "message already received, skipping"
|
||||||
continue
|
continue
|
||||||
|
|
||||||
var rmsg = decodeRpcMsg(data)
|
var rmsg = decodeRpcMsg(data)
|
||||||
if rmsg.isErr():
|
if rmsg.isErr():
|
||||||
notice "failed to decode msg from peer", peer = p.id
|
notice "failed to decode msg from peer"
|
||||||
break
|
break
|
||||||
|
|
||||||
var msg = rmsg.get()
|
var msg = rmsg.get()
|
||||||
|
|
||||||
trace "decoded msg from peer", peer = p.id, msg = msg.shortLog
|
trace "decoded msg from peer", msg = msg.shortLog
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
p.recvObservers(msg)
|
p.recvObservers(msg)
|
||||||
|
|
||||||
|
@ -130,7 +132,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
p.recvdRpcCache.put(digest)
|
p.recvdRpcCache.put(digest)
|
||||||
finally:
|
finally:
|
||||||
trace "exiting pubsub peer read loop", peer = p.id
|
debug "exiting pubsub peer read loop"
|
||||||
await conn.close()
|
await conn.close()
|
||||||
|
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
|
@ -196,12 +198,12 @@ proc sendSubOpts*(p: PubSubPeer, topics: seq[string], subscribe: bool): Future[v
|
||||||
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
|
subscriptions: topics.mapIt(SubOpts(subscribe: subscribe, topic: it))))
|
||||||
|
|
||||||
proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] =
|
proc sendGraft*(p: PubSubPeer, topics: seq[string]): Future[void] =
|
||||||
trace "sending graft msg to peer", peer = p.id, topicIDs = topics
|
trace "sending graft to peer", peer = p.id, topicIDs = topics
|
||||||
p.send(RPCMsg(control: some(
|
p.send(RPCMsg(control: some(
|
||||||
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
|
ControlMessage(graft: topics.mapIt(ControlGraft(topicID: it))))))
|
||||||
|
|
||||||
proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
|
proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
|
||||||
trace "sending prune msg to peer", peer = p.id, topicIDs = topics
|
trace "sending prune to peer", peer = p.id, topicIDs = topics
|
||||||
p.send(RPCMsg(control: some(
|
p.send(RPCMsg(control: some(
|
||||||
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
|
ControlMessage(prune: topics.mapIt(ControlPrune(topicID: it))))))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue