WebWorkers: implement tear down and terminate()

Summary:
public
Previously we did no cleanup when the executor was torn down and didn't expose a way to tear down the worker from JS. Now we do.

Termination happens synchronously and waits the the worker's MessageQueueThread to finish and join.

Reviewed By: mhorowitz

Differential Revision: D2815240

fb-gh-sync-id: 786beb30d4d64556586b91727f32e379c667a965
This commit is contained in:
Andy Street 2016-01-12 04:51:16 -08:00 committed by facebook-github-bot-4
parent b0519c8280
commit 0be36a2c80
8 changed files with 145 additions and 12 deletions

View File

@ -43,4 +43,11 @@ public interface MessageQueueThread {
* {@link AssertionError}) if the assertion fails. * {@link AssertionError}) if the assertion fails.
*/ */
void assertIsOnThread(); void assertIsOnThread();
/**
* Quits this MessageQueueThread. If called from this MessageQueueThread, this will be the last
* thing the thread runs. If called from a separate thread, this will block until the thread can
* be quit and joined.
*/
void quitSynchronous();
} }

View File

@ -101,6 +101,7 @@ public class MessageQueueThreadImpl implements MessageQueueThread {
* Quits this queue's Looper. If that Looper was running on a different Thread than the current * Quits this queue's Looper. If that Looper was running on a different Thread than the current
* Thread, also waits for the last message being processed to finish and the Thread to die. * Thread, also waits for the last message being processed to finish and the Thread to die.
*/ */
@Override
public void quitSynchronous() { public void quitSynchronous() {
mIsFinished = true; mIsFinished = true;
mLooper.quit(); mLooper.quit();

View File

@ -106,6 +106,7 @@ JSCExecutor::JSCExecutor(FlushImmediateCallback cb) :
installGlobalFunction(m_context, "nativePerformanceNow", nativePerformanceNow); installGlobalFunction(m_context, "nativePerformanceNow", nativePerformanceNow);
installGlobalFunction(m_context, "nativeStartWorker", nativeStartWorker); installGlobalFunction(m_context, "nativeStartWorker", nativeStartWorker);
installGlobalFunction(m_context, "nativePostMessageToWorker", nativePostMessageToWorker); installGlobalFunction(m_context, "nativePostMessageToWorker", nativePostMessageToWorker);
installGlobalFunction(m_context, "nativeTerminateWorker", nativeTerminateWorker);
#ifdef WITH_FB_JSC_TUNING #ifdef WITH_FB_JSC_TUNING
configureJSCForAndroid(); configureJSCForAndroid();
@ -123,6 +124,15 @@ JSCExecutor::JSCExecutor(FlushImmediateCallback cb) :
} }
JSCExecutor::~JSCExecutor() { JSCExecutor::~JSCExecutor() {
// terminateWebWorker mutates m_webWorkers so collect all the workers to terminate first
std::vector<int> workerIds;
for (auto it = m_webWorkers.begin(); it != m_webWorkers.end(); it++) {
workerIds.push_back(it->first);
}
for (int workerId : workerIds) {
terminateWebWorker(workerId);
}
s_globalContextRefToJSCExecutor.erase(m_context); s_globalContextRefToJSCExecutor.erase(m_context);
JSGlobalContextRelease(m_context); JSGlobalContextRelease(m_context);
} }
@ -268,6 +278,15 @@ void JSCExecutor::postMessageToWebWorker(int workerId, JSValueRef message, JSVal
worker.postMessage(message); worker.postMessage(message);
} }
void JSCExecutor::terminateWebWorker(int workerId) {
JSCWebWorker& worker = m_webWorkers.at(workerId);
worker.terminate();
m_webWorkers.erase(workerId);
m_webWorkerJSObjs.erase(workerId);
}
static JSValueRef createErrorString(JSContextRef ctx, const char *msg) { static JSValueRef createErrorString(JSContextRef ctx, const char *msg) {
return JSValueMakeString(ctx, String(msg)); return JSValueMakeString(ctx, String(msg));
} }
@ -361,6 +380,37 @@ JSValueRef JSCExecutor::nativePostMessageToWorker(
return JSValueMakeUndefined(ctx); return JSValueMakeUndefined(ctx);
} }
JSValueRef JSCExecutor::nativeTerminateWorker(
JSContextRef ctx,
JSObjectRef function,
JSObjectRef thisObject,
size_t argumentCount,
const JSValueRef arguments[],
JSValueRef *exception) {
if (argumentCount != 1) {
*exception = createErrorString(ctx, "Got wrong number of args");
return JSValueMakeUndefined(ctx);
}
double workerDouble = JSValueToNumber(ctx, arguments[0], exception);
if (workerDouble != workerDouble) {
*exception = createErrorString(ctx, "Got invalid worker id");
return JSValueMakeUndefined(ctx);
}
JSCExecutor *executor;
try {
executor = s_globalContextRefToJSCExecutor.at(JSContextGetGlobalContext(ctx));
} catch (std::out_of_range& e) {
*exception = createErrorString(ctx, "Global JS context didn't map to a valid executor");
return JSValueMakeUndefined(ctx);
}
executor->terminateWebWorker((int) workerDouble);
return JSValueMakeUndefined(ctx);
}
static JSValueRef nativeLoggingHook( static JSValueRef nativeLoggingHook(
JSContextRef ctx, JSContextRef ctx,
JSObjectRef function, JSObjectRef function,

View File

@ -62,6 +62,7 @@ private:
int addWebWorker(const std::string& script, JSValueRef workerRef); int addWebWorker(const std::string& script, JSValueRef workerRef);
void postMessageToWebWorker(int worker, JSValueRef message, JSValueRef *exn); void postMessageToWebWorker(int worker, JSValueRef message, JSValueRef *exn);
void terminateWebWorker(int worker);
static JSValueRef nativeStartWorker( static JSValueRef nativeStartWorker(
JSContextRef ctx, JSContextRef ctx,
@ -77,6 +78,13 @@ private:
size_t argumentCount, size_t argumentCount,
const JSValueRef arguments[], const JSValueRef arguments[],
JSValueRef *exception); JSValueRef *exception);
static JSValueRef nativeTerminateWorker(
JSContextRef ctx,
JSObjectRef function,
JSObjectRef thisObject,
size_t argumentCount,
const JSValueRef arguments[],
JSValueRef *exception);
}; };
} } } }

View File

@ -1,8 +1,8 @@
// Copyright 2004-present Facebook. All Rights Reserved. // Copyright 2004-present Facebook. All Rights Reserved.
#include <unistd.h> #include <unistd.h>
#include <condition_variable>
#include <mutex> #include <mutex>
#include <pthread.h>
#include <unordered_map> #include <unordered_map>
#include <fb/assert.h> #include <fb/assert.h>
@ -43,14 +43,14 @@ JSCWebWorker::JSCWebWorker(int id, JSCWebWorkerOwner *owner, std::string scriptS
} }
JSCWebWorker::~JSCWebWorker() { JSCWebWorker::~JSCWebWorker() {
// TODO(9604430): Implement tear down FBASSERTMSGF(isTerminated(), "Didn't terminate the web worker before releasing it!");
} }
void JSCWebWorker::postMessage(JSValueRef msg) { void JSCWebWorker::postMessage(JSValueRef msg) {
std::string msgString = Value(owner_->getContext(), msg).toJSONString(); std::string msgString = Value(owner_->getContext(), msg).toJSONString();
workerMessageQueueThread_->runOnQueue([this, msgString] () { workerMessageQueueThread_->runOnQueue([this, msgString] () {
if (isFinished()) { if (isTerminated()) {
return; return;
} }
@ -60,17 +60,46 @@ void JSCWebWorker::postMessage(JSValueRef msg) {
}); });
} }
void JSCWebWorker::finish() { void JSCWebWorker::terminate() {
isFinished_ = true; if (isTerminated()) {
// TODO(9604430): Implement tear down return;
}
isTerminated_.store(true, std::memory_order_release);
if (workerMessageQueueThread_->isOnThread()) {
terminateOnWorkerThread();
} else {
std::mutex signalMutex;
std::condition_variable signalCv;
bool terminationComplete = false;
workerMessageQueueThread_->runOnQueue([&] () mutable {
std::lock_guard<std::mutex> lock(signalMutex);
terminateOnWorkerThread();
terminationComplete = true;
signalCv.notify_one();
});
std::unique_lock<std::mutex> lock(signalMutex);
signalCv.wait(lock, [&terminationComplete] { return terminationComplete; });
}
} }
bool JSCWebWorker::isFinished() { void JSCWebWorker::terminateOnWorkerThread() {
return isFinished_; s_globalContextRefToJSCWebWorker.erase(context_);
JSGlobalContextRelease(context_);
context_ = nullptr;
workerMessageQueueThread_->quitSynchronous();
}
bool JSCWebWorker::isTerminated() {
return isTerminated_.load(std::memory_order_acquire);
} }
void JSCWebWorker::initJSVMAndLoadScript() { void JSCWebWorker::initJSVMAndLoadScript() {
FBASSERTMSGF(!isFinished(), "Worker was already finished!"); FBASSERTMSGF(!isTerminated(), "Worker was already finished!");
FBASSERTMSGF(!context_, "Worker JS VM was already created!"); FBASSERTMSGF(!context_, "Worker JS VM was already created!");
context_ = JSGlobalContextCreateInGroup( context_ = JSGlobalContextCreateInGroup(
@ -106,6 +135,11 @@ JSValueRef JSCWebWorker::nativePostMessage(
} }
JSValueRef msg = arguments[0]; JSValueRef msg = arguments[0];
JSCWebWorker *webWorker = s_globalContextRefToJSCWebWorker.at(JSContextGetGlobalContext(ctx)); JSCWebWorker *webWorker = s_globalContextRefToJSCWebWorker.at(JSContextGetGlobalContext(ctx));
if (webWorker->isTerminated()) {
return JSValueMakeUndefined(ctx);
}
webWorker->postMessageToOwner(msg); webWorker->postMessageToOwner(msg);
return JSValueMakeUndefined(ctx); return JSValueMakeUndefined(ctx);

View File

@ -1,5 +1,6 @@
// Copyright 2004-present Facebook. All Rights Reserved. // Copyright 2004-present Facebook. All Rights Reserved.
#include <atomic>
#include <functional> #include <functional>
#include <mutex> #include <mutex>
#include <string> #include <string>
@ -56,17 +57,26 @@ public:
* ownerMessageQueueThread_. * ownerMessageQueueThread_.
*/ */
void postMessage(JSValueRef msg); void postMessage(JSValueRef msg);
void finish();
bool isFinished(); /**
* Synchronously quits the current worker and cleans up its VM.
*/
void terminate();
/**
* Whether terminate() has been called on this worker.
*/
bool isTerminated();
static Object createMessageObject(JSContextRef context, const std::string& msgData); static Object createMessageObject(JSContextRef context, const std::string& msgData);
private: private:
void initJSVMAndLoadScript(); void initJSVMAndLoadScript();
void postRunnableToEventLoop(std::function<void()>&& runnable); void postRunnableToEventLoop(std::function<void()>&& runnable);
void postMessageToOwner(JSValueRef result); void postMessageToOwner(JSValueRef result);
void terminateOnWorkerThread();
int id_; int id_;
bool isFinished_ = false; std::atomic_bool isTerminated_ = ATOMIC_VAR_INIT(false);
std::string scriptName_; std::string scriptName_;
JSCWebWorkerOwner *owner_ = nullptr; JSCWebWorkerOwner *owner_ = nullptr;
std::shared_ptr<JMessageQueueThread> ownerMessageQueueThread_; std::shared_ptr<JMessageQueueThread> ownerMessageQueueThread_;

View File

@ -21,6 +21,18 @@ void JMessageQueueThread::runOnQueue(std::function<void()>&& runnable) {
method(m_jobj, JNativeRunnable::newObjectCxxArgs(runnable).get()); method(m_jobj, JNativeRunnable::newObjectCxxArgs(runnable).get());
} }
bool JMessageQueueThread::isOnThread() {
static auto method = MessageQueueThread::javaClassStatic()->
getMethod<jboolean()>("isOnThread");
return method(m_jobj);
}
void JMessageQueueThread::quitSynchronous() {
static auto method = MessageQueueThread::javaClassStatic()->
getMethod<void()>("quitSynchronous");
method(m_jobj);
}
/* static */ /* static */
std::unique_ptr<JMessageQueueThread> JMessageQueueThread::currentMessageQueueThread() { std::unique_ptr<JMessageQueueThread> JMessageQueueThread::currentMessageQueueThread() {
static auto method = MessageQueueThreadRegistry::javaClassStatic()-> static auto method = MessageQueueThreadRegistry::javaClassStatic()->

View File

@ -25,6 +25,17 @@ public:
*/ */
void runOnQueue(std::function<void()>&& runnable); void runOnQueue(std::function<void()>&& runnable);
/**
* Returns whether the currently executing thread is this MessageQueueThread.
*/
bool isOnThread();
/**
* Synchronously quits the current MessageQueueThread. Can be called from any thread, but will
* block if not called on this MessageQueueThread.
*/
void quitSynchronous();
MessageQueueThread::javaobject jobj() { MessageQueueThread::javaobject jobj() {
return m_jobj.get(); return m_jobj.get();
} }