Initial support for upload completion and upload progress callbacks

This commit is contained in:
blagoev 2017-07-14 01:03:20 +03:00
parent fb3979110d
commit c98d48e04e
2 changed files with 201 additions and 0 deletions

View File

@ -173,6 +173,7 @@ public:
static void write(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void add_listener(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void wait_for_download_completion(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void wait_for_upload_completion(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void remove_listener(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void remove_all_listeners(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
static void close(ContextType, FunctionType, ObjectType, size_t, const ValueType[], ReturnValue &);
@ -205,6 +206,7 @@ public:
{"clearTestState", wrap<clear_test_state>},
{"copyBundledRealmFiles", wrap<copy_bundled_realm_files>},
{"_waitForDownload", wrap<wait_for_download_completion>},
{"_waitForUpload", wrap<wait_for_upload_completion>},
};
PropertyMap<T> const static_properties = {
@ -643,6 +645,113 @@ void RealmClass<T>::wait_for_download_completion(ContextType ctx, FunctionType,
Function<T>::callback(ctx, callback_function, this_object, 1, callback_arguments);
}
template<typename T>
void RealmClass<T>::wait_for_upload_completion(ContextType ctx, FunctionType, ObjectType this_object, size_t argc, const ValueType arguments[], ReturnValue &return_value) {
validate_argument_count(argc, 2);
auto config_object = Value::validated_to_object(ctx, arguments[0]);
auto callback_function = Value::validated_to_function(ctx, arguments[1]);
#if REALM_ENABLE_SYNC
ValueType sync_config_value = Object::get_property(ctx, config_object, "sync");
if (!Value::is_undefined(ctx, sync_config_value)) {
realm::Realm::Config config;
config.cache = false;
static const String encryption_key_string = "encryptionKey";
ValueType encryption_key_value = Object::get_property(ctx, config_object, encryption_key_string);
if (!Value::is_undefined(ctx, encryption_key_value)) {
auto encryption_key = Value::validated_to_binary(ctx, encryption_key_value, "encryptionKey");
config.encryption_key.assign(encryption_key.data(), encryption_key.data() + encryption_key.size());
}
Protected<ObjectType> thiz(ctx, this_object);
SyncClass<T>::populate_sync_config(ctx, thiz, config_object, config);
Protected<FunctionType> protected_callback(ctx, callback_function);
Protected<ObjectType> protected_this(ctx, this_object);
Protected<typename T::GlobalContext> protected_ctx(Context<T>::get_global_context(ctx));
EventLoopDispatcher<WaitHandler> wait_handler([=](std::error_code error_code) {
HANDLESCOPE
if (!error_code) {
//success
Function<T>::callback(protected_ctx, protected_callback, protected_this, 0, nullptr);
}
else {
//fail
ObjectType object = Object::create_empty(protected_ctx);
Object::set_property(protected_ctx, object, "message", Value::from_string(protected_ctx, error_code.message()));
Object::set_property(protected_ctx, object, "errorCode", Value::from_number(protected_ctx, error_code.value()));
ValueType callback_arguments[1];
callback_arguments[0] = object;
Function<T>::callback(protected_ctx, protected_callback, protected_this, 1, callback_arguments);
}
});
std::function<WaitHandler> waitFunc = std::move(wait_handler);
std::function<ProgressHandler> progressFunc;
auto realm = realm::Realm::get_shared_realm(config);
if (auto sync_config = config.sync_config)
{
static const String progressFuncName = "_onUploadProgress";
bool progressFuncDefined = false;
if (!Value::is_boolean(ctx, sync_config_value) && !Value::is_undefined(ctx, sync_config_value))
{
auto sync_config_object = Value::validated_to_object(ctx, sync_config_value);
ValueType progressFuncValue = Object::get_property(ctx, sync_config_object, progressFuncName);
progressFuncDefined = !Value::is_undefined(ctx, progressFuncValue);
if (progressFuncDefined)
{
Protected<FunctionType> protected_progressCallback(protected_ctx, Value::validated_to_function(protected_ctx, progressFuncValue));
EventLoopDispatcher<ProgressHandler> progress_handler([=](uint64_t transferred_bytes, uint64_t transferrable_bytes) {
HANDLESCOPE
ValueType callback_arguments[2];
callback_arguments[0] = Value::from_number(protected_ctx, transferred_bytes);
callback_arguments[1] = Value::from_number(protected_ctx, transferrable_bytes);
Function<T>::callback(protected_ctx, protected_progressCallback, protected_this, 2, callback_arguments);
});
progressFunc = std::move(progress_handler);
}
}
std::shared_ptr<SyncUser> user = sync_config->user;
if (user && user->state() != SyncUser::State::Error) {
if (auto session = user->session_for_on_disk_path(config.path)) {
if (progressFuncDefined) {
session->register_progress_notifier(std::move(progressFunc), SyncSession::NotifierType::upload, false);
}
session->wait_for_upload_completion([=](std::error_code error_code) {
realm->close(); //capture and keep realm instance for until here
waitFunc(error_code);
});
return;
}
}
else {
ObjectType object = Object::create_empty(protected_ctx);
Object::set_property(protected_ctx, object, "message", Value::from_string(protected_ctx, "Cannot wait for upload completion, because the associated session previously experienced a fatal error."));
Object::set_property(protected_ctx, object, "errorCode", Value::from_number(protected_ctx, 1));
ValueType callback_arguments[1];
callback_arguments[0] = object;
Function<T>::callback(protected_ctx, protected_callback, protected_this, 1, callback_arguments);
return;
}
}
}
#endif
ValueType callback_arguments[1];
callback_arguments[0] = Value::from_null(ctx);
Function<T>::callback(ctx, callback_function, this_object, 1, callback_arguments);
}
template<typename T>
void RealmClass<T>::objects(ContextType ctx, FunctionType, ObjectType this_object, size_t argc, const ValueType arguments[], ReturnValue &return_value) {
validate_argument_count(argc, 1);

View File

@ -427,6 +427,98 @@ module.exports = {
});
},
testWaitForUploadNotification() {
if (!isNodeProccess) {
return Promise.resolve();
}
const realmName = uuid();
return promisifiedRegister('http://localhost:9080', uuid(), 'password')
.then(user => {
let config = {
sync: { user, url: `realm://localhost:9080/~/${realmName}` },
schema: [{ name: 'Dog', properties: { name: 'string' } }],
};
return Realm.open(config)
.then(realm => {
return new Promise((resolve, reject) => {
realm.write(() => {
for (let i = 1; i <= 30; i++) {
realm.create('Dog', { name: `Lassy ${i}` });
}
});
Realm._waitForUpload(config, (error) => {
if (error) {
reject(error);
}
//FIXME: RN hangs here. Remove when node's makeCallback alternative is implemented
setTimeout(() => {
resolve();
}, 1);
});
});
});
});
},
testUploadProgress() {
if (!isNodeProccess) {
return Promise.resolve();
}
const realmName = uuid();
return promisifiedRegister('http://localhost:9080', uuid(), 'password')
.then(user => {
let progressNotificationCalled = false;
let config = {
sync: {
user,
url: `realm://localhost:9080/~/${realmName}`,
_onUploadProgress: (transferred, total) => {
progressNotificationCalled = true
},
},
schema: [{ name: 'Dog', properties: { name: 'string' } }],
};
return Realm.open(config)
.then(realm => {
return new Promise((resolve, reject) => {
realm.write(() => {
for (let i = 1; i <= 3; i++) {
realm.create('Dog', { name: `Lassy ${i}` });
}
});
Realm._waitForUpload(config, (error) => {
if (error) {
reject(error);
}
//FIXME: RN hangs here. Remove when node's makeCallback alternative is implemented
try {
TestCase.assertTrue(progressNotificationCalled, "Upload progress callback not called");
}
catch(ex) {
reject(ex);
}
resolve();
});
});
});
});
},
testErrorHandling() {
return promisifiedRegister('http://localhost:9080', uuid(), 'password').then(user => {
return new Promise((resolve, _reject) => {