mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
Add in Backoff Check
This commit is contained in:
parent
e02b3472aa
commit
aeb30a2ac1
16
gossipsub.go
16
gossipsub.go
@ -1036,10 +1036,12 @@ func (gs *GossipSubRouter) Join(topic string) {
|
||||
|
||||
gmap, ok = gs.fanout[topic]
|
||||
if ok {
|
||||
backoff := gs.backoff[topic]
|
||||
// these peers have a score above the publish threshold, which may be negative
|
||||
// so drop the ones with a negative score
|
||||
for p := range gmap {
|
||||
if gs.score.Score(p) < 0 {
|
||||
_, doBackOff := backoff[p]
|
||||
if gs.score.Score(p) < 0 || doBackOff {
|
||||
delete(gmap, p)
|
||||
}
|
||||
}
|
||||
@ -1047,10 +1049,12 @@ func (gs *GossipSubRouter) Join(topic string) {
|
||||
if len(gmap) < gs.params.D {
|
||||
// we need more peers; eager, as this would get fixed in the next heartbeat
|
||||
more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool {
|
||||
// filter our current peers, direct peers, and peers with negative scores
|
||||
// filter our current peers, direct peers, peers we are backing off, and
|
||||
// peers with negative scores
|
||||
_, inMesh := gmap[p]
|
||||
_, direct := gs.direct[p]
|
||||
return !inMesh && !direct && gs.score.Score(p) >= 0
|
||||
_, doBackOff := backoff[p]
|
||||
return !inMesh && !direct && !doBackOff && gs.score.Score(p) >= 0
|
||||
})
|
||||
for _, p := range more {
|
||||
gmap[p] = struct{}{}
|
||||
@ -1060,10 +1064,12 @@ func (gs *GossipSubRouter) Join(topic string) {
|
||||
delete(gs.fanout, topic)
|
||||
delete(gs.lastpub, topic)
|
||||
} else {
|
||||
backoff := gs.backoff[topic]
|
||||
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
|
||||
// filter direct peers and peers with negative score
|
||||
// filter direct peers, peers we are backing off and peers with negative score
|
||||
_, direct := gs.direct[p]
|
||||
return !direct && gs.score.Score(p) >= 0
|
||||
_, doBackOff := backoff[p]
|
||||
return !direct && !doBackOff && gs.score.Score(p) >= 0
|
||||
})
|
||||
gmap = peerListToMap(peers)
|
||||
gs.mesh[topic] = gmap
|
||||
|
||||
@ -1862,6 +1862,51 @@ func TestGossipSubLeaveTopic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGossipSubJoinTopic(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
h := getNetHosts(t, ctx, 3)
|
||||
psubs := []*PubSub{
|
||||
getGossipsub(ctx, h[0]),
|
||||
getGossipsub(ctx, h[1]),
|
||||
getGossipsub(ctx, h[2]),
|
||||
}
|
||||
|
||||
connect(t, h[0], h[1])
|
||||
connect(t, h[0], h[2])
|
||||
|
||||
router0 := psubs[0].rt.(*GossipSubRouter)
|
||||
|
||||
// Add in backoff for peer.
|
||||
peerMap := make(map[peer.ID]time.Time)
|
||||
peerMap[h[1].ID()] = time.Now().Add(router0.params.PruneBackoff)
|
||||
|
||||
router0.backoff["test"] = peerMap
|
||||
|
||||
// Join all peers
|
||||
var subs []*Subscription
|
||||
for _, ps := range psubs {
|
||||
sub, err := ps.Subscribe("test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
meshMap := router0.mesh["test"]
|
||||
if len(meshMap) != 1 {
|
||||
t.Fatalf("Unexpect peer included in the mesh")
|
||||
}
|
||||
|
||||
_, ok := meshMap[h[1].ID()]
|
||||
if ok {
|
||||
t.Fatalf("Peer that was to be backed off is included in the mesh")
|
||||
}
|
||||
}
|
||||
|
||||
type sybilSquatter struct {
|
||||
h host.Host
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user