Fix all data races in jail package (#632)

* Rework how loop is stopped

* Use atomic int32 for DummyTask

* Close loop channel only once

* Rename closeChan to closedChan
This commit is contained in:
Dmitry Shulyak 2018-02-08 15:03:04 +02:00 committed by GitHub
parent 70912ab8f6
commit 4cddc362ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 38 deletions

View File

@ -46,7 +46,8 @@ type Loop struct {
lock sync.RWMutex lock sync.RWMutex
tasks map[int64]Task tasks map[int64]Task
ready chan Task ready chan Task
accepting bool closer sync.Once
closedChan chan struct{}
} }
// New creates a new Loop with an unbuffered ready queue on a specific VM. // New creates a new Loop with an unbuffered ready queue on a specific VM.
@ -61,18 +62,15 @@ func NewWithBacklog(vm *vm.VM, backlog int) *Loop {
vm: vm, vm: vm,
tasks: make(map[int64]Task), tasks: make(map[int64]Task),
ready: make(chan Task, backlog), ready: make(chan Task, backlog),
accepting: true, closedChan: make(chan struct{}),
} }
} }
// close the loop so that it no longer accepts tasks. // close the loop so that it no longer accepts tasks.
func (l *Loop) close() { func (l *Loop) close() {
l.lock.Lock() l.closer.Do(func() {
defer l.lock.Unlock() close(l.closedChan)
if l.accepting { })
l.accepting = false
close(l.ready)
}
} }
// VM gets the JavaScript interpreter associated with the loop. // VM gets the JavaScript interpreter associated with the loop.
@ -84,11 +82,13 @@ func (l *Loop) VM() *vm.VM {
// doing something outside of the JavaScript environment, and that at some // doing something outside of the JavaScript environment, and that at some
// point, it will become ready for finalising. // point, it will become ready for finalising.
func (l *Loop) Add(t Task) error { func (l *Loop) Add(t Task) error {
select {
case <-l.closedChan:
return ErrClosed
default:
}
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock() defer l.lock.Unlock()
if !l.accepting {
return ErrClosed
}
t.SetID(atomic.AddInt64(&l.id, 1)) t.SetID(atomic.AddInt64(&l.id, 1))
l.tasks[t.GetID()] = t l.tasks[t.GetID()] = t
return nil return nil
@ -124,17 +124,13 @@ func (l *Loop) removeAll() {
// Ready signals to the loop that a task is ready to be finalised. This might // Ready signals to the loop that a task is ready to be finalised. This might
// block if the "ready channel" in the loop is at capacity. // block if the "ready channel" in the loop is at capacity.
func (l *Loop) Ready(t Task) error { func (l *Loop) Ready(t Task) error {
var err error select {
l.lock.RLock() case <-l.closedChan:
if !l.accepting {
t.Cancel() t.Cancel()
err = ErrClosed return ErrClosed
case l.ready <- t:
return nil
} }
l.lock.RUnlock()
if err == nil {
l.ready <- t
}
return err
} }
// AddAndExecute combines Add and Ready for immediate execution. // AddAndExecute combines Add and Ready for immediate execution.

View File

@ -2,6 +2,7 @@ package loop
import ( import (
"context" "context"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -11,18 +12,26 @@ import (
// DummyTask is something that satisfies the loop.Task interface for testing. // DummyTask is something that satisfies the loop.Task interface for testing.
type DummyTask struct { type DummyTask struct {
canceled bool canceled int32
executed bool executed int32
} }
func (*DummyTask) SetID(int64) {} func (*DummyTask) SetID(int64) {}
func (*DummyTask) GetID() int64 { return 1 } func (*DummyTask) GetID() int64 { return 1 }
func (d *DummyTask) Cancel() { d.canceled = true } func (d *DummyTask) Cancel() { atomic.StoreInt32(&d.canceled, 1) }
func (d *DummyTask) Execute(*vm.VM, *Loop) error { func (d *DummyTask) Execute(*vm.VM, *Loop) error {
d.executed = true atomic.StoreInt32(&d.executed, 1)
return nil return nil
} }
func (d *DummyTask) Canceled() bool {
return atomic.LoadInt32(&d.canceled) == 1
}
func (d *DummyTask) Executed() bool {
return atomic.LoadInt32(&d.executed) == 1
}
func TestLoopSuite(t *testing.T) { func TestLoopSuite(t *testing.T) {
suite.Run(t, new(LoopSuite)) suite.Run(t, new(LoopSuite))
} }
@ -51,14 +60,14 @@ func (s *LoopSuite) TestAddAndReady() {
err := s.loop.Add(s.task) err := s.loop.Add(s.task)
s.NoError(err) s.NoError(err)
s.False(s.task.canceled) s.False(s.task.Canceled())
err = s.loop.Ready(s.task) err = s.loop.Ready(s.task)
s.NoError(err) s.NoError(err)
// Wait to process task // Wait to process task
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
s.True(s.task.executed) s.True(s.task.Executed())
s.cancel() s.cancel()
} }
@ -74,7 +83,7 @@ func (s *LoopSuite) TestLoopErrorWhenClosed() {
err = s.loop.Ready(s.task) err = s.loop.Ready(s.task)
s.Error(err) s.Error(err)
s.True(s.task.canceled) s.True(s.task.Canceled())
} }
func (s *LoopSuite) TestImmediateExecution() { func (s *LoopSuite) TestImmediateExecution() {
@ -84,8 +93,8 @@ func (s *LoopSuite) TestImmediateExecution() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
s.NoError(err) s.NoError(err)
s.True(s.task.executed) s.True(s.task.Executed())
s.False(s.task.canceled) s.False(s.task.Canceled())
s.cancel() s.cancel()
} }
@ -99,6 +108,6 @@ func (s *LoopSuite) TestImmediateExecutionErrorWhenClosed() {
err := s.loop.AddAndExecute(s.task) err := s.loop.AddAndExecute(s.task)
s.Error(err) s.Error(err)
s.False(s.task.executed) s.False(s.task.Executed())
} }