Gossipsub: Unsubscribe backoff (#488)

* Implement Unsusbcribe backoff

* Add test to check that prune backoff time is used

* Update which backoff to use in TestGossibSubJoinTopic test

* Fix race in TestGossipSubLeaveTopic

* Wait for all the backoff checks, and check that we aren't missing too many

* Remove open question
This commit is contained in:
Marco Munizaga 2022-06-02 20:46:56 -07:00 committed by GitHub
parent 06b5ba4763
commit 68cdae031b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 171 additions and 33 deletions

View File

@ -45,6 +45,7 @@ var (
GossipSubFanoutTTL = 60 * time.Second GossipSubFanoutTTL = 60 * time.Second
GossipSubPrunePeers = 16 GossipSubPrunePeers = 16
GossipSubPruneBackoff = time.Minute GossipSubPruneBackoff = time.Minute
GossipSubUnsubscribeBackoff = 10 * time.Second
GossipSubConnectors = 8 GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128 GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time.Second GossipSubConnectionTimeout = 30 * time.Second
@ -153,6 +154,11 @@ type GossipSubParams struct {
// before attempting to re-graft. // before attempting to re-graft.
PruneBackoff time.Duration PruneBackoff time.Duration
// UnsubscribeBackoff controls the backoff time to use when unsuscribing
// from a topic. A peer should not resubscribe to this topic before this
// duration.
UnsubscribeBackoff time.Duration
// Connectors controls the number of active connection attempts for peers obtained through PX. // Connectors controls the number of active connection attempts for peers obtained through PX.
Connectors int Connectors int
@ -244,6 +250,7 @@ func DefaultGossipSubParams() GossipSubParams {
FanoutTTL: GossipSubFanoutTTL, FanoutTTL: GossipSubFanoutTTL,
PrunePeers: GossipSubPrunePeers, PrunePeers: GossipSubPrunePeers,
PruneBackoff: GossipSubPruneBackoff, PruneBackoff: GossipSubPruneBackoff,
UnsubscribeBackoff: GossipSubUnsubscribeBackoff,
Connectors: GossipSubConnectors, Connectors: GossipSubConnectors,
MaxPendingConnections: GossipSubMaxPendingConnections, MaxPendingConnections: GossipSubMaxPendingConnections,
ConnectionTimeout: GossipSubConnectionTimeout, ConnectionTimeout: GossipSubConnectionTimeout,
@ -777,7 +784,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
gs.score.AddPenalty(p, 1) gs.score.AddPenalty(p, 1)
} }
// refresh the backoff // refresh the backoff
gs.addBackoff(p, topic) gs.addBackoff(p, topic, false)
prune = append(prune, topic) prune = append(prune, topic)
continue continue
} }
@ -791,7 +798,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// but we won't PX to them // but we won't PX to them
doPX = false doPX = false
// add/refresh backoff so that we don't reGRAFT too early even if the score decays back up // add/refresh backoff so that we don't reGRAFT too early even if the score decays back up
gs.addBackoff(p, topic) gs.addBackoff(p, topic, false)
continue continue
} }
@ -800,7 +807,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// mesh takeover attacks combined with love bombing // mesh takeover attacks combined with love bombing
if len(peers) >= gs.params.Dhi && !gs.outbound[p] { if len(peers) >= gs.params.Dhi && !gs.outbound[p] {
prune = append(prune, topic) prune = append(prune, topic)
gs.addBackoff(p, topic) gs.addBackoff(p, topic, false)
continue continue
} }
@ -815,7 +822,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
cprune := make([]*pb.ControlPrune, 0, len(prune)) cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune { for _, topic := range prune {
cprune = append(cprune, gs.makePrune(p, topic, doPX)) cprune = append(cprune, gs.makePrune(p, topic, doPX, false))
} }
return cprune return cprune
@ -839,7 +846,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
if backoff > 0 { if backoff > 0 {
gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second) gs.doAddBackoff(p, topic, time.Duration(backoff)*time.Second)
} else { } else {
gs.addBackoff(p, topic) gs.addBackoff(p, topic, false)
} }
px := prune.GetPeers() px := prune.GetPeers()
@ -855,8 +862,12 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
} }
} }
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string, isUnsubscribe bool) {
gs.doAddBackoff(p, topic, gs.params.PruneBackoff) backoff := gs.params.PruneBackoff
if isUnsubscribe {
backoff = gs.params.UnsubscribeBackoff
}
gs.doAddBackoff(p, topic, backoff)
} }
func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) { func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
@ -1096,11 +1107,11 @@ func (gs *GossipSubRouter) Leave(topic string) {
for p := range gmap { for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic) gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic) gs.sendPrune(p, topic, true)
// Add a backoff to this peer to prevent us from eagerly // Add a backoff to this peer to prevent us from eagerly
// re-grafting this peer into our mesh if we rejoin this // re-grafting this peer into our mesh if we rejoin this
// topic before the backoff period ends. // topic before the backoff period ends.
gs.addBackoff(p, topic) gs.addBackoff(p, topic, true)
} }
} }
@ -1110,8 +1121,8 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
gs.sendRPC(p, out) gs.sendRPC(p, out)
} }
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string, isUnsubscribe bool) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX)} prune := []*pb.ControlPrune{gs.makePrune(p, topic, gs.doPX, isUnsubscribe)}
out := rpcWithControl(nil, nil, nil, nil, prune) out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out) gs.sendRPC(p, out)
} }
@ -1368,7 +1379,7 @@ func (gs *GossipSubRouter) heartbeat() {
prunePeer := func(p peer.ID) { prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic) gs.tracer.Prune(p, topic)
delete(peers, p) delete(peers, p)
gs.addBackoff(p, topic) gs.addBackoff(p, topic, false)
topics := toprune[p] topics := toprune[p]
toprune[p] = append(topics, topic) toprune[p] = append(topics, topic)
} }
@ -1668,7 +1679,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
delete(toprune, p) delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning)) prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning { for _, topic := range pruning {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
} }
} }
@ -1679,7 +1690,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
for p, topics := range toprune { for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics)) prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics { for _, topic := range topics {
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p])) prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p], false))
} }
out := rpcWithControl(nil, nil, nil, nil, prune) out := rpcWithControl(nil, nil, nil, nil, prune)
@ -1834,13 +1845,17 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
} }
} }
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune { func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool, isUnsubscribe bool) *pb.ControlPrune {
if !gs.feature(GossipSubFeaturePX, gs.peers[p]) { if !gs.feature(GossipSubFeaturePX, gs.peers[p]) {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic} return &pb.ControlPrune{TopicID: &topic}
} }
backoff := uint64(gs.params.PruneBackoff / time.Second) backoff := uint64(gs.params.PruneBackoff / time.Second)
if isUnsubscribe {
backoff = uint64(gs.params.UnsubscribeBackoff / time.Second)
}
var px []*pb.PeerInfo var px []*pb.PeerInfo
if doPX { if doPX {
// select peers for Peer eXchange // select peers for Peer eXchange

View File

@ -7,6 +7,7 @@ import (
"io" "io"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -581,6 +582,104 @@ func TestGossipsubPrune(t *testing.T) {
} }
} }
func TestGossipsubPruneBackoffTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
// App specific score that we'll change later.
currentScoreForHost0 := int32(0)
params := DefaultGossipSubParams()
params.HeartbeatInitialDelay = time.Millisecond * 10
params.HeartbeatInterval = time.Millisecond * 100
psubs := getGossipsubs(ctx, hosts, WithGossipSubParams(params), WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 {
if p == hosts[0].ID() {
return float64(atomic.LoadInt32(&currentScoreForHost0))
} else {
return 0
}
},
AppSpecificWeight: 1,
DecayInterval: time.Second,
DecayToZero: 0.01,
},
&PeerScoreThresholds{
GossipThreshold: -1,
PublishThreshold: -1,
GraylistThreshold: -1,
}))
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, subch)
}
connectAll(t, hosts)
// wait for heartbeats to build mesh
time.Sleep(time.Second)
pruneTime := time.Now()
// Flip the score. Host 0 should be pruned from everyone
atomic.StoreInt32(&currentScoreForHost0, -1000)
// wait for heartbeats to run and prune
time.Sleep(time.Second)
wg := sync.WaitGroup{}
var missingBackoffs uint32 = 0
for i := 1; i < 10; i++ {
wg.Add(1)
// Copy i so this func keeps the correct value in the closure.
var idx = i
// Run this check in the eval thunk so that we don't step over the heartbeat goroutine and trigger a race.
psubs[idx].rt.(*GossipSubRouter).p.eval <- func() {
defer wg.Done()
backoff, ok := psubs[idx].rt.(*GossipSubRouter).backoff["foobar"][hosts[0].ID()]
if !ok {
atomic.AddUint32(&missingBackoffs, 1)
}
if ok && backoff.Sub(pruneTime)-params.PruneBackoff > time.Second {
t.Errorf("backoff time should be equal to prune backoff (with some slack) was %v", backoff.Sub(pruneTime)-params.PruneBackoff)
}
}
}
wg.Wait()
// Sometimes not all the peers will have updated their backoffs by this point. If the majority haven't we'll fail this test.
if missingBackoffs >= 5 {
t.Errorf("missing too many backoffs: %v", missingBackoffs)
}
for i := 0; i < 10; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
// Don't publish from host 0, since everyone should have pruned it.
owner := rand.Intn(len(psubs)-1) + 1
psubs[owner].Publish("foobar", msg)
for _, sub := range msgs[1:] {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}
func TestGossipsubGraft(t *testing.T) { func TestGossipsubGraft(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -1839,27 +1938,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
psubs[0].rt.Leave("test") leaveTime := time.Now()
time.Sleep(time.Second) done := make(chan struct{})
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}
psubs[0].rt.(*GossipSubRouter).p.eval <- func() {
defer close(done)
psubs[0].rt.Leave("test")
time.Sleep(time.Second)
peerMap := psubs[0].rt.(*GossipSubRouter).backoff["test"]
if len(peerMap) != 1 {
t.Fatalf("No peer is populated in the backoff map for peer 0")
}
_, ok := peerMap[h[1].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}
backoffTime := peerMap[h[1].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
}
<-done
done = make(chan struct{})
// Ensure that remote peer 1 also applies the backoff appropriately // Ensure that remote peer 1 also applies the backoff appropriately
// for peer 0. // for peer 0.
peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"] psubs[1].rt.(*GossipSubRouter).p.eval <- func() {
if len(peerMap2) != 1 { defer close(done)
t.Fatalf("No peer is populated in the backoff map for peer 1") peerMap2 := psubs[1].rt.(*GossipSubRouter).backoff["test"]
} if len(peerMap2) != 1 {
_, ok = peerMap2[h[0].ID()] t.Fatalf("No peer is populated in the backoff map for peer 1")
if !ok { }
t.Errorf("Expected peer does not exist in the backoff map") _, ok := peerMap2[h[0].ID()]
if !ok {
t.Errorf("Expected peer does not exist in the backoff map")
}
backoffTime := peerMap2[h[0].ID()].Sub(leaveTime)
// Check that the backoff time is roughly the unsubscribebackoff time (with a slack of 1s)
if backoffTime-GossipSubUnsubscribeBackoff > time.Second {
t.Error("Backoff time should be set to GossipSubUnsubscribeBackoff.")
}
} }
<-done
} }
func TestGossipSubJoinTopic(t *testing.T) { func TestGossipSubJoinTopic(t *testing.T) {
@ -1880,7 +2003,7 @@ func TestGossipSubJoinTopic(t *testing.T) {
// Add in backoff for peer. // Add in backoff for peer.
peerMap := make(map[peer.ID]time.Time) peerMap := make(map[peer.ID]time.Time)
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff) peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff)
router0.backoff["test"] = peerMap router0.backoff["test"] = peerMap