realm-js/lib/worker.js

129 lines
3.7 KiB
JavaScript
Raw Normal View History

////////////////////////////////////////////////////////////////////////////
//
// Copyright 2017 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
'use strict';
const require_method = require;
// Prevent React Native packager from seeing modules required with this
function nodeRequire(module) {
return require_method(module);
}
const cp = nodeRequire('child_process');
const os = nodeRequire('os');
class Worker {
constructor(modulePath, options={}) {
this.modulePath = modulePath;
this.maxWorkers = options.maxWorkers || os.cpus().length;
this.env = options.env || {};
this.execArgv = options.execArgv || {};
this._workers = [];
this._waiting = [];
this._workQueue = [];
this._changeObjects = {};
this._startWorker();
}
onavailable(path) {
this._push({message: 'available', path});
}
onchange(change) {
const serialized = change.serialize();
change.refCount = (change.refCount || 0) + 1;
this._changeObjects[serialized] = change;
this._push({message: 'change', change: serialized});
}
stop() {
this._stopping = true;
return new Promise((r) => {
this._shutdownComplete = r;
this._next();
});
}
_push(message) {
if (this._stopping) {
return;
}
this._workQueue.push(message);
this._next();
}
_startWorker() {
const child = cp.fork(__dirname + '/notification-worker.js', [], {
env: this.env,
execArgv: this.execArgv
});
child.on('message', (m) => {
if (m.change) {
const changeObj = this._changeObjects[m.change];
delete this._changeObjects[m.change];
changeObj.release();
}
this._waiting.push(child);
this._next();
});
child.on('exit', (code, signal) => {
if (code !== 0) {
console.error(`Unexpected exit code from child: ${code} ${signal}`);
}
this._workers = this._workers.filter(c => c !== child);
this._next();
});
child.send({message: 'load', module: this.modulePath});
this._workers.push(child);
this._waiting.push(child);
}
_next() {
if (this._stopping && this._workQueue.length === 0) {
for (const worker of this._workers) {
if (!worker.stopping) {
worker.send({message: 'stop'});
worker.stopping = true;
}
}
if (this._workers.length === 0) {
this._shutdownComplete();
}
return;
}
if (this._workQueue.length === 0) {
return;
}
if (this._waiting.length === 0) {
if (this._workers.length < this.maxWorkers) {
this._startWorker();
}
return;
}
const worker = this._waiting.shift();
const message = this._workQueue.shift();
worker.send(message);
}
}
module.exports = Worker;