go-libp2p-pubsub/gossipsub_spam_test.go

999 lines
28 KiB
Go
Raw Permalink Normal View History

package pubsub
import (
"context"
2024-08-05 19:43:28 +02:00
"crypto/rand"
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
"encoding/base64"
"fmt"
"strconv"
2020-04-20 13:03:17 +03:00
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
2024-08-05 19:51:38 +02:00
//lint:ignore SA1019 "github.com/libp2p/go-msgio/protoio" is deprecated
"github.com/libp2p/go-msgio/protoio"
)
// Test that when Gossipsub receives too many IWANT messages from a peer
// for the same message ID, it cuts off the peer
func TestGossipsubAttackSpamIWANT(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create legitimate and attacker hosts
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit)
if err != nil {
t.Fatal(err)
}
// Subscribe to mytopic on the legit host
mytopic := "mytopic"
_, err = ps.Subscribe(mytopic)
if err != nil {
t.Fatal(err)
}
// Used to publish a message with random data
publishMsg := func() {
data := make([]byte, 16)
rand.Read(data)
if err = ps.Publish(mytopic, data); err != nil {
t.Fatal(err)
}
}
// Wait a bit after the last message before checking we got the
// right number of messages
msgWaitMax := time.Second
msgCount := 0
msgTimer := time.NewTimer(msgWaitMax)
// Checks we received the right number of messages
checkMsgCount := func() {
// After the original message from the legit host, we keep sending
// IWANT until it stops replying. So the number of messages is
// <original message> + GossipSubGossipRetransmission
exp := 1 + GossipSubGossipRetransmission
if msgCount != exp {
t.Fatalf("Expected %d messages, got %d", exp, msgCount)
}
}
// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgCount()
cancel()
return
case <-ctx.Done():
checkMsgCount()
}
}()
newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})
go func() {
// Wait for a short interval to make sure the legit host
// received and processed the subscribe + graft
time.Sleep(100 * time.Millisecond)
// Publish a message from the legit host
publishMsg()
}()
}
}
// Each time the legit host sends a message
for _, msg := range irpc.GetPublish() {
// Increment the number of messages and reset the timer
msgCount++
msgTimer.Reset(msgWaitMax)
// Shouldn't get more than the expected number of messages
exp := 1 + GossipSubGossipRetransmission
if msgCount > exp {
cancel()
t.Fatal("Received too many responses")
}
// Send an IWANT with the message ID, causing the legit host
// to send another message (until it cuts off the attacker for
// being spammy)
iwantlst := []string{DefaultMsgIdFn(msg)}
iwant := []*pb.ControlIWant{{MessageIDs: iwantlst}}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
orpc := rpcWithControl(nil, nil, iwant, nil, nil, nil)
writeMsg(&orpc.RPC)
}
})
connect(t, hosts[0], hosts[1])
<-ctx.Done()
}
2020-04-15 11:21:23 -04:00
// Test that Gossipsub only responds to IHAVE with IWANT once per heartbeat
func TestGossipsubAttackSpamIHAVE(t *testing.T) {
originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime
GossipSubIWantFollowupTime = 10 * time.Second
defer func() {
GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime
}()
2020-04-15 11:21:23 -04:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create legitimate and attacker hosts
hosts := getDefaultHosts(t, 2)
2020-04-15 11:21:23 -04:00
legit := hosts[0]
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit,
WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
BehaviourPenaltyWeight: -1,
BehaviourPenaltyDecay: ScoreParameterDecay(time.Minute),
DecayInterval: DefaultDecayInterval,
DecayToZero: DefaultDecayToZero,
},
&PeerScoreThresholds{
GossipThreshold: -100,
PublishThreshold: -500,
GraylistThreshold: -1000,
}))
2020-04-15 11:21:23 -04:00
if err != nil {
t.Fatal(err)
}
// Subscribe to mytopic on the legit host
mytopic := "mytopic"
_, err = ps.Subscribe(mytopic)
if err != nil {
t.Fatal(err)
}
iWantCount := 0
2020-04-20 13:03:17 +03:00
iWantCountMx := sync.Mutex{}
getIWantCount := func() int {
iWantCountMx.Lock()
defer iWantCountMx.Unlock()
return iWantCount
}
addIWantCount := func(i int) {
iWantCountMx.Lock()
defer iWantCountMx.Unlock()
iWantCount += i
}
2020-04-15 11:21:23 -04:00
newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
2020-04-15 11:21:23 -04:00
})
sub := sub
2020-04-15 11:21:23 -04:00
go func() {
defer cancel()
// Wait for a short interval to make sure the legit host
// received and processed the subscribe + graft
time.Sleep(20 * time.Millisecond)
// Send a bunch of IHAVEs
2020-04-20 13:03:17 +03:00
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
2020-04-15 11:21:23 -04:00
ihavelst := []string{"someid" + strconv.Itoa(i)}
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil)
2020-04-15 11:21:23 -04:00
writeMsg(&orpc.RPC)
}
select {
case <-ctx.Done():
return
case <-time.After(GossipSubHeartbeatInterval):
}
2020-04-15 11:21:23 -04:00
// Should have hit the maximum number of IWANTs per peer
// per heartbeat
2020-04-20 13:03:17 +03:00
iwc := getIWantCount()
if iwc > GossipSubMaxIHaveLength {
t.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-15 11:21:23 -04:00
}
2020-04-20 13:03:17 +03:00
firstBatchCount := iwc
2020-04-15 11:21:23 -04:00
// the score should still be 0 because we haven't broken any promises yet
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score != 0 {
t.Errorf("Expected 0 score, but got %f", score)
return // cannot call t.Fatalf in a non-test goroutine
}
2020-04-15 11:21:23 -04:00
// Send a bunch of IHAVEs
2020-04-20 13:03:17 +03:00
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
2020-04-15 11:21:23 -04:00
ihavelst := []string{"someid" + strconv.Itoa(i+100)}
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
orpc := rpcWithControl(nil, ihave, nil, nil, nil, nil)
2020-04-15 11:21:23 -04:00
writeMsg(&orpc.RPC)
}
select {
case <-ctx.Done():
return
case <-time.After(GossipSubHeartbeatInterval):
}
2020-04-15 11:21:23 -04:00
// Should have sent more IWANTs after the heartbeat
2020-04-20 13:03:17 +03:00
iwc = getIWantCount()
2020-04-20 15:02:58 +03:00
if iwc == firstBatchCount {
t.Error("Expecting to receive more IWANTs after heartbeat but did not")
return // cannot call t.Fatalf in a non-test goroutine
2020-04-15 11:21:23 -04:00
}
// Should not be more than the maximum per heartbeat
2020-04-20 15:02:58 +03:00
if iwc-firstBatchCount > GossipSubMaxIHaveLength {
t.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-15 11:21:23 -04:00
}
select {
case <-ctx.Done():
return
case <-time.After(GossipSubIWantFollowupTime):
}
// The score should now be negative because of broken promises
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score >= 0 {
t.Errorf("Expected negative score, but got %f", score)
return // cannot call t.Fatalf in a non-test goroutine
}
2020-04-15 11:21:23 -04:00
}()
}
}
// Record the count of received IWANT messages
if ctl := irpc.GetControl(); ctl != nil {
2020-04-20 13:03:17 +03:00
addIWantCount(len(ctl.GetIwant()))
2020-04-15 11:21:23 -04:00
}
})
connect(t, hosts[0], hosts[1])
<-ctx.Done()
}
// Test that when Gossipsub receives GRAFT for an unknown topic, it ignores
// the request
func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create legitimate and attacker hosts
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit)
if err != nil {
t.Fatal(err)
}
// Subscribe to mytopic on the legit host
mytopic := "mytopic"
_, err = ps.Subscribe(mytopic)
if err != nil {
t.Fatal(err)
}
// Checks that we haven't received any PRUNE message
pruneCount := 0
checkForPrune := func() {
// We send a GRAFT for a non-existent topic so we shouldn't
// receive a PRUNE in response
if pruneCount != 0 {
t.Fatalf("Got %d unexpected PRUNE messages", pruneCount)
}
}
newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})
// Graft to the peer on a non-existent topic
nonExistentTopic := "non-existent"
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: &nonExistentTopic}}},
})
go func() {
// Wait for a short interval to make sure the legit host
// received and processed the subscribe + graft
time.Sleep(100 * time.Millisecond)
// We shouldn't get any prune messages becaue the topic
// doesn't exist
checkForPrune()
cancel()
}()
}
}
// Record the count of received PRUNE messages
if ctl := irpc.GetControl(); ctl != nil {
pruneCount += len(ctl.GetPrune())
}
})
connect(t, hosts[0], hosts[1])
<-ctx.Done()
}
// Test that when Gossipsub receives GRAFT for a peer that has been PRUNED,
// it penalizes through P7 and eventually graylists and ignores the requests if the
// GRAFTs are coming too fast
func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
originalGossipSubPruneBackoff := GossipSubPruneBackoff
2020-04-20 12:46:20 +03:00
GossipSubPruneBackoff = 200 * time.Millisecond
originalGossipSubGraftFloodThreshold := GossipSubGraftFloodThreshold
GossipSubGraftFloodThreshold = 100 * time.Millisecond
defer func() {
GossipSubPruneBackoff = originalGossipSubPruneBackoff
2020-04-20 12:46:20 +03:00
GossipSubGraftFloodThreshold = originalGossipSubGraftFloodThreshold
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create legitimate and attacker hosts
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
// Set up gossipsub on the legit host
ps, err := NewGossipSub(ctx, legit,
WithPeerScore(
&PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
BehaviourPenaltyWeight: -100,
BehaviourPenaltyDecay: ScoreParameterDecay(time.Minute),
DecayInterval: DefaultDecayInterval,
DecayToZero: DefaultDecayToZero,
},
&PeerScoreThresholds{
GossipThreshold: -100,
PublishThreshold: -500,
GraylistThreshold: -1000,
}))
if err != nil {
t.Fatal(err)
}
// Subscribe to mytopic on the legit host
mytopic := "mytopic"
_, err = ps.Subscribe(mytopic)
if err != nil {
t.Fatal(err)
}
pruneCount := 0
2020-04-20 13:03:17 +03:00
pruneCountMx := sync.Mutex{}
getPruneCount := func() int {
pruneCountMx.Lock()
defer pruneCountMx.Unlock()
return pruneCount
}
addPruneCount := func(i int) {
pruneCountMx.Lock()
defer pruneCountMx.Unlock()
pruneCount += i
}
newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
graft := []*pb.ControlGraft{{TopicID: sub.Topicid}}
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: graft},
})
sub := sub
go func() {
2020-04-20 12:46:20 +03:00
defer cancel()
// Wait for a short interval to make sure the legit host
// received and processed the subscribe + graft
time.Sleep(20 * time.Millisecond)
// No PRUNE should have been sent at this stage
2020-04-20 13:03:17 +03:00
pc := getPruneCount()
if pc != 0 {
t.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-20 12:46:20 +03:00
}
// Send a PRUNE to remove the attacker node from the legit
// host's mesh
var prune []*pb.ControlPrune
prune = append(prune, &pb.ControlPrune{TopicID: sub.Topicid})
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Prune: prune},
})
select {
case <-ctx.Done():
return
case <-time.After(20 * time.Millisecond):
}
// No PRUNE should have been sent at this stage
2020-04-20 13:03:17 +03:00
pc = getPruneCount()
if pc != 0 {
t.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-20 12:46:20 +03:00
}
// wait for the GossipSubGraftFloodThreshold to pass before attempting another graft
time.Sleep(GossipSubGraftFloodThreshold + time.Millisecond)
// Send a GRAFT to attempt to rejoin the mesh
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: graft},
})
select {
case <-ctx.Done():
return
case <-time.After(20 * time.Millisecond):
}
// We should have been peanalized by the peer for sending before the backoff has expired
// but should still receive a PRUNE because we haven't dropped below GraylistThreshold
// yet.
2020-04-20 13:03:17 +03:00
pc = getPruneCount()
if pc != 1 {
t.Errorf("Expected %d PRUNE messages but got %d", 1, pc)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-20 12:46:20 +03:00
}
score1 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score1 >= 0 {
t.Errorf("Expected negative score, but got %f", score1)
return // cannot call t.Fatalf in a non-test goroutine
}
// Send a GRAFT again to attempt to rejoin the mesh
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: graft},
})
select {
case <-ctx.Done():
return
case <-time.After(20 * time.Millisecond):
}
// we are before the flood threshold so we should be penalized twice, but still get
// a PRUNE because we are before the flood threshold
2020-04-20 13:03:17 +03:00
pc = getPruneCount()
if pc != 2 {
t.Errorf("Expected %d PRUNE messages but got %d", 2, pc)
return // cannot call t.Fatalf in a non-test goroutine
}
score2 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score2 >= score1 {
t.Errorf("Expected score below %f, but got %f", score1, score2)
return // cannot call t.Fatalf in a non-test goroutine
}
// Send another GRAFT; this should get us a PRUNE, but penalize us below the graylist threshold
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: graft},
})
select {
case <-ctx.Done():
return
case <-time.After(20 * time.Millisecond):
}
pc = getPruneCount()
if pc != 3 {
t.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
return // cannot call t.Fatalf in a non-test goroutine
}
score3 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
if score3 >= score2 {
t.Errorf("Expected score below %f, but got %f", score2, score3)
return // cannot call t.Fatalf in a non-test goroutine
}
if score3 >= -1000 {
t.Errorf("Expected score below %f, but got %f", -1000.0, score3)
return // cannot call t.Fatalf in a non-test goroutine
}
// Wait for the PRUNE backoff to expire and try again; this time we should fail
// because we are below the graylist threshold, so our RPC should be ignored and
// we should get no PRUNE back
select {
case <-ctx.Done():
return
case <-time.After(GossipSubPruneBackoff + time.Millisecond):
}
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Graft: graft},
})
select {
case <-ctx.Done():
return
case <-time.After(20 * time.Millisecond):
}
pc = getPruneCount()
if pc != 3 {
t.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
return // cannot call t.Fatalf in a non-test goroutine
2020-04-20 12:46:20 +03:00
}
2020-04-20 16:03:44 +03:00
// make sure we are _not_ in the mesh
2020-04-20 16:03:44 +03:00
res := make(chan bool)
ps.eval <- func() {
mesh := ps.rt.(*GossipSubRouter).mesh[mytopic]
_, inMesh := mesh[attacker.ID()]
res <- inMesh
}
inMesh := <-res
if inMesh {
t.Error("Expected to not be in the mesh of the legitimate host")
return // cannot call t.Fatal in a non-test goroutine
2020-04-20 16:03:44 +03:00
}
}()
}
}
if ctl := irpc.GetControl(); ctl != nil {
2020-04-20 13:03:17 +03:00
addPruneCount(len(ctl.GetPrune()))
}
})
connect(t, hosts[0], hosts[1])
<-ctx.Done()
}
type gsAttackInvalidMsgTracer struct {
rejectCount int
}
func (t *gsAttackInvalidMsgTracer) Trace(evt *pb.TraceEvent) {
// fmt.Printf(" %s %s\n", evt.Type, evt)
if evt.GetType() == pb.TraceEvent_REJECT_MESSAGE {
t.rejectCount++
}
}
// Test that when Gossipsub receives a lot of invalid messages from
// a peer it should graylist the peer
func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create legitimate and attacker hosts
hosts := getDefaultHosts(t, 2)
legit := hosts[0]
attacker := hosts[1]
mytopic := "mytopic"
// Create parameters with reasonable default values
params := &PeerScoreParams{
AppSpecificScore: func(peer.ID) float64 { return 0 },
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 1,
DecayInterval: 5 * time.Second,
DecayToZero: 0.01,
RetainScore: 10 * time.Second,
Topics: make(map[string]*TopicScoreParams),
}
params.Topics[mytopic] = &TopicScoreParams{
TopicWeight: 0.25,
TimeInMeshWeight: 0.0027,
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 3600,
FirstMessageDeliveriesWeight: 0.664,
FirstMessageDeliveriesDecay: 0.9916,
FirstMessageDeliveriesCap: 1500,
MeshMessageDeliveriesWeight: -0.25,
MeshMessageDeliveriesDecay: 0.97,
MeshMessageDeliveriesCap: 400,
MeshMessageDeliveriesThreshold: 100,
MeshMessageDeliveriesActivation: 30 * time.Second,
MeshMessageDeliveriesWindow: 5 * time.Minute,
MeshFailurePenaltyWeight: -0.25,
MeshFailurePenaltyDecay: 0.997,
InvalidMessageDeliveriesWeight: -99,
InvalidMessageDeliveriesDecay: 0.9994,
}
thresholds := &PeerScoreThresholds{
GossipThreshold: -100,
PublishThreshold: -200,
GraylistThreshold: -300,
AcceptPXThreshold: 0,
}
// Set up gossipsub on the legit host
tracer := &gsAttackInvalidMsgTracer{}
ps, err := NewGossipSub(ctx, legit,
WithEventTracer(tracer),
WithPeerScore(params, thresholds),
)
if err != nil {
t.Fatal(err)
}
attackerScore := func() float64 {
return ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
}
// Subscribe to mytopic on the legit host
_, err = ps.Subscribe(mytopic)
if err != nil {
t.Fatal(err)
}
pruneCount := 0
2020-04-20 13:03:17 +03:00
pruneCountMx := sync.Mutex{}
getPruneCount := func() int {
pruneCountMx.Lock()
defer pruneCountMx.Unlock()
return pruneCount
}
addPruneCount := func(i int) {
pruneCountMx.Lock()
defer pruneCountMx.Unlock()
pruneCount += i
}
newMockGS(ctx, t, attacker, func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
// When the legit host connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})
go func() {
defer cancel()
// Attacker score should start at zero
if attackerScore() != 0 {
t.Errorf("Expected attacker score to be zero but it's %f", attackerScore())
return // cannot call t.Fatalf in a non-test goroutine
}
// Send a bunch of messages with no signature (these will
// fail validation and reduce the attacker's score)
for i := 0; i < 100; i++ {
msg := &pb.Message{
2020-09-29 18:05:54 +03:00
Data: []byte("some data" + strconv.Itoa(i)),
Topic: &mytopic,
From: []byte(attacker.ID()),
Seqno: []byte{byte(i + 1)},
}
writeMsg(&pb.RPC{
Publish: []*pb.Message{msg},
})
}
// Wait for the initial heartbeat, plus a bit of padding
select {
case <-ctx.Done():
return
case <-time.After(100*time.Millisecond + GossipSubHeartbeatInitialDelay):
}
// The attackers score should now have fallen below zero
2020-05-08 20:07:05 +03:00
if attackerScore() >= 0 {
t.Errorf("Expected attacker score to be less than zero but it's %f", attackerScore())
return // cannot call t.Fatalf in a non-test goroutine
}
// There should be several rejected messages (because the signature was invalid)
if tracer.rejectCount == 0 {
t.Error("Expected message rejection but got none")
return // cannot call t.Fatal in a non-test goroutine
}
// The legit node should have sent a PRUNE message
2020-04-20 13:03:17 +03:00
pc := getPruneCount()
if pc == 0 {
t.Error("Expected attacker node to be PRUNED when score drops low enough")
return // cannot call t.Fatal in a non-test goroutine
}
}()
}
}
if ctl := irpc.GetControl(); ctl != nil {
2020-04-20 13:03:17 +03:00
addPruneCount(len(ctl.GetPrune()))
}
})
connect(t, hosts[0], hosts[1])
<-ctx.Done()
}
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
// Test that when Gossipsub receives too many IDONTWANT messages from a peer
func TestGossipsubAttackSpamIDONTWANT(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 3)
msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}
psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))
topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
// Wait a bit after the last message before checking the result
msgWaitMax := time.Second + GossipSubHeartbeatInterval
msgTimer := time.NewTimer(msgWaitMax)
// Checks we received some messages
var expMid string
var actMids []string
var mu sync.Mutex
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
checkMsgs := func() {
mu.Lock()
defer mu.Unlock()
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
if len(actMids) == 0 {
t.Fatalf("Expected some messages when the maximum number of IDONTWANTs is reached")
}
if actMids[0] != expMid {
t.Fatalf("The expected message is incorrect")
}
if len(actMids) > 1 {
t.Fatalf("The spam prevention should be reset after the heartbeat")
}
}
// Wait for the timer to expire
go func() {
select {
case <-msgTimer.C:
checkMsgs()
cancel()
return
case <-ctx.Done():
checkMsgs()
}
}()
newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) {
mu.Lock()
defer mu.Unlock()
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
// Each time the host receives a message
for _, msg := range irpc.GetPublish() {
actMids = append(actMids, msgID(msg))
}
// When the middle peer connects it will send us its subscriptions
for _, sub := range irpc.GetSubscriptions() {
if sub.GetSubscribe() {
// Reply by subcribing to the topic and grafting to the middle peer
writeMsg(&pb.RPC{
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
})
go func() {
// Wait for a short interval to make sure the middle peer
// received and processed the subscribe + graft
time.Sleep(100 * time.Millisecond)
// Generate a message and send IDONTWANT to the middle peer
data := make([]byte, 16)
var mid string
for i := 0; i < 1+GossipSubMaxIDontWantMessages; i++ {
rand.Read(data)
mid = msgID(&pb.Message{Data: data})
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: []string{mid}}}},
})
}
// The host should receives this message id because the maximum was reached
expMid = mid
// Wait for a short interval to make sure the middle peer
// received and processed the IDONTWANTs
time.Sleep(100 * time.Millisecond)
// Publish the message from the first peer
if err := psubs[0].Publish(topic, data); err != nil {
t.Error(err)
return // cannot call t.Fatal in a non-test goroutine
}
// Wait for the next heartbeat so that the prevention will be reset
select {
case <-ctx.Done():
return
case <-time.After(GossipSubHeartbeatInterval):
}
// Test IDONTWANT again to see that it now works again
rand.Read(data)
mid = msgID(&pb.Message{Data: data})
writeMsg(&pb.RPC{
Control: &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: []string{mid}}}},
})
time.Sleep(100 * time.Millisecond)
if err := psubs[0].Publish(topic, data); err != nil {
t.Error(err)
return // cannot call t.Fatal in a non-test goroutine
}
}()
}
}
})
connect(t, hosts[0], hosts[1])
connect(t, hosts[1], hosts[2])
<-ctx.Done()
}
func TestGossipsubHandleIDontwantSpam(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 2)
msgID := func(pmsg *pb.Message) string {
// silly content-based test message-ID: just use the data as whole
return base64.URLEncoding.EncodeToString(pmsg.Data)
}
psubs := make([]*PubSub, 2)
psubs[0] = getGossipsub(ctx, hosts[0], WithMessageIdFn(msgID))
psubs[1] = getGossipsub(ctx, hosts[1], WithMessageIdFn(msgID))
connect(t, hosts[0], hosts[1])
topic := "foobar"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
exceededIDWLength := GossipSubMaxIDontWantLength + 1
var idwIds []string
for i := 0; i < exceededIDWLength; i++ {
idwIds = append(idwIds, fmt.Sprintf("idontwant-%d", i))
}
rPid := hosts[1].ID()
ctrlMessage := &pb.ControlMessage{Idontwant: []*pb.ControlIDontWant{{MessageIDs: idwIds}}}
grt := psubs[0].rt.(*GossipSubRouter)
grt.handleIDontWant(rPid, ctrlMessage)
if grt.peerdontwant[rPid] != 1 {
t.Errorf("Wanted message count of %d but received %d", 1, grt.peerdontwant[rPid])
}
mid := fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength-1)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; !ok {
t.Errorf("Desired message id was not stored in the unwanted map: %s", mid)
}
mid = fmt.Sprintf("idontwant-%d", GossipSubMaxIDontWantLength)
if _, ok := grt.unwanted[rPid][computeChecksum(mid)]; ok {
t.Errorf("Unwanted message id was stored in the unwanted map: %s", mid)
}
}
type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)
func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {
GossipSub v1.2: IDONTWANT control message and priority queue. (#553) ## GossipSub v1.2 implementation Specification: libp2p/specs#548 ### Work Summary Sending IDONTWANT Implement a smart queue Add priorities to the smart queue Put IDONTWANT packets into the smart priority queue as soon as the node gets the packets Handling IDONTWANT Use a map to remember the message ids whose IDONTWANT packets have been received Implement max_idontwant_messages (ignore the IDONWANT packets if the max is reached) Clear the message IDs from the cache after 3 heartbeats Hash the message IDs before putting them into the cache. More requested features Add a feature test to not send IDONTWANT if the other side doesnt support it ### Commit Summary * Replace sending channel with the smart rpcQueue Since we want to implement a priority queue later, we need to replace the normal sending channels with the new smart structures first. * Implement UrgentPush in the smart rpcQueue UrgentPush allows you to push an rpc packet to the front of the queue so that it will be popped out fast. * Add IDONTWANT to rpc.proto and trace.proto * Send IDONTWANT right before validation step Most importantly, this commit adds a new method called PreValidation to the interface PubSubRouter, which will be called right before validating the gossipsub message. In GossipSubRouter, PreValidation will send the IDONTWANT controll messages to all the mesh peers of the topics of the received messages. * Test GossipSub IDONWANT sending * Send IDONWANT only for large messages * Handle IDONTWANT control messages When receiving IDONTWANTs, the host should remember the message ids contained in IDONTWANTs using a hash map. When receiving messages with those ids, it shouldn't forward them to the peers who already sent the IDONTWANTs. When the maximum number of IDONTWANTs is reached for any particular peer, the host should ignore any excessive IDONTWANTs from that peer. * Clear expired message IDs from the IDONTWANT cache If the messages IDs received from IDONTWANTs are older than 3 heartbeats, they should be removed from the IDONTWANT cache. * Keep the hashes of IDONTWANT message ids instead Rather than keeping the raw message ids, keep their hashes instead to save memory and protect again memory DoS attacks. * Increase GossipSubMaxIHaveMessages to 1000 * fixup! Clear expired message IDs from the IDONTWANT cache * Not send IDONTWANT if the receiver doesn't support * fixup! Replace sending channel with the smart rpcQueue * Not use pointers in rpcQueue * Simply rcpQueue by using only one mutex * Check ctx error in rpc sending worker Co-authored-by: Steven Allen <steven@stebalien.com> * fixup! Simply rcpQueue by using only one mutex * fixup! Keep the hashes of IDONTWANT message ids instead * Use AfterFunc instead implementing our own * Fix misc lint errors * fixup! Fix misc lint errors * Revert "Increase GossipSubMaxIHaveMessages to 1000" This reverts commit 6fabcdd068a5f5238c5280a3460af9c3998418ec. * Increase GossipSubMaxIDontWantMessages to 1000 * fixup! Handle IDONTWANT control messages * Skip TestGossipsubConnTagMessageDeliveries * Skip FuzzAppendOrMergeRPC * Revert "Skip FuzzAppendOrMergeRPC" This reverts commit f141e13234de0960d139339acb636a1afea9e219. * fixup! Send IDONWANT only for large messages * fixup! fixup! Keep the hashes of IDONTWANT message ids instead * fixup! Implement UrgentPush in the smart rpcQueue * fixup! Use AfterFunc instead implementing our own --------- Co-authored-by: Steven Allen <steven@stebalien.com>
2024-08-16 22:16:35 +07:00
newMockGSWithVersion(ctx, t, attacker, protocol.ID("/meshsub/1.2.0"), onReadMsg)
}
func newMockGSWithVersion(ctx context.Context, t *testing.T, attacker host.Host, gossipSubID protocol.ID, onReadMsg mockGSOnRead) {
// Listen on the gossipsub protocol
const maxMessageSize = 1024 * 1024
attacker.SetStreamHandler(gossipSubID, func(stream network.Stream) {
// When an incoming stream is opened, set up an outgoing stream
p := stream.Conn().RemotePeer()
ostream, err := attacker.NewStream(ctx, p, gossipSubID)
if err != nil {
t.Fatal(err)
}
r := protoio.NewDelimitedReader(stream, maxMessageSize)
w := protoio.NewDelimitedWriter(ostream)
var irpc pb.RPC
writeMsg := func(rpc *pb.RPC) {
if err = w.WriteMsg(rpc); err != nil {
t.Fatalf("error writing RPC: %s", err)
}
}
// Keep reading messages and responding
for {
// Bail out when the test finishes
if ctx.Err() != nil {
return
}
irpc.Reset()
err := r.ReadMsg(&irpc)
// Bail out when the test finishes
if ctx.Err() != nil {
return
}
if err != nil {
t.Fatal(err)
}
onReadMsg(writeMsg, &irpc)
}
})
}