diff --git a/geth/jail/internal/loop/loop.go b/geth/jail/internal/loop/loop.go index 3e593edd4..e0523c9d4 100644 --- a/geth/jail/internal/loop/loop.go +++ b/geth/jail/internal/loop/loop.go @@ -41,12 +41,13 @@ type Task interface { // Otherwise, on ARM and x86-32 it will panic. // More information: https://golang.org/pkg/sync/atomic/#pkg-note-BUG. type Loop struct { - id int64 - vm *vm.VM - lock sync.RWMutex - tasks map[int64]Task - ready chan Task - accepting bool + id int64 + vm *vm.VM + lock sync.RWMutex + tasks map[int64]Task + ready chan Task + closer sync.Once + closedChan chan struct{} } // New creates a new Loop with an unbuffered ready queue on a specific VM. @@ -58,21 +59,18 @@ func New(vm *vm.VM) *Loop { // queue, the capacity of which being specified by the backlog argument. func NewWithBacklog(vm *vm.VM, backlog int) *Loop { return &Loop{ - vm: vm, - tasks: make(map[int64]Task), - ready: make(chan Task, backlog), - accepting: true, + vm: vm, + tasks: make(map[int64]Task), + ready: make(chan Task, backlog), + closedChan: make(chan struct{}), } } // close the loop so that it no longer accepts tasks. func (l *Loop) close() { - l.lock.Lock() - defer l.lock.Unlock() - if l.accepting { - l.accepting = false - close(l.ready) - } + l.closer.Do(func() { + close(l.closedChan) + }) } // VM gets the JavaScript interpreter associated with the loop. @@ -84,11 +82,13 @@ func (l *Loop) VM() *vm.VM { // doing something outside of the JavaScript environment, and that at some // point, it will become ready for finalising. func (l *Loop) Add(t Task) error { + select { + case <-l.closedChan: + return ErrClosed + default: + } l.lock.Lock() defer l.lock.Unlock() - if !l.accepting { - return ErrClosed - } t.SetID(atomic.AddInt64(&l.id, 1)) l.tasks[t.GetID()] = t return nil @@ -124,17 +124,13 @@ func (l *Loop) removeAll() { // Ready signals to the loop that a task is ready to be finalised. This might // block if the "ready channel" in the loop is at capacity. func (l *Loop) Ready(t Task) error { - var err error - l.lock.RLock() - if !l.accepting { + select { + case <-l.closedChan: t.Cancel() - err = ErrClosed + return ErrClosed + case l.ready <- t: + return nil } - l.lock.RUnlock() - if err == nil { - l.ready <- t - } - return err } // AddAndExecute combines Add and Ready for immediate execution. diff --git a/geth/jail/internal/loop/loop_test.go b/geth/jail/internal/loop/loop_test.go index 2e2ae7074..a552d6bad 100644 --- a/geth/jail/internal/loop/loop_test.go +++ b/geth/jail/internal/loop/loop_test.go @@ -2,6 +2,7 @@ package loop import ( "context" + "sync/atomic" "testing" "time" @@ -11,18 +12,26 @@ import ( // DummyTask is something that satisfies the loop.Task interface for testing. type DummyTask struct { - canceled bool - executed bool + canceled int32 + executed int32 } func (*DummyTask) SetID(int64) {} func (*DummyTask) GetID() int64 { return 1 } -func (d *DummyTask) Cancel() { d.canceled = true } +func (d *DummyTask) Cancel() { atomic.StoreInt32(&d.canceled, 1) } func (d *DummyTask) Execute(*vm.VM, *Loop) error { - d.executed = true + atomic.StoreInt32(&d.executed, 1) return nil } +func (d *DummyTask) Canceled() bool { + return atomic.LoadInt32(&d.canceled) == 1 +} + +func (d *DummyTask) Executed() bool { + return atomic.LoadInt32(&d.executed) == 1 +} + func TestLoopSuite(t *testing.T) { suite.Run(t, new(LoopSuite)) } @@ -51,14 +60,14 @@ func (s *LoopSuite) TestAddAndReady() { err := s.loop.Add(s.task) s.NoError(err) - s.False(s.task.canceled) + s.False(s.task.Canceled()) err = s.loop.Ready(s.task) s.NoError(err) // Wait to process task time.Sleep(100 * time.Millisecond) - s.True(s.task.executed) + s.True(s.task.Executed()) s.cancel() } @@ -74,7 +83,7 @@ func (s *LoopSuite) TestLoopErrorWhenClosed() { err = s.loop.Ready(s.task) s.Error(err) - s.True(s.task.canceled) + s.True(s.task.Canceled()) } func (s *LoopSuite) TestImmediateExecution() { @@ -84,8 +93,8 @@ func (s *LoopSuite) TestImmediateExecution() { time.Sleep(100 * time.Millisecond) s.NoError(err) - s.True(s.task.executed) - s.False(s.task.canceled) + s.True(s.task.Executed()) + s.False(s.task.Canceled()) s.cancel() } @@ -99,6 +108,6 @@ func (s *LoopSuite) TestImmediateExecutionErrorWhenClosed() { err := s.loop.AddAndExecute(s.task) s.Error(err) - s.False(s.task.executed) + s.False(s.task.Executed()) }