mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 13:53:06 +00:00
add test for opportunistic grafting
This commit is contained in:
parent
ca7b1f3dbf
commit
bac5d5910c
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
@ -11,11 +12,14 @@ import (
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
|
||||
bhost "github.com/libp2p/go-libp2p-blankhost"
|
||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||
|
||||
ggio "github.com/gogo/protobuf/io"
|
||||
)
|
||||
|
||||
func getGossipsub(ctx context.Context, h host.Host, opts ...Option) *PubSub {
|
||||
@ -1269,3 +1273,149 @@ func TestGossipsubPiggybackControl(t *testing.T) {
|
||||
t.Fatal("expected test3 as prune topic ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGossipsubOpportunisticGrafting(t *testing.T) {
|
||||
originalGossipSubOpportunisticGraftTicks := GossipSubOpportunisticGraftTicks
|
||||
GossipSubOpportunisticGraftTicks = 2
|
||||
defer func() {
|
||||
GossipSubOpportunisticGraftTicks = originalGossipSubOpportunisticGraftTicks
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
hosts := getNetHosts(t, ctx, 50)
|
||||
// pubsubs for the first 10 hosts
|
||||
psubs := getGossipsubs(ctx, hosts[:10],
|
||||
WithFloodPublish(true),
|
||||
WithPeerScore(
|
||||
&PeerScoreParams{
|
||||
AppSpecificScore: func(peer.ID) float64 { return 0 },
|
||||
AppSpecificWeight: 0,
|
||||
DecayInterval: time.Second,
|
||||
DecayToZero: 0.01,
|
||||
Topics: map[string]*TopicScoreParams{
|
||||
"test": &TopicScoreParams{
|
||||
TopicWeight: 1,
|
||||
TimeInMeshWeight: 0.0002777,
|
||||
TimeInMeshQuantum: time.Second,
|
||||
TimeInMeshCap: 3600,
|
||||
FirstMessageDeliveriesWeight: 1,
|
||||
FirstMessageDeliveriesDecay: 0.9997,
|
||||
FirstMessageDeliveriesCap: 100,
|
||||
InvalidMessageDeliveriesDecay: 0.99997,
|
||||
},
|
||||
},
|
||||
},
|
||||
&PeerScoreThresholds{
|
||||
GossipThreshold: -10,
|
||||
PublishThreshold: -100,
|
||||
GraylistThreshold: -10000,
|
||||
OpportunisticGraftThreshold: 1,
|
||||
}))
|
||||
|
||||
// connect the real hosts with degree 5
|
||||
connectSome(t, hosts[:10], 5)
|
||||
|
||||
// sybil squatters for the remaining 40 hosts
|
||||
squatters := make([]*sybilSquatter, 0, 40)
|
||||
for _, h := range hosts[10:] {
|
||||
squatter := &sybilSquatter{h: h}
|
||||
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
|
||||
squatters = append(squatters, squatter)
|
||||
}
|
||||
|
||||
// connect all squatters to every real host
|
||||
for _, squatter := range hosts[10:] {
|
||||
for _, real := range hosts[:10] {
|
||||
connect(t, squatter, real)
|
||||
}
|
||||
}
|
||||
|
||||
// wait a bit for the connections to propagate events to the pubsubs
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// ask the real pubsus to join the topic
|
||||
for _, ps := range psubs {
|
||||
sub, err := ps.Subscribe("test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// consume the messages
|
||||
go func(sub *Subscription) {
|
||||
for {
|
||||
_, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
|
||||
// publish a bunch of messages from the real hosts
|
||||
for i := 0; i < 1000; i++ {
|
||||
msg := []byte(fmt.Sprintf("message %d", i))
|
||||
psubs[i%10].Publish("test", msg)
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
|
||||
// now wait a few of oppgraft cycles
|
||||
time.Sleep(7 * time.Second)
|
||||
|
||||
// check the honest peer meshes, they should have at least 3 honest peers each
|
||||
res := make(chan int, 1)
|
||||
for _, ps := range psubs {
|
||||
ps.eval <- func() {
|
||||
gs := ps.rt.(*GossipSubRouter)
|
||||
count := 0
|
||||
for _, h := range hosts[:10] {
|
||||
_, ok := gs.mesh["test"][h.ID()]
|
||||
if ok {
|
||||
count++
|
||||
}
|
||||
}
|
||||
res <- count
|
||||
}
|
||||
|
||||
count := <-res
|
||||
if count < 3 {
|
||||
t.Fatalf("expected at least 3 honest peers, got %d", count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type sybilSquatter struct {
|
||||
h host.Host
|
||||
}
|
||||
|
||||
func (sq *sybilSquatter) handleStream(s network.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
os, err := sq.h.NewStream(context.Background(), s.Conn().RemotePeer(), GossipSubID_v10)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// send a subscription for test in the output stream to become candidate for GRAFT
|
||||
// and then just read and ignore the incoming RPCs
|
||||
r := ggio.NewDelimitedReader(s, 1<<20)
|
||||
w := ggio.NewDelimitedWriter(os)
|
||||
truth := true
|
||||
topic := "test"
|
||||
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var rpc pb.RPC
|
||||
for {
|
||||
rpc.Reset()
|
||||
err = r.ReadMsg(&rpc)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
s.Reset()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user