2
0
mirror of https://github.com/status-im/status-go.git synced 2025-01-12 07:35:02 +00:00

Change Loop API to prevent it from accepting tasks when the loop is closed ()

This commit is contained in:
Donnie Adams 2017-12-28 13:50:36 -07:00 committed by Ivan Daniluk
parent 97c473ad7a
commit 373fc86d1e
6 changed files with 211 additions and 33 deletions

@ -13,6 +13,8 @@ import (
"github.com/status-im/status-go/geth/jail/internal/vm" "github.com/status-im/status-go/geth/jail/internal/vm"
) )
const timeout = 5 * time.Second
// Cell represents a single jail cell, which is basically a JavaScript VM. // Cell represents a single jail cell, which is basically a JavaScript VM.
type Cell struct { type Cell struct {
*vm.VM *vm.VM
@ -86,11 +88,26 @@ func (c *Cell) Stop() error {
// event queue loop and schedules for immediate execution. // event queue loop and schedules for immediate execution.
// Intended to be used by any cell user that want's to run // Intended to be used by any cell user that want's to run
// async call, like callback. // async call, like callback.
func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) { func (c *Cell) CallAsync(fn otto.Value, args ...interface{}) error {
task := looptask.NewCallTask(fn, args...) task := looptask.NewCallTask(fn, args...)
// Add a task to the queue. errChan := make(chan error)
c.loop.Add(task)
// And run the task immediately. go func() {
// It's a blocking operation. defer close(errChan)
c.loop.Ready(task) err := c.loop.AddAndExecute(task)
if err != nil {
errChan <- err
}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-errChan:
return err
case <-timer.C:
return errors.New("Timeout")
}
} }

@ -94,6 +94,7 @@ func createSendAsyncHandler(jail *Jail, cell *Cell) func(call otto.FunctionCall)
return return
} }
// nolint: errcheck
if err != nil { if err != nil {
cell.CallAsync(callback, vm.MakeCustomError("Error", err.Error())) cell.CallAsync(callback, vm.MakeCustomError("Error", err.Error()))
} else { } else {

@ -129,10 +129,14 @@ func DefineWithHandler(vm *vm.VM, l *loop.Loop, h http.Handler) error {
cb: cb, cb: cb,
} }
l.Add(t) // If err is non-nil, then the loop is closed
// and we shouldn't do anymore with it.
if err := l.Add(t); err != nil {
return otto.UndefinedValue()
}
go func() { go func() {
defer l.Ready(t) defer l.Ready(t) // nolint: errcheck
req, rqErr := http.NewRequest(method, urlStr, body) req, rqErr := http.NewRequest(method, urlStr, body)
if rqErr != nil { if rqErr != nil {

@ -2,12 +2,17 @@ package loop
import ( import (
"context" "context"
"errors"
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/status-im/status-go/geth/jail/internal/vm" "github.com/status-im/status-go/geth/jail/internal/vm"
) )
// ErrClosed represents the error returned when we try to add or ready
// a task on a closed loop.
var ErrClosed = errors.New("The loop is closed and no longer accepting tasks")
// Task represents something that the event loop can schedule and run. // Task represents something that the event loop can schedule and run.
// //
// Task describes two operations that will almost always be boilerplate, // Task describes two operations that will almost always be boilerplate,
@ -27,19 +32,21 @@ type Task interface {
// Loop encapsulates the event loop's state. This includes the vm on which the // Loop encapsulates the event loop's state. This includes the vm on which the
// loop operates, a monotonically incrementing event id, a map of tasks that // loop operates, a monotonically incrementing event id, a map of tasks that
// aren't ready yet, keyed by their ID, and a channel of tasks that are ready // aren't ready yet, keyed by their ID, a channel of tasks that are ready
// to finalise on the VM. The channel holding the tasks pending finalising can // to finalise on the VM, and a boolean that indicates if the loop is still
// be buffered or unbuffered. // accepting tasks. The channel holding the tasks pending finalising can be
// buffered or unbuffered.
// //
// Warning: id must be the first field in this struct as it's accessed atomically. // Warning: id must be the first field in this struct as it's accessed atomically.
// Otherwise, on ARM and x86-32 it will panic. // Otherwise, on ARM and x86-32 it will panic.
// More information: https://golang.org/pkg/sync/atomic/#pkg-note-BUG. // More information: https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
type Loop struct { type Loop struct {
id int64 id int64
vm *vm.VM vm *vm.VM
lock sync.RWMutex lock sync.RWMutex
tasks map[int64]Task tasks map[int64]Task
ready chan Task ready chan Task
accepting bool
} }
// New creates a new Loop with an unbuffered ready queue on a specific VM. // New creates a new Loop with an unbuffered ready queue on a specific VM.
@ -51,9 +58,20 @@ func New(vm *vm.VM) *Loop {
// queue, the capacity of which being specified by the backlog argument. // queue, the capacity of which being specified by the backlog argument.
func NewWithBacklog(vm *vm.VM, backlog int) *Loop { func NewWithBacklog(vm *vm.VM, backlog int) *Loop {
return &Loop{ return &Loop{
vm: vm, vm: vm,
tasks: make(map[int64]Task), tasks: make(map[int64]Task),
ready: make(chan Task, backlog), ready: make(chan Task, backlog),
accepting: true,
}
}
// close the loop so that it no longer accepts tasks.
func (l *Loop) close() {
l.lock.Lock()
defer l.lock.Unlock()
if l.accepting {
l.accepting = false
close(l.ready)
} }
} }
@ -65,11 +83,15 @@ func (l *Loop) VM() *vm.VM {
// Add puts a task into the loop. This signals to the loop that this task is // Add puts a task into the loop. This signals to the loop that this task is
// doing something outside of the JavaScript environment, and that at some // doing something outside of the JavaScript environment, and that at some
// point, it will become ready for finalising. // point, it will become ready for finalising.
func (l *Loop) Add(t Task) { func (l *Loop) Add(t Task) error {
l.lock.Lock() l.lock.Lock()
defer l.lock.Unlock()
if !l.accepting {
return ErrClosed
}
t.SetID(atomic.AddInt64(&l.id, 1)) t.SetID(atomic.AddInt64(&l.id, 1))
l.tasks[t.GetID()] = t l.tasks[t.GetID()] = t
l.lock.Unlock() return nil
} }
// Remove takes a task out of the loop. This should not be called if a task // Remove takes a task out of the loop. This should not be called if a task
@ -77,7 +99,7 @@ func (l *Loop) Add(t Task) {
// broken. // broken.
func (l *Loop) Remove(t Task) { func (l *Loop) Remove(t Task) {
l.remove(t) l.remove(t)
go l.Ready(nil) go l.Ready(nil) // nolint: errcheck
} }
func (l *Loop) remove(t Task) { func (l *Loop) remove(t Task) {
@ -101,8 +123,26 @@ func (l *Loop) removeAll() {
// Ready signals to the loop that a task is ready to be finalised. This might // Ready signals to the loop that a task is ready to be finalised. This might
// block if the "ready channel" in the loop is at capacity. // block if the "ready channel" in the loop is at capacity.
func (l *Loop) Ready(t Task) { func (l *Loop) Ready(t Task) error {
l.ready <- t var err error
l.lock.RLock()
if !l.accepting {
t.Cancel()
err = ErrClosed
}
l.lock.RUnlock()
if err == nil {
l.ready <- t
}
return err
}
// AddAndExecute combines Add and Ready for immediate execution.
func (l *Loop) AddAndExecute(t Task) error {
if err := l.Add(t); err != nil {
return err
}
return l.Ready(t)
} }
// Eval executes some code in the VM associated with the loop and returns an // Eval executes some code in the VM associated with the loop and returns an
@ -131,11 +171,13 @@ func (l *Loop) processTask(t Task) error {
// Run handles the task scheduling and finalisation. // Run handles the task scheduling and finalisation.
// It runs infinitely waiting for new tasks. // It runs infinitely waiting for new tasks.
func (l *Loop) Run(ctx context.Context) error { func (l *Loop) Run(ctx context.Context) error {
defer l.close()
defer l.removeAll()
for { for {
select { select {
case t := <-l.ready: case t := <-l.ready:
if ctx.Err() != nil { if ctx.Err() != nil {
l.removeAll()
return ctx.Err() return ctx.Err()
} }
@ -147,12 +189,10 @@ func (l *Loop) Run(ctx context.Context) error {
if err != nil { if err != nil {
// TODO(divan): do we need to report // TODO(divan): do we need to report
// errors up to the caller? // errors up to the caller?
// Ignoring for now, as loop // Ignoring for now.
// should keep running.
continue continue
} }
case <-ctx.Done(): case <-ctx.Done():
l.removeAll()
return ctx.Err() return ctx.Err()
} }
} }

@ -0,0 +1,104 @@
package loop
import (
"context"
"testing"
"time"
"github.com/status-im/status-go/geth/jail/internal/vm"
"github.com/stretchr/testify/suite"
)
// DummyTask is something that satisfies the loop.Task interface for testing.
type DummyTask struct {
canceled bool
executed bool
}
func (*DummyTask) SetID(int64) {}
func (*DummyTask) GetID() int64 { return 1 }
func (d *DummyTask) Cancel() { d.canceled = true }
func (d *DummyTask) Execute(*vm.VM, *Loop) error {
d.executed = true
return nil
}
func TestLoopSuite(t *testing.T) {
suite.Run(t, new(LoopSuite))
}
type LoopSuite struct {
suite.Suite
loop *Loop
cancel context.CancelFunc
task *DummyTask
}
func (s *LoopSuite) SetupTest() {
s.task = &DummyTask{}
vm := vm.New()
s.loop = New(vm)
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go s.loop.Run(ctx)
}
func (s *LoopSuite) TestAddAndReady() {
err := s.loop.Add(s.task)
s.NoError(err)
s.False(s.task.canceled)
err = s.loop.Ready(s.task)
s.NoError(err)
// Wait to process task
time.Sleep(100 * time.Millisecond)
s.True(s.task.executed)
s.cancel()
}
func (s *LoopSuite) TestLoopErrorWhenClosed() {
s.cancel()
// Wait for the context to cancel and loop to close
time.Sleep(100 * time.Millisecond)
err := s.loop.Add(s.task)
s.Error(err)
err = s.loop.Ready(s.task)
s.Error(err)
s.True(s.task.canceled)
}
func (s *LoopSuite) TestImmediateExecution() {
err := s.loop.AddAndExecute(s.task)
// Wait for the task to execute
time.Sleep(100 * time.Millisecond)
s.NoError(err)
s.True(s.task.executed)
s.False(s.task.canceled)
s.cancel()
}
func (s *LoopSuite) TestImmediateExecutionErrorWhenClosed() {
s.cancel()
// Wait for the context to cancel and loop to close
time.Sleep(100 * time.Millisecond)
err := s.loop.AddAndExecute(s.task)
s.Error(err)
s.False(s.task.executed)
}

@ -34,10 +34,15 @@ func Define(vm *vm.VM, l *loop.Loop) error {
call: call, call: call,
interval: interval, interval: interval,
} }
l.Add(t)
// If err is non-nil, then the loop is closed and should not
// be used anymore.
if err := l.Add(t); err != nil {
return otto.UndefinedValue()
}
t.timer = time.AfterFunc(t.duration, func() { t.timer = time.AfterFunc(t.duration, func() {
l.Ready(t) l.Ready(t) // nolint: errcheck
}) })
value, newTimerErr := call.Otto.ToValue(t) value, newTimerErr := call.Otto.ToValue(t)
@ -64,10 +69,15 @@ func Define(vm *vm.VM, l *loop.Loop) error {
duration: time.Millisecond, duration: time.Millisecond,
call: call, call: call,
} }
l.Add(t)
// If err is non-nil, then the loop is closed and should not
// be used anymore.
if err := l.Add(t); err != nil {
return otto.UndefinedValue()
}
t.timer = time.AfterFunc(t.duration, func() { t.timer = time.AfterFunc(t.duration, func() {
l.Ready(t) l.Ready(t) // nolint: errcheck
}) })
value, setImmediateErr := call.Otto.ToValue(t) value, setImmediateErr := call.Otto.ToValue(t)
@ -139,7 +149,9 @@ func (t *timerTask) Execute(vm *vm.VM, l *loop.Loop) error {
if t.interval && !t.stopped { if t.interval && !t.stopped {
t.timer.Reset(t.duration) t.timer.Reset(t.duration)
l.Add(t) if err := l.Add(t); err != nil {
return err
}
} }
return nil return nil