Remove unused CxxMessageQueue
Differential Revision: D4713064 fbshipit-source-id: 511b782279b89076228f00290e78ed155e2e723e
This commit is contained in:
parent
4797701b66
commit
314ec87269
|
@ -172,7 +172,6 @@
|
|||
13F887581E2971D400C3C7A1 /* Demangle.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 13F887521E2971C500C3C7A1 /* Demangle.cpp */; };
|
||||
13F887591E2971D400C3C7A1 /* StringBase.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 13F887531E2971C500C3C7A1 /* StringBase.cpp */; };
|
||||
13F8875A1E2971D400C3C7A1 /* Unicode.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 13F887541E2971C500C3C7A1 /* Unicode.cpp */; };
|
||||
13F8876D1E29726200C3C7A1 /* CxxMessageQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0A51E03699D0018521A /* CxxMessageQueue.cpp */; };
|
||||
13F8876E1E29726200C3C7A1 /* CxxNativeModule.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0A81E03699D0018521A /* CxxNativeModule.cpp */; };
|
||||
13F887701E29726200C3C7A1 /* Instance.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0AE1E03699D0018521A /* Instance.cpp */; };
|
||||
13F887711E29726200C3C7A1 /* JSBundleType.cpp in Sources */ = {isa = PBXBuildFile; fileRef = AC70D2EB1DE48A22002E6351 /* JSBundleType.cpp */; };
|
||||
|
@ -191,7 +190,6 @@
|
|||
13F8877E1E29726200C3C7A1 /* NativeToJsBridge.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0CF1E03699D0018521A /* NativeToJsBridge.cpp */; };
|
||||
13F8877F1E29726200C3C7A1 /* Platform.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0D11E03699D0018521A /* Platform.cpp */; };
|
||||
13F887801E29726200C3C7A1 /* SampleCxxModule.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0D31E03699D0018521A /* SampleCxxModule.cpp */; };
|
||||
13F887811E29726300C3C7A1 /* CxxMessageQueue.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0A51E03699D0018521A /* CxxMessageQueue.cpp */; };
|
||||
13F887821E29726300C3C7A1 /* CxxNativeModule.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0A81E03699D0018521A /* CxxNativeModule.cpp */; };
|
||||
13F887841E29726300C3C7A1 /* Instance.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0AE1E03699D0018521A /* Instance.cpp */; };
|
||||
13F887851E29726300C3C7A1 /* JSCExecutor.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 3D92B0B21E03699D0018521A /* JSCExecutor.cpp */; };
|
||||
|
@ -229,7 +227,6 @@
|
|||
191E3EBE1C29D9AF00C180A6 /* RCTRefreshControlManager.m in Sources */ = {isa = PBXBuildFile; fileRef = 191E3EBD1C29D9AF00C180A6 /* RCTRefreshControlManager.m */; };
|
||||
191E3EC11C29DC3800C180A6 /* RCTRefreshControl.m in Sources */ = {isa = PBXBuildFile; fileRef = 191E3EC01C29DC3800C180A6 /* RCTRefreshControl.m */; };
|
||||
19DED2291E77E29200F089BB /* systemJSCWrapper.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 19DED2281E77E29200F089BB /* systemJSCWrapper.cpp */; };
|
||||
27595AA31E575C7800CCE2B1 /* CxxMessageQueue.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A61E03699D0018521A /* CxxMessageQueue.h */; };
|
||||
27595AA41E575C7800CCE2B1 /* CxxModule.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A71E03699D0018521A /* CxxModule.h */; };
|
||||
27595AA51E575C7800CCE2B1 /* CxxNativeModule.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A91E03699D0018521A /* CxxNativeModule.h */; };
|
||||
27595AA61E575C7800CCE2B1 /* Executor.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0AB1E03699D0018521A /* Executor.h */; };
|
||||
|
@ -256,7 +253,6 @@
|
|||
27595ABB1E575C7800CCE2B1 /* Platform.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0D21E03699D0018521A /* Platform.h */; };
|
||||
27595ABC1E575C7800CCE2B1 /* SampleCxxModule.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0D41E03699D0018521A /* SampleCxxModule.h */; };
|
||||
27595ABD1E575C7800CCE2B1 /* SystraceSection.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0D51E03699D0018521A /* SystraceSection.h */; };
|
||||
27595ABE1E575C7800CCE2B1 /* CxxMessageQueue.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A61E03699D0018521A /* CxxMessageQueue.h */; };
|
||||
27595ABF1E575C7800CCE2B1 /* CxxModule.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A71E03699D0018521A /* CxxModule.h */; };
|
||||
27595AC01E575C7800CCE2B1 /* CxxNativeModule.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A91E03699D0018521A /* CxxNativeModule.h */; };
|
||||
27595AC11E575C7800CCE2B1 /* Executor.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0AB1E03699D0018521A /* Executor.h */; };
|
||||
|
@ -744,7 +740,6 @@
|
|||
3D8ED92C1E5B120100D83D20 /* libcxxreact.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 3D3CD9321DE5FBEE00167DC4 /* libcxxreact.a */; };
|
||||
3D8ED92D1E5B120100D83D20 /* libyoga.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 3D3C06751DE3340C00C268FA /* libyoga.a */; };
|
||||
3DA9819E1E5B0DBB004F2374 /* NSDataBigString.h in Headers */ = {isa = PBXBuildFile; fileRef = 3D7454B31E54786200E74ADD /* NSDataBigString.h */; };
|
||||
3DA9819F1E5B0E34004F2374 /* CxxMessageQueue.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A61E03699D0018521A /* CxxMessageQueue.h */; };
|
||||
3DA981A01E5B0E34004F2374 /* CxxModule.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A71E03699D0018521A /* CxxModule.h */; };
|
||||
3DA981A11E5B0E34004F2374 /* CxxNativeModule.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A91E03699D0018521A /* CxxNativeModule.h */; };
|
||||
3DA981A21E5B0E34004F2374 /* Executor.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0AB1E03699D0018521A /* Executor.h */; };
|
||||
|
@ -891,7 +886,6 @@
|
|||
3DA982361E5B0F7F004F2374 /* RCTWrapperViewController.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 13B080231A694A8400A75B9A /* RCTWrapperViewController.h */; };
|
||||
3DA982381E5B0F7F004F2374 /* UIView+React.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 13E067531A70F44B002CDEE1 /* UIView+React.h */; };
|
||||
3DA982391E5B0F8A004F2374 /* UIView+Private.h in Headers */ = {isa = PBXBuildFile; fileRef = 83F15A171B7CC46900F10295 /* UIView+Private.h */; };
|
||||
3DA9823A1E5B1053004F2374 /* CxxMessageQueue.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A61E03699D0018521A /* CxxMessageQueue.h */; };
|
||||
3DA9823B1E5B1053004F2374 /* CxxModule.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A71E03699D0018521A /* CxxModule.h */; };
|
||||
3DA9823C1E5B1053004F2374 /* CxxNativeModule.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0A91E03699D0018521A /* CxxNativeModule.h */; };
|
||||
3DA9823D1E5B1053004F2374 /* Executor.h in Copy Headers */ = {isa = PBXBuildFile; fileRef = 3D92B0AB1E03699D0018521A /* Executor.h */; };
|
||||
|
@ -1201,7 +1195,6 @@
|
|||
dstPath = include/cxxreact;
|
||||
dstSubfolderSpec = 16;
|
||||
files = (
|
||||
3DA9823A1E5B1053004F2374 /* CxxMessageQueue.h in Copy Headers */,
|
||||
3DA9823B1E5B1053004F2374 /* CxxModule.h in Copy Headers */,
|
||||
3DA9823C1E5B1053004F2374 /* CxxNativeModule.h in Copy Headers */,
|
||||
3DA9823D1E5B1053004F2374 /* Executor.h in Copy Headers */,
|
||||
|
@ -1400,7 +1393,6 @@
|
|||
dstPath = include/cxxreact;
|
||||
dstSubfolderSpec = 16;
|
||||
files = (
|
||||
3DA9819F1E5B0E34004F2374 /* CxxMessageQueue.h in Copy Headers */,
|
||||
3DA981A01E5B0E34004F2374 /* CxxModule.h in Copy Headers */,
|
||||
3DA981A11E5B0E34004F2374 /* CxxNativeModule.h in Copy Headers */,
|
||||
3DA981A21E5B0E34004F2374 /* Executor.h in Copy Headers */,
|
||||
|
@ -1720,8 +1712,6 @@
|
|||
3D7A27DE1DE32541002E3F95 /* JSCWrapper.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = JSCWrapper.h; sourceTree = "<group>"; };
|
||||
3D7A27E11DE325B7002E3F95 /* RCTJSCErrorHandling.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = RCTJSCErrorHandling.mm; sourceTree = "<group>"; };
|
||||
3D7AA9C31E548CD5001955CF /* NSDataBigString.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = NSDataBigString.mm; sourceTree = "<group>"; };
|
||||
3D92B0A51E03699D0018521A /* CxxMessageQueue.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = CxxMessageQueue.cpp; sourceTree = "<group>"; };
|
||||
3D92B0A61E03699D0018521A /* CxxMessageQueue.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CxxMessageQueue.h; sourceTree = "<group>"; };
|
||||
3D92B0A71E03699D0018521A /* CxxModule.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CxxModule.h; sourceTree = "<group>"; };
|
||||
3D92B0A81E03699D0018521A /* CxxNativeModule.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = CxxNativeModule.cpp; sourceTree = "<group>"; };
|
||||
3D92B0A91E03699D0018521A /* CxxNativeModule.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CxxNativeModule.h; sourceTree = "<group>"; };
|
||||
|
@ -2416,8 +2406,6 @@
|
|||
AC70D2EA1DE489FC002E6351 /* cxxreact */ = {
|
||||
isa = PBXGroup;
|
||||
children = (
|
||||
3D92B0A51E03699D0018521A /* CxxMessageQueue.cpp */,
|
||||
3D92B0A61E03699D0018521A /* CxxMessageQueue.h */,
|
||||
3D92B0A71E03699D0018521A /* CxxModule.h */,
|
||||
3D92B0A81E03699D0018521A /* CxxNativeModule.cpp */,
|
||||
3D92B0A91E03699D0018521A /* CxxNativeModule.h */,
|
||||
|
@ -2660,7 +2648,6 @@
|
|||
3D3030221DF8294C00D6DDAE /* JSBundleType.h in Headers */,
|
||||
27595ACA1E575C7800CCE2B1 /* JSCMemory.h in Headers */,
|
||||
3D74547D1E54758900E74ADD /* JSBigString.h in Headers */,
|
||||
27595ABE1E575C7800CCE2B1 /* CxxMessageQueue.h in Headers */,
|
||||
27595AC71E575C7800CCE2B1 /* JSCExecutor.h in Headers */,
|
||||
27595ACD1E575C7800CCE2B1 /* JSCSamplingProfiler.h in Headers */,
|
||||
27595ABF1E575C7800CCE2B1 /* CxxModule.h in Headers */,
|
||||
|
@ -2732,7 +2719,6 @@
|
|||
3D3CD9471DE5FC7800167DC4 /* oss-compat-util.h in Headers */,
|
||||
27595AAF1E575C7800CCE2B1 /* JSCMemory.h in Headers */,
|
||||
3D74547C1E54758900E74ADD /* JSBigString.h in Headers */,
|
||||
27595AA31E575C7800CCE2B1 /* CxxMessageQueue.h in Headers */,
|
||||
27595AAC1E575C7800CCE2B1 /* JSCExecutor.h in Headers */,
|
||||
27595AB21E575C7800CCE2B1 /* JSCSamplingProfiler.h in Headers */,
|
||||
27595AA41E575C7800CCE2B1 /* CxxModule.h in Headers */,
|
||||
|
@ -3436,7 +3422,6 @@
|
|||
13F8877F1E29726200C3C7A1 /* Platform.cpp in Sources */,
|
||||
13F887701E29726200C3C7A1 /* Instance.cpp in Sources */,
|
||||
13F8877E1E29726200C3C7A1 /* NativeToJsBridge.cpp in Sources */,
|
||||
13F8876D1E29726200C3C7A1 /* CxxMessageQueue.cpp in Sources */,
|
||||
13F887761E29726200C3C7A1 /* JSCNativeModules.cpp in Sources */,
|
||||
13F887801E29726200C3C7A1 /* SampleCxxModule.cpp in Sources */,
|
||||
);
|
||||
|
@ -3460,7 +3445,6 @@
|
|||
3D80D9181DF6F7A80028D040 /* JSBundleType.cpp in Sources */,
|
||||
13F8878F1E29726300C3C7A1 /* MethodCall.cpp in Sources */,
|
||||
13F887921E29726300C3C7A1 /* Platform.cpp in Sources */,
|
||||
13F887811E29726300C3C7A1 /* CxxMessageQueue.cpp in Sources */,
|
||||
13F887911E29726300C3C7A1 /* NativeToJsBridge.cpp in Sources */,
|
||||
13F887821E29726300C3C7A1 /* CxxNativeModule.cpp in Sources */,
|
||||
13F887891E29726300C3C7A1 /* JSCNativeModules.cpp in Sources */,
|
||||
|
|
|
@ -5,7 +5,6 @@ include $(CLEAR_VARS)
|
|||
LOCAL_MODULE := libreactnativefb
|
||||
|
||||
LOCAL_SRC_FILES := \
|
||||
CxxMessageQueue.cpp \
|
||||
CxxNativeModule.cpp \
|
||||
Instance.cpp \
|
||||
JSCExecutor.cpp \
|
||||
|
|
|
@ -127,7 +127,6 @@ cxx_library(
|
|||
)
|
||||
|
||||
CXXREACT_PUBLIC_HEADERS = [
|
||||
"CxxMessageQueue.h",
|
||||
"CxxNativeModule.h",
|
||||
"Executor.h",
|
||||
"ExecutorToken.h",
|
||||
|
|
|
@ -1,320 +0,0 @@
|
|||
// Copyright 2004-present Facebook. All Rights Reserved.
|
||||
|
||||
#include "CxxMessageQueue.h"
|
||||
|
||||
#include <folly/AtomicIntrusiveLinkedList.h>
|
||||
|
||||
#include <unordered_map>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
namespace facebook {
|
||||
namespace react {
|
||||
|
||||
using detail::BinarySemaphore;
|
||||
using detail::EventFlag;
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
using time_point = clock::time_point;
|
||||
static_assert(std::is_same<time_point, EventFlag::time_point>::value, "");
|
||||
|
||||
namespace {
|
||||
time_point now() {
|
||||
return clock::now();
|
||||
}
|
||||
|
||||
class Task {
|
||||
public:
|
||||
static Task* create(std::function<void()>&& func) {
|
||||
return new Task{std::move(func), false, time_point()};
|
||||
}
|
||||
|
||||
static Task* createSync(std::function<void()>&& func) {
|
||||
return new Task{std::move(func), true, time_point()};
|
||||
}
|
||||
|
||||
static Task* createDelayed(std::function<void()>&& func, time_point startTime) {
|
||||
return new Task{std::move(func), false, startTime};
|
||||
}
|
||||
|
||||
std::function<void()> func;
|
||||
// This flag is just to mark that the task is expected to be synchronous. If
|
||||
// a synchronous task races with stopping the queue, the thread waiting on
|
||||
// the synchronous task might never resume. We use this flag to detect this
|
||||
// case and throw an error.
|
||||
bool sync;
|
||||
time_point startTime;
|
||||
|
||||
folly::AtomicIntrusiveLinkedListHook<Task> hook;
|
||||
|
||||
// Should this sort consider id also?
|
||||
struct Compare {
|
||||
bool operator()(const Task* a, const Task* b) {
|
||||
return a->startTime > b->startTime;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
class DelayedTaskQueue {
|
||||
public:
|
||||
~DelayedTaskQueue() {
|
||||
while (!queue_.empty()) {
|
||||
delete queue_.top();
|
||||
queue_.pop();
|
||||
}
|
||||
}
|
||||
|
||||
void process() {
|
||||
while (!queue_.empty()) {
|
||||
Task* d = queue_.top();
|
||||
if (now() < d->startTime) {
|
||||
break;
|
||||
}
|
||||
auto owned = std::unique_ptr<Task>(queue_.top());
|
||||
queue_.pop();
|
||||
owned->func();
|
||||
}
|
||||
}
|
||||
|
||||
void push(Task* t) {
|
||||
queue_.push(t);
|
||||
}
|
||||
|
||||
bool empty() {
|
||||
return queue_.empty();
|
||||
}
|
||||
|
||||
time_point nextTime() {
|
||||
return queue_.top()->startTime;
|
||||
}
|
||||
private:
|
||||
std::priority_queue<Task*, std::vector<Task*>, Task::Compare> queue_;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
class CxxMessageQueue::QueueRunner {
|
||||
public:
|
||||
~QueueRunner() {
|
||||
queue_.sweep([] (Task* t) {
|
||||
delete t;
|
||||
});
|
||||
}
|
||||
|
||||
void enqueue(std::function<void()>&& func) {
|
||||
enqueueTask(Task::create(std::move(func)));
|
||||
}
|
||||
|
||||
void enqueueDelayed(std::function<void()>&& func, uint64_t delayMs) {
|
||||
if (delayMs) {
|
||||
enqueueTask(Task::createDelayed(std::move(func), now() + std::chrono::milliseconds(delayMs)));
|
||||
} else {
|
||||
enqueue(std::move(func));
|
||||
}
|
||||
}
|
||||
|
||||
void enqueueSync(std::function<void()>&& func) {
|
||||
EventFlag done;
|
||||
enqueueTask(Task::createSync([&] () mutable {
|
||||
func();
|
||||
done.set();
|
||||
}));
|
||||
if (stopped_) {
|
||||
// If this queue is stopped_, the sync task might never actually run.
|
||||
throw std::runtime_error("Stopped within enqueueSync.");
|
||||
}
|
||||
done.wait();
|
||||
}
|
||||
|
||||
void stop() {
|
||||
stopped_ = true;
|
||||
pending_.set();
|
||||
}
|
||||
|
||||
bool isStopped() {
|
||||
return stopped_;
|
||||
}
|
||||
|
||||
void quitSynchronous() {
|
||||
stop();
|
||||
finished_.wait();
|
||||
}
|
||||
|
||||
void run() {
|
||||
// If another thread stops this one, then the acquire-release on pending_
|
||||
// ensures that we read stopped some time after it was set (and other
|
||||
// threads just have to deal with the fact that we might run a task "after"
|
||||
// they stop us).
|
||||
//
|
||||
// If we are stopped on this thread, then memory order doesn't really
|
||||
// matter reading stopped_.
|
||||
while (!stopped_.load(std::memory_order_relaxed)) {
|
||||
sweep();
|
||||
if (delayed_.empty()) {
|
||||
pending_.wait();
|
||||
} else {
|
||||
pending_.wait_until(delayed_.nextTime());
|
||||
}
|
||||
}
|
||||
// This sweep is just to catch erroneous enqueueSync. That is, there could
|
||||
// be a task marked sync that another thread is waiting for, but we'll
|
||||
// never actually run it.
|
||||
sweep();
|
||||
finished_.set();
|
||||
}
|
||||
|
||||
// We are processing two queues, the posted tasks (queue_) and the delayed
|
||||
// tasks (delayed_). Delayed tasks first go into posted tasks, and then are
|
||||
// moved to the delayed queue if we pop them before the time they are
|
||||
// scheduled for.
|
||||
// As we pop things from queue_, before dealing with that thing, we run any
|
||||
// delayed tasks whose scheduled time has arrived.
|
||||
void sweep() {
|
||||
queue_.sweep([this] (Task* t) {
|
||||
std::unique_ptr<Task> owned(t);
|
||||
if (stopped_.load(std::memory_order_relaxed)) {
|
||||
if (t->sync) {
|
||||
throw std::runtime_error("Sync task posted while stopped.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
delayed_.process();
|
||||
if (t->startTime != time_point() && now() <= t->startTime) {
|
||||
delayed_.push(owned.release());
|
||||
} else {
|
||||
t->func();
|
||||
}
|
||||
});
|
||||
delayed_.process();
|
||||
}
|
||||
|
||||
void bindToThisThread() {
|
||||
// TODO: handle nested runloops (either allow them or throw an exception).
|
||||
if (tid_ != std::thread::id{}) {
|
||||
throw std::runtime_error("Message queue already bound to thread.");
|
||||
}
|
||||
tid_ = std::this_thread::get_id();
|
||||
}
|
||||
|
||||
bool isOnQueue() {
|
||||
return std::this_thread::get_id() == tid_;
|
||||
}
|
||||
|
||||
private:
|
||||
void enqueueTask(Task* task) {
|
||||
if (queue_.insertHead(task)) {
|
||||
pending_.set();
|
||||
}
|
||||
}
|
||||
|
||||
std::thread::id tid_;
|
||||
|
||||
folly::AtomicIntrusiveLinkedList<Task, &Task::hook> queue_;
|
||||
|
||||
std::atomic_bool stopped_{false};
|
||||
DelayedTaskQueue delayed_;
|
||||
|
||||
BinarySemaphore pending_;
|
||||
EventFlag finished_;
|
||||
};
|
||||
|
||||
|
||||
CxxMessageQueue::CxxMessageQueue() : qr_(new QueueRunner()) {
|
||||
|
||||
}
|
||||
|
||||
CxxMessageQueue::~CxxMessageQueue() {
|
||||
// TODO(cjhopman): Add detach() so that the queue doesn't have to be
|
||||
// explicitly stopped.
|
||||
if (!qr_->isStopped()) {
|
||||
LOG(FATAL) << "Queue not stopped.";
|
||||
}
|
||||
}
|
||||
|
||||
void CxxMessageQueue::runOnQueue(std::function<void()>&& func) {
|
||||
qr_->enqueue(std::move(func));
|
||||
}
|
||||
|
||||
void CxxMessageQueue::runOnQueueDelayed(std::function<void()>&& func, uint64_t delayMs) {
|
||||
qr_->enqueueDelayed(std::move(func), delayMs);
|
||||
}
|
||||
|
||||
void CxxMessageQueue::runOnQueueSync(std::function<void()>&& func) {
|
||||
if (isOnQueue()) {
|
||||
func();
|
||||
return;
|
||||
}
|
||||
qr_->enqueueSync(std::move(func));
|
||||
}
|
||||
|
||||
void CxxMessageQueue::quitSynchronous() {
|
||||
if (isOnQueue()) {
|
||||
qr_->stop();
|
||||
} else {
|
||||
qr_->quitSynchronous();
|
||||
}
|
||||
}
|
||||
|
||||
bool CxxMessageQueue::isOnQueue() {
|
||||
return qr_->isOnQueue();
|
||||
}
|
||||
|
||||
namespace {
|
||||
struct MQRegistry {
|
||||
std::weak_ptr<CxxMessageQueue> find(std::thread::id tid) {
|
||||
std::lock_guard<std::mutex> g(lock_);
|
||||
auto iter = registry_.find(tid);
|
||||
if (iter == registry_.end()) return std::weak_ptr<CxxMessageQueue>();
|
||||
return iter->second;
|
||||
}
|
||||
|
||||
void registerQueue(std::thread::id tid, std::weak_ptr<CxxMessageQueue> mq) {
|
||||
std::lock_guard<std::mutex> g(lock_);
|
||||
registry_[tid] = mq;
|
||||
}
|
||||
|
||||
void unregister(std::thread::id tid) {
|
||||
std::lock_guard<std::mutex> g(lock_);
|
||||
registry_.erase(tid);
|
||||
}
|
||||
private:
|
||||
std::mutex lock_;
|
||||
std::unordered_map<std::thread::id, std::weak_ptr<CxxMessageQueue>> registry_;
|
||||
};
|
||||
|
||||
MQRegistry& getMQRegistry() {
|
||||
static MQRegistry* mq_registry = new MQRegistry();
|
||||
return *mq_registry;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<CxxMessageQueue> CxxMessageQueue::current() {
|
||||
auto tid = std::this_thread::get_id();
|
||||
return getMQRegistry().find(tid).lock();
|
||||
}
|
||||
|
||||
std::function<void()> CxxMessageQueue::getUnregisteredRunLoop() {
|
||||
return [capture=qr_] {
|
||||
capture->bindToThisThread();
|
||||
capture->run();
|
||||
};
|
||||
}
|
||||
|
||||
std::function<void()> CxxMessageQueue::getRunLoop(std::shared_ptr<CxxMessageQueue> mq) {
|
||||
return [capture=mq->qr_, weakMq=std::weak_ptr<CxxMessageQueue>(mq)] {
|
||||
capture->bindToThisThread();
|
||||
auto tid = std::this_thread::get_id();
|
||||
|
||||
getMQRegistry().registerQueue(tid, weakMq);
|
||||
capture->run();
|
||||
getMQRegistry().unregister(tid);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
} // namespace react
|
||||
} // namespace facebook
|
|
@ -1,83 +0,0 @@
|
|||
// Copyright 2004-present Facebook. All Rights Reserved.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
#include <cxxreact/MessageQueueThread.h>
|
||||
|
||||
namespace facebook {
|
||||
namespace react {
|
||||
|
||||
namespace detail {
|
||||
template<bool clearOnWait>
|
||||
class CVFlag {
|
||||
public:
|
||||
using time_point = std::chrono::steady_clock::time_point;
|
||||
void set() {
|
||||
std::lock_guard<std::mutex> lk(mtx_);
|
||||
flag_ = true;
|
||||
cv_.notify_one();
|
||||
}
|
||||
|
||||
void wait() {
|
||||
std::unique_lock<std::mutex> lk(mtx_);
|
||||
cv_.wait(lk, [this] { return flag_; });
|
||||
if (clearOnWait) flag_ = false;
|
||||
}
|
||||
|
||||
bool wait_until(time_point d) {
|
||||
std::unique_lock<std::mutex> lk(mtx_);
|
||||
bool res = cv_.wait_until(lk, d, [this] { return flag_; });
|
||||
if (clearOnWait && res) flag_ = false;
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
bool flag_{false};
|
||||
std::condition_variable cv_;
|
||||
std::mutex mtx_;
|
||||
};
|
||||
|
||||
using BinarySemaphore = CVFlag<true>;
|
||||
using EventFlag = CVFlag<false>;
|
||||
}
|
||||
|
||||
class CxxMessageQueue : public MessageQueueThread {
|
||||
public:
|
||||
CxxMessageQueue();
|
||||
virtual ~CxxMessageQueue() override;
|
||||
virtual void runOnQueue(std::function<void()>&&) override;
|
||||
void runOnQueueDelayed(std::function<void()>&&, uint64_t delayMs);
|
||||
// runOnQueueSync and quitSynchronous are dangerous. They should only be
|
||||
// used for initialization and cleanup.
|
||||
virtual void runOnQueueSync(std::function<void()>&&) override;
|
||||
// Once quitSynchronous() returns, no further work should run on the queue.
|
||||
virtual void quitSynchronous() override;
|
||||
|
||||
bool isOnQueue();
|
||||
|
||||
// If this getRunLoop is used, current() will not work.
|
||||
std::function<void()> getUnregisteredRunLoop();
|
||||
|
||||
// This returns a function that will actually run the runloop.
|
||||
// This runloop will return some time after quitSynchronous (or after this is destroyed).
|
||||
//
|
||||
// When running the runloop, it is important to ensure that no frames in the
|
||||
// current stack have a strong reference to the queue.
|
||||
//
|
||||
// Only one thread should run the runloop.
|
||||
static std::function<void()> getRunLoop(std::shared_ptr<CxxMessageQueue> mq);
|
||||
|
||||
static std::shared_ptr<CxxMessageQueue> current();
|
||||
private:
|
||||
class QueueRunner;
|
||||
std::shared_ptr<QueueRunner> qr_;
|
||||
};
|
||||
|
||||
}}
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
#include "Instance.h"
|
||||
|
||||
#include "CxxMessageQueue.h"
|
||||
#include "Executor.h"
|
||||
#include "MethodCall.h"
|
||||
#include "RecoverableError.h"
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
TEST_SRCS = [
|
||||
"CxxMessageQueueTest.cpp",
|
||||
"RecoverableErrorTest.cpp",
|
||||
"jsarg_helpers.cpp",
|
||||
"jsbigstring.cpp",
|
||||
|
|
|
@ -1,188 +0,0 @@
|
|||
// Copyright 2004-present Facebook. All Rights Reserved.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <cxxreact/CxxMessageQueue.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
using namespace facebook::react;
|
||||
using detail::EventFlag;
|
||||
using time_point = EventFlag::time_point;
|
||||
|
||||
using std::chrono::milliseconds;
|
||||
|
||||
namespace {
|
||||
time_point now() {
|
||||
return std::chrono::steady_clock::now();
|
||||
}
|
||||
|
||||
std::shared_ptr<CxxMessageQueue> createAndStartQueue(EventFlag& finishedFlag) {
|
||||
auto q = std::make_shared<CxxMessageQueue>();
|
||||
std::thread t([q, &finishedFlag] () mutable {
|
||||
auto loop = CxxMessageQueue::getRunLoop(q);
|
||||
// Note: make sure that no stack frames above loop() have a strong reference to q.
|
||||
q.reset();
|
||||
loop();
|
||||
finishedFlag.set();
|
||||
});
|
||||
t.detach();
|
||||
return q;
|
||||
}
|
||||
|
||||
std::unique_ptr<CxxMessageQueue> createAndStartUnregisteredQueue(
|
||||
EventFlag& finishedFlag) {
|
||||
auto q = std::make_unique<CxxMessageQueue>();
|
||||
std::thread t([loop=q->getUnregisteredRunLoop(), &finishedFlag] {
|
||||
loop();
|
||||
finishedFlag.set();
|
||||
});
|
||||
t.detach();
|
||||
return q;
|
||||
}
|
||||
|
||||
// This is just used to start up a queue for a test and make sure that it is
|
||||
// actually shut down after the test.
|
||||
struct QueueWithThread {
|
||||
QueueWithThread() {
|
||||
queue = createAndStartQueue(done);
|
||||
}
|
||||
|
||||
~QueueWithThread() {
|
||||
queue->quitSynchronous();
|
||||
queue.reset();
|
||||
EXPECT_TRUE(done.wait_until(now() + milliseconds(300))) << "Queue did not exit";
|
||||
}
|
||||
|
||||
EventFlag done;
|
||||
std::shared_ptr<CxxMessageQueue> queue;
|
||||
};
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestQuit) {
|
||||
EventFlag done;
|
||||
auto q = createAndStartQueue(done);
|
||||
q->quitSynchronous();
|
||||
EXPECT_TRUE(done.wait_until(now() + milliseconds(300)))
|
||||
<< "Queue did not exit runloop after quitSynchronous";
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestPostTask) {
|
||||
QueueWithThread qt;
|
||||
auto q = qt.queue;
|
||||
|
||||
EventFlag flag;
|
||||
q->runOnQueue([&] {
|
||||
flag.set();
|
||||
});
|
||||
flag.wait();
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestPostUnregistered) {
|
||||
EventFlag qdone;
|
||||
auto q = createAndStartUnregisteredQueue(qdone);
|
||||
|
||||
EventFlag tflag;
|
||||
q->runOnQueue([&] {
|
||||
tflag.set();
|
||||
});
|
||||
tflag.wait();
|
||||
|
||||
q->quitSynchronous();
|
||||
q.reset();
|
||||
EXPECT_TRUE(qdone.wait_until(now() + milliseconds(300))) << "Queue did not exit";
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestPostCurrent) {
|
||||
QueueWithThread qt;
|
||||
auto q = qt.queue;
|
||||
|
||||
EventFlag flag;
|
||||
q->runOnQueue([&] {
|
||||
CxxMessageQueue::current()->runOnQueue([&] {
|
||||
flag.set();
|
||||
});
|
||||
});
|
||||
flag.wait();
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestPostTaskMultiple) {
|
||||
QueueWithThread qt;
|
||||
auto q = qt.queue;
|
||||
|
||||
std::vector<EventFlag> vec(10);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
q->runOnQueue([&, i] {
|
||||
vec[i].set();
|
||||
});
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
vec[i].wait();
|
||||
}
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestQueuedTaskOrdering) {
|
||||
QueueWithThread qt;
|
||||
auto q = qt.queue;
|
||||
|
||||
// Block the runloop so we can get some queued tasks.
|
||||
EventFlag wait;
|
||||
q->runOnQueue([&] {
|
||||
wait.wait();
|
||||
});
|
||||
|
||||
// These tasks should run in order.
|
||||
int failed = -1;
|
||||
int i = 0;
|
||||
for (int j = 0; j < 10; j++) {
|
||||
q->runOnQueue([&, j] {
|
||||
if (i != j) {
|
||||
failed = j;
|
||||
}
|
||||
i++;
|
||||
});
|
||||
}
|
||||
wait.set();
|
||||
|
||||
// Flush the queue.
|
||||
q->runOnQueueSync([&] {});
|
||||
|
||||
ASSERT_EQ(failed, -1);
|
||||
ASSERT_EQ(i, 10);
|
||||
}
|
||||
|
||||
TEST(CxxMessageQueue, TestDelayedTaskOrdering) {
|
||||
QueueWithThread qt;
|
||||
auto q = qt.queue;
|
||||
|
||||
// Block the runloop so we can get some queued tasks.
|
||||
EventFlag wait;
|
||||
q->runOnQueue([&] {
|
||||
wait.wait();
|
||||
});
|
||||
|
||||
int ids[] = {8, 4, 6, 1, 3, 2, 9, 5, 0, 7};
|
||||
|
||||
int failed = -1;
|
||||
int i = 0;
|
||||
EventFlag done;
|
||||
// If this loop actually takes longer than the difference between delays, the
|
||||
// ordering could get screwed up :/
|
||||
for (int j = 0; j < 10; j++) {
|
||||
q->runOnQueueDelayed([&, j] {
|
||||
if (i != ids[j]) {
|
||||
failed = j;
|
||||
}
|
||||
i++;
|
||||
if (ids[j] == 9) {
|
||||
done.set();
|
||||
}
|
||||
}, 50 + 10 * ids[j]);
|
||||
}
|
||||
wait.set();
|
||||
done.wait();
|
||||
|
||||
ASSERT_EQ(failed, -1);
|
||||
ASSERT_EQ(i, 10);
|
||||
}
|
Loading…
Reference in New Issue