From 3536508a9d6914cd46b9154e4c050085d9e2821a Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Fri, 27 Dec 2024 02:02:14 +0800 Subject: [PATCH] Fix the Router's Ability to Prune the Mesh Periodically (#589) When a new peer wants to graft us into their mesh, we check our current mesh size to determine whether we can add any more new peers to it. This is done to prevent our mesh size from being greater than `Dhi` and prevent mesh takeover attacks here: https://github.com/libp2p/go-libp2p-pubsub/blob/c06df2f9a38e9382e644b241adf0e96e5ca00955/gossipsub.go#L943 During every heartbeat we check our mesh size and if it is **greater** than `Dhi` then we will prune our mesh back down to `D`. https://github.com/libp2p/go-libp2p-pubsub/blob/c06df2f9a38e9382e644b241adf0e96e5ca00955/gossipsub.go#L1608 However if you look closely at both lines there is a problematic end result. Since we only stop grafting new peers into our mesh if our current mesh size is **greater than or equal to** `Dhi` and we only prune peers if the current mesh size is greater than `Dhi`. This would result in the mesh being in a state of stasis at `Dhi`. Rather than float between `D` and `Dhi` , the mesh stagnates at `Dhi` . This would end up increasing the target degree of the node to `Dhi` from `D`. This had been observed in ethereum mainnet by recording mesh interactions and message fulfillment from those peers. This PR fixes it by adding an equality check to the conditional so that it can be periodically pruned. The PR also adds a regression test for this particular case. --- gossipsub.go | 2 +- gossipsub_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/gossipsub.go b/gossipsub.go index 222c71a..849f2fa 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1605,7 +1605,7 @@ func (gs *GossipSubRouter) heartbeat() { } // do we have too many peers? - if len(peers) > gs.params.Dhi { + if len(peers) >= gs.params.Dhi { plst := peerMapToList(peers) // sort by score (but shuffle first for the case we don't use the score) diff --git a/gossipsub_test.go b/gossipsub_test.go index d515654..93edeec 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -3176,6 +3176,53 @@ func TestGossipsubIdontwantClear(t *testing.T) { <-ctx.Done() } +func TestGossipsubPruneMeshCorrectly(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + hosts := getDefaultHosts(t, 9) + + msgID := func(pmsg *pb.Message) string { + // silly content-based test message-ID: just use the data as whole + return base64.URLEncoding.EncodeToString(pmsg.Data) + } + + params := DefaultGossipSubParams() + params.Dhi = 8 + + psubs := make([]*PubSub, 9) + for i := 0; i < 9; i++ { + psubs[i] = getGossipsub(ctx, hosts[i], + WithGossipSubParams(params), + WithMessageIdFn(msgID)) + } + + topic := "foobar" + for _, ps := range psubs { + _, err := ps.Subscribe(topic) + if err != nil { + t.Fatal(err) + } + } + + // Connect first peer with the rest of the 8 other + // peers. + for i := 1; i < 9; i++ { + connect(t, hosts[0], hosts[i]) + } + + // Wait for 2 heartbeats to be able to prune excess peers back down to D. + totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval + time.Sleep(totalTimeToWait) + + meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic] + if !ok { + t.Fatal("mesh does not exist for topic") + } + if len(meshPeers) != params.D { + t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers)) + } +} + func BenchmarkAllocDoDropRPC(b *testing.B) { gs := GossipSubRouter{tracer: &pubsubTracer{}}