feat(@embark/storage): Add command service ipfs on/off

Add support for ability to start and stop IPFS via command `service ipfs on/off`.

Add support for `ProcessState.Errored` process state in the `ProcessManager`. This allows for processes being launched and stopped to trigger an error and the resulting process will be in an errored state. Processes in errored states can still attempt to be started and stopped.

`service ipfs on` - starts an IPFS node if not already started. Shows an error if the node is already starting or started.

`service ipfs off` - kills the running IPFS node as long as Embark has started the IPFS process. If the IPFS process was started externally, an error is shown.

return early
This commit is contained in:
emizzle 2019-05-02 19:52:13 +10:00 committed by Iuri Matias
parent dcecad5501
commit 1e4e6e4758
7 changed files with 93 additions and 25 deletions

View File

@ -64,7 +64,8 @@
"storage": { "storage": {
"init": "init", "init": "init",
"initiated": "initiated", "initiated": "initiated",
"restart": "restart" "restart": "restart",
"exit": "storageExit"
}, },
"codeGenerator": { "codeGenerator": {
"gasLimit": 6000000 "gasLimit": 6000000

View File

@ -278,14 +278,14 @@ class Engine {
if (!this.config.storageConfig.available_providers.includes("ipfs")) { if (!this.config.storageConfig.available_providers.includes("ipfs")) {
return next(); return next();
} }
this.events.on("ipfs:process:started", next); this.events.once("ipfs:process:started", next);
this.registerModule('ipfs'); this.registerModule('ipfs');
}, },
(next) => { (next) => {
if (!this.config.storageConfig.available_providers.includes("swarm")) { if (!this.config.storageConfig.available_providers.includes("swarm")) {
return next(); return next();
} }
this.events.on("swarm:process:started", next); this.events.once("swarm:process:started", next);
this.registerModule('swarm'); this.registerModule('swarm');
} }
], (err) => { ], (err) => {

View File

@ -2,7 +2,8 @@ const ProcessState = {
Stopped: 'stopped', Stopped: 'stopped',
Starting: 'starting', Starting: 'starting',
Running: 'running', Running: 'running',
Stopping: 'stopping' Stopping: 'stopping',
Errored: 'errored'
}; };
class ProcessManager { class ProcessManager {
@ -111,14 +112,17 @@ class ProcessManager {
self.events.setCommandHandler('processes:launch', (name, cb) => { self.events.setCommandHandler('processes:launch', (name, cb) => {
cb = cb || function noop() {}; cb = cb || function noop() {};
let process = self.processes[name]; let process = self.processes[name];
if (process.state !== ProcessState.Stopped) { if (![ProcessState.Stopped, ProcessState.Errored].includes(process.state)) {
return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`)); return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`));
} }
process.state = ProcessState.Starting; process.state = ProcessState.Starting;
if(!process.afterLaunchFn) process.afterLaunchFn = cb; if(!process.afterLaunchFn) process.afterLaunchFn = cb;
process.cb.apply(process.cb, [ process.cb.apply(process.cb, [
(...args) => { (...args) => {
process.state = ProcessState.Running; if(args[0]) {
process.state = ProcessState.Errored;
}
else process.state = ProcessState.Running;
cb.apply(cb, args); cb.apply(cb, args);
} }
]); ]);
@ -127,13 +131,16 @@ class ProcessManager {
self.events.setCommandHandler('processes:stop', (name, cb) => { self.events.setCommandHandler('processes:stop', (name, cb) => {
let process = self.processes[name]; let process = self.processes[name];
cb = cb || function noop() {}; cb = cb || function noop() {};
if (process.state !== ProcessState.Running) { if (![ProcessState.Running, ProcessState.Errored].includes(process.state)) {
return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`)); return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`));
} }
process.state = ProcessState.Stopping; process.state = ProcessState.Stopping;
process.stopFn.apply(process.stopFn, [ process.stopFn.apply(process.stopFn, [
(...args) => { (...args) => {
process.state = ProcessState.Stopped; if(args[0]) {
process.state = ProcessState.Errored;
}
else process.state = ProcessState.Stopped;
cb.apply(cb, args); cb.apply(cb, args);
} }
]); ]);

View File

@ -71,7 +71,7 @@ class BlockchainProcessLauncher {
stopBlockchainNode(cb) { stopBlockchainNode(cb) {
if(this.blockchainProcess) { if(this.blockchainProcess) {
this.events.on(constants.blockchain.blockchainExit, cb); this.events.once(constants.blockchain.blockchainExit, cb);
this.blockchainProcess.exitCallback = () => {}; // don't show error message as the process was killed on purpose this.blockchainProcess.exitCallback = () => {}; // don't show error message as the process was killed on purpose
this.blockchainProcess.send('exit'); this.blockchainProcess.send('exit');
} }

View File

@ -16,6 +16,11 @@ class IPFS {
this.namesystemConfig = embark.config.namesystemConfig; this.namesystemConfig = embark.config.namesystemConfig;
this.embark = embark; this.embark = embark;
this.fs = embark.fs; this.fs = embark.fs;
this.isServiceRegistered = false;
this.addedToEmbarkJs = false;
this.addedToConsole = false;
this.storageProcessesLauncher = null;
this.usingRunningNode = false;
this.webServerConfig = embark.config.webServerConfig; this.webServerConfig = embark.config.webServerConfig;
this.blockchainConfig = embark.config.blockchainConfig; this.blockchainConfig = embark.config.blockchainConfig;
@ -28,10 +33,32 @@ class IPFS {
this.registerUploadCommand(); this.registerUploadCommand();
this.listenToCommands(); this.listenToCommands();
this.registerConsoleCommands(); this.registerConsoleCommands();
this.startProcess((err, newProcessStarted) => { this.events.request("processes:register", "ipfs", {
this.addStorageProviderToEmbarkJS(); launchFn: (cb) => {
this.addObjectToConsole(); if(this.usingRunningNode) {
this.events.emit("ipfs:process:started", err, newProcessStarted); return cb(__("IPFS process is running in a separate process and cannot be started by Embark."));
}
this.startProcess((err, newProcessStarted) => {
this.addStorageProviderToEmbarkJS();
this.addObjectToConsole();
this.events.emit("ipfs:process:started", err, newProcessStarted);
cb();
});
},
stopFn: (cb) => {
if(this.usingRunningNode) {
return cb(__("IPFS process is running in a separate process and cannot be stopped by Embark."));
}
this.stopProcess(cb);
}
});
this.events.request("processes:launch", "ipfs", (err, msg) => {
if (err) {
return this.logger.error(err);
}
if (msg) {
this.logger.info(msg);
}
}); });
} }
@ -50,14 +77,16 @@ class IPFS {
} }
setServiceCheck() { setServiceCheck() {
if (this.isServiceRegistered) return;
this.isServiceRegistered = true;
let self = this; let self = this;
self.events.on('check:backOnline:IPFS', function () { self.events.on('check:backOnline:IPFS', function () {
self.logger.info(__('IPFS node detected') + '..'); self.logger.info(__('IPFS node detected') + '...');
}); });
self.events.on('check:wentOffline:IPFS', function () { self.events.on('check:wentOffline:IPFS', function () {
self.logger.info(__('IPFS node is offline') + '..'); self.logger.info(__('IPFS node is offline') + '...');
}); });
self.events.request("services:register", 'IPFS', function (cb) { self.events.request("services:register", 'IPFS', function (cb) {
@ -98,6 +127,8 @@ class IPFS {
} }
addStorageProviderToEmbarkJS() { addStorageProviderToEmbarkJS() {
if(this.addedToEmbarkJs) return;
this.addedToEmbarkJs = true;
this.events.request('version:downloadIfNeeded', 'ipfs-api', (err, location) => { this.events.request('version:downloadIfNeeded', 'ipfs-api', (err, location) => {
if (err) { if (err) {
this.logger.error(__('Error downloading IPFS API')); this.logger.error(__('Error downloading IPFS API'));
@ -124,6 +155,9 @@ class IPFS {
} }
addObjectToConsole() { addObjectToConsole() {
if(this.addedToConsole) return;
this.addedToConsole = true;
const {host, port} = this._getNodeUrlConfig(); const {host, port} = this._getNodeUrlConfig();
let ipfs = IpfsApi(host, port); let ipfs = IpfsApi(host, port);
this.events.emit("runcode:register", "ipfs", ipfs); this.events.emit("runcode:register", "ipfs", ipfs);
@ -132,26 +166,33 @@ class IPFS {
startProcess(callback) { startProcess(callback) {
this._checkService((err) => { this._checkService((err) => {
if (!err) { if (!err) {
this.usingRunningNode = true;
this.logger.info("IPFS node found, using currently running node"); this.logger.info("IPFS node found, using currently running node");
return callback(null, false); return callback(null, false);
} }
this.logger.info("IPFS node not found, attempting to start own node"); this.logger.info("IPFS node not found, attempting to start own node");
let self = this; let self = this;
const storageProcessesLauncher = new StorageProcessesLauncher({ if(this.storageProcessesLauncher === null) {
logger: self.logger, this.storageProcessesLauncher = new StorageProcessesLauncher({
events: self.events, logger: self.logger,
storageConfig: self.storageConfig, events: self.events,
webServerConfig: self.webServerConfig, storageConfig: self.storageConfig,
blockchainConfig: self.blockchainConfig, webServerConfig: self.webServerConfig,
corsParts: self.embark.config.corsParts, blockchainConfig: self.blockchainConfig,
embark: self.embark corsParts: self.embark.config.corsParts,
}); embark: self.embark
});
}
self.logger.trace(`Storage module: Launching ipfs process...`); self.logger.trace(`Storage module: Launching ipfs process...`);
return storageProcessesLauncher.launchProcess('ipfs', (err) => { return this.storageProcessesLauncher.launchProcess('ipfs', (err) => {
callback(err, true); callback(err, true);
}); });
}); });
} }
stopProcess(cb) {
if(!this.storageProcessesLauncher) return cb();
this.storageProcessesLauncher.stopProcess("ipfs", cb);
}
registerUploadCommand() { registerUploadCommand() {
const self = this; const self = this;

View File

@ -184,6 +184,7 @@ class IPFSProcess extends ProcessWrapper {
kill() { kill() {
if (this.child) { if (this.child) {
this.child.kill(); this.child.kill();
ipfsProcess.send({result: constants.storage.exit});
} }
} }
} }

View File

@ -21,6 +21,7 @@ class StorageProcessesLauncher {
this.processes = {}; this.processes = {};
this.corsParts = options.corsParts || []; this.corsParts = options.corsParts || [];
this.restartCalled = false; this.restartCalled = false;
this.manualExit = false;
this.cors = this.buildCors(); this.cors = this.buildCors();
@ -81,6 +82,10 @@ class StorageProcessesLauncher {
} }
processExited(storageName, code) { processExited(storageName, code) {
if (this.manualExit) {
this.manualExit = false;
return;
}
if(this.restartCalled){ if(this.restartCalled){
this.restartCalled = false; this.restartCalled = false;
return this.launchProcess(storageName, () => {}); return this.launchProcess(storageName, () => {});
@ -156,6 +161,12 @@ class StorageProcessesLauncher {
delete this.processes[storageName]; delete this.processes[storageName];
}); });
self.processes[storageName].on('result', constants.storage.exit, (_msg) => {
self.processes[storageName].kill();
delete this.processes[storageName];
this.events.emit(constants.storage.exit);
});
self.events.on('logs:swarm:enable', () => { self.events.on('logs:swarm:enable', () => {
self.processes[storageName].silent = false; self.processes[storageName].silent = false;
}); });
@ -166,6 +177,13 @@ class StorageProcessesLauncher {
}); });
} }
stopProcess(storageName, cb) {
if(this.processes[storageName]) {
this.manualExit = true;
this.events.once(constants.storage.exit, cb);
this.processes[storageName].send('exit');
}
}
} }
module.exports = StorageProcessesLauncher; module.exports = StorageProcessesLauncher;