mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-08 07:43:11 +00:00
Fix the Router's Ability to Prune the Mesh Periodically (#589)
When a new peer wants to graft us into their mesh, we check our current mesh size to determine whether we can add any more new peers to it. This is done to prevent our mesh size from being greater than `Dhi` and prevent mesh takeover attacks here:c06df2f9a3/gossipsub.go (L943)During every heartbeat we check our mesh size and if it is **greater** than `Dhi` then we will prune our mesh back down to `D`.c06df2f9a3/gossipsub.go (L1608)However if you look closely at both lines there is a problematic end result. Since we only stop grafting new peers into our mesh if our current mesh size is **greater than or equal to** `Dhi` and we only prune peers if the current mesh size is greater than `Dhi`. This would result in the mesh being in a state of stasis at `Dhi`. Rather than float between `D` and `Dhi` , the mesh stagnates at `Dhi` . This would end up increasing the target degree of the node to `Dhi` from `D`. This had been observed in ethereum mainnet by recording mesh interactions and message fulfillment from those peers. This PR fixes it by adding an equality check to the conditional so that it can be periodically pruned. The PR also adds a regression test for this particular case.
This commit is contained in:
parent
c06df2f9a3
commit
3536508a9d
@ -1605,7 +1605,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
|
||||
// do we have too many peers?
|
||||
if len(peers) > gs.params.Dhi {
|
||||
if len(peers) >= gs.params.Dhi {
|
||||
plst := peerMapToList(peers)
|
||||
|
||||
// sort by score (but shuffle first for the case we don't use the score)
|
||||
|
||||
@ -3176,6 +3176,53 @@ func TestGossipsubIdontwantClear(t *testing.T) {
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func TestGossipsubPruneMeshCorrectly(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hosts := getDefaultHosts(t, 9)
|
||||
|
||||
msgID := func(pmsg *pb.Message) string {
|
||||
// silly content-based test message-ID: just use the data as whole
|
||||
return base64.URLEncoding.EncodeToString(pmsg.Data)
|
||||
}
|
||||
|
||||
params := DefaultGossipSubParams()
|
||||
params.Dhi = 8
|
||||
|
||||
psubs := make([]*PubSub, 9)
|
||||
for i := 0; i < 9; i++ {
|
||||
psubs[i] = getGossipsub(ctx, hosts[i],
|
||||
WithGossipSubParams(params),
|
||||
WithMessageIdFn(msgID))
|
||||
}
|
||||
|
||||
topic := "foobar"
|
||||
for _, ps := range psubs {
|
||||
_, err := ps.Subscribe(topic)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect first peer with the rest of the 8 other
|
||||
// peers.
|
||||
for i := 1; i < 9; i++ {
|
||||
connect(t, hosts[0], hosts[i])
|
||||
}
|
||||
|
||||
// Wait for 2 heartbeats to be able to prune excess peers back down to D.
|
||||
totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval
|
||||
time.Sleep(totalTimeToWait)
|
||||
|
||||
meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic]
|
||||
if !ok {
|
||||
t.Fatal("mesh does not exist for topic")
|
||||
}
|
||||
if len(meshPeers) != params.D {
|
||||
t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAllocDoDropRPC(b *testing.B) {
|
||||
gs := GossipSubRouter{tracer: &pubsubTracer{}}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user