fix flaky resource manager tests
This commit is contained in:
parent
c681541243
commit
edc86d982b
|
@ -30,6 +30,7 @@ type Echo struct {
|
|||
status EchoStatus
|
||||
|
||||
beforeReserve, beforeRead, beforeWrite, beforeDone func() error
|
||||
done func()
|
||||
}
|
||||
|
||||
type EchoStatus struct {
|
||||
|
@ -81,6 +82,13 @@ func (e *Echo) BeforeDone(f func() error) {
|
|||
e.beforeDone = f
|
||||
}
|
||||
|
||||
func (e *Echo) Done(f func()) {
|
||||
e.mx.Lock()
|
||||
defer e.mx.Unlock()
|
||||
|
||||
e.done = f
|
||||
}
|
||||
|
||||
func (e *Echo) getBeforeReserve() func() error {
|
||||
e.mx.Lock()
|
||||
defer e.mx.Unlock()
|
||||
|
@ -109,9 +117,20 @@ func (e *Echo) getBeforeDone() func() error {
|
|||
return e.beforeDone
|
||||
}
|
||||
|
||||
func (e *Echo) getDone() func() {
|
||||
e.mx.Lock()
|
||||
defer e.mx.Unlock()
|
||||
|
||||
return e.done
|
||||
}
|
||||
|
||||
func (e *Echo) handleStream(s network.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
if done := e.getDone(); done != nil {
|
||||
defer done()
|
||||
}
|
||||
|
||||
e.mx.Lock()
|
||||
e.status.StreamsIn++
|
||||
e.mx.Unlock()
|
||||
|
|
|
@ -109,9 +109,6 @@ func TestResourceManagerServiceInbound(t *testing.T) {
|
|||
defer closeEchos(echos)
|
||||
defer closeRcmgrs(echos)
|
||||
|
||||
ready := make(chan struct{})
|
||||
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
|
||||
|
||||
for i := 1; i < 5; i++ {
|
||||
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
|
||||
if err != nil {
|
||||
|
@ -120,9 +117,16 @@ func TestResourceManagerServiceInbound(t *testing.T) {
|
|||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
ready := make(chan struct{})
|
||||
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
|
||||
|
||||
var eg sync.WaitGroup
|
||||
echos[0].Done(eg.Done)
|
||||
|
||||
var once sync.Once
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i < 5; i++ {
|
||||
eg.Add(1)
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
@ -137,6 +141,7 @@ func TestResourceManagerServiceInbound(t *testing.T) {
|
|||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
eg.Wait()
|
||||
|
||||
checkEchoStatus(t, echos[0], EchoStatus{
|
||||
StreamsIn: 4,
|
||||
|
@ -157,11 +162,6 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
defer closeEchos(echos)
|
||||
defer closeRcmgrs(echos)
|
||||
|
||||
count := new(int32)
|
||||
ready := make(chan struct{})
|
||||
*count = 4
|
||||
echos[0].BeforeDone(waitForBarrier(count, ready, time.Minute))
|
||||
|
||||
for i := 1; i < 5; i++ {
|
||||
err := echos[i].Host.Connect(context.Background(), peer.AddrInfo{ID: echos[0].Host.ID()})
|
||||
if err != nil {
|
||||
|
@ -170,8 +170,14 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
echos[0].BeforeDone(waitForBarrier(4, time.Minute))
|
||||
|
||||
var eg sync.WaitGroup
|
||||
echos[0].Done(eg.Done)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 1; i < 5; i++ {
|
||||
eg.Add(1)
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
@ -183,6 +189,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
eg.Wait()
|
||||
|
||||
checkEchoStatus(t, echos[0], EchoStatus{
|
||||
StreamsIn: 4,
|
||||
|
@ -191,11 +198,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
ResourceServiceErrors: 0,
|
||||
})
|
||||
|
||||
ready = make(chan struct{})
|
||||
ready := make(chan struct{})
|
||||
echos[0].BeforeDone(waitForChannel(ready, time.Minute))
|
||||
|
||||
var once sync.Once
|
||||
for i := 0; i < 3; i++ {
|
||||
eg.Add(1)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
@ -210,6 +218,7 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
eg.Wait()
|
||||
|
||||
checkEchoStatus(t, echos[0], EchoStatus{
|
||||
StreamsIn: 7,
|
||||
|
@ -219,9 +228,12 @@ func TestResourceManagerServicePeerInbound(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func waitForBarrier(count *int32, ready chan struct{}, timeout time.Duration) func() error {
|
||||
func waitForBarrier(count int32, timeout time.Duration) func() error {
|
||||
ready := make(chan struct{})
|
||||
wait := new(int32)
|
||||
*wait = count
|
||||
return func() error {
|
||||
if atomic.AddInt32(count, -1) == 0 {
|
||||
if atomic.AddInt32(wait, -1) == 0 {
|
||||
close(ready)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue