Add c++-runloop-backed message queue
Reviewed By: mhorowitz Differential Revision: D3250498 fbshipit-source-id: 4e32153bcf07f6362f55fa558c22551140b34451
This commit is contained in:
parent
9e9536c50b
commit
b2d4c2e255
|
@ -108,6 +108,7 @@ react_library(
|
||||||
header_namespace = 'cxxreact',
|
header_namespace = 'cxxreact',
|
||||||
force_static = True,
|
force_static = True,
|
||||||
srcs = [
|
srcs = [
|
||||||
|
'CxxMessageQueue.cpp',
|
||||||
'Instance.cpp',
|
'Instance.cpp',
|
||||||
'JSCExecutor.cpp',
|
'JSCExecutor.cpp',
|
||||||
'JSCHelpers.cpp',
|
'JSCHelpers.cpp',
|
||||||
|
@ -131,6 +132,7 @@ react_library(
|
||||||
'JSCTracing.h',
|
'JSCTracing.h',
|
||||||
],
|
],
|
||||||
exported_headers = [
|
exported_headers = [
|
||||||
|
'CxxMessageQueue.h',
|
||||||
'Executor.h',
|
'Executor.h',
|
||||||
'ExecutorToken.h',
|
'ExecutorToken.h',
|
||||||
'ExecutorTokenFactory.h',
|
'ExecutorTokenFactory.h',
|
||||||
|
|
|
@ -0,0 +1,313 @@
|
||||||
|
// Copyright 2004-present Facebook. All Rights Reserved.
|
||||||
|
|
||||||
|
#include "CxxMessageQueue.h"
|
||||||
|
|
||||||
|
#include <folly/AtomicIntrusiveLinkedList.h>
|
||||||
|
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
#include <glog/logging.h>
|
||||||
|
|
||||||
|
namespace facebook {
|
||||||
|
namespace react {
|
||||||
|
|
||||||
|
using detail::BinarySemaphore;
|
||||||
|
using detail::EventFlag;
|
||||||
|
|
||||||
|
using clock = std::chrono::steady_clock;
|
||||||
|
using time_point = clock::time_point;
|
||||||
|
static_assert(std::is_same<time_point, EventFlag::time_point>::value, "");
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
time_point now() {
|
||||||
|
return clock::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
class Task {
|
||||||
|
public:
|
||||||
|
static Task* create(std::function<void()>&& func) {
|
||||||
|
return new Task{std::move(func), false, time_point()};
|
||||||
|
}
|
||||||
|
|
||||||
|
static Task* createSync(std::function<void()>&& func) {
|
||||||
|
return new Task{std::move(func), true, time_point()};
|
||||||
|
}
|
||||||
|
|
||||||
|
static Task* createDelayed(std::function<void()>&& func, time_point startTime) {
|
||||||
|
return new Task{std::move(func), false, startTime};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::function<void()> func;
|
||||||
|
// This flag is just to mark that the task is expected to be synchronous. If
|
||||||
|
// a synchronous task races with stopping the queue, the thread waiting on
|
||||||
|
// the synchronous task might never resume. We use this flag to detect this
|
||||||
|
// case and throw an error.
|
||||||
|
bool sync;
|
||||||
|
time_point startTime;
|
||||||
|
|
||||||
|
folly::AtomicIntrusiveLinkedListHook<Task> hook;
|
||||||
|
|
||||||
|
// Should this sort consider id also?
|
||||||
|
struct Compare {
|
||||||
|
bool operator()(const Task* a, const Task* b) {
|
||||||
|
return a->startTime > b->startTime;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
class DelayedTaskQueue {
|
||||||
|
public:
|
||||||
|
~DelayedTaskQueue() {
|
||||||
|
while (!queue_.empty()) {
|
||||||
|
delete queue_.top();
|
||||||
|
queue_.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void process() {
|
||||||
|
while (!queue_.empty()) {
|
||||||
|
Task* d = queue_.top();
|
||||||
|
if (now() < d->startTime) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
auto owned = std::unique_ptr<Task>(queue_.top());
|
||||||
|
queue_.pop();
|
||||||
|
owned->func();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void push(Task* t) {
|
||||||
|
queue_.push(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool empty() {
|
||||||
|
return queue_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
time_point nextTime() {
|
||||||
|
return queue_.top()->startTime;
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
std::priority_queue<Task*, std::vector<Task*>, Task::Compare> queue_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class CxxMessageQueue::QueueRunner {
|
||||||
|
public:
|
||||||
|
~QueueRunner() {
|
||||||
|
queue_.sweep([] (Task* t) {
|
||||||
|
delete t;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueue(std::function<void()>&& func) {
|
||||||
|
enqueueTask(Task::create(std::move(func)));
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueueDelayed(std::function<void()>&& func, uint64_t delayMs) {
|
||||||
|
if (delayMs) {
|
||||||
|
enqueueTask(Task::createDelayed(std::move(func), now() + std::chrono::milliseconds(delayMs)));
|
||||||
|
} else {
|
||||||
|
enqueue(std::move(func));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void enqueueSync(std::function<void()>&& func) {
|
||||||
|
EventFlag done;
|
||||||
|
enqueueTask(Task::createSync([&] () mutable {
|
||||||
|
func();
|
||||||
|
done.set();
|
||||||
|
}));
|
||||||
|
if (stopped_) {
|
||||||
|
// If this queue is stopped_, the sync task might never actually run.
|
||||||
|
throw std::runtime_error("Stopped within enqueueSync.");
|
||||||
|
}
|
||||||
|
done.wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
void stop() {
|
||||||
|
stopped_ = true;
|
||||||
|
pending_.set();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isStopped() {
|
||||||
|
return stopped_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void quitSynchronous() {
|
||||||
|
stop();
|
||||||
|
finished_.wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
void run() {
|
||||||
|
// If another thread stops this one, then the acquire-release on pending_
|
||||||
|
// ensures that we read stopped some time after it was set (and other
|
||||||
|
// threads just have to deal with the fact that we might run a task "after"
|
||||||
|
// they stop us).
|
||||||
|
//
|
||||||
|
// If we are stopped on this thread, then memory order doesn't really
|
||||||
|
// matter reading stopped_.
|
||||||
|
while (!stopped_.load(std::memory_order_relaxed)) {
|
||||||
|
sweep();
|
||||||
|
if (delayed_.empty()) {
|
||||||
|
pending_.wait();
|
||||||
|
} else {
|
||||||
|
pending_.wait_until(delayed_.nextTime());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// This sweep is just to catch erroneous enqueueSync. That is, there could
|
||||||
|
// be a task marked sync that another thread is waiting for, but we'll
|
||||||
|
// never actually run it.
|
||||||
|
sweep();
|
||||||
|
finished_.set();
|
||||||
|
}
|
||||||
|
|
||||||
|
// We are processing two queues, the posted tasks (queue_) and the delayed
|
||||||
|
// tasks (delayed_). Delayed tasks first go into posted tasks, and then are
|
||||||
|
// moved to the delayed queue if we pop them before the time they are
|
||||||
|
// scheduled for.
|
||||||
|
// As we pop things from queue_, before dealing with that thing, we run any
|
||||||
|
// delayed tasks whose scheduled time has arrived.
|
||||||
|
void sweep() {
|
||||||
|
queue_.sweep([this] (Task* t) {
|
||||||
|
std::unique_ptr<Task> owned(t);
|
||||||
|
if (stopped_.load(std::memory_order_relaxed)) {
|
||||||
|
if (t->sync) {
|
||||||
|
throw std::runtime_error("Sync task posted while stopped.");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
delayed_.process();
|
||||||
|
if (t->startTime != time_point() && now() <= t->startTime) {
|
||||||
|
delayed_.push(owned.release());
|
||||||
|
} else {
|
||||||
|
t->func();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
delayed_.process();
|
||||||
|
}
|
||||||
|
|
||||||
|
void bindToThisThread() {
|
||||||
|
if (tid_ != std::thread::id{}) {
|
||||||
|
throw std::runtime_error("Message queue already bound to thread.");
|
||||||
|
}
|
||||||
|
tid_ = std::this_thread::get_id();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isOnQueue() {
|
||||||
|
return std::this_thread::get_id() == tid_;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void enqueueTask(Task* task) {
|
||||||
|
if (queue_.insertHead(task)) {
|
||||||
|
pending_.set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::thread::id tid_;
|
||||||
|
|
||||||
|
folly::AtomicIntrusiveLinkedList<Task, &Task::hook> queue_;
|
||||||
|
|
||||||
|
std::atomic_bool stopped_{false};
|
||||||
|
DelayedTaskQueue delayed_;
|
||||||
|
|
||||||
|
BinarySemaphore pending_;
|
||||||
|
EventFlag finished_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
CxxMessageQueue::CxxMessageQueue() : qr_(new QueueRunner()) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
CxxMessageQueue::~CxxMessageQueue() {
|
||||||
|
// TODO(cjhopman): Add detach() so that the queue doesn't have to be
|
||||||
|
// explicitly stopped.
|
||||||
|
if (!qr_->isStopped()) {
|
||||||
|
LOG(FATAL) << "Queue not stopped.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CxxMessageQueue::runOnQueue(std::function<void()>&& func) {
|
||||||
|
qr_->enqueue(std::move(func));
|
||||||
|
}
|
||||||
|
|
||||||
|
void CxxMessageQueue::runOnQueueDelayed(std::function<void()>&& func, uint64_t delayMs) {
|
||||||
|
qr_->enqueueDelayed(std::move(func), delayMs);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CxxMessageQueue::runOnQueueSync(std::function<void()>&& func) {
|
||||||
|
if (isOnQueue()) {
|
||||||
|
func();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
qr_->enqueueSync(std::move(func));
|
||||||
|
}
|
||||||
|
|
||||||
|
void CxxMessageQueue::quitSynchronous() {
|
||||||
|
if (isOnQueue()) {
|
||||||
|
qr_->stop();
|
||||||
|
} else {
|
||||||
|
qr_->quitSynchronous();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CxxMessageQueue::isOnQueue() {
|
||||||
|
return qr_->isOnQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
struct MQRegistry {
|
||||||
|
std::weak_ptr<CxxMessageQueue> find(std::thread::id tid) {
|
||||||
|
std::lock_guard<std::mutex> g(lock_);
|
||||||
|
auto iter = registry_.find(tid);
|
||||||
|
if (iter == registry_.end()) return std::weak_ptr<CxxMessageQueue>();
|
||||||
|
return iter->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerQueue(std::thread::id tid, std::weak_ptr<CxxMessageQueue> mq) {
|
||||||
|
std::lock_guard<std::mutex> g(lock_);
|
||||||
|
registry_[tid] = mq;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unregister(std::thread::id tid) {
|
||||||
|
std::lock_guard<std::mutex> g(lock_);
|
||||||
|
registry_.erase(tid);
|
||||||
|
}
|
||||||
|
private:
|
||||||
|
std::mutex lock_;
|
||||||
|
std::unordered_map<std::thread::id, std::weak_ptr<CxxMessageQueue>> registry_;
|
||||||
|
};
|
||||||
|
|
||||||
|
MQRegistry& getMQRegistry() {
|
||||||
|
static MQRegistry* mq_registry = new MQRegistry();
|
||||||
|
return *mq_registry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::weak_ptr<CxxMessageQueue> CxxMessageQueue::current() {
|
||||||
|
auto tid = std::this_thread::get_id();
|
||||||
|
return getMQRegistry().find(tid);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::function<void()> CxxMessageQueue::getRunLoop(std::shared_ptr<CxxMessageQueue> mq) {
|
||||||
|
return [capture=mq->qr_, weakMq=std::weak_ptr<CxxMessageQueue>(mq)] {
|
||||||
|
capture->bindToThisThread();
|
||||||
|
auto tid = std::this_thread::get_id();
|
||||||
|
|
||||||
|
// TODO: handle nested runloops (either allow them or throw an exception).
|
||||||
|
getMQRegistry().registerQueue(tid, weakMq);
|
||||||
|
capture->run();
|
||||||
|
getMQRegistry().unregister(tid);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
} // namespace react
|
||||||
|
} // namespace facebook
|
|
@ -0,0 +1,80 @@
|
||||||
|
// Copyright 2004-present Facebook. All Rights Reserved.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "MessageQueueThread.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <functional>
|
||||||
|
#include <chrono>
|
||||||
|
#include <mutex>
|
||||||
|
#include <thread>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
namespace facebook {
|
||||||
|
namespace react {
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
template<bool clearOnWait>
|
||||||
|
class CVFlag {
|
||||||
|
public:
|
||||||
|
using time_point = std::chrono::steady_clock::time_point;
|
||||||
|
void set() {
|
||||||
|
std::lock_guard<std::mutex> lk(mtx_);
|
||||||
|
flag_ = true;
|
||||||
|
cv_.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void wait() {
|
||||||
|
std::unique_lock<std::mutex> lk(mtx_);
|
||||||
|
cv_.wait(lk, [this] { return flag_; });
|
||||||
|
if (clearOnWait) flag_ = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool wait_until(time_point d) {
|
||||||
|
std::unique_lock<std::mutex> lk(mtx_);
|
||||||
|
bool res = cv_.wait_until(lk, d, [this] { return flag_; });
|
||||||
|
if (clearOnWait && res) flag_ = false;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool flag_{false};
|
||||||
|
std::condition_variable cv_;
|
||||||
|
std::mutex mtx_;
|
||||||
|
};
|
||||||
|
|
||||||
|
using BinarySemaphore = CVFlag<true>;
|
||||||
|
using EventFlag = CVFlag<false>;
|
||||||
|
}
|
||||||
|
|
||||||
|
class CxxMessageQueue : public MessageQueueThread {
|
||||||
|
public:
|
||||||
|
CxxMessageQueue();
|
||||||
|
virtual ~CxxMessageQueue() override;
|
||||||
|
virtual void runOnQueue(std::function<void()>&&) override;
|
||||||
|
void runOnQueueDelayed(std::function<void()>&&, uint64_t delayMs);
|
||||||
|
// runOnQueueSync and quitSynchronous are dangerous. They should only be
|
||||||
|
// used for initialization and cleanup.
|
||||||
|
virtual void runOnQueueSync(std::function<void()>&&) override;
|
||||||
|
// Once quitSynchronous() returns, no further work should run on the queue.
|
||||||
|
virtual void quitSynchronous() override;
|
||||||
|
|
||||||
|
bool isOnQueue();
|
||||||
|
|
||||||
|
// This returns a function that will actually run the runloop.
|
||||||
|
// This runloop will return some time after quitSynchronous (or after this is destroyed).
|
||||||
|
//
|
||||||
|
// When running the runloop, it is important to ensure that no frames in the
|
||||||
|
// current stack have a strong reference to the queue.
|
||||||
|
//
|
||||||
|
// Only one thread should run the runloop.
|
||||||
|
static std::function<void()> getRunLoop(std::shared_ptr<CxxMessageQueue> mq);
|
||||||
|
|
||||||
|
static std::weak_ptr<CxxMessageQueue> current();
|
||||||
|
private:
|
||||||
|
class QueueRunner;
|
||||||
|
std::shared_ptr<QueueRunner> qr_;
|
||||||
|
};
|
||||||
|
|
||||||
|
}}
|
|
@ -0,0 +1,19 @@
|
||||||
|
include_defs('//ReactAndroid/DEFS')
|
||||||
|
include_defs('//ReactAndroid/TEST_DEFS')
|
||||||
|
|
||||||
|
jni_instrumentation_test_lib(
|
||||||
|
name = 'tests',
|
||||||
|
class_under_test = 'com/facebook/react/XplatBridgeTest',
|
||||||
|
soname = 'libxplat-bridge.so',
|
||||||
|
srcs = [
|
||||||
|
'CxxMessageQueueTest.cpp',
|
||||||
|
],
|
||||||
|
compiler_flags = [
|
||||||
|
'-fexceptions',
|
||||||
|
],
|
||||||
|
deps = [
|
||||||
|
'//xplat/third-party/gmock:gtest',
|
||||||
|
react_native_xplat_target('cxxreact:bridge'),
|
||||||
|
],
|
||||||
|
visibility = ['//instrumentation_tests/...'],
|
||||||
|
)
|
|
@ -0,0 +1,150 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <cxxreact/CxxMessageQueue.h>
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
|
using namespace facebook::react;
|
||||||
|
using detail::EventFlag;
|
||||||
|
using time_point = EventFlag::time_point;
|
||||||
|
|
||||||
|
using std::chrono::milliseconds;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
time_point now() {
|
||||||
|
return std::chrono::steady_clock::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<CxxMessageQueue> createAndStartQueue(EventFlag& finishedFlag) {
|
||||||
|
auto q = std::make_shared<CxxMessageQueue>();
|
||||||
|
std::thread t([q, &finishedFlag] () mutable {
|
||||||
|
auto loop = CxxMessageQueue::getRunLoop(q);
|
||||||
|
// Note: make sure that no stack frames above loop() have a strong reference to q.
|
||||||
|
q.reset();
|
||||||
|
loop();
|
||||||
|
finishedFlag.set();
|
||||||
|
});
|
||||||
|
t.detach();
|
||||||
|
return q;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is just used to start up a queue for a test and make sure that it is
|
||||||
|
// actually shut down after the test.
|
||||||
|
struct QueueWithThread {
|
||||||
|
QueueWithThread() {
|
||||||
|
queue = createAndStartQueue(done);
|
||||||
|
}
|
||||||
|
|
||||||
|
~QueueWithThread() {
|
||||||
|
queue->quitSynchronous();
|
||||||
|
queue.reset();
|
||||||
|
if (!done.wait_until(now() + milliseconds(300))) {
|
||||||
|
ADD_FAILURE() << "Queue did not exit";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EventFlag done;
|
||||||
|
std::shared_ptr<CxxMessageQueue> queue;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(CxxMessageQueue, TestQuit) {
|
||||||
|
EventFlag done;
|
||||||
|
auto q = createAndStartQueue(done);
|
||||||
|
q->quitSynchronous();
|
||||||
|
if (!done.wait_until(now() + milliseconds(300))) {
|
||||||
|
FAIL() << "Queue did not exit runloop after quitSynchronous";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(CxxMessageQueue, TestPostTask) {
|
||||||
|
QueueWithThread qt;
|
||||||
|
auto q = qt.queue;
|
||||||
|
|
||||||
|
EventFlag flag;
|
||||||
|
q->runOnQueue([&] {
|
||||||
|
flag.set();
|
||||||
|
});
|
||||||
|
flag.wait();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(CxxMessageQueue, TestPostTaskMultiple) {
|
||||||
|
QueueWithThread qt;
|
||||||
|
auto q = qt.queue;
|
||||||
|
|
||||||
|
std::vector<EventFlag> vec(10);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
q->runOnQueue([&, i] {
|
||||||
|
vec[i].set();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
vec[i].wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(CxxMessageQueue, TestQueuedTaskOrdering) {
|
||||||
|
QueueWithThread qt;
|
||||||
|
auto q = qt.queue;
|
||||||
|
|
||||||
|
// Block the runloop so we can get some queued tasks.
|
||||||
|
EventFlag wait;
|
||||||
|
q->runOnQueue([&] {
|
||||||
|
wait.wait();
|
||||||
|
});
|
||||||
|
|
||||||
|
// These tasks should run in order.
|
||||||
|
int failed = -1;
|
||||||
|
int i = 0;
|
||||||
|
for (int j = 0; j < 10; j++) {
|
||||||
|
q->runOnQueue([&, j] {
|
||||||
|
if (i != j) {
|
||||||
|
failed = j;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
wait.set();
|
||||||
|
|
||||||
|
// Flush the queue.
|
||||||
|
q->runOnQueueSync([&] {});
|
||||||
|
|
||||||
|
ASSERT_EQ(failed, -1);
|
||||||
|
ASSERT_EQ(i, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(CxxMessageQueue, TestDelayedTaskOrdering) {
|
||||||
|
QueueWithThread qt;
|
||||||
|
auto q = qt.queue;
|
||||||
|
|
||||||
|
// Block the runloop so we can get some queued tasks.
|
||||||
|
EventFlag wait;
|
||||||
|
q->runOnQueue([&] {
|
||||||
|
wait.wait();
|
||||||
|
});
|
||||||
|
|
||||||
|
int ids[] = {8, 4, 6, 1, 3, 2, 9, 5, 0, 7};
|
||||||
|
|
||||||
|
int failed = -1;
|
||||||
|
int i = 0;
|
||||||
|
EventFlag done;
|
||||||
|
// If this loop actually takes longer than the difference between delays, the
|
||||||
|
// ordering could get screwed up :/
|
||||||
|
for (int j = 0; j < 10; j++) {
|
||||||
|
q->runOnQueueDelayed([&, j] {
|
||||||
|
if (i != ids[j]) {
|
||||||
|
failed = j;
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
if (ids[j] == 9) {
|
||||||
|
done.set();
|
||||||
|
}
|
||||||
|
}, 50 + 10 * ids[j]);
|
||||||
|
}
|
||||||
|
wait.set();
|
||||||
|
done.wait();
|
||||||
|
|
||||||
|
ASSERT_EQ(failed, -1);
|
||||||
|
ASSERT_EQ(i, 10);
|
||||||
|
}
|
Loading…
Reference in New Issue