Add support for uploading and downloading all changes (#2124)
This commit is contained in:
parent
05a65ce49c
commit
a680084788
|
@ -1,7 +1,8 @@
|
|||
x.x.x Release notes (yyyy-MM-dd)
|
||||
=============================================================
|
||||
### Enhancements
|
||||
* Adds support for setting a custom User-Agent string using `Realm.Sync.setUserAgent(...)`. This string will be sent to the server when creating a connection. ([#XXX]())
|
||||
* Adds support for setting a custom User-Agent string using `Realm.Sync.setUserAgent(...)`. This string will be sent to the server when creating a connection. ([#2102](https://github.com/realm/realm-js/issues/2102))
|
||||
* Adds support for uploading and downloading changes using `Realm.Sync.Session.uploadAllLocalChanges(timeout)` and `Realm.Sync.Session.downloadAllRemoteChanges(timeout)`. ([#2122](https://github.com/realm/realm-js/issues/2122))
|
||||
|
||||
### Fixed
|
||||
* Tokens are refreshed ahead of time. If the lifetime of the token is lower than the threshold for refreshing it will cause the client to continously refresh, spamming the server with refresh requests. A lower bound of 10 seconds has been introduced. ([#2115](https://github.com/realm/realm-js/issues/2115), since v1.0.2)
|
||||
|
|
|
@ -203,6 +203,7 @@ def doDockerBuild(target, postStep = null) {
|
|||
def doMacBuild(target, postStep = null) {
|
||||
return {
|
||||
node('osx_vegas') {
|
||||
env.DEVELOPER_DIR = "/Applications/Xcode-9.4.app/Contents/Developer"
|
||||
doInside("./scripts/test.sh", target, postStep)
|
||||
}
|
||||
}
|
||||
|
|
23
docs/sync.js
23
docs/sync.js
|
@ -764,6 +764,29 @@ class Session {
|
|||
*/
|
||||
pause() {}
|
||||
|
||||
/**
|
||||
* This method returns a promise that does not resolve successfully until all known local changes have been uploaded
|
||||
* to the server or the specified timeout is hit in which case it will be rejected. If the method times out, the upload
|
||||
* will still continue in the background.
|
||||
*
|
||||
* This method cannot be called before the Realm has been opened.
|
||||
*
|
||||
* @param timeout maximum amount of time to wait in milliseconds before the promise is rejected. If no timeout
|
||||
* is specified the method will wait forever.
|
||||
*/
|
||||
uploadAllLocalChanges(timeoutMs) {}
|
||||
|
||||
/**
|
||||
* This method returns a promise that does not resolve successfully until all known remote changes have been
|
||||
* downloaded and applied to the Realm or the specified timeout is hit in which case it will be rejected. If the method
|
||||
* times out, the download will still continue in the background.
|
||||
*
|
||||
* This method cannot be called before the Realm has been opened.
|
||||
*
|
||||
* @param timeout maximum amount of time to wait in milliseconds before the promise will be rejected. If no timeout
|
||||
* is specified the method will wait forever.
|
||||
*/
|
||||
downloadAllServerChanges(timeoutMs) {}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -70,6 +70,29 @@ function addSchemaIfNeeded(schemaList, schemaObj) {
|
|||
}
|
||||
}
|
||||
|
||||
function waitForCompletion(session, fn, timeout, timeoutErrorMessage) {
|
||||
const waiter = new Promise((resolve, reject) => {
|
||||
fn.call(session, (error) => {
|
||||
if (error === undefined) {
|
||||
setTimeout(() => resolve(), 1);
|
||||
} else {
|
||||
setTimeout(() => reject(error), 1);
|
||||
}
|
||||
});
|
||||
});
|
||||
if (timeout === undefined) {
|
||||
return waiter;
|
||||
}
|
||||
return Promise.race([
|
||||
waiter,
|
||||
new Promise((resolve, reject) => {
|
||||
setTimeout(() => {
|
||||
reject(timeoutErrorMessage);
|
||||
}, timeout)
|
||||
})
|
||||
]);
|
||||
}
|
||||
|
||||
module.exports = function(realmConstructor) {
|
||||
// Add the specified Array methods to the Collection prototype.
|
||||
Object.defineProperties(realmConstructor.Collection.prototype, require('./collection-methods'));
|
||||
|
@ -238,6 +261,14 @@ module.exports = function(realmConstructor) {
|
|||
}
|
||||
}
|
||||
|
||||
realmConstructor.Sync.Session.prototype.uploadAllLocalChanges = function(timeout) {
|
||||
return waitForCompletion(this, this._waitForUploadCompletion, timeout, `Uploading changes did not complete in ${timeout} ms.`);
|
||||
};
|
||||
|
||||
realmConstructor.Sync.Session.prototype.downloadAllServerChanges = function(timeout) {
|
||||
return waitForCompletion(this, this._waitForDownloadCompletion, timeout, `Downloading changes did not complete ${timeout} ms.`);
|
||||
};
|
||||
|
||||
// Keep these value in sync with subscription_state.hpp
|
||||
realmConstructor.Sync.SubscriptionState = {
|
||||
Error: -1, // An error occurred while creating or processing the partial sync subscription.
|
||||
|
|
|
@ -489,6 +489,9 @@ declare namespace Realm.Sync {
|
|||
|
||||
resume(): void;
|
||||
pause(): void;
|
||||
|
||||
downloadAllServerChanges(timeoutMs?: number): Promise<void>;
|
||||
uploadAllLocalChanges(timeoutMs?: number): Promise<void>;
|
||||
}
|
||||
|
||||
type SubscriptionNotificationCallback = (subscription: Subscription, state: number) => void;
|
||||
|
|
|
@ -210,6 +210,7 @@ public:
|
|||
using ProgressHandler = void(uint64_t transferred_bytes, uint64_t transferrable_bytes);
|
||||
using StateHandler = void(SyncSession::PublicState old_state, SyncSession::PublicState new_state);
|
||||
using ConnectionHandler = void(SyncSession::ConnectionState new_state, SyncSession::ConnectionState old_state);
|
||||
using DownloadUploadCompletionHandler = void(std::error_code error);
|
||||
|
||||
static FunctionType create_constructor(ContextType);
|
||||
|
||||
|
@ -231,6 +232,9 @@ public:
|
|||
static void resume(ContextType ctx, ObjectType this_object, Arguments &, ReturnValue &);
|
||||
static void pause(ContextType ctx, ObjectType this_object, Arguments &, ReturnValue &);
|
||||
static void override_server(ContextType, ObjectType, Arguments &, ReturnValue &);
|
||||
static void wait_for_download_completion(ContextType, ObjectType, Arguments &, ReturnValue &);
|
||||
static void wait_for_upload_completion(ContextType, ObjectType, Arguments &, ReturnValue &);
|
||||
|
||||
|
||||
PropertyMap<T> const properties = {
|
||||
{"config", {wrap<get_config>, nullptr}},
|
||||
|
@ -244,6 +248,8 @@ public:
|
|||
{"_simulateError", wrap<simulate_error>},
|
||||
{"_refreshAccessToken", wrap<refresh_access_token>},
|
||||
{"_overrideServer", wrap<override_server>},
|
||||
{"_waitForDownloadCompletion", wrap<wait_for_download_completion>},
|
||||
{"_waitForUploadCompletion", wrap<wait_for_upload_completion>},
|
||||
{"addProgressNotification", wrap<add_progress_notification>},
|
||||
{"removeProgressNotification", wrap<remove_progress_notification>},
|
||||
{"addConnectionNotification", wrap<add_connection_notification>},
|
||||
|
@ -254,7 +260,9 @@ public:
|
|||
};
|
||||
|
||||
private:
|
||||
enum Direction { Upload, Download };
|
||||
static std::string get_connection_state_value(SyncSession::ConnectionState state);
|
||||
static void wait_for_completion(Direction direction, ContextType ctx, ObjectType this_object, Arguments& args);
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
|
@ -679,6 +687,57 @@ void SessionClass<T>::override_server(ContextType ctx, ObjectType this_object, A
|
|||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void SessionClass<T>::wait_for_completion(Direction direction, ContextType ctx, ObjectType this_object, Arguments &args) {
|
||||
args.validate_count(1);
|
||||
if (auto session = get_internal<T, SessionClass<T>>(this_object)->lock()) {
|
||||
auto callback_function = Value::validated_to_function(ctx, args[0]);
|
||||
Protected<FunctionType> protected_callback(ctx, callback_function);
|
||||
Protected<ObjectType> protected_this(ctx, this_object);
|
||||
Protected<typename T::GlobalContext> protected_ctx(Context<T>::get_global_context(ctx));
|
||||
|
||||
EventLoopDispatcher<DownloadUploadCompletionHandler> completion_handler([=](std::error_code error) {
|
||||
HANDLESCOPE
|
||||
ValueType callback_arguments[1];
|
||||
if (error) {
|
||||
ObjectType error_object = Object::create_empty(protected_ctx);
|
||||
Object::set_property(protected_ctx, error_object, "message", Value::from_string(protected_ctx, error.message()));
|
||||
Object::set_property(protected_ctx, error_object, "errorCode", Value::from_number(protected_ctx, error.value()));
|
||||
callback_arguments[0] = error_object;
|
||||
} else {
|
||||
callback_arguments[0] = Value::from_undefined(ctx);
|
||||
}
|
||||
Function<T>::callback(protected_ctx, protected_callback, typename T::Object(), 1, callback_arguments);
|
||||
});
|
||||
|
||||
bool callback_registered;
|
||||
switch(direction) {
|
||||
case Upload:
|
||||
callback_registered = session->wait_for_upload_completion(std::move(completion_handler));
|
||||
break;
|
||||
case Download:
|
||||
callback_registered = session->wait_for_download_completion(std::move(completion_handler));
|
||||
break;
|
||||
}
|
||||
if (!callback_registered) {
|
||||
throw new logic_error("Could not register upload/download completion handler");
|
||||
}
|
||||
auto syncSession = create_object<T, SessionClass<T>>(ctx, new WeakSession(session));
|
||||
PropertyAttributes attributes = ReadOnly | DontEnum | DontDelete;
|
||||
Object::set_property(ctx, callback_function, "_syncSession", syncSession, attributes);
|
||||
}
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void SessionClass<T>::wait_for_upload_completion(ContextType ctx, ObjectType this_object, Arguments &args, ReturnValue&) {
|
||||
wait_for_completion(Direction::Upload, ctx, this_object, args);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void SessionClass<T>::wait_for_download_completion(ContextType ctx, ObjectType this_object, Arguments &args, ReturnValue&) {
|
||||
wait_for_completion(Direction::Download, ctx, this_object, args);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
class Subscription : public partial_sync::Subscription {
|
||||
public:
|
||||
|
|
|
@ -1296,4 +1296,109 @@ module.exports = {
|
|||
})
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
testUploadDownloadAllChanges() {
|
||||
if(!isNodeProccess) {
|
||||
return;
|
||||
}
|
||||
|
||||
const AUTH_URL = 'http://localhost:9080';
|
||||
const REALM_URL = 'realm://localhost:9080/completion_realm';
|
||||
const schema = {
|
||||
'name': 'CompletionHandlerObject',
|
||||
properties: {
|
||||
'name': { type: 'string'}
|
||||
}
|
||||
};
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let admin2Realm;
|
||||
Realm.Sync.User.login(AUTH_URL, Realm.Sync.Credentials.nickname("admin1", true))
|
||||
.then((admin1) => {
|
||||
const admin1Config = admin1.createConfiguration({
|
||||
schema: [schema],
|
||||
sync: {
|
||||
url: REALM_URL,
|
||||
fullSynchronization: true
|
||||
}
|
||||
});
|
||||
return Realm.open(admin1Config);
|
||||
})
|
||||
.then((admin1Realm) => {
|
||||
admin1Realm.write(() => { admin1Realm.create('CompletionHandlerObject', { 'name': 'foo'}); });
|
||||
return admin1Realm.syncSession.uploadAllLocalChanges();
|
||||
})
|
||||
.then(() => {
|
||||
return Realm.Sync.User.login(AUTH_URL, Realm.Sync.Credentials.nickname("admin2", true));
|
||||
})
|
||||
.then((admin2) => {
|
||||
const admin2Config = admin2.createConfiguration({
|
||||
schema: [schema],
|
||||
sync: {
|
||||
url: REALM_URL,
|
||||
fullSynchronization: true
|
||||
}
|
||||
});
|
||||
admin2Realm = new Realm(admin2Config);
|
||||
return admin2Realm.syncSession.downloadAllServerChanges();
|
||||
})
|
||||
.then(() => {
|
||||
TestCase.assertEqual(1, admin2Realm.objects('CompletionHandlerObject').length);
|
||||
resolve();
|
||||
})
|
||||
.catch(e => reject(e));
|
||||
});
|
||||
},
|
||||
|
||||
testDownloadAllServerChangesTimeout() {
|
||||
if(!isNodeProccess) {
|
||||
return;
|
||||
}
|
||||
|
||||
const AUTH_URL = 'http://localhost:9080';
|
||||
const REALM_URL = 'realm://localhost:9080/timeout_download_realm';
|
||||
return new Promise((resolve, reject) => {
|
||||
Realm.Sync.User.login(AUTH_URL, Realm.Sync.Credentials.nickname("admin", true))
|
||||
.then((admin1) => {
|
||||
const admin1Config = admin1.createConfiguration({
|
||||
sync: {
|
||||
url: REALM_URL,
|
||||
fullSynchronization: true
|
||||
}
|
||||
});
|
||||
let realm = new Realm(admin1Config);
|
||||
realm.syncSession.downloadAllServerChanges(1).then(() => {
|
||||
reject("Download did not time out");
|
||||
}).catch(e => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
testUploadAllLocalChangesTimeout() {
|
||||
if(!isNodeProccess) {
|
||||
return;
|
||||
}
|
||||
|
||||
const AUTH_URL = 'http://localhost:9080';
|
||||
const REALM_URL = 'realm://localhost:9080/timeout_upload_realm';
|
||||
return new Promise((resolve, reject) => {
|
||||
Realm.Sync.User.login(AUTH_URL, Realm.Sync.Credentials.nickname("admin", true))
|
||||
.then((admin1) => {
|
||||
const admin1Config = admin1.createConfiguration({
|
||||
sync: {
|
||||
url: REALM_URL,
|
||||
fullSynchronization: true
|
||||
}
|
||||
});
|
||||
let realm = new Realm(admin1Config);
|
||||
realm.syncSession.uploadAllLocalChanges(1).then(() => {
|
||||
reject("Upload did not time out");
|
||||
}).catch(e => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue