From 373fc86d1e1110ac2b48996a7d3e50b2fad63190 Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Thu, 28 Dec 2017 13:50:36 -0700 Subject: [PATCH] Change Loop API to prevent it from accepting tasks when the loop is closed #415 (#448) --- geth/jail/cell.go | 29 ++++++-- geth/jail/handlers.go | 1 + geth/jail/internal/fetch/fetch.go | 8 ++- geth/jail/internal/loop/loop.go | 80 +++++++++++++++------ geth/jail/internal/loop/loop_test.go | 104 +++++++++++++++++++++++++++ geth/jail/internal/timers/timers.go | 22 ++++-- 6 files changed, 211 insertions(+), 33 deletions(-) create mode 100644 geth/jail/internal/loop/loop_test.go diff --git a/geth/jail/cell.go b/geth/jail/cell.go index 54735dda9..82a040e95 100644 --- a/geth/jail/cell.go +++ b/geth/jail/cell.go @@ -13,6 +13,8 @@ import ( "github.com/status-im/status-go/geth/jail/internal/vm" ) +const timeout = 5 * time.Second + // Cell represents a single jail cell, which is basically a JavaScript VM. type Cell struct { *vm.VM @@ -86,11 +88,26 @@ func (c *Cell) Stop() error { // event queue loop and schedules for immediate execution. // Intended to be used by any cell user that want's to run // async call, like callback. -func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) { +func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) error { task := looptask.NewCallTask(fn, args...) - // Add a task to the queue. - c.loop.Add(task) - // And run the task immediately. - // It's a blocking operation. - c.loop.Ready(task) + errChan := make(chan error) + + go func() { + defer close(errChan) + err := c.loop.AddAndExecute(task) + if err != nil { + errChan <- err + } + }() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case err := <-errChan: + return err + + case <-timer.C: + return errors.New("Timeout") + } } diff --git a/geth/jail/handlers.go b/geth/jail/handlers.go index cbf961b5e..1b9d9f736 100644 --- a/geth/jail/handlers.go +++ b/geth/jail/handlers.go @@ -94,6 +94,7 @@ func createSendAsyncHandler(jail *Jail, cell *Cell) func(call otto.FunctionCall) return } + // nolint: errcheck if err != nil { cell.CallAsync(callback, vm.MakeCustomError("Error", err.Error())) } else { diff --git a/geth/jail/internal/fetch/fetch.go b/geth/jail/internal/fetch/fetch.go index 325316363..b3c82fb19 100644 --- a/geth/jail/internal/fetch/fetch.go +++ b/geth/jail/internal/fetch/fetch.go @@ -129,10 +129,14 @@ func DefineWithHandler(vm *vm.VM, l *loop.Loop, h http.Handler) error { cb: cb, } - l.Add(t) + // If err is non-nil, then the loop is closed + // and we shouldn't do anymore with it. + if err := l.Add(t); err != nil { + return otto.UndefinedValue() + } go func() { - defer l.Ready(t) + defer l.Ready(t) // nolint: errcheck req, rqErr := http.NewRequest(method, urlStr, body) if rqErr != nil { diff --git a/geth/jail/internal/loop/loop.go b/geth/jail/internal/loop/loop.go index f37752c6a..3e593edd4 100644 --- a/geth/jail/internal/loop/loop.go +++ b/geth/jail/internal/loop/loop.go @@ -2,12 +2,17 @@ package loop import ( "context" + "errors" "sync" "sync/atomic" "github.com/status-im/status-go/geth/jail/internal/vm" ) +// ErrClosed represents the error returned when we try to add or ready +// a task on a closed loop. +var ErrClosed = errors.New("The loop is closed and no longer accepting tasks") + // Task represents something that the event loop can schedule and run. // // Task describes two operations that will almost always be boilerplate, @@ -27,19 +32,21 @@ type Task interface { // Loop encapsulates the event loop's state. This includes the vm on which the // loop operates, a monotonically incrementing event id, a map of tasks that -// aren't ready yet, keyed by their ID, and a channel of tasks that are ready -// to finalise on the VM. The channel holding the tasks pending finalising can -// be buffered or unbuffered. +// aren't ready yet, keyed by their ID, a channel of tasks that are ready +// to finalise on the VM, and a boolean that indicates if the loop is still +// accepting tasks. The channel holding the tasks pending finalising can be +// buffered or unbuffered. // // Warning: id must be the first field in this struct as it's accessed atomically. // Otherwise, on ARM and x86-32 it will panic. // More information: https://golang.org/pkg/sync/atomic/#pkg-note-BUG. type Loop struct { - id int64 - vm *vm.VM - lock sync.RWMutex - tasks map[int64]Task - ready chan Task + id int64 + vm *vm.VM + lock sync.RWMutex + tasks map[int64]Task + ready chan Task + accepting bool } // New creates a new Loop with an unbuffered ready queue on a specific VM. @@ -51,9 +58,20 @@ func New(vm *vm.VM) *Loop { // queue, the capacity of which being specified by the backlog argument. func NewWithBacklog(vm *vm.VM, backlog int) *Loop { return &Loop{ - vm: vm, - tasks: make(map[int64]Task), - ready: make(chan Task, backlog), + vm: vm, + tasks: make(map[int64]Task), + ready: make(chan Task, backlog), + accepting: true, + } +} + +// close the loop so that it no longer accepts tasks. +func (l *Loop) close() { + l.lock.Lock() + defer l.lock.Unlock() + if l.accepting { + l.accepting = false + close(l.ready) } } @@ -65,11 +83,15 @@ func (l *Loop) VM() *vm.VM { // Add puts a task into the loop. This signals to the loop that this task is // doing something outside of the JavaScript environment, and that at some // point, it will become ready for finalising. -func (l *Loop) Add(t Task) { +func (l *Loop) Add(t Task) error { l.lock.Lock() + defer l.lock.Unlock() + if !l.accepting { + return ErrClosed + } t.SetID(atomic.AddInt64(&l.id, 1)) l.tasks[t.GetID()] = t - l.lock.Unlock() + return nil } // Remove takes a task out of the loop. This should not be called if a task @@ -77,7 +99,7 @@ func (l *Loop) Add(t Task) { // broken. func (l *Loop) Remove(t Task) { l.remove(t) - go l.Ready(nil) + go l.Ready(nil) // nolint: errcheck } func (l *Loop) remove(t Task) { @@ -101,8 +123,26 @@ func (l *Loop) removeAll() { // Ready signals to the loop that a task is ready to be finalised. This might // block if the "ready channel" in the loop is at capacity. -func (l *Loop) Ready(t Task) { - l.ready <- t +func (l *Loop) Ready(t Task) error { + var err error + l.lock.RLock() + if !l.accepting { + t.Cancel() + err = ErrClosed + } + l.lock.RUnlock() + if err == nil { + l.ready <- t + } + return err +} + +// AddAndExecute combines Add and Ready for immediate execution. +func (l *Loop) AddAndExecute(t Task) error { + if err := l.Add(t); err != nil { + return err + } + return l.Ready(t) } // Eval executes some code in the VM associated with the loop and returns an @@ -131,11 +171,13 @@ func (l *Loop) processTask(t Task) error { // Run handles the task scheduling and finalisation. // It runs infinitely waiting for new tasks. func (l *Loop) Run(ctx context.Context) error { + defer l.close() + defer l.removeAll() + for { select { case t := <-l.ready: if ctx.Err() != nil { - l.removeAll() return ctx.Err() } @@ -147,12 +189,10 @@ func (l *Loop) Run(ctx context.Context) error { if err != nil { // TODO(divan): do we need to report // errors up to the caller? - // Ignoring for now, as loop - // should keep running. + // Ignoring for now. continue } case <-ctx.Done(): - l.removeAll() return ctx.Err() } } diff --git a/geth/jail/internal/loop/loop_test.go b/geth/jail/internal/loop/loop_test.go new file mode 100644 index 000000000..2e2ae7074 --- /dev/null +++ b/geth/jail/internal/loop/loop_test.go @@ -0,0 +1,104 @@ +package loop + +import ( + "context" + "testing" + "time" + + "github.com/status-im/status-go/geth/jail/internal/vm" + "github.com/stretchr/testify/suite" +) + +// DummyTask is something that satisfies the loop.Task interface for testing. +type DummyTask struct { + canceled bool + executed bool +} + +func (*DummyTask) SetID(int64) {} +func (*DummyTask) GetID() int64 { return 1 } +func (d *DummyTask) Cancel() { d.canceled = true } +func (d *DummyTask) Execute(*vm.VM, *Loop) error { + d.executed = true + return nil +} + +func TestLoopSuite(t *testing.T) { + suite.Run(t, new(LoopSuite)) +} + +type LoopSuite struct { + suite.Suite + + loop *Loop + cancel context.CancelFunc + + task *DummyTask +} + +func (s *LoopSuite) SetupTest() { + s.task = &DummyTask{} + + vm := vm.New() + s.loop = New(vm) + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + go s.loop.Run(ctx) +} + +func (s *LoopSuite) TestAddAndReady() { + + err := s.loop.Add(s.task) + s.NoError(err) + s.False(s.task.canceled) + + err = s.loop.Ready(s.task) + s.NoError(err) + + // Wait to process task + time.Sleep(100 * time.Millisecond) + s.True(s.task.executed) + + s.cancel() +} + +func (s *LoopSuite) TestLoopErrorWhenClosed() { + s.cancel() + + // Wait for the context to cancel and loop to close + time.Sleep(100 * time.Millisecond) + + err := s.loop.Add(s.task) + s.Error(err) + + err = s.loop.Ready(s.task) + s.Error(err) + s.True(s.task.canceled) +} + +func (s *LoopSuite) TestImmediateExecution() { + err := s.loop.AddAndExecute(s.task) + + // Wait for the task to execute + time.Sleep(100 * time.Millisecond) + + s.NoError(err) + s.True(s.task.executed) + s.False(s.task.canceled) + + s.cancel() +} + +func (s *LoopSuite) TestImmediateExecutionErrorWhenClosed() { + s.cancel() + + // Wait for the context to cancel and loop to close + time.Sleep(100 * time.Millisecond) + + err := s.loop.AddAndExecute(s.task) + + s.Error(err) + s.False(s.task.executed) + +} diff --git a/geth/jail/internal/timers/timers.go b/geth/jail/internal/timers/timers.go index d19470048..b8a4aad9c 100644 --- a/geth/jail/internal/timers/timers.go +++ b/geth/jail/internal/timers/timers.go @@ -34,10 +34,15 @@ func Define(vm *vm.VM, l *loop.Loop) error { call: call, interval: interval, } - l.Add(t) + + // If err is non-nil, then the loop is closed and should not + // be used anymore. + if err := l.Add(t); err != nil { + return otto.UndefinedValue() + } t.timer = time.AfterFunc(t.duration, func() { - l.Ready(t) + l.Ready(t) // nolint: errcheck }) value, newTimerErr := call.Otto.ToValue(t) @@ -64,10 +69,15 @@ func Define(vm *vm.VM, l *loop.Loop) error { duration: time.Millisecond, call: call, } - l.Add(t) + + // If err is non-nil, then the loop is closed and should not + // be used anymore. + if err := l.Add(t); err != nil { + return otto.UndefinedValue() + } t.timer = time.AfterFunc(t.duration, func() { - l.Ready(t) + l.Ready(t) // nolint: errcheck }) value, setImmediateErr := call.Otto.ToValue(t) @@ -139,7 +149,9 @@ func (t *timerTask) Execute(vm *vm.VM, l *loop.Loop) error { if t.interval && !t.stopped { t.timer.Reset(t.duration) - l.Add(t) + if err := l.Add(t); err != nil { + return err + } } return nil