From 1e4e6e4758e002e30cbac5c37ab329a06d24706e Mon Sep 17 00:00:00 2001 From: emizzle Date: Thu, 2 May 2019 19:52:13 +1000 Subject: [PATCH] feat(@embark/storage): Add command `service ipfs on/off` Add support for ability to start and stop IPFS via command `service ipfs on/off`. Add support for `ProcessState.Errored` process state in the `ProcessManager`. This allows for processes being launched and stopped to trigger an error and the resulting process will be in an errored state. Processes in errored states can still attempt to be started and stopped. `service ipfs on` - starts an IPFS node if not already started. Shows an error if the node is already starting or started. `service ipfs off` - kills the running IPFS node as long as Embark has started the IPFS process. If the IPFS process was started externally, an error is shown. return early --- packages/embark-core/constants.json | 3 +- packages/embark/src/lib/core/engine.js | 4 +- .../src/lib/core/processes/processManager.js | 17 +++-- .../blockchainProcessLauncher.js | 2 +- packages/embark/src/lib/modules/ipfs/index.js | 73 +++++++++++++++---- .../embark/src/lib/modules/ipfs/process.js | 1 + .../storage/storageProcessesLauncher.js | 18 +++++ 7 files changed, 93 insertions(+), 25 deletions(-) diff --git a/packages/embark-core/constants.json b/packages/embark-core/constants.json index c054f39c8..0f66b00e7 100644 --- a/packages/embark-core/constants.json +++ b/packages/embark-core/constants.json @@ -64,7 +64,8 @@ "storage": { "init": "init", "initiated": "initiated", - "restart": "restart" + "restart": "restart", + "exit": "storageExit" }, "codeGenerator": { "gasLimit": 6000000 diff --git a/packages/embark/src/lib/core/engine.js b/packages/embark/src/lib/core/engine.js index 481aaf847..a9d94ccea 100644 --- a/packages/embark/src/lib/core/engine.js +++ b/packages/embark/src/lib/core/engine.js @@ -278,14 +278,14 @@ class Engine { if (!this.config.storageConfig.available_providers.includes("ipfs")) { return next(); } - this.events.on("ipfs:process:started", next); + this.events.once("ipfs:process:started", next); this.registerModule('ipfs'); }, (next) => { if (!this.config.storageConfig.available_providers.includes("swarm")) { return next(); } - this.events.on("swarm:process:started", next); + this.events.once("swarm:process:started", next); this.registerModule('swarm'); } ], (err) => { diff --git a/packages/embark/src/lib/core/processes/processManager.js b/packages/embark/src/lib/core/processes/processManager.js index 790b8fa5c..7186a29e4 100644 --- a/packages/embark/src/lib/core/processes/processManager.js +++ b/packages/embark/src/lib/core/processes/processManager.js @@ -2,7 +2,8 @@ const ProcessState = { Stopped: 'stopped', Starting: 'starting', Running: 'running', - Stopping: 'stopping' + Stopping: 'stopping', + Errored: 'errored' }; class ProcessManager { @@ -111,14 +112,17 @@ class ProcessManager { self.events.setCommandHandler('processes:launch', (name, cb) => { cb = cb || function noop() {}; let process = self.processes[name]; - if (process.state !== ProcessState.Stopped) { + if (![ProcessState.Stopped, ProcessState.Errored].includes(process.state)) { return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`)); } process.state = ProcessState.Starting; if(!process.afterLaunchFn) process.afterLaunchFn = cb; process.cb.apply(process.cb, [ (...args) => { - process.state = ProcessState.Running; + if(args[0]) { + process.state = ProcessState.Errored; + } + else process.state = ProcessState.Running; cb.apply(cb, args); } ]); @@ -127,13 +131,16 @@ class ProcessManager { self.events.setCommandHandler('processes:stop', (name, cb) => { let process = self.processes[name]; cb = cb || function noop() {}; - if (process.state !== ProcessState.Running) { + if (![ProcessState.Running, ProcessState.Errored].includes(process.state)) { return cb(__(`The ${name} process is already ${process.state.toLowerCase()}.`)); } process.state = ProcessState.Stopping; process.stopFn.apply(process.stopFn, [ (...args) => { - process.state = ProcessState.Stopped; + if(args[0]) { + process.state = ProcessState.Errored; + } + else process.state = ProcessState.Stopped; cb.apply(cb, args); } ]); diff --git a/packages/embark/src/lib/modules/blockchain_process/blockchainProcessLauncher.js b/packages/embark/src/lib/modules/blockchain_process/blockchainProcessLauncher.js index 15c87b22f..771fb74cb 100644 --- a/packages/embark/src/lib/modules/blockchain_process/blockchainProcessLauncher.js +++ b/packages/embark/src/lib/modules/blockchain_process/blockchainProcessLauncher.js @@ -71,7 +71,7 @@ class BlockchainProcessLauncher { stopBlockchainNode(cb) { if(this.blockchainProcess) { - this.events.on(constants.blockchain.blockchainExit, cb); + this.events.once(constants.blockchain.blockchainExit, cb); this.blockchainProcess.exitCallback = () => {}; // don't show error message as the process was killed on purpose this.blockchainProcess.send('exit'); } diff --git a/packages/embark/src/lib/modules/ipfs/index.js b/packages/embark/src/lib/modules/ipfs/index.js index e3ce408c1..07e169eda 100644 --- a/packages/embark/src/lib/modules/ipfs/index.js +++ b/packages/embark/src/lib/modules/ipfs/index.js @@ -16,6 +16,11 @@ class IPFS { this.namesystemConfig = embark.config.namesystemConfig; this.embark = embark; this.fs = embark.fs; + this.isServiceRegistered = false; + this.addedToEmbarkJs = false; + this.addedToConsole = false; + this.storageProcessesLauncher = null; + this.usingRunningNode = false; this.webServerConfig = embark.config.webServerConfig; this.blockchainConfig = embark.config.blockchainConfig; @@ -28,10 +33,32 @@ class IPFS { this.registerUploadCommand(); this.listenToCommands(); this.registerConsoleCommands(); - this.startProcess((err, newProcessStarted) => { - this.addStorageProviderToEmbarkJS(); - this.addObjectToConsole(); - this.events.emit("ipfs:process:started", err, newProcessStarted); + this.events.request("processes:register", "ipfs", { + launchFn: (cb) => { + if(this.usingRunningNode) { + return cb(__("IPFS process is running in a separate process and cannot be started by Embark.")); + } + this.startProcess((err, newProcessStarted) => { + this.addStorageProviderToEmbarkJS(); + this.addObjectToConsole(); + this.events.emit("ipfs:process:started", err, newProcessStarted); + cb(); + }); + }, + stopFn: (cb) => { + if(this.usingRunningNode) { + return cb(__("IPFS process is running in a separate process and cannot be stopped by Embark.")); + } + this.stopProcess(cb); + } + }); + this.events.request("processes:launch", "ipfs", (err, msg) => { + if (err) { + return this.logger.error(err); + } + if (msg) { + this.logger.info(msg); + } }); } @@ -50,14 +77,16 @@ class IPFS { } setServiceCheck() { + if (this.isServiceRegistered) return; + this.isServiceRegistered = true; let self = this; self.events.on('check:backOnline:IPFS', function () { - self.logger.info(__('IPFS node detected') + '..'); + self.logger.info(__('IPFS node detected') + '...'); }); self.events.on('check:wentOffline:IPFS', function () { - self.logger.info(__('IPFS node is offline') + '..'); + self.logger.info(__('IPFS node is offline') + '...'); }); self.events.request("services:register", 'IPFS', function (cb) { @@ -98,6 +127,8 @@ class IPFS { } addStorageProviderToEmbarkJS() { + if(this.addedToEmbarkJs) return; + this.addedToEmbarkJs = true; this.events.request('version:downloadIfNeeded', 'ipfs-api', (err, location) => { if (err) { this.logger.error(__('Error downloading IPFS API')); @@ -124,6 +155,9 @@ class IPFS { } addObjectToConsole() { + if(this.addedToConsole) return; + this.addedToConsole = true; + const {host, port} = this._getNodeUrlConfig(); let ipfs = IpfsApi(host, port); this.events.emit("runcode:register", "ipfs", ipfs); @@ -132,26 +166,33 @@ class IPFS { startProcess(callback) { this._checkService((err) => { if (!err) { + this.usingRunningNode = true; this.logger.info("IPFS node found, using currently running node"); return callback(null, false); } this.logger.info("IPFS node not found, attempting to start own node"); let self = this; - const storageProcessesLauncher = new StorageProcessesLauncher({ - logger: self.logger, - events: self.events, - storageConfig: self.storageConfig, - webServerConfig: self.webServerConfig, - blockchainConfig: self.blockchainConfig, - corsParts: self.embark.config.corsParts, - embark: self.embark - }); + if(this.storageProcessesLauncher === null) { + this.storageProcessesLauncher = new StorageProcessesLauncher({ + logger: self.logger, + events: self.events, + storageConfig: self.storageConfig, + webServerConfig: self.webServerConfig, + blockchainConfig: self.blockchainConfig, + corsParts: self.embark.config.corsParts, + embark: self.embark + }); + } self.logger.trace(`Storage module: Launching ipfs process...`); - return storageProcessesLauncher.launchProcess('ipfs', (err) => { + return this.storageProcessesLauncher.launchProcess('ipfs', (err) => { callback(err, true); }); }); } + stopProcess(cb) { + if(!this.storageProcessesLauncher) return cb(); + this.storageProcessesLauncher.stopProcess("ipfs", cb); + } registerUploadCommand() { const self = this; diff --git a/packages/embark/src/lib/modules/ipfs/process.js b/packages/embark/src/lib/modules/ipfs/process.js index ac3c8990f..36156eb71 100644 --- a/packages/embark/src/lib/modules/ipfs/process.js +++ b/packages/embark/src/lib/modules/ipfs/process.js @@ -184,6 +184,7 @@ class IPFSProcess extends ProcessWrapper { kill() { if (this.child) { this.child.kill(); + ipfsProcess.send({result: constants.storage.exit}); } } } diff --git a/packages/embark/src/lib/modules/storage/storageProcessesLauncher.js b/packages/embark/src/lib/modules/storage/storageProcessesLauncher.js index 8e39e06e7..3a75edb00 100644 --- a/packages/embark/src/lib/modules/storage/storageProcessesLauncher.js +++ b/packages/embark/src/lib/modules/storage/storageProcessesLauncher.js @@ -21,6 +21,7 @@ class StorageProcessesLauncher { this.processes = {}; this.corsParts = options.corsParts || []; this.restartCalled = false; + this.manualExit = false; this.cors = this.buildCors(); @@ -81,6 +82,10 @@ class StorageProcessesLauncher { } processExited(storageName, code) { + if (this.manualExit) { + this.manualExit = false; + return; + } if(this.restartCalled){ this.restartCalled = false; return this.launchProcess(storageName, () => {}); @@ -156,6 +161,12 @@ class StorageProcessesLauncher { delete this.processes[storageName]; }); + self.processes[storageName].on('result', constants.storage.exit, (_msg) => { + self.processes[storageName].kill(); + delete this.processes[storageName]; + this.events.emit(constants.storage.exit); + }); + self.events.on('logs:swarm:enable', () => { self.processes[storageName].silent = false; }); @@ -166,6 +177,13 @@ class StorageProcessesLauncher { }); } + stopProcess(storageName, cb) { + if(this.processes[storageName]) { + this.manualExit = true; + this.events.once(constants.storage.exit, cb); + this.processes[storageName].send('exit'); + } + } } module.exports = StorageProcessesLauncher;