diff --git a/.github/workflows/go-test-config.json b/.github/workflows/go-test-config.json index b0642fb..879d74a 100644 --- a/.github/workflows/go-test-config.json +++ b/.github/workflows/go-test-config.json @@ -1,4 +1,3 @@ { - "skipOSes": ["windows", "macos"], - "skipRace": true + "skipOSes": ["windows", "macos"] } diff --git a/backoff_test.go b/backoff_test.go index 4cedbe1..542aceb 100644 --- a/backoff_test.go +++ b/backoff_test.go @@ -96,11 +96,17 @@ func TestBackoff_Clean(t *testing.T) { if err != nil { t.Fatalf("unexpected error post update: %s", err) } + b.mu.Lock() b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry + b.mu.Unlock() } - if len(b.info) != size { - t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) + b.mu.Lock() + infoLen := len(b.info) + b.mu.Unlock() + + if infoLen != size { + t.Fatalf("info map size mismatch, expected: %d, got: %d", size, infoLen) } // waits for a cleanup loop to kick-in diff --git a/floodsub_test.go b/floodsub_test.go index 8efedaa..13c698f 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -268,8 +268,11 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - nSubs := len(psubs[2].mySubs["cats"]) - if nSubs > 0 { + nSubs := make(chan int) + psubs[2].eval <- func() { + nSubs <- len(psubs[2].mySubs["cats"]) + } + if <-nSubs > 0 { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } @@ -866,9 +869,14 @@ func TestImproperlySignedMessageRejected(t *testing.T) { t.Fatal(err) } - var adversaryMessages []*Message + adversaryMessagesCh := make(chan []*Message) + adversaryContext, adversaryCancel := context.WithCancel(ctx) go func(ctx context.Context) { + var adversaryMessages []*Message + defer func() { + adversaryMessagesCh <- adversaryMessages + }() for { select { case <-ctx.Done(): @@ -885,6 +893,7 @@ func TestImproperlySignedMessageRejected(t *testing.T) { <-time.After(1 * time.Second) adversaryCancel() + adversaryMessages := <-adversaryMessagesCh // Ensure the adversary successfully publishes the incorrectly signed // message. If the adversary "sees" this, we successfully got through @@ -895,9 +904,13 @@ func TestImproperlySignedMessageRejected(t *testing.T) { // the honest peer's validation process will drop the message; // next will never furnish the incorrect message. - var honestPeerMessages []*Message + honestPeerMessagesCh := make(chan []*Message) honestPeerContext, honestPeerCancel := context.WithCancel(ctx) go func(ctx context.Context) { + var honestPeerMessages []*Message + defer func() { + honestPeerMessagesCh <- honestPeerMessages + }() for { select { case <-ctx.Done(): @@ -915,6 +928,7 @@ func TestImproperlySignedMessageRejected(t *testing.T) { <-time.After(1 * time.Second) honestPeerCancel() + honestPeerMessages := <-honestPeerMessagesCh if len(honestPeerMessages) != 1 { t.Fatalf("got %d messages, expected 1", len(honestPeerMessages)) } diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index 9f6f0f9..e1f16b6 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -797,7 +797,10 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) { // Checks we received some messages var expMid string var actMids []string + var mu sync.Mutex checkMsgs := func() { + mu.Lock() + defer mu.Unlock() if len(actMids) == 0 { t.Fatalf("Expected some messages when the maximum number of IDONTWANTs is reached") } @@ -822,6 +825,8 @@ func TestGossipsubAttackSpamIDONTWANT(t *testing.T) { }() newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { + mu.Lock() + defer mu.Unlock() // Each time the host receives a message for _, msg := range irpc.GetPublish() { actMids = append(actMids, msgID(msg)) diff --git a/gossipsub_test.go b/gossipsub_test.go index 9f450d8..d0f905d 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -1989,6 +1989,27 @@ func TestGossipSubLeaveTopic(t *testing.T) { <-done } +// withRouter is a race-free way of accessing state from the PubSubRouter. +// It runs the callback synchronously +func withRouter(p *PubSub, f func(r PubSubRouter)) { + done := make(chan struct{}) + p.eval <- func() { + defer close(done) + router := p.rt + f(router) + } + <-done +} + +// withGSRouter is a race-free way of accessing state from the GossipSubRouter. +// It runs the callback synchronously +func withGSRouter(p *PubSub, f func(r *GossipSubRouter)) { + withRouter(p, func(r PubSubRouter) { + router := p.rt.(*GossipSubRouter) + f(router) + }) +} + func TestGossipSubJoinTopic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2003,13 +2024,15 @@ func TestGossipSubJoinTopic(t *testing.T) { connect(t, h[0], h[1]) connect(t, h[0], h[2]) - router0 := psubs[0].rt.(*GossipSubRouter) - // Add in backoff for peer. peerMap := make(map[peer.ID]time.Time) - peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff) + withGSRouter(psubs[0], func(router0 *GossipSubRouter) { + peerMap[h[1].ID()] = time.Now().Add(router0.params.UnsubscribeBackoff) + }) - router0.backoff["test"] = peerMap + withGSRouter(psubs[0], func(router0 *GossipSubRouter) { + router0.backoff["test"] = peerMap + }) // Join all peers for _, ps := range psubs { @@ -2021,15 +2044,16 @@ func TestGossipSubJoinTopic(t *testing.T) { time.Sleep(time.Second) - meshMap := router0.mesh["test"] - if len(meshMap) != 1 { - t.Fatalf("Unexpect peer included in the mesh") - } - - _, ok := meshMap[h[1].ID()] - if ok { - t.Fatalf("Peer that was to be backed off is included in the mesh") - } + withGSRouter(psubs[0], func(router0 *GossipSubRouter) { + meshMap := router0.mesh["test"] + if len(meshMap) != 1 { + t.Fatalf("Unexpect peer included in the mesh") + } + _, ok := meshMap[h[1].ID()] + if ok { + t.Fatalf("Peer that was to be backed off is included in the mesh") + } + }) } type sybilSquatter struct { @@ -2697,10 +2721,10 @@ func TestGossipsubIdontwantSend(t *testing.T) { return base64.URLEncoding.EncodeToString(pmsg.Data) } - validated := false + var validated atomic.Bool validate := func(context.Context, peer.ID, *Message) bool { time.Sleep(100 * time.Millisecond) - validated = true + validated.Store(true) return true } @@ -2798,7 +2822,7 @@ func TestGossipsubIdontwantSend(t *testing.T) { for _, idonthave := range irpc.GetControl().GetIdontwant() { // If true, it means that, when we get IDONTWANT, the middle peer has done validation // already, which should not be the case - if validated { + if validated.Load() { t.Fatalf("IDONTWANT should be sent before doing validation") } for _, mid := range idonthave.GetMessageIDs() { @@ -3333,13 +3357,13 @@ func TestGossipsubIdontwantBeforeIwant(t *testing.T) { msgTimer := time.NewTimer(msgWaitMax) // Checks we received right messages - msgReceived := false - ihaveReceived := false + var msgReceived atomic.Bool + var ihaveReceived atomic.Bool checkMsgs := func() { - if msgReceived { + if msgReceived.Load() { t.Fatalf("Expected no messages received after IDONWANT") } - if !ihaveReceived { + if !ihaveReceived.Load() { t.Fatalf("Expected IHAVE received") } } @@ -3359,11 +3383,11 @@ func TestGossipsubIdontwantBeforeIwant(t *testing.T) { newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { // Check if it receives any message if len(irpc.GetPublish()) > 0 { - msgReceived = true + msgReceived.Store(true) } // The middle peer is supposed to send IHAVE for _, ihave := range irpc.GetControl().GetIhave() { - ihaveReceived = true + ihaveReceived.Store(true) mids := ihave.GetMessageIDs() writeMsg(&pb.RPC{ @@ -3437,9 +3461,9 @@ func TestGossipsubIdontwantClear(t *testing.T) { msgTimer := time.NewTimer(msgWaitMax) // Checks we received some message after the IDONTWANT is cleared - received := false + var received atomic.Bool checkMsgs := func() { - if !received { + if !received.Load() { t.Fatalf("Expected some message after the IDONTWANT is cleared") } } @@ -3459,7 +3483,7 @@ func TestGossipsubIdontwantClear(t *testing.T) { newMockGS(ctx, t, hosts[2], func(writeMsg func(*pb.RPC), irpc *pb.RPC) { // Check if it receives any message if len(irpc.GetPublish()) > 0 { - received = true + received.Store(true) } // When the middle peer connects it will send us its subscriptions for _, sub := range irpc.GetSubscriptions() { @@ -3544,13 +3568,15 @@ func TestGossipsubPruneMeshCorrectly(t *testing.T) { totalTimeToWait := params.HeartbeatInitialDelay + 2*params.HeartbeatInterval time.Sleep(totalTimeToWait) - meshPeers, ok := psubs[0].rt.(*GossipSubRouter).mesh[topic] - if !ok { - t.Fatal("mesh does not exist for topic") - } - if len(meshPeers) != params.D { - t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers)) - } + withGSRouter(psubs[0], func(rt *GossipSubRouter) { + meshPeers, ok := rt.mesh[topic] + if !ok { + t.Fatal("mesh does not exist for topic") + } + if len(meshPeers) != params.D { + t.Fatalf("mesh does not have the correct number of peers. Wanted %d but got %d", params.D, len(meshPeers)) + } + }) } func BenchmarkAllocDoDropRPC(b *testing.B) { diff --git a/pubsub_test.go b/pubsub_test.go index 245a69d..37fbbf2 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -40,7 +40,9 @@ func TestPubSubRemovesBlacklistedPeer(t *testing.T) { // Bad peer is blacklisted after it has connected. // Calling p.BlacklistPeer directly does the right thing but we should also clean // up the peer if it has been added the the blacklist by another means. - bl.Add(hosts[0].ID()) + withRouter(psubs1, func(r PubSubRouter) { + bl.Add(hosts[0].ID()) + }) _, err := psubs0.Subscribe("test") if err != nil { diff --git a/timecache/first_seen_cache.go b/timecache/first_seen_cache.go index 457391c..ea9dd2d 100644 --- a/timecache/first_seen_cache.go +++ b/timecache/first_seen_cache.go @@ -18,6 +18,10 @@ type FirstSeenCache struct { var _ TimeCache = (*FirstSeenCache)(nil) func newFirstSeenCache(ttl time.Duration) *FirstSeenCache { + return newFirstSeenCacheWithSweepInterval(ttl, backgroundSweepInterval) +} + +func newFirstSeenCacheWithSweepInterval(ttl time.Duration, sweepInterval time.Duration) *FirstSeenCache { tc := &FirstSeenCache{ m: make(map[string]time.Time), ttl: ttl, @@ -25,7 +29,7 @@ func newFirstSeenCache(ttl time.Duration) *FirstSeenCache { ctx, done := context.WithCancel(context.Background()) tc.done = done - go background(ctx, &tc.lk, tc.m) + go background(ctx, &tc.lk, tc.m, sweepInterval) return tc } diff --git a/timecache/first_seen_cache_test.go b/timecache/first_seen_cache_test.go index 59d2a59..10f69c9 100644 --- a/timecache/first_seen_cache_test.go +++ b/timecache/first_seen_cache_test.go @@ -17,9 +17,7 @@ func TestFirstSeenCacheFound(t *testing.T) { } func TestFirstSeenCacheExpire(t *testing.T) { - backgroundSweepInterval = time.Second - - tc := newFirstSeenCache(time.Second) + tc := newFirstSeenCacheWithSweepInterval(time.Second, time.Second) for i := 0; i < 10; i++ { tc.Add(fmt.Sprint(i)) time.Sleep(time.Millisecond * 100) @@ -34,9 +32,7 @@ func TestFirstSeenCacheExpire(t *testing.T) { } func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) { - backgroundSweepInterval = time.Second - - tc := newFirstSeenCache(time.Second) + tc := newFirstSeenCacheWithSweepInterval(time.Second, time.Second) tc.Add(fmt.Sprint(0)) time.Sleep(2 * time.Second) diff --git a/timecache/last_seen_cache.go b/timecache/last_seen_cache.go index 128c299..676e0ae 100644 --- a/timecache/last_seen_cache.go +++ b/timecache/last_seen_cache.go @@ -19,6 +19,10 @@ type LastSeenCache struct { var _ TimeCache = (*LastSeenCache)(nil) func newLastSeenCache(ttl time.Duration) *LastSeenCache { + return newLastSeenCacheWithSweepInterval(ttl, backgroundSweepInterval) +} + +func newLastSeenCacheWithSweepInterval(ttl time.Duration, sweepInterval time.Duration) *LastSeenCache { tc := &LastSeenCache{ m: make(map[string]time.Time), ttl: ttl, @@ -26,7 +30,7 @@ func newLastSeenCache(ttl time.Duration) *LastSeenCache { ctx, done := context.WithCancel(context.Background()) tc.done = done - go background(ctx, &tc.lk, tc.m) + go background(ctx, &tc.lk, tc.m, sweepInterval) return tc } diff --git a/timecache/last_seen_cache_test.go b/timecache/last_seen_cache_test.go index 4522026..a320093 100644 --- a/timecache/last_seen_cache_test.go +++ b/timecache/last_seen_cache_test.go @@ -17,8 +17,7 @@ func TestLastSeenCacheFound(t *testing.T) { } func TestLastSeenCacheExpire(t *testing.T) { - backgroundSweepInterval = time.Second - tc := newLastSeenCache(time.Second) + tc := newLastSeenCacheWithSweepInterval(time.Second, time.Second) for i := 0; i < 11; i++ { tc.Add(fmt.Sprint(i)) time.Sleep(time.Millisecond * 100) @@ -80,9 +79,7 @@ func TestLastSeenCacheSlideForward(t *testing.T) { } func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) { - backgroundSweepInterval = time.Second - - tc := newLastSeenCache(time.Second) + tc := newLastSeenCacheWithSweepInterval(time.Second, time.Second) tc.Add(fmt.Sprint(0)) time.Sleep(2 * time.Second) diff --git a/timecache/util.go b/timecache/util.go index eaf92b3..5370572 100644 --- a/timecache/util.go +++ b/timecache/util.go @@ -6,10 +6,10 @@ import ( "time" ) -var backgroundSweepInterval = time.Minute +const backgroundSweepInterval = time.Minute -func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) { - ticker := time.NewTicker(backgroundSweepInterval) +func background(ctx context.Context, lk sync.Locker, m map[string]time.Time, tickerDur time.Duration) { + ticker := time.NewTicker(tickerDur) defer ticker.Stop() for { diff --git a/validation_builtin_test.go b/validation_builtin_test.go index bca8774..ed57753 100644 --- a/validation_builtin_test.go +++ b/validation_builtin_test.go @@ -20,10 +20,23 @@ import ( pb "github.com/libp2p/go-libp2p-pubsub/pb" ) -var rng *rand.Rand +var rng *concurrentRNG + +type concurrentRNG struct { + mu sync.Mutex + rng *rand.Rand +} + +func (r *concurrentRNG) Intn(n int) int { + r.mu.Lock() + defer r.mu.Unlock() + return r.rng.Intn(n) +} func init() { - rng = rand.New(rand.NewSource(314159)) + rng = &concurrentRNG{ + rng: rand.New(rand.NewSource(314159)), + } } func TestBasicSeqnoValidator1(t *testing.T) {