diff --git a/embark-ui/src/actions/index.js b/embark-ui/src/actions/index.js
index b540a998..322e5c58 100644
--- a/embark-ui/src/actions/index.js
+++ b/embark-ui/src/actions/index.js
@@ -9,6 +9,9 @@ export const RECEIVE_PROCESSES_ERROR = 'RECEIVE_PROCESSES_ERROR';
// Process logs
export const FETCH_PROCESS_LOGS = 'FETCH_PROCESS_LOGS';
export const RECEIVE_PROCESS_LOGS = 'RECEIVE_PROCESS_LOGS';
+export const WATCH_NEW_PROCESS_LOGS = 'WATCH_NEW_PROCESS_LOGS';
+export const IS_LISTENING_PROCESS_LOG = 'IS_LISTENING_PROCESS_LOG';
+export const RECEIVE_NEW_PROCESS_LOG = 'RECEIVE_NEW_PROCESS_LOG';
export const RECEIVE_PROCESS_LOGS_ERROR = 'RECEIVE_PROCESS_LOGS_ERROR';
// Blocks
export const FETCH_BLOCKS = 'FETCH_BLOCKS';
@@ -67,6 +70,13 @@ export function fetchProcessLogs(processName) {
};
}
+export function listenToProcessLogs(processName) {
+ return {
+ type: WATCH_NEW_PROCESS_LOGS,
+ processName
+ };
+}
+
export function receiveProcessLogs(processName, logs) {
return {
type: RECEIVE_PROCESS_LOGS,
diff --git a/embark-ui/src/api/index.js b/embark-ui/src/api/index.js
index ab46e38f..550eb7fe 100644
--- a/embark-ui/src/api/index.js
+++ b/embark-ui/src/api/index.js
@@ -21,6 +21,10 @@ export function fetchProcessLogs(processName) {
return axios.get(`${constants.httpEndpoint}/process-logs/${processName}`);
}
+export function webSocketProcess(processName) {
+ return new WebSocket(constants.wsEndpoint + '/process-logs/' + processName);
+}
+
export function webSocketBlockHeader() {
return new WebSocket(`${constants.wsEndpoint}/blockchain/blockHeader`);
}
diff --git a/embark-ui/src/components/Process.js b/embark-ui/src/components/Process.js
index 18000340..02bb6bce 100644
--- a/embark-ui/src/components/Process.js
+++ b/embark-ui/src/components/Process.js
@@ -1,66 +1,15 @@
import React, {Component} from 'react';
-import connect from "react-redux/es/connect/connect";
-import {fetchProcessLogs} from "../actions";
-import constants from '../constants';
import PropTypes from 'prop-types';
class Process extends Component {
- constructor(props) {
- super(props);
- this.state = {
- logs: []
- };
- this.gotOriginalLogs = false;
- }
-
- componentDidMount() {
- const self = this;
-
- this.props.fetchProcessLogs(self.props.processName);
-
- this.ws = new WebSocket(constants.wsEndpoint + '/process-logs/' + self.props.processName);
-
- this.ws.onmessage = function(evt) {
- const log = JSON.parse(evt.data);
- const logs = self.state.logs;
- logs.push(log);
- self.setState({
- logs
- });
- };
-
- this.ws.onclose = function() {
- console.log(self.props.processName + "Log process connection is closed");
- };
-
- window.onbeforeunload = function(_event) {
- this.ws.close();
- };
- }
-
- shouldComponentUpdate(nextProps, _nextState) {
- if (!this.gotOriginalLogs && nextProps.logs && nextProps.logs[this.props.processName]) {
- const logs = nextProps.logs[this.props.processName].concat(this.state.logs);
- this.gotOriginalLogs = true;
- this.setState({
- logs
- });
- }
- return true;
- }
-
- componentWillUnmount() {
- this.ws.close();
- this.ws = null;
- }
-
render() {
+ const logs = this.props.logs || [];
return (
State: {this.props.state}
{
- this.state.logs.map((item, i) =>
{item.msg_clear || item.msg}
)
+ logs.map((item, i) =>
{item.msg_clear || item.msg}
)
}
);
@@ -70,17 +19,7 @@ class Process extends Component {
Process.propTypes = {
processName: PropTypes.string.isRequired,
state: PropTypes.string.isRequired,
- fetchProcessLogs: PropTypes.func,
- logs: PropTypes.object
+ logs: PropTypes.array
};
-function mapStateToProps(state) {
- return {logs: state.processes.logs};
-}
-
-export default connect(
- mapStateToProps,
- {
- fetchProcessLogs
- }
-)(Process);
+export default Process;
diff --git a/embark-ui/src/containers/ProcessesContainer.js b/embark-ui/src/containers/ProcessesContainer.js
index c98bc026..681b3bce 100644
--- a/embark-ui/src/containers/ProcessesContainer.js
+++ b/embark-ui/src/containers/ProcessesContainer.js
@@ -2,8 +2,7 @@ import React, {Component} from 'react';
import {connect} from 'react-redux';
import {Tabs, Tab} from 'tabler-react';
import PropTypes from 'prop-types';
-
-import {fetchProcesses} from '../actions';
+import {fetchProcesses, fetchProcessLogs, listenToProcessLogs} from '../actions';
import Loading from '../components/Loading';
import "./css/processContainer.css";
@@ -14,6 +13,23 @@ class ProcessesContainer extends Component {
this.props.fetchProcesses();
}
+ shouldComponentUpdate(nextProps, _nextState) {
+ if (!this.islistening && nextProps.processes && nextProps.processes.data) {
+ this.islistening = true;
+ Object.keys(nextProps.processes.data).forEach(processName => {
+ this.props.fetchProcessLogs(processName);
+ // Only start watching if we are not already watching
+ if (!this.props.processes.data ||
+ !this.props.processes.data[processName] ||
+ !this.props.processes.data[processName].isListening
+ ) {
+ this.props.listenToProcessLogs(processName);
+ }
+ });
+ }
+ return true;
+ }
+
render() {
const {processes} = this.props;
if (!processes.data) {
@@ -30,7 +46,9 @@ class ProcessesContainer extends Component {
{processNames && processNames.length &&
{processNames.map(processName => {
return (
-
+
);
})}
}
@@ -42,7 +60,9 @@ class ProcessesContainer extends Component {
ProcessesContainer.propTypes = {
processes: PropTypes.object,
- fetchProcesses: PropTypes.func
+ fetchProcesses: PropTypes.func,
+ fetchProcessLogs: PropTypes.func,
+ listenToProcessLogs: PropTypes.func
};
function mapStateToProps(state) {
@@ -52,6 +72,8 @@ function mapStateToProps(state) {
export default connect(
mapStateToProps,
{
- fetchProcesses
+ fetchProcesses,
+ fetchProcessLogs,
+ listenToProcessLogs
}
)(ProcessesContainer);
diff --git a/embark-ui/src/reducers/processesReducer.js b/embark-ui/src/reducers/processesReducer.js
index a7ddae2e..397f8254 100644
--- a/embark-ui/src/reducers/processesReducer.js
+++ b/embark-ui/src/reducers/processesReducer.js
@@ -1,4 +1,11 @@
-import {RECEIVE_PROCESSES, RECEIVE_PROCESSES_ERROR, RECEIVE_PROCESS_LOGS, RECEIVE_PROCESS_LOGS_ERROR} from "../actions";
+import {
+ RECEIVE_PROCESSES,
+ RECEIVE_PROCESSES_ERROR,
+ RECEIVE_PROCESS_LOGS,
+ RECEIVE_PROCESS_LOGS_ERROR,
+ RECEIVE_NEW_PROCESS_LOG,
+ IS_LISTENING_PROCESS_LOG
+} from "../actions";
export default function processes(state = {}, action) {
switch (action.type) {
@@ -9,11 +16,40 @@ export default function processes(state = {}, action) {
case RECEIVE_PROCESS_LOGS:
return {
...state,
- logs: {
- ...state.logs,
- [action.processName]: action.logs.data
+ data: {
+ ...state.data,
+ [action.processName]: {
+ ...state.data[action.processName],
+ logs: action.logs.data
+ }
}
};
+ case RECEIVE_NEW_PROCESS_LOG: {
+ const logs = state.data[action.processName].logs || [];
+ logs.push(action.log);
+ return {
+ ...state,
+ data: {
+ ...state.data,
+ [action.processName]: {
+ ...state.data[action.processName],
+ logs: logs
+ }
+ }
+ };
+ }
+ case IS_LISTENING_PROCESS_LOG: {
+ return {
+ ...state,
+ data: {
+ ...state.data,
+ [action.processName]: {
+ ...state.data[action.processName],
+ isListening: true
+ }
+ }
+ };
+ }
case RECEIVE_PROCESS_LOGS_ERROR:
return Object.assign({}, state, {error: action.error});
default:
diff --git a/embark-ui/src/sagas/index.js b/embark-ui/src/sagas/index.js
index b1868d72..abea8747 100644
--- a/embark-ui/src/sagas/index.js
+++ b/embark-ui/src/sagas/index.js
@@ -93,12 +93,28 @@ export function *watchInitBlockHeader() {
yield takeEvery(actions.INIT_BLOCK_HEADER, initBlockHeader);
}
+export function *listenToProcessLogs(action) {
+ console.log('WATCH', action.processName);
+ yield put({type: actions.IS_LISTENING_PROCESS_LOG, processName: action.processName});
+ const socket = api.webSocketProcess(action.processName);
+ const channel = yield call(createChannel, socket);
+ while (true) {
+ const log = yield take(channel);
+ yield put({type: actions.RECEIVE_NEW_PROCESS_LOG, processName: action.processName, log});
+ }
+}
+
+export function *watchListenToProcessLogs() {
+ yield takeEvery(actions.WATCH_NEW_PROCESS_LOGS, listenToProcessLogs);
+}
+
export default function *root() {
yield all([
fork(watchInitBlockHeader),
fork(watchFetchAccounts),
fork(watchFetchProcesses),
fork(watchFetchProcessLogs),
+ fork(watchListenToProcessLogs),
fork(watchFetchBlocks),
fork(watchFetchTransactions)
]);