From edc86d982ba2ede5e6618854caf4c6df889585a1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 6 Feb 2022 11:14:41 +0200 Subject: [PATCH] fix flaky resource manager tests --- itest/echo.go | 19 +++++++++++++++++++ itest/rcmgr_test.go | 34 +++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/itest/echo.go b/itest/echo.go index 96c5efa1..90acec56 100644 --- a/itest/echo.go +++ b/itest/echo.go @@ -30,6 +30,7 @@ type Echo struct { status EchoStatus beforeReserve, beforeRead, beforeWrite, beforeDone func() error + done func() } type EchoStatus struct { @@ -81,6 +82,13 @@ func (e *Echo) BeforeDone(f func() error) { e.beforeDone = f } +func (e *Echo) Done(f func()) { + e.mx.Lock() + defer e.mx.Unlock() + + e.done = f +} + func (e *Echo) getBeforeReserve() func() error { e.mx.Lock() defer e.mx.Unlock() @@ -109,9 +117,20 @@ func (e *Echo) getBeforeDone() func() error { return e.beforeDone } +func (e *Echo) getDone() func() { + e.mx.Lock() + defer e.mx.Unlock() + + return e.done +} + func (e *Echo) handleStream(s network.Stream) { defer s.Close() + if done := e.getDone(); done != nil { + defer done() + } + e.mx.Lock() e.status.StreamsIn++ e.mx.Unlock() diff --git a/itest/rcmgr_test.go b/itest/rcmgr_test.go index 9523b1a8..b2162f71 100644 --- a/itest/rcmgr_test.go +++ b/itest/rcmgr_test.go @@ -109,9 +109,6 @@ func TestResourceManagerServiceInbound(t *testing.T) { defer closeEchos(echos) defer closeRcmgrs(echos) - ready := make(chan struct{}) - echos[0].BeforeDone(waitForChannel(ready, time.Minute)) - for i := 1; i < 5; i++ { err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) if err != nil { @@ -120,9 +117,16 @@ func TestResourceManagerServiceInbound(t *testing.T) { time.Sleep(10 * time.Millisecond) } + ready := make(chan struct{}) + echos[0].BeforeDone(waitForChannel(ready, time.Minute)) + + var eg sync.WaitGroup + echos[0].Done(eg.Done) + var once sync.Once var wg sync.WaitGroup for i := 1; i < 5; i++ { + eg.Add(1) wg.Add(1) go func(i int) { defer wg.Done() @@ -137,6 +141,7 @@ func TestResourceManagerServiceInbound(t *testing.T) { }(i) } wg.Wait() + eg.Wait() checkEchoStatus(t, echos[0], EchoStatus{ StreamsIn: 4, @@ -157,11 +162,6 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { defer closeEchos(echos) defer closeRcmgrs(echos) - count := new(int32) - ready := make(chan struct{}) - *count = 4 - echos[0].BeforeDone(waitForBarrier(count, ready, time.Minute)) - for i := 1; i < 5; i++ { err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()}) if err != nil { @@ -170,8 +170,14 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { time.Sleep(10 * time.Millisecond) } + echos[0].BeforeDone(waitForBarrier(4, time.Minute)) + + var eg sync.WaitGroup + echos[0].Done(eg.Done) + var wg sync.WaitGroup for i := 1; i < 5; i++ { + eg.Add(1) wg.Add(1) go func(i int) { defer wg.Done() @@ -183,6 +189,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { }(i) } wg.Wait() + eg.Wait() checkEchoStatus(t, echos[0], EchoStatus{ StreamsIn: 4, @@ -191,11 +198,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { ResourceServiceErrors: 0, }) - ready = make(chan struct{}) + ready := make(chan struct{}) echos[0].BeforeDone(waitForChannel(ready, time.Minute)) var once sync.Once for i := 0; i < 3; i++ { + eg.Add(1) wg.Add(1) go func() { defer wg.Done() @@ -210,6 +218,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { }() } wg.Wait() + eg.Wait() checkEchoStatus(t, echos[0], EchoStatus{ StreamsIn: 7, @@ -219,9 +228,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) { }) } -func waitForBarrier(count *int32, ready chan struct{}, timeout time.Duration) func() error { +func waitForBarrier(count int32, timeout time.Duration) func() error { + ready := make(chan struct{}) + wait := new(int32) + *wait = count return func() error { - if atomic.AddInt32(count, -1) == 0 { + if atomic.AddInt32(wait, -1) == 0 { close(ready) }