diff --git a/gossipsub_attack_test.go b/gossipsub_attack_test.go index 3a2b4ab..cde6f35 100644 --- a/gossipsub_attack_test.go +++ b/gossipsub_attack_test.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "math/rand" + "strconv" "testing" "time" @@ -10,6 +11,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" pb "github.com/libp2p/go-libp2p-pubsub/pb" ) @@ -82,16 +84,9 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) { for _, sub := range irpc.GetSubscriptions() { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer - subs := []*pb.RPC_SubOpts{ - &pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}, - } - graft := []*pb.ControlGraft{ - &pb.ControlGraft{TopicID: sub.Topicid}, - } - writeMsg(&pb.RPC{ - Subscriptions: subs, - Control: &pb.ControlMessage{Graft: graft}, + Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}}, }) go func() { @@ -172,17 +167,15 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) { for _, sub := range irpc.GetSubscriptions() { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer - var subs []*pb.RPC_SubOpts - var graft []*pb.ControlGraft - subs = append(subs, &pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}) - graft = append(graft, &pb.ControlGraft{TopicID: sub.Topicid}) + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}}, + }) // Graft to the peer on a non-existent topic nonExistentTopic := "non-existent" - graft = append(graft, &pb.ControlGraft{TopicID: &nonExistentTopic}) - writeMsg(&pb.RPC{ - Control: &pb.ControlMessage{Graft: graft}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: &nonExistentTopic}}}, }) go func() { @@ -252,13 +245,9 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { for _, sub := range irpc.GetSubscriptions() { if sub.GetSubscribe() { // Reply by subcribing to the topic and grafting to the peer - var subs []*pb.RPC_SubOpts - var graft []*pb.ControlGraft - subs = append(subs, &pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}) - graft = append(graft, &pb.ControlGraft{TopicID: sub.Topicid}) - + graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}} writeMsg(&pb.RPC{ - Subscriptions: subs, + Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, Control: &pb.ControlMessage{Graft: graft}, }) @@ -325,6 +314,157 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) { <-ctx.Done() } +type gsAttackInvalidMsgTracer struct { + rejectCount int +} + +func (t *gsAttackInvalidMsgTracer) Trace(evt *pb.TraceEvent) { + // fmt.Printf(" %s %s\n", evt.Type, evt) + if evt.GetType() == pb.TraceEvent_REJECT_MESSAGE { + t.rejectCount++ + } +} + +// Test that when Gossipsub receives a lot of invalid messages from +// a peer it should graylist the peer +func TestGossipsubAttackInvalidMessageSpam(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create legitimate and attacker hosts + hosts := getNetHosts(t, ctx, 2) + legit := hosts[0] + attacker := hosts[1] + + mytopic := "mytopic" + + // Create parameters with reasonable default values + params := &PeerScoreParams{ + AppSpecificScore: func(peer.ID) float64 { return 0 }, + IPColocationFactorWeight: 0, + IPColocationFactorThreshold: 1, + DecayInterval: 5 * time.Second, + DecayToZero: 0.01, + RetainScore: 10 * time.Second, + Topics: make(map[string]*TopicScoreParams), + } + params.Topics[mytopic] = &TopicScoreParams{ + TopicWeight: 0.25, + TimeInMeshWeight: 0.0027, + TimeInMeshQuantum: time.Second, + TimeInMeshCap: 3600, + FirstMessageDeliveriesWeight: 0.664, + FirstMessageDeliveriesDecay: 0.9916, + FirstMessageDeliveriesCap: 1500, + MeshMessageDeliveriesWeight: -0.25, + MeshMessageDeliveriesDecay: 0.97, + MeshMessageDeliveriesCap: 400, + MeshMessageDeliveriesThreshold: 100, + MeshMessageDeliveriesActivation: 30 * time.Second, + MeshMessageDeliveriesWindow: 5 * time.Minute, + MeshFailurePenaltyWeight: -0.25, + MeshFailurePenaltyDecay: 0.997, + InvalidMessageDeliveriesWeight: -99, + InvalidMessageDeliveriesDecay: 0.9994, + } + thresholds := &PeerScoreThresholds{ + GossipThreshold: -100, + PublishThreshold: -200, + GraylistThreshold: -300, + AcceptPXThreshold: 0, + } + + // Set up gossipsub on the legit host + tracer := &gsAttackInvalidMsgTracer{} + ps, err := NewGossipSub(ctx, legit, + WithEventTracer(tracer), + WithPeerScore(params, thresholds), + ) + if err != nil { + t.Fatal(err) + } + + attackerScore := func() float64 { + return ps.rt.(*GossipSubRouter).score.Score(attacker.ID()) + } + + // Register a validator for the topic + err = ps.RegisterTopicValidator(mytopic, + func(ctx context.Context, from peer.ID, msg *Message) bool { + return true + }) + if err != nil { + t.Fatal(err) + } + + // Subscribe to mytopic on the legit host + _, err = ps.Subscribe(mytopic) + if err != nil { + t.Fatal(err) + } + + pruneCount := 0 + newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + // When the legit host connects it will send us its subscriptions + for _, sub := range irpc.GetSubscriptions() { + if sub.GetSubscribe() { + // Reply by subcribing to the topic and grafting to the peer + writeMsg(&pb.RPC{ + Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}}, + Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}}, + }) + + go func() { + defer cancel() + + // Attacker score should start at zero + if attackerScore() != 0 { + t.Fatalf("Expected attacker score to be zero but it's %f", attackerScore()) + } + + // Send a bunch of messages with no signature (these will + // fail validation and reduce the attacker's score) + for i := 0; i < 100; i++ { + msg := &pb.Message{ + Data: []byte("some data" + strconv.Itoa(i)), + TopicIDs: []string{mytopic}, + From: []byte(attacker.ID()), + Seqno: []byte{byte(i + 1)}, + } + writeMsg(&pb.RPC{ + Publish: []*pb.Message{msg}, + }) + } + + // Wait for the initial heartbeat, plus a bit of padding + time.Sleep(100*time.Millisecond + GossipSubHeartbeatInitialDelay) + + // The attackers score should now have fallen below zero + if attackerScore() > 0 { + t.Fatalf("Expected attacker score to be less than zero but it's %f", attackerScore()) + } + // There should be several rejected messages (because the signature was invalid) + if tracer.rejectCount == 0 { + t.Fatal("Expected message rejection but got none") + } + // The legit node should have sent a PRUNE message + if pruneCount == 0 { + t.Fatal("Expected attacker node to be PRUNED when score drops low enough") + } + }() + } + } + + if ctl := irpc.GetControl(); ctl != nil { + pruneCount += len(ctl.GetPrune()) + } + }) + + connect(t, hosts[0], hosts[1]) + + <-ctx.Done() +} + func turnOnPubsubDebug() { logging.SetLogLevel("pubsub", "debug") }