mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-11 01:03:11 +00:00
cleanup: fix vet and staticcheck failures (#435)
* cleanup: fix vet failures and most staticcheck failures * Fix remaining staticcheck failures * Give test goroutines chance to exit early when context is canceled
This commit is contained in:
parent
37ebe34352
commit
2efd313b83
@ -292,6 +292,7 @@ func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 used only by skipped tests at present
|
||||||
func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
|
func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
|
||||||
done := false
|
done := false
|
||||||
doneCh := make(chan bool, 1)
|
doneCh := make(chan bool, 1)
|
||||||
|
|||||||
@ -18,12 +18,10 @@ func TestBrokenPromises(t *testing.T) {
|
|||||||
peerB := peer.ID("B")
|
peerB := peer.ID("B")
|
||||||
peerC := peer.ID("C")
|
peerC := peer.ID("C")
|
||||||
|
|
||||||
var msgs []*pb.Message
|
|
||||||
var mids []string
|
var mids []string
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
m := makeTestMessage(i)
|
m := makeTestMessage(i)
|
||||||
m.From = []byte(peerA)
|
m.From = []byte(peerA)
|
||||||
msgs = append(msgs, m)
|
|
||||||
mid := DefaultMsgIdFn(m)
|
mid := DefaultMsgIdFn(m)
|
||||||
mids = append(mids, mid)
|
mids = append(mids, mid)
|
||||||
}
|
}
|
||||||
@ -97,5 +95,4 @@ func TestNoBrokenPromises(t *testing.T) {
|
|||||||
if brokenPromises != nil {
|
if brokenPromises != nil {
|
||||||
t.Fatal("expected no broken promises")
|
t.Fatal("expected no broken promises")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
12
gossipsub.go
12
gossipsub.go
@ -210,8 +210,8 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
|||||||
outbound: make(map[peer.ID]bool),
|
outbound: make(map[peer.ID]bool),
|
||||||
connect: make(chan connectInfo, params.MaxPendingConnections),
|
connect: make(chan connectInfo, params.MaxPendingConnections),
|
||||||
mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength),
|
mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength),
|
||||||
protos: GossipSubDefaultProtocols,
|
protos: GossipSubDefaultProtocols,
|
||||||
feature: GossipSubDefaultFeatures,
|
feature: GossipSubDefaultFeatures,
|
||||||
tagTracer: newTagTracer(h.ConnManager()),
|
tagTracer: newTagTracer(h.ConnManager()),
|
||||||
params: params,
|
params: params,
|
||||||
}
|
}
|
||||||
@ -668,7 +668,7 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
|||||||
|
|
||||||
gs.gossipTracer.AddPromise(p, iwantlst)
|
gs.gossipTracer.AddPromise(p, iwantlst)
|
||||||
|
|
||||||
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
|
return []*pb.ControlIWant{{MessageIDs: iwantlst}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
|
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
|
||||||
@ -901,7 +901,6 @@ func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
|
|||||||
case gs.connect <- ci:
|
case gs.connect <- ci:
|
||||||
default:
|
default:
|
||||||
log.Debugf("ignoring peer connection attempt; too many pending connections")
|
log.Debugf("ignoring peer connection attempt; too many pending connections")
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1078,7 +1077,7 @@ func (gs *GossipSubRouter) Leave(topic string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
|
func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
|
||||||
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: &topic}}
|
graft := []*pb.ControlGraft{{TopicID: &topic}}
|
||||||
out := rpcWithControl(nil, nil, nil, graft, nil)
|
out := rpcWithControl(nil, nil, nil, graft, nil)
|
||||||
gs.sendRPC(p, out)
|
gs.sendRPC(p, out)
|
||||||
}
|
}
|
||||||
@ -1297,7 +1296,7 @@ func (gs *GossipSubRouter) heartbeatTimer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) heartbeat() {
|
func (gs *GossipSubRouter) heartbeat() {
|
||||||
defer log.EventBegin(gs.p.ctx, "heartbeat").Done()
|
defer log.Infow("heartbeat")
|
||||||
|
|
||||||
gs.heartbeatTicks++
|
gs.heartbeatTicks++
|
||||||
|
|
||||||
@ -1650,7 +1649,6 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
|
|||||||
out := rpcWithControl(nil, nil, nil, nil, prune)
|
out := rpcWithControl(nil, nil, nil, nil, prune)
|
||||||
gs.sendRPC(p, out)
|
gs.sendRPC(p, out)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// emitGossip emits IHAVE gossip advertising items in the message cache window
|
// emitGossip emits IHAVE gossip advertising items in the message cache window
|
||||||
|
|||||||
@ -79,11 +79,9 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
|
|||||||
|
|
||||||
// sybil squatters to be connected later
|
// sybil squatters to be connected later
|
||||||
sybilHosts := getNetHosts(t, ctx, nSquatter)
|
sybilHosts := getNetHosts(t, ctx, nSquatter)
|
||||||
squatters := make([]*sybilSquatter, 0, nSquatter)
|
|
||||||
for _, h := range sybilHosts {
|
for _, h := range sybilHosts {
|
||||||
squatter := &sybilSquatter{h: h}
|
squatter := &sybilSquatter{h: h}
|
||||||
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
|
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
|
||||||
squatters = append(squatters, squatter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// connect the honest hosts
|
// connect the honest hosts
|
||||||
@ -97,14 +95,11 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
|
|||||||
|
|
||||||
// subscribe everyone to the topic
|
// subscribe everyone to the topic
|
||||||
topic := "test"
|
topic := "test"
|
||||||
var msgs []*Subscription
|
|
||||||
for _, ps := range psubs {
|
for _, ps := range psubs {
|
||||||
subch, err := ps.Subscribe(topic)
|
_, err := ps.Subscribe(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgs = append(msgs, subch)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// sleep to allow meshes to form
|
// sleep to allow meshes to form
|
||||||
|
|||||||
@ -37,11 +37,7 @@ func TestGossipSubCustomProtocols(t *testing.T) {
|
|||||||
customsub := protocol.ID("customsub/1.0.0")
|
customsub := protocol.ID("customsub/1.0.0")
|
||||||
protos := []protocol.ID{customsub, FloodSubID}
|
protos := []protocol.ID{customsub, FloodSubID}
|
||||||
features := func(feat GossipSubFeature, proto protocol.ID) bool {
|
features := func(feat GossipSubFeature, proto protocol.ID) bool {
|
||||||
if proto == customsub {
|
return proto == customsub
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|||||||
@ -15,7 +15,6 @@ import (
|
|||||||
|
|
||||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log"
|
|
||||||
"github.com/libp2p/go-msgio/protoio"
|
"github.com/libp2p/go-msgio/protoio"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -88,8 +87,8 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) {
|
|||||||
if sub.GetSubscribe() {
|
if sub.GetSubscribe() {
|
||||||
// Reply by subcribing to the topic and grafting to the peer
|
// Reply by subcribing to the topic and grafting to the peer
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||||
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}},
|
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -120,7 +119,7 @@ func TestGossipsubAttackSpamIWANT(t *testing.T) {
|
|||||||
// to send another message (until it cuts off the attacker for
|
// to send another message (until it cuts off the attacker for
|
||||||
// being spammy)
|
// being spammy)
|
||||||
iwantlst := []string{DefaultMsgIdFn(msg)}
|
iwantlst := []string{DefaultMsgIdFn(msg)}
|
||||||
iwant := []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
|
iwant := []*pb.ControlIWant{{MessageIDs: iwantlst}}
|
||||||
orpc := rpcWithControl(nil, nil, iwant, nil, nil)
|
orpc := rpcWithControl(nil, nil, iwant, nil, nil)
|
||||||
writeMsg(&orpc.RPC)
|
writeMsg(&orpc.RPC)
|
||||||
}
|
}
|
||||||
@ -192,8 +191,8 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
|
|||||||
if sub.GetSubscribe() {
|
if sub.GetSubscribe() {
|
||||||
// Reply by subcribing to the topic and grafting to the peer
|
// Reply by subcribing to the topic and grafting to the peer
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||||
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}},
|
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -206,53 +205,70 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
|
|||||||
// Send a bunch of IHAVEs
|
// Send a bunch of IHAVEs
|
||||||
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
|
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
|
||||||
ihavelst := []string{"someid" + strconv.Itoa(i)}
|
ihavelst := []string{"someid" + strconv.Itoa(i)}
|
||||||
ihave := []*pb.ControlIHave{&pb.ControlIHave{TopicID: sub.Topicid, MessageIDs: ihavelst}}
|
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
|
||||||
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
|
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
|
||||||
writeMsg(&orpc.RPC)
|
writeMsg(&orpc.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(GossipSubHeartbeatInterval)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(GossipSubHeartbeatInterval):
|
||||||
|
}
|
||||||
|
|
||||||
// Should have hit the maximum number of IWANTs per peer
|
// Should have hit the maximum number of IWANTs per peer
|
||||||
// per heartbeat
|
// per heartbeat
|
||||||
iwc := getIWantCount()
|
iwc := getIWantCount()
|
||||||
if iwc > GossipSubMaxIHaveLength {
|
if iwc > GossipSubMaxIHaveLength {
|
||||||
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc)
|
t.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
firstBatchCount := iwc
|
firstBatchCount := iwc
|
||||||
|
|
||||||
// the score should still be 0 because we haven't broken any promises yet
|
// the score should still be 0 because we haven't broken any promises yet
|
||||||
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
score := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
||||||
if score != 0 {
|
if score != 0 {
|
||||||
t.Fatalf("Expected 0 score, but got %f", score)
|
t.Errorf("Expected 0 score, but got %f", score)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a bunch of IHAVEs
|
// Send a bunch of IHAVEs
|
||||||
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
|
for i := 0; i < 3*GossipSubMaxIHaveLength; i++ {
|
||||||
ihavelst := []string{"someid" + strconv.Itoa(i+100)}
|
ihavelst := []string{"someid" + strconv.Itoa(i+100)}
|
||||||
ihave := []*pb.ControlIHave{&pb.ControlIHave{TopicID: sub.Topicid, MessageIDs: ihavelst}}
|
ihave := []*pb.ControlIHave{{TopicID: sub.Topicid, MessageIDs: ihavelst}}
|
||||||
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
|
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
|
||||||
writeMsg(&orpc.RPC)
|
writeMsg(&orpc.RPC)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(GossipSubHeartbeatInterval)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(GossipSubHeartbeatInterval):
|
||||||
|
}
|
||||||
|
|
||||||
// Should have sent more IWANTs after the heartbeat
|
// Should have sent more IWANTs after the heartbeat
|
||||||
iwc = getIWantCount()
|
iwc = getIWantCount()
|
||||||
if iwc == firstBatchCount {
|
if iwc == firstBatchCount {
|
||||||
t.Fatal("Expecting to receive more IWANTs after heartbeat but did not")
|
t.Error("Expecting to receive more IWANTs after heartbeat but did not")
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
// Should not be more than the maximum per heartbeat
|
// Should not be more than the maximum per heartbeat
|
||||||
if iwc-firstBatchCount > GossipSubMaxIHaveLength {
|
if iwc-firstBatchCount > GossipSubMaxIHaveLength {
|
||||||
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
|
t.Errorf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(GossipSubIWantFollowupTime)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(GossipSubIWantFollowupTime):
|
||||||
|
}
|
||||||
|
|
||||||
// The score should now be negative because of broken promises
|
// The score should now be negative because of broken promises
|
||||||
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
score = ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
||||||
if score >= 0 {
|
if score >= 0 {
|
||||||
t.Fatalf("Expected negative score, but got %f", score)
|
t.Errorf("Expected negative score, but got %f", score)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -309,14 +325,14 @@ func TestGossipsubAttackGRAFTNonExistentTopic(t *testing.T) {
|
|||||||
if sub.GetSubscribe() {
|
if sub.GetSubscribe() {
|
||||||
// Reply by subcribing to the topic and grafting to the peer
|
// Reply by subcribing to the topic and grafting to the peer
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||||
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}},
|
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
// Graft to the peer on a non-existent topic
|
// Graft to the peer on a non-existent topic
|
||||||
nonExistentTopic := "non-existent"
|
nonExistentTopic := "non-existent"
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: &nonExistentTopic}}},
|
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: &nonExistentTopic}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -408,9 +424,9 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
for _, sub := range irpc.GetSubscriptions() {
|
for _, sub := range irpc.GetSubscriptions() {
|
||||||
if sub.GetSubscribe() {
|
if sub.GetSubscribe() {
|
||||||
// Reply by subcribing to the topic and grafting to the peer
|
// Reply by subcribing to the topic and grafting to the peer
|
||||||
graft := []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}
|
graft := []*pb.ControlGraft{{TopicID: sub.Topicid}}
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||||
Control: &pb.ControlMessage{Graft: graft},
|
Control: &pb.ControlMessage{Graft: graft},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -424,7 +440,8 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
// No PRUNE should have been sent at this stage
|
// No PRUNE should have been sent at this stage
|
||||||
pc := getPruneCount()
|
pc := getPruneCount()
|
||||||
if pc != 0 {
|
if pc != 0 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 0, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a PRUNE to remove the attacker node from the legit
|
// Send a PRUNE to remove the attacker node from the legit
|
||||||
@ -435,12 +452,18 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
Control: &pb.ControlMessage{Prune: prune},
|
Control: &pb.ControlMessage{Prune: prune},
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
// No PRUNE should have been sent at this stage
|
// No PRUNE should have been sent at this stage
|
||||||
pc = getPruneCount()
|
pc = getPruneCount()
|
||||||
if pc != 0 {
|
if pc != 0 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 0, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 0, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the GossipSubGraftFloodThreshold to pass before attempting another graft
|
// wait for the GossipSubGraftFloodThreshold to pass before attempting another graft
|
||||||
@ -451,19 +474,25 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
Control: &pb.ControlMessage{Graft: graft},
|
Control: &pb.ControlMessage{Graft: graft},
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
// We should have been peanalized by the peer for sending before the backoff has expired
|
// We should have been peanalized by the peer for sending before the backoff has expired
|
||||||
// but should still receive a PRUNE because we haven't dropped below GraylistThreshold
|
// but should still receive a PRUNE because we haven't dropped below GraylistThreshold
|
||||||
// yet.
|
// yet.
|
||||||
pc = getPruneCount()
|
pc = getPruneCount()
|
||||||
if pc != 1 {
|
if pc != 1 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 1, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 1, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
score1 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
score1 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
||||||
if score1 >= 0 {
|
if score1 >= 0 {
|
||||||
t.Fatalf("Expected negative score, but got %f", score1)
|
t.Errorf("Expected negative score, but got %f", score1)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a GRAFT again to attempt to rejoin the mesh
|
// Send a GRAFT again to attempt to rejoin the mesh
|
||||||
@ -471,18 +500,24 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
Control: &pb.ControlMessage{Graft: graft},
|
Control: &pb.ControlMessage{Graft: graft},
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
// we are before the flood threshold so we should be penalized twice, but still get
|
// we are before the flood threshold so we should be penalized twice, but still get
|
||||||
// a PRUNE because we are before the flood threshold
|
// a PRUNE because we are before the flood threshold
|
||||||
pc = getPruneCount()
|
pc = getPruneCount()
|
||||||
if pc != 2 {
|
if pc != 2 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 2, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 2, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
score2 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
score2 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
||||||
if score2 >= score1 {
|
if score2 >= score1 {
|
||||||
t.Fatalf("Expected score below %f, but got %f", score1, score2)
|
t.Errorf("Expected score below %f, but got %f", score1, score2)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send another GRAFT; this should get us a PRUNE, but penalize us below the graylist threshold
|
// Send another GRAFT; this should get us a PRUNE, but penalize us below the graylist threshold
|
||||||
@ -490,35 +525,51 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
Control: &pb.ControlMessage{Graft: graft},
|
Control: &pb.ControlMessage{Graft: graft},
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
pc = getPruneCount()
|
pc = getPruneCount()
|
||||||
if pc != 3 {
|
if pc != 3 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 3, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
score3 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
score3 := ps.rt.(*GossipSubRouter).score.Score(attacker.ID())
|
||||||
if score3 >= score2 {
|
if score3 >= score2 {
|
||||||
t.Fatalf("Expected score below %f, but got %f", score2, score3)
|
t.Errorf("Expected score below %f, but got %f", score2, score3)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
if score3 >= -1000 {
|
if score3 >= -1000 {
|
||||||
t.Fatalf("Expected score below %f, but got %f", -1000.0, score3)
|
t.Errorf("Expected score below %f, but got %f", -1000.0, score3)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the PRUNE backoff to expire and try again; this time we should fail
|
// Wait for the PRUNE backoff to expire and try again; this time we should fail
|
||||||
// because we are below the graylist threshold, so our RPC should be ignored and
|
// because we are below the graylist threshold, so our RPC should be ignored and
|
||||||
// we should get no PRUNE back
|
// we should get no PRUNE back
|
||||||
time.Sleep(GossipSubPruneBackoff + time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(GossipSubPruneBackoff + time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Control: &pb.ControlMessage{Graft: graft},
|
Control: &pb.ControlMessage{Graft: graft},
|
||||||
})
|
})
|
||||||
|
|
||||||
time.Sleep(20 * time.Millisecond)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(20 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
pc = getPruneCount()
|
pc = getPruneCount()
|
||||||
if pc != 3 {
|
if pc != 3 {
|
||||||
t.Fatalf("Expected %d PRUNE messages but got %d", 3, pc)
|
t.Errorf("Expected %d PRUNE messages but got %d", 3, pc)
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure we are _not_ in the mesh
|
// make sure we are _not_ in the mesh
|
||||||
@ -531,7 +582,8 @@ func TestGossipsubAttackGRAFTDuringBackoff(t *testing.T) {
|
|||||||
|
|
||||||
inMesh := <-res
|
inMesh := <-res
|
||||||
if inMesh {
|
if inMesh {
|
||||||
t.Fatal("Expected to not be in the mesh of the legitimate host")
|
t.Error("Expected to not be in the mesh of the legitimate host")
|
||||||
|
return // cannot call t.Fatal in a non-test goroutine
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -646,8 +698,8 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
|
|||||||
if sub.GetSubscribe() {
|
if sub.GetSubscribe() {
|
||||||
// Reply by subcribing to the topic and grafting to the peer
|
// Reply by subcribing to the topic and grafting to the peer
|
||||||
writeMsg(&pb.RPC{
|
writeMsg(&pb.RPC{
|
||||||
Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
Subscriptions: []*pb.RPC_SubOpts{{Subscribe: sub.Subscribe, Topicid: sub.Topicid}},
|
||||||
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: sub.Topicid}}},
|
Control: &pb.ControlMessage{Graft: []*pb.ControlGraft{{TopicID: sub.Topicid}}},
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -655,7 +707,8 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
|
|||||||
|
|
||||||
// Attacker score should start at zero
|
// Attacker score should start at zero
|
||||||
if attackerScore() != 0 {
|
if attackerScore() != 0 {
|
||||||
t.Fatalf("Expected attacker score to be zero but it's %f", attackerScore())
|
t.Errorf("Expected attacker score to be zero but it's %f", attackerScore())
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a bunch of messages with no signature (these will
|
// Send a bunch of messages with no signature (these will
|
||||||
@ -673,20 +726,27 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the initial heartbeat, plus a bit of padding
|
// Wait for the initial heartbeat, plus a bit of padding
|
||||||
time.Sleep(100*time.Millisecond + GossipSubHeartbeatInitialDelay)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(100*time.Millisecond + GossipSubHeartbeatInitialDelay):
|
||||||
|
}
|
||||||
|
|
||||||
// The attackers score should now have fallen below zero
|
// The attackers score should now have fallen below zero
|
||||||
if attackerScore() >= 0 {
|
if attackerScore() >= 0 {
|
||||||
t.Fatalf("Expected attacker score to be less than zero but it's %f", attackerScore())
|
t.Errorf("Expected attacker score to be less than zero but it's %f", attackerScore())
|
||||||
|
return // cannot call t.Fatalf in a non-test goroutine
|
||||||
}
|
}
|
||||||
// There should be several rejected messages (because the signature was invalid)
|
// There should be several rejected messages (because the signature was invalid)
|
||||||
if tracer.rejectCount == 0 {
|
if tracer.rejectCount == 0 {
|
||||||
t.Fatal("Expected message rejection but got none")
|
t.Error("Expected message rejection but got none")
|
||||||
|
return // cannot call t.Fatal in a non-test goroutine
|
||||||
}
|
}
|
||||||
// The legit node should have sent a PRUNE message
|
// The legit node should have sent a PRUNE message
|
||||||
pc := getPruneCount()
|
pc := getPruneCount()
|
||||||
if pc == 0 {
|
if pc == 0 {
|
||||||
t.Fatal("Expected attacker node to be PRUNED when score drops low enough")
|
t.Error("Expected attacker node to be PRUNED when score drops low enough")
|
||||||
|
return // cannot call t.Fatal in a non-test goroutine
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -702,10 +762,6 @@ func TestGossipsubAttackInvalidMessageSpam(t *testing.T) {
|
|||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func turnOnPubsubDebug() {
|
|
||||||
logging.SetLogLevel("pubsub", "debug")
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)
|
type mockGSOnRead func(writeMsg func(*pb.RPC), irpc *pb.RPC)
|
||||||
|
|
||||||
func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {
|
func newMockGS(ctx context.Context, t *testing.T, attacker host.Host, onReadMsg mockGSOnRead) {
|
||||||
|
|||||||
@ -1126,8 +1126,8 @@ func TestGossipsubDirectPeers(t *testing.T) {
|
|||||||
h := getNetHosts(t, ctx, 3)
|
h := getNetHosts(t, ctx, 3)
|
||||||
psubs := []*PubSub{
|
psubs := []*PubSub{
|
||||||
getGossipsub(ctx, h[0], WithDirectConnectTicks(2)),
|
getGossipsub(ctx, h[0], WithDirectConnectTicks(2)),
|
||||||
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}}), WithDirectConnectTicks(2)),
|
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}}), WithDirectConnectTicks(2)),
|
||||||
getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}}), WithDirectConnectTicks(2)),
|
getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}}), WithDirectConnectTicks(2)),
|
||||||
}
|
}
|
||||||
|
|
||||||
connect(t, h[0], h[1])
|
connect(t, h[0], h[1])
|
||||||
@ -1191,8 +1191,8 @@ func TestGossipsubDirectPeersFanout(t *testing.T) {
|
|||||||
h := getNetHosts(t, ctx, 3)
|
h := getNetHosts(t, ctx, 3)
|
||||||
psubs := []*PubSub{
|
psubs := []*PubSub{
|
||||||
getGossipsub(ctx, h[0]),
|
getGossipsub(ctx, h[0]),
|
||||||
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[2].ID(), h[2].Addrs()}})),
|
getGossipsub(ctx, h[1], WithDirectPeers([]peer.AddrInfo{{ID: h[2].ID(), Addrs: h[2].Addrs()}})),
|
||||||
getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{peer.AddrInfo{h[1].ID(), h[1].Addrs()}})),
|
getGossipsub(ctx, h[2], WithDirectPeers([]peer.AddrInfo{{ID: h[1].ID(), Addrs: h[1].Addrs()}})),
|
||||||
}
|
}
|
||||||
|
|
||||||
connect(t, h[0], h[1])
|
connect(t, h[0], h[1])
|
||||||
@ -1313,13 +1313,11 @@ func TestGossipsubEnoughPeers(t *testing.T) {
|
|||||||
hosts := getNetHosts(t, ctx, 20)
|
hosts := getNetHosts(t, ctx, 20)
|
||||||
psubs := getGossipsubs(ctx, hosts)
|
psubs := getGossipsubs(ctx, hosts)
|
||||||
|
|
||||||
var subs []*Subscription
|
|
||||||
for _, ps := range psubs {
|
for _, ps := range psubs {
|
||||||
sub, err := ps.Subscribe("test")
|
_, err := ps.Subscribe("test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
subs = append(subs, sub)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// at this point we have no connections and no mesh, so EnoughPeers should return false
|
// at this point we have no connections and no mesh, so EnoughPeers should return false
|
||||||
@ -1482,7 +1480,7 @@ func TestGossipsubScoreValidatorEx(t *testing.T) {
|
|||||||
DecayInterval: time.Second,
|
DecayInterval: time.Second,
|
||||||
DecayToZero: 0.01,
|
DecayToZero: 0.01,
|
||||||
Topics: map[string]*TopicScoreParams{
|
Topics: map[string]*TopicScoreParams{
|
||||||
"test": &TopicScoreParams{
|
"test": {
|
||||||
TopicWeight: 1,
|
TopicWeight: 1,
|
||||||
TimeInMeshQuantum: time.Second,
|
TimeInMeshQuantum: time.Second,
|
||||||
InvalidMessageDeliveriesWeight: -1,
|
InvalidMessageDeliveriesWeight: -1,
|
||||||
@ -1579,8 +1577,8 @@ func TestGossipsubPiggybackControl(t *testing.T) {
|
|||||||
|
|
||||||
rpc := &RPC{RPC: pb.RPC{}}
|
rpc := &RPC{RPC: pb.RPC{}}
|
||||||
gs.piggybackControl(blah, rpc, &pb.ControlMessage{
|
gs.piggybackControl(blah, rpc, &pb.ControlMessage{
|
||||||
Graft: []*pb.ControlGraft{&pb.ControlGraft{TopicID: &test1}, &pb.ControlGraft{TopicID: &test2}, &pb.ControlGraft{TopicID: &test3}},
|
Graft: []*pb.ControlGraft{{TopicID: &test1}, {TopicID: &test2}, {TopicID: &test3}},
|
||||||
Prune: []*pb.ControlPrune{&pb.ControlPrune{TopicID: &test1}, &pb.ControlPrune{TopicID: &test2}, &pb.ControlPrune{TopicID: &test3}},
|
Prune: []*pb.ControlPrune{{TopicID: &test1}, {TopicID: &test2}, {TopicID: &test3}},
|
||||||
})
|
})
|
||||||
res <- rpc
|
res <- rpc
|
||||||
}
|
}
|
||||||
@ -1642,7 +1640,7 @@ func TestGossipsubMultipleGraftTopics(t *testing.T) {
|
|||||||
// Send multiple GRAFT messages to second peer from
|
// Send multiple GRAFT messages to second peer from
|
||||||
// 1st peer
|
// 1st peer
|
||||||
p1Router.sendGraftPrune(map[peer.ID][]string{
|
p1Router.sendGraftPrune(map[peer.ID][]string{
|
||||||
secondPeer: []string{firstTopic, secondTopic, thirdTopic},
|
secondPeer: {firstTopic, secondTopic, thirdTopic},
|
||||||
}, map[peer.ID][]string{}, map[peer.ID]bool{})
|
}, map[peer.ID][]string{}, map[peer.ID]bool{})
|
||||||
|
|
||||||
time.Sleep(time.Second * 1)
|
time.Sleep(time.Second * 1)
|
||||||
@ -1689,7 +1687,7 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) {
|
|||||||
DecayInterval: time.Second,
|
DecayInterval: time.Second,
|
||||||
DecayToZero: 0.01,
|
DecayToZero: 0.01,
|
||||||
Topics: map[string]*TopicScoreParams{
|
Topics: map[string]*TopicScoreParams{
|
||||||
"test": &TopicScoreParams{
|
"test": {
|
||||||
TopicWeight: 1,
|
TopicWeight: 1,
|
||||||
TimeInMeshWeight: 0.0002777,
|
TimeInMeshWeight: 0.0002777,
|
||||||
TimeInMeshQuantum: time.Second,
|
TimeInMeshQuantum: time.Second,
|
||||||
@ -1712,11 +1710,9 @@ func TestGossipsubOpportunisticGrafting(t *testing.T) {
|
|||||||
connectSome(t, hosts[:10], 5)
|
connectSome(t, hosts[:10], 5)
|
||||||
|
|
||||||
// sybil squatters for the remaining 40 hosts
|
// sybil squatters for the remaining 40 hosts
|
||||||
squatters := make([]*sybilSquatter, 0, 40)
|
|
||||||
for _, h := range hosts[10:] {
|
for _, h := range hosts[10:] {
|
||||||
squatter := &sybilSquatter{h: h}
|
squatter := &sybilSquatter{h: h}
|
||||||
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
|
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
|
||||||
squatters = append(squatters, squatter)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// connect all squatters to every real host
|
// connect all squatters to every real host
|
||||||
@ -1796,7 +1792,7 @@ func (sq *sybilSquatter) handleStream(s network.Stream) {
|
|||||||
w := protoio.NewDelimitedWriter(os)
|
w := protoio.NewDelimitedWriter(os)
|
||||||
truth := true
|
truth := true
|
||||||
topic := "test"
|
topic := "test"
|
||||||
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
|
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -1826,7 +1822,7 @@ func TestGossipsubPeerScoreInspect(t *testing.T) {
|
|||||||
WithPeerScore(
|
WithPeerScore(
|
||||||
&PeerScoreParams{
|
&PeerScoreParams{
|
||||||
Topics: map[string]*TopicScoreParams{
|
Topics: map[string]*TopicScoreParams{
|
||||||
"test": &TopicScoreParams{
|
"test": {
|
||||||
TopicWeight: 1,
|
TopicWeight: 1,
|
||||||
TimeInMeshQuantum: time.Second,
|
TimeInMeshQuantum: time.Second,
|
||||||
FirstMessageDeliveriesWeight: 1,
|
FirstMessageDeliveriesWeight: 1,
|
||||||
@ -1851,13 +1847,11 @@ func TestGossipsubPeerScoreInspect(t *testing.T) {
|
|||||||
|
|
||||||
connect(t, hosts[0], hosts[1])
|
connect(t, hosts[0], hosts[1])
|
||||||
|
|
||||||
var subs []*Subscription
|
|
||||||
for _, ps := range psubs {
|
for _, ps := range psubs {
|
||||||
sub, err := ps.Subscribe("test")
|
_, err := ps.Subscribe("test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
subs = append(subs, sub)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
@ -1887,7 +1881,7 @@ func TestGossipsubPeerScoreResetTopicParams(t *testing.T) {
|
|||||||
WithPeerScore(
|
WithPeerScore(
|
||||||
&PeerScoreParams{
|
&PeerScoreParams{
|
||||||
Topics: map[string]*TopicScoreParams{
|
Topics: map[string]*TopicScoreParams{
|
||||||
"test": &TopicScoreParams{
|
"test": {
|
||||||
TopicWeight: 1,
|
TopicWeight: 1,
|
||||||
TimeInMeshQuantum: time.Second,
|
TimeInMeshQuantum: time.Second,
|
||||||
FirstMessageDeliveriesWeight: 1,
|
FirstMessageDeliveriesWeight: 1,
|
||||||
@ -2028,7 +2022,10 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
|
|||||||
w := protoio.NewDelimitedWriter(os)
|
w := protoio.NewDelimitedWriter(os)
|
||||||
truth := true
|
truth := true
|
||||||
topic := "test"
|
topic := "test"
|
||||||
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{&pb.RPC_SubOpts{Subscribe: &truth, Topicid: &topic}}})
|
err = w.WriteMsg(&pb.RPC{Subscriptions: []*pb.RPC_SubOpts{{Subscribe: &truth, Topicid: &topic}}})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
var rpc pb.RPC
|
var rpc pb.RPC
|
||||||
for {
|
for {
|
||||||
|
|||||||
@ -160,13 +160,11 @@ func TestRandomsubEnoughPeers(t *testing.T) {
|
|||||||
|
|
||||||
connectSome(t, hosts, 12)
|
connectSome(t, hosts, 12)
|
||||||
|
|
||||||
var subs []*Subscription
|
|
||||||
for _, ps := range psubs {
|
for _, ps := range psubs {
|
||||||
sub, err := ps.Subscribe("test")
|
_, err := ps.Subscribe("test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
subs = append(subs, sub)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|||||||
10
score.go
10
score.go
@ -117,8 +117,10 @@ const (
|
|||||||
deliveryThrottled // we can't tell if it is valid because validation throttled
|
deliveryThrottled // we can't tell if it is valid because validation throttled
|
||||||
)
|
)
|
||||||
|
|
||||||
type PeerScoreInspectFn = func(map[peer.ID]float64)
|
type (
|
||||||
type ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot)
|
PeerScoreInspectFn = func(map[peer.ID]float64)
|
||||||
|
ExtendedPeerScoreInspectFn = func(map[peer.ID]*PeerScoreSnapshot)
|
||||||
|
)
|
||||||
|
|
||||||
type PeerScoreSnapshot struct {
|
type PeerScoreSnapshot struct {
|
||||||
Score float64
|
Score float64
|
||||||
@ -700,7 +702,7 @@ func (ps *peerScore) DeliverMessage(msg *Message) {
|
|||||||
|
|
||||||
// defensive check that this is the first delivery trace -- delivery status should be unknown
|
// defensive check that this is the first delivery trace -- delivery status should be unknown
|
||||||
if drec.status != deliveryUnknown {
|
if drec.status != deliveryUnknown {
|
||||||
log.Debugf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
|
log.Debugf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Since(drec.firstSeen), drec.status)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -751,7 +753,7 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
|
|||||||
|
|
||||||
// defensive check that this is the first rejection trace -- delivery status should be unknown
|
// defensive check that this is the first rejection trace -- delivery status should be unknown
|
||||||
if drec.status != deliveryUnknown {
|
if drec.status != deliveryUnknown {
|
||||||
log.Debugf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
|
log.Debugf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Since(drec.firstSeen), drec.status)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -237,6 +237,7 @@ func getTagValue(mgr connmgri.ConnManager, p peer.ID, tag string) int {
|
|||||||
return val
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//lint:ignore U1000 used only by skipped tests at present
|
||||||
func tagExists(mgr connmgri.ConnManager, p peer.ID, tag string) bool {
|
func tagExists(mgr connmgri.ConnManager, p peer.ID, tag string) bool {
|
||||||
info := mgr.GetTagInfo(p)
|
info := mgr.GetTagInfo(p)
|
||||||
if info == nil {
|
if info == nil {
|
||||||
|
|||||||
@ -167,7 +167,7 @@ func TestTopicReuse(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if bytes.Compare(msg.GetData(), firstMsg) != 0 {
|
if !bytes.Equal(msg.GetData(), firstMsg) {
|
||||||
t.Fatal("received incorrect message")
|
t.Fatal("received incorrect message")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +194,7 @@ func TestTopicReuse(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if bytes.Compare(msg.GetData(), illegalSend) != 0 {
|
if !bytes.Equal(msg.GetData(), illegalSend) {
|
||||||
t.Fatal("received incorrect message from illegal topic")
|
t.Fatal("received incorrect message from illegal topic")
|
||||||
}
|
}
|
||||||
t.Fatal("received message sent by illegal topic")
|
t.Fatal("received message sent by illegal topic")
|
||||||
@ -213,11 +213,11 @@ func TestTopicReuse(t *testing.T) {
|
|||||||
|
|
||||||
timeoutCtx, timeoutCancel = context.WithTimeout(ctx, time.Second*2)
|
timeoutCtx, timeoutCancel = context.WithTimeout(ctx, time.Second*2)
|
||||||
defer timeoutCancel()
|
defer timeoutCancel()
|
||||||
msg, err = sub.Next(ctx)
|
msg, err = sub.Next(timeoutCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if bytes.Compare(msg.GetData(), secondMsg) != 0 {
|
if !bytes.Equal(msg.GetData(), secondMsg) {
|
||||||
t.Fatal("received incorrect message")
|
t.Fatal("received incorrect message")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -143,7 +143,7 @@ func (v *validation) AddValidator(req *addValReq) {
|
|||||||
|
|
||||||
_, ok := v.topicVals[topic]
|
_, ok := v.topicVals[topic]
|
||||||
if ok {
|
if ok {
|
||||||
req.resp <- fmt.Errorf("Duplicate validator for topic %s", topic)
|
req.resp <- fmt.Errorf("duplicate validator for topic %s", topic)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,7 +170,7 @@ func (v *validation) AddValidator(req *addValReq) {
|
|||||||
validator = v
|
validator = v
|
||||||
|
|
||||||
default:
|
default:
|
||||||
req.resp <- fmt.Errorf("Unknown validator type for topic %s; must be an instance of Validator or ValidatorEx", topic)
|
req.resp <- fmt.Errorf("unknown validator type for topic %s; must be an instance of Validator or ValidatorEx", topic)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,7 +206,7 @@ func (v *validation) RemoveValidator(req *rmValReq) {
|
|||||||
delete(v.topicVals, topic)
|
delete(v.topicVals, topic)
|
||||||
req.resp <- nil
|
req.resp <- nil
|
||||||
} else {
|
} else {
|
||||||
req.resp <- fmt.Errorf("No validator for topic %s", topic)
|
req.resp <- fmt.Errorf("no validator for topic %s", topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,7 +384,7 @@ func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
// BUG: this would be an internal programming error, so a panic seems appropiate.
|
// BUG: this would be an internal programming error, so a panic seems appropiate.
|
||||||
panic(fmt.Errorf("Unexpected validation result: %d", result))
|
panic(fmt.Errorf("unexpected validation result: %d", result))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user