Gossip improvements (#476)

* add more traces, remove async from rebalance

* more traces

* avoid computng scores when weight is 0.0

* debug colocation, fix an indent in unsubpeer (minor)

* add full ValidationResult coverage

* store in cache only after validation

* gossip 1.0 fixes

* fix typo

* gossip 10 internal test fixes

* test fixing

* refactor peerstats usages

* populate tables if missing when scoring
This commit is contained in:
Giovanni Petrantoni 2020-12-15 10:25:22 +09:00 committed by GitHub
parent 4224f12503
commit f8f0bc1bd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 171 additions and 72 deletions

View File

@ -318,7 +318,7 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) =
discard
proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
g.peerStats.withValue(p, stats) do:
g.peerStats.withValue(p, stats):
var info = stats.topicInfos.getOrDefault(topic)
info.graftTime = Moment.now()
info.meshTime = 0.seconds
@ -335,7 +335,7 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
g.grafted(p, topic)
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
g.peerStats.withValue(p, stats) do:
g.peerStats.withValue(p, stats):
when not defined(release):
g.prunedPeers.incl(p)
@ -400,7 +400,7 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent
procCall FloodSub(p).onPubSubPeerEvent(peer, event)
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
proc rebalanceMesh(g: GossipSub, topic: string) =
logScope:
topic
mesh = g.mesh.peers(topic)
@ -529,6 +529,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "pruning", prunes = prunes.len
for peer in prunes:
trace "pruning peer on rebalance", peer, score = peer.score
g.pruned(peer, topic)
g.mesh.removePeer(topic, peer)
@ -659,6 +660,7 @@ func `/`(a, b: Duration): float64 =
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
if peer.connections.len == 0:
debug "colocationFactor, no connections", peer
0.0
else:
let
@ -666,6 +668,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
ipPeers = g.peersInIP.getOrDefault(address)
len = ipPeers.len.float64
if len > g.parameters.ipColocationFactorThreshold:
debug "colocationFactor over threshold", peer, address, len
let over = len - g.parameters.ipColocationFactorThreshold
over * over
else:
@ -697,42 +700,44 @@ proc updateScores(g: GossipSub) = # avoid async
var info = stats.topicInfos.getOrDefault(topic)
inc n_topics
# Scoring
var topicScore = 0'f64
# if weight is 0.0 avoid wasting time
if topicParams.topicWeight != 0.0:
# Scoring
var topicScore = 0'f64
if info.inMesh:
inc is_grafted
info.meshTime = now - info.graftTime
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
info.meshMessageDeliveriesActive = true
if info.inMesh:
inc is_grafted
info.meshTime = now - info.graftTime
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
info.meshMessageDeliveriesActive = true
var p1 = info.meshTime / topicParams.timeInMeshQuantum
if p1 > topicParams.timeInMeshCap:
p1 = topicParams.timeInMeshCap
trace "p1", peer, p1
topicScore += p1 * topicParams.timeInMeshWeight
else:
info.meshMessageDeliveriesActive = false
var p1 = info.meshTime / topicParams.timeInMeshQuantum
if p1 > topicParams.timeInMeshCap:
p1 = topicParams.timeInMeshCap
trace "p1", peer, p1, topic, topicScore
topicScore += p1 * topicParams.timeInMeshWeight
else:
info.meshMessageDeliveriesActive = false
topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
trace "p2", peer, p2 = info.firstMessageDeliveries
topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
trace "p2", peer, p2 = info.firstMessageDeliveries, topic, topicScore
if info.meshMessageDeliveriesActive:
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
let p3 = deficit * deficit
trace "p3", peer, p3
topicScore += p3 * topicParams.meshMessageDeliveriesWeight
if info.meshMessageDeliveriesActive:
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
let p3 = deficit * deficit
trace "p3", peer, p3, topic, topicScore
topicScore += p3 * topicParams.meshMessageDeliveriesWeight
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
trace "p3b", peer, p3b = info.meshFailurePenalty
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore
trace "updated peer topic's scores", peer, topic, info, topicScore
trace "updated peer topic's scores", peer, topic, info, topicScore
peer.score += topicScore * topicParams.topicWeight
peer.score += topicScore * topicParams.topicWeight
# Score decay
info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
@ -811,6 +816,7 @@ proc heartbeat(g: GossipSub) {.async.} =
var prunes: seq[PubSubPeer]
for peer in meshPeers:
if peer.score < 0.0:
trace "pruning negative score peer", peer, score = peer.score
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
@ -821,7 +827,7 @@ proc heartbeat(g: GossipSub) {.async.} =
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(prunes, prune)
await g.rebalanceMesh(t)
g.rebalanceMesh(t)
libp2p_gossipsub_peers_mesh_sum.set(totalMeshPeers.int64)
libp2p_gossipsub_peers_gossipsub_sum.set(totalGossipPeers.int64)
@ -834,7 +840,7 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers()
for peer, control in peers:
g.peers.withValue(peer.peerId, pubsubPeer) do:
g.peers.withValue(peer.peerId, pubsubPeer):
g.send(
pubsubPeer[],
RPCMsg(control: some(control)))
@ -864,7 +870,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
# remove from peer IPs collection too
if pubSubPeer.connections.len > 0:
g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s) do:
g.peersInIP.withValue(pubSubPeer.connections[0].observedAddr, s):
s[].excl(pubSubPeer)
for t in toSeq(g.gossipsub.keys):
@ -877,6 +883,7 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
.set(g.gossipsub.peers(t).int64, labelValues = [t])
for t in toSeq(g.mesh.keys):
trace "pruning unsubscribing peer", pubSubPeer, score = pubSubPeer.score
g.pruned(pubSubPeer, t)
g.mesh.removePeer(t, pubSubPeer)
@ -891,8 +898,9 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
g.peerStats[pubSubPeer].expire = Moment.now() + g.parameters.retainScore
for topic, info in g.peerStats[pubSubPeer].topicInfos.mpairs:
g.peerStats.withValue(pubSubPeer, stats):
stats[].expire = Moment.now() + g.parameters.retainScore
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0
procCall FloodSub(g).unsubscribePeer(peer)
@ -942,9 +950,18 @@ proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
# ensure we init a new topic if unknown
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
# update stats
var tstats = g.peerStats[peer].topicInfos.getOrDefault(t)
tstats.invalidMessageDeliveries += 1
g.peerStats[peer].topicInfos[t] = tstats
g.peerStats.withValue(peer, stats):
stats[].topicInfos.withValue(t, tstats):
tstats[].invalidMessageDeliveries += 1
do: # if we have no stats populate!
stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
do: # if we have no stats populate!
g.peerStats[peer] =
block:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
stats
proc handleGraft(g: GossipSub,
peer: PubSubPeer,
@ -1007,6 +1024,7 @@ proc handleGraft(g: GossipSub,
else:
trace "peer already in mesh"
else:
trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic)
result.add(ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
@ -1032,6 +1050,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
if backoff > current:
g.backingOff[peer.peerId] = backoff
trace "pruning rpc received peer", peer, score = peer.score
g.pruned(peer, prune.topicID)
g.mesh.removePeer(prune.topicID, peer)
@ -1104,19 +1123,26 @@ method rpcHandler*(g: GossipSub,
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
g.peerStats.withValue(peer, pstats):
pstats[].topicInfos.withValue(t, stats):
if stats[].inMesh:
# TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old.
stats[].meshMessageDeliveries += 1
if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
pstats[].topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
do: # make sure we don't loose this information
g.peerStats[peer] =
block:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
stats
# onto the next message
continue
g.mcache.put(msgId, msg)
if (msg.signature.len > 0 or g.verifySignature) and not msg.verify():
# always validate if signature is present or required
debug "Dropping message due to failed signature verification", msgId, peer
@ -1144,24 +1170,33 @@ method rpcHandler*(g: GossipSub,
of ValidationResult.Accept:
discard
# store in cache only after validation
g.mcache.put(msgId, msg)
var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# contribute to peer score first delivery
var stats = g.peerStats[peer].topicInfos.getOrDefault(t)
stats.firstMessageDeliveries += 1
if stats.firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
stats.firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
g.peerStats.withValue(peer, pstats):
pstats[].topicInfos.withValue(t, stats):
# contribute to peer score first delivery
stats[].firstMessageDeliveries += 1
if stats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
stats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap
# if in mesh add more delivery score
if stats.inMesh:
stats.meshMessageDeliveries += 1
if stats.meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats.meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
# commit back to the table
g.peerStats[peer].topicInfos[t] = stats
# if in mesh add more delivery score
if stats[].inMesh:
stats[].meshMessageDeliveries += 1
if stats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
stats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
pstats[].topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)
do: # make sure we don't loose this information
g.peerStats[peer] =
block:
var stats = PeerStats()
stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)
stats
g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])
@ -1200,7 +1235,7 @@ method subscribe*(g: GossipSub,
if topic in g.fanout:
g.fanout.del(topic)
await g.rebalanceMesh(topic)
g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} =
@ -1214,6 +1249,7 @@ method unsubscribe*(g: GossipSub,
g.mesh.del(topic)
g.topicParams.del(topic)
for peer in peers:
trace "pruning unsubscribe call peer", peer, score = peer.score
g.pruned(peer, topic)
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
@ -1229,6 +1265,7 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
let peers = g.mesh.getOrDefault(topic)
g.mesh.del(topic)
for peer in peers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score
g.pruned(peer, topic)
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(

View File

@ -157,7 +157,7 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent
procCall FloodSub(p).onPubSubPeerEvent(peer, event)
proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
proc rebalanceMesh(g: GossipSub, topic: string) =
logScope:
topic
mesh = g.mesh.peers(topic)
@ -277,7 +277,7 @@ proc heartbeat(g: GossipSub) {.async.} =
trace "running heartbeat"
for t in toSeq(g.topics.keys):
await g.rebalanceMesh(t)
g.rebalanceMesh(t)
g.dropFanoutPeers()
@ -508,7 +508,7 @@ method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) {.async.} =
await procCall PubSub(g).subscribe(topic, handler)
await g.rebalanceMesh(topic)
g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) {.async.} =

View File

@ -55,7 +55,7 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic].incl(peer)
check gossipSub.peers.len == 15
await gossipSub.rebalanceMesh(topic)
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == gossipSub.parameters.d # + 2 # account opportunistic grafts
await allFuturesThrowing(conns.mapIt(it.close()))
@ -82,7 +82,7 @@ suite "GossipSub internal":
gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15
await gossipSub.rebalanceMesh(topic)
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore
await allFuturesThrowing(conns.mapIt(it.close()))

View File

@ -49,7 +49,7 @@ suite "GossipSub internal":
gossipSub.mesh[topic].incl(peer)
check gossipSub.peers.len == 15
await gossipSub.rebalanceMesh(topic)
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))
@ -74,7 +74,7 @@ suite "GossipSub internal":
gossipSub.mesh[topic].incl(peer)
check gossipSub.mesh[topic].len == 15
await gossipSub.rebalanceMesh(topic)
gossipSub.rebalanceMesh(topic)
check gossipSub.mesh[topic].len == GossipSubD
await allFuturesThrowing(conns.mapIt(it.close()))

View File

@ -133,7 +133,7 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail":
asyncTest "GossipSub validation should fail (reject)":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
@ -195,6 +195,68 @@ suite "GossipSub":
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation should fail (ignore)":
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
check false # if we get here, it should fail
let
nodes = generateNodes(2, gossip = true)
# start switches
nodesFut = await allFinished(
nodes[0].switch.start(),
nodes[1].switch.start(),
)
# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
let gossip1 = GossipSub(nodes[0])
let gossip2 = GossipSub(nodes[1])
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
var validatorFut = newFuture[bool]()
proc validator(topic: string,
message: Message):
Future[ValidationResult] {.async.} =
result = ValidationResult.Ignore
validatorFut.complete(true)
nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
check (await validatorFut) == true
await allFuturesThrowing(
nodes[0].switch.stop(),
nodes[1].switch.stop()
)
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)
await allFuturesThrowing(nodesFut.concat())
asyncTest "GossipSub validation one fails and one succeeds":
var handlerFut = newFuture[bool]()
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =