Copying definition of functions (#1621)

* Copying definition of functions
* Avoid loading when using React Native
This commit is contained in:
Kenneth Geisshirt 2018-01-16 10:11:30 +01:00 committed by GitHub
parent 91559c216b
commit 193314834e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 472 additions and 7 deletions

View File

@ -1,3 +1,18 @@
X.Y.Z Release notes
=============================================================
### Breaking changes
* None.
### Enhancements
* None.
### Bug fixes
* [Object Server] Added missing `Realm.Sync` listener functions.
### Internal
* None.
2.2.1 Release notes (2018-1-13)
=============================================================
### Breaking changes

View File

@ -119,4 +119,11 @@ if (!realmConstructor) {
require('./extensions')(realmConstructor);
if (realmConstructor.Sync) {
if (getContext() === 'nodejs') {
nodeRequire('./notifier')(realmConstructor);
Object.defineProperty(realmConstructor, 'Worker', {value: nodeRequire('./worker')});
}
}
module.exports = realmConstructor;

View File

@ -0,0 +1,61 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2017 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
'use strict';
const require_method = require;
// Prevent React Native packager from seeing modules required with this
function nodeRequire(module) {
return require_method(module);
}
const Realm = require('.');
let impl;
process.on('message', (m) => {
switch (m.message) {
case 'load':
impl = require(m.module);
break;
case 'available':
if (impl.onavailable) {
impl.onavailable(m.path);
}
process.send({});
break;
case 'change':
if (impl.onchange) {
const change = Realm.Sync._deserializeChangeSet(m.change);
if (!change.isEmpty) {
impl.onchange(change);
}
change.close();
}
process.send({change: m.change});
break;
case 'message':
if (impl.onmessage) {
impl.onmessage(m.body);
}
process.send({});
break;
case 'stop':
process.exit(0);
}
});

252
lib/notifier.js Normal file
View File

@ -0,0 +1,252 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2017 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
'use strict';
const require_method = require;
// Prevent React Native packager from seeing modules required with this
function nodeRequire(module) {
return require_method(module);
}
const Worker = nodeRequire('./worker');
class FunctionListener {
constructor(regex, regexStr, event, fn) {
this.regex = regex;
this.regexStr = regexStr;
this.event = event;
this.fn = fn;
this.seen = {};
}
stop() {
return Promise.resolve();
}
matches(regex, event, fn) {
return this.regexStr === regex && this.event === event && this.fn === fn;
}
onavailable(path) {
if (this.regex.test(path)) {
if (this.event === 'available' && !this.seen[path]) {
this.fn(path);
this.seen[path] = true;
}
return this.event === 'change';
}
return false;
}
onchange(changes) {
if (this.event !== 'change' || !this.regex.test(changes.path)) {
return;
}
if (changes.isEmpty) {
return;
}
this.fn(changes);
}
};
class OutOfProcListener {
constructor(regex, regexStr, worker) {
this.regex = regex;
this.regexStr = regexStr;
this.worker = worker;
this.seen = {};
}
stop() {
return this.worker.stop();
}
matches(regex, worker) {
return this.regexStr === regex && this.worker === worker;
}
onavailable(path) {
if (this.regex.test(path)) {
if (!this.seen[path]) {
this.worker.onavailable(path);
this.seen[path] = true;
}
return true;
}
return false;
}
onchange(changes) {
if (!this.regex.test(changes.path)) {
return;
}
this.worker.onchange(changes);
}
};
class Listener {
constructor(Sync, server, user) {
this.notifier = Sync._createNotifier(server, user, (event, arg) => this[event](arg));
this.initPromises = [];
this.callbacks = [];
}
// callbacks for C++ functions
downloadComplete() {
this.initComplete = true;
this._notifyDownloadComplete();
}
error(err) {
this.err = err;
this.initComplete = true;
this._notifyDownloadComplete();
}
change() {
const changes = this.notifier.next();
if (!changes) {
return;
}
for (const callback of this.callbacks) {
callback.onchange(changes);
}
if (!changes.refCount) {
changes.close();
}
}
available(virtualPath) {
let watch = false;
for (const callback of this.callbacks) {
if (callback.onavailable(virtualPath)) {
watch = true;
}
}
return watch;
}
// public API implementation
add(regexStr, event, fn) {
const regex = new RegExp(regexStr);
if (typeof fn === 'function') {
this.callbacks.push(new FunctionListener(regex, regexStr, event, fn));
}
else if (event instanceof Worker) {
this.callbacks.push(new OutOfProcListener(regex, regexStr, event));
}
else {
throw new Error(`Invalid arguments: must supply either event name and callback function or a Worker, got (${event}, ${fn})`);
}
const promise = new Promise((resolve, reject) => {
this.initPromises.push([resolve, reject]);
});
this._notifyDownloadComplete();
this.notifier.start();
return promise;
}
remove(regex, event, callback) {
for (let i = 0; i < this.callbacks.length; ++i) {
if (this.callbacks[i].matches(regex, event, callback)) {
const ret = this.callbacks[i].stop();
this.callbacks.splice(i, 1);
return ret;
}
}
return Promise.resolve();
}
removeAll() {
let ret = Promise.all(this.callbacks.map(c => c.stop()));
this.callbacks = [];
return ret;
}
get isEmpty() {
return this.callbacks.length === 0;
}
// helpers
_notifyDownloadComplete() {
if (!this.initComplete) {
return;
}
if (this.err) {
for (let [_, reject] of this.initPromises) {
setImmediate(() => reject(this.err));
}
}
else {
for (let [resolve, _] of this.initPromises) {
setImmediate(resolve);
}
}
this.initPromises = [];
}
};
let listener;
function addListener(server, user, regex, event, callback) {
if (!listener) {
listener = new Listener(this, server, user);
}
return listener.add(regex, event, callback);
}
function removeListener(regex, event, callback) {
if (!listener) {
return Promise.resolve();
}
let ret = listener.remove(regex, event, callback);
if (listener.isEmpty) {
listener.notifier.close();
listener = null;
}
return ret;
}
function removeAllListeners() {
if (!listener) {
return Promise.resolve();
}
let ret = listener.removeAll();
listener.notifier.close();
listener = null;
return ret;
}
function setListenerDirectory(dir) {
if (listener) {
throw new Error("The listener directory can't be changed when there are active listeners.");
}
this._setListenerDirectory(dir);
}
module.exports = function(Realm) {
Realm.Sync.setListenerDirectory = setListenerDirectory.bind(Realm.Sync);
Realm.Sync.addListener = addListener.bind(Realm.Sync);
Realm.Sync.removeListener = removeListener;
Realm.Sync.removeAllListeners = removeAllListeners;
}

130
lib/worker.js Normal file
View File

@ -0,0 +1,130 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2017 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
'use strict';
const require_method = require;
// Prevent React Native packager from seeing modules required with this
function nodeRequire(module) {
return require_method(module);
}
const cp = nodeRequire('child_process');
const os = nodeRequire('os');
class Worker {
constructor(modulePath, options={}) {
this.modulePath = modulePath;
this.maxWorkers = options.maxWorkers || os.cpus().length;
this.env = options.env || {};
this.execArgv = options.execArgv || {};
this._workers = [];
this._waiting = [];
this._workQueue = [];
this._changeObjects = {};
this._startWorker();
}
onavailable(path) {
this._push({message: 'available', path});
}
onchange(change) {
const serialized = change.serialize();
change.refCount = (change.refCount || 0) + 1;
this._changeObjects[serialized] = change;
this._push({message: 'change', change: serialized});
}
stop() {
this._stopping = true;
return new Promise((r) => {
this._shutdownComplete = r;
this._next();
});
}
_push(message) {
if (this._stopping) {
return;
}
this._workQueue.push(message);
this._next();
}
_startWorker() {
const child = cp.fork(__dirname + '/notification-worker.js', [], {
env: this.env,
execArgv: this.execArgv
});
child.on('message', (m) => {
if (m.change) {
const changeObj = this._changeObjects[m.change];
delete this._changeObjects[m.change];
if (--changeObj.refCount === 0) {
changeObj.close();
}
}
this._waiting.push(child);
this._next();
});
child.on('exit', (code, signal) => {
if (code !== 0) {
console.error(`Unexpected exit code from child: ${code} ${signal}`);
}
this._workers = this._workers.filter(c => c !== child);
this._next();
});
child.send({message: 'load', module: this.modulePath});
this._workers.push(child);
this._waiting.push(child);
}
_next() {
if (this._stopping && this._workQueue.length === 0) {
for (const worker of this._workers) {
if (!worker.stopping) {
worker.send({message: 'stop'});
worker.stopping = true;
}
}
if (this._workers.length === 0) {
this._shutdownComplete();
}
return;
}
if (this._workQueue.length === 0) {
return;
}
if (this._waiting.length === 0) {
if (this._workers.length < this.maxWorkers) {
this._startWorker();
}
return;
}
const worker = this._waiting.shift();
const message = this._workQueue.shift();
worker.send(message);
}
};
module.exports = Worker;