conflict in saga and action

This commit is contained in:
Jonathan Rainville 2018-08-08 12:35:01 -04:00 committed by Pascal Precht
parent 9208778089
commit 11bf667ad4
No known key found for this signature in database
GPG Key ID: 0EE28D8D6FD85D7D
7 changed files with 180 additions and 28 deletions

View File

@ -76,6 +76,21 @@ export const processLogs = {
failure: (error) => action(PROCESS_LOGS[FAILURE], {error}) failure: (error) => action(PROCESS_LOGS[FAILURE], {error})
}; };
export const MESSAGE_LISTEN = createRequestTypes('MESSAGE_LISTEN');
export const messageListen = {
request: (channel) => action(MESSAGE_LISTEN[REQUEST], {channel}),
success: (message) => action(MESSAGE_LISTEN[SUCCESS], {message}),
failure: (error) => action(MESSAGE_LISTEN[FAILURE], {error})
};
export const MESSAGE_SEND = createRequestTypes('MESSAGE_SEND');
export const messageSend = {
request: (body) => action(MESSAGE_SEND[REQUEST], {body}),
success: () => action(MESSAGE_SEND[SUCCESS]),
failure: (error) => action(MESSAGE_SEND[FAILURE], {error})
};
export const CONTRACTS = createRequestTypes('CONTRACTS'); export const CONTRACTS = createRequestTypes('CONTRACTS');
export const contracts = { export const contracts = {
request: () => action(CONTRACTS[REQUEST]), request: () => action(CONTRACTS[REQUEST]),

View File

@ -1,7 +1,6 @@
import axios from "axios"; import axios from "axios";
import constants from '../constants'; import constants from '../constants';
function get(path, params) { function get(path, params) {
return axios.get(constants.httpEndpoint + path, params) return axios.get(constants.httpEndpoint + path, params)
.then((response) => { .then((response) => {
@ -65,6 +64,14 @@ export function fetchContract(payload) {
return get(`/contract/${payload.contractName}`); return get(`/contract/${payload.contractName}`);
} }
export function sendMessage(payload) {
return post(`/communication/sendMessage`, payload);
}
export function listenToChannel(channel) {
return new WebSocket(`${constants.wsEndpoint}/communication/listenTo/${channel}`);
}
export function fetchContractProfile(payload) { export function fetchContractProfile(payload) {
return get(`/profiler/${payload.contractName}`); return get(`/profiler/${payload.contractName}`);
} }

View File

@ -1,5 +1,8 @@
import PropTypes from "prop-types";
import React, {Component} from 'react'; import React, {Component} from 'react';
import connect from "react-redux/es/connect/connect";
import {Alert, Button, Form, Icon} from 'tabler-react'; import {Alert, Button, Form, Icon} from 'tabler-react';
import {messageSend, messageListen} from "../actions";
class CommunicationContainer extends Component { class CommunicationContainer extends Component {
constructor(props) { constructor(props) {
@ -23,8 +26,7 @@ class CommunicationContainer extends Component {
sendMessage(e) { sendMessage(e) {
e.preventDefault(); e.preventDefault();
// TODO send message via API this.props.messageSend({topic: this.state.channel, message: this.state.message});
console.log('Send', this.state.message);
this.addToLog("EmbarkJS.Messages.sendMessage({topic: '" + this.state.channel + "', data: '" + this.state.message + "'})"); this.addToLog("EmbarkJS.Messages.sendMessage({topic: '" + this.state.channel + "', data: '" + this.state.message + "'})");
} }
@ -37,20 +39,7 @@ class CommunicationContainer extends Component {
subscribedChannels subscribedChannels
}); });
console.log('Listen to', this.state.listenTo); this.props.messageListen(this.state.listenTo);
// TODO listen to channel via API
/*EmbarkJS.Messages.listenTo({topic: [this.state.listenTo]}, (error, message) => {
const messageList = this.state.messageList;
if (error) {
messageList.push(<span className="alert-danger">Error: {error}</span>);
} else {
messageList.push(<span>Channel: <b>{message.topic}</b> | Message: <b>{message.data}</b></span>);
}
this.setState({
messageList
});
});*/
this.addToLog("EmbarkJS.Messages.listenTo({topic: ['" + this.state.listenTo + "']}).then(function(message) {})"); this.addToLog("EmbarkJS.Messages.listenTo({topic: ['" + this.state.listenTo + "']}).then(function(message) {})");
} }
@ -85,16 +74,25 @@ class CommunicationContainer extends Component {
<div id="subscribeList"> <div id="subscribeList">
{this.state.subscribedChannels.map((item, i) => <p key={i}>{item}</p>)} {this.state.subscribedChannels.map((item, i) => <p key={i}>{item}</p>)}
</div> </div>
<p>Messages received:</p> {this.props.messages && this.props.messages.channels && this.props.messages.channels.length &&
<div id="messagesList"> <React.Fragment>
{this.state.messageList.map((item, i) => <p key={i}>{item}</p>)} <p>Messages received:</p>
</div> <div id="messagesList">
{Object.keys(this.props.messages.channels).map((channelName, i) => {
return (<React.Fragment key={'channel-' + i}>
<p><b>{channelName}</b></p>
{this.props.messages.channels[channelName].messages.map((message, f) => {
return <p key={`${message}-${i}-${f}`}>{message}</p>;
})}
</React.Fragment>);
})}
</div>
</React.Fragment>
}
</Form.FieldSet> </Form.FieldSet>
<h3>Send Message</h3> <h3>Send Message</h3>
<Form.FieldSet> <Form.FieldSet>
<Form.Group label="Whisper channel" isRequired> <Form.Group label="Whisper channel" isRequired>
<Form.Input name="text-input" <Form.Input name="text-input"
@ -124,4 +122,21 @@ class CommunicationContainer extends Component {
} }
} }
export default CommunicationContainer; CommunicationContainer.propTypes = {
messageSend: PropTypes.func,
messageListen: PropTypes.func,
messages: PropTypes.object
};
function mapStateToProps(state) {
return {messages: state.messages};
}
export default connect(
mapStateToProps,
{
messageSend: messageSend.request,
messageListen: messageListen.request
}
)(CommunicationContainer);

View File

@ -0,0 +1,22 @@
import * as actions from "../actions";
export default function messages(state = {channels: {}}, action) {
switch (action.type) {
case actions.MESSAGE_LISTEN[actions.SUCCESS]: {
const messages = state.channels[action.channel] ? state.channels[action.channel].messages : [];
messages.push(action.message.data);
return {
...state,
channels: {
...state.channels,
[action.channel]: {
...state.channels[action.channel],
messages: messages
}
}
};
}
default:
return state;
}
}

View File

@ -4,7 +4,7 @@ import {eventChannel} from 'redux-saga';
import {all, call, fork, put, takeEvery, take} from 'redux-saga/effects'; import {all, call, fork, put, takeEvery, take} from 'redux-saga/effects';
const {account, accounts, block, blocks, transaction, transactions, processes, commands, processLogs, const {account, accounts, block, blocks, transaction, transactions, processes, commands, processLogs,
contracts, contract, contractProfile} = actions; contracts, contract, contractProfile, messageSend} = actions;
function *doRequest(entity, apiFn, payload) { function *doRequest(entity, apiFn, payload) {
const {response, error} = yield call(apiFn, payload); const {response, error} = yield call(apiFn, payload);
@ -114,6 +114,25 @@ export function *watchListenToProcessLogs() {
yield takeEvery(actions.WATCH_NEW_PROCESS_LOGS, listenToProcessLogs); yield takeEvery(actions.WATCH_NEW_PROCESS_LOGS, listenToProcessLogs);
} }
export const sendMessage = doRequest.bind(null, messageSend, api.sendMessage);
export function *watchSendMessage() {
yield takeEvery(actions.MESSAGE_SEND[actions.REQUEST], sendMessage);
}
export function *listenToMessages(action) {
const socket = api.listenToChannel(action.channel);
const channel = yield call(createChannel, socket);
while (true) {
const message = yield take(channel);
yield put({type: actions.MESSAGE_LISTEN[actions.SUCCESS], channel: action.channel, message});
}
}
export function *watchListenToMessages() {
yield takeEvery(actions.MESSAGE_LISTEN[actions.REQUEST], listenToMessages);
}
export default function *root() { export default function *root() {
yield all([ yield all([
fork(watchInitBlockHeader), fork(watchInitBlockHeader),
@ -124,14 +143,14 @@ export default function *root() {
fork(watchListenToProcessLogs), fork(watchListenToProcessLogs),
fork(watchFetchBlock), fork(watchFetchBlock),
fork(watchFetchTransactions), fork(watchFetchTransactions),
fork(watchFetchTransaction),
fork(watchPostCommand), fork(watchPostCommand),
fork(watchFetchBlocks), fork(watchFetchBlocks),
fork(watchFetchContracts), fork(watchFetchContracts),
fork(watchListenToMessages),
fork(watchSendMessage),
fork(watchFetchContract), fork(watchFetchContract),
fork(watchFetchTransaction), fork(watchFetchTransaction),
fork(watchFetchContractProfile), fork(watchFetchContractProfile)
fork(watchFetchTransactions)
]); ]);
} }

View File

@ -1,6 +1,9 @@
let utils = require('../../utils/utils.js'); let utils = require('../../utils/utils.js');
let fs = require('../../core/fs.js'); let fs = require('../../core/fs.js');
let Web3 = require('web3'); let Web3 = require('web3');
const {parallel} = require('async');
const {sendMessage, listenTo} = require('./js/communicationFunctions');
const messageEvents = require('./js/message_events');
const {canonicalHost, defaultHost} = require('../../utils/host'); const {canonicalHost, defaultHost} = require('../../utils/host');
@ -113,6 +116,74 @@ class Whisper {
this.embark.addConsoleProviderInit('communication', consoleCode, shouldInit); this.embark.addConsoleProviderInit('communication', consoleCode, shouldInit);
} }
registerAPICalls() {
const self = this;
if (self.apiCallsRegistered) {
return;
}
self.apiCallsRegistered = true;
let symKeyID, sig;
parallel([
function(paraCb) {
self.web3.shh.newSymKey((err, id) => {
symKeyID = id;
paraCb(err);
});
},
function(paraCb) {
self.web3.shh.newKeyPair((err, id) => {
sig = id;
paraCb(err);
});
}
], (err) => {
if (err) {
self.logger.error('Error getting Whisper keys:', err.message || err);
return;
}
self.embark.registerAPICall(
'post',
'/embark-api/communication/sendMessage',
(req, res) => {
sendMessage({
topic: req.body.topic,
data: req.body.message,
sig,
symKeyID,
fromAscii: self.web3.utils.asciiToHex,
toHex: self.web3.utils.toHex,
post: self.web3.shh.post
}, (err, result) => {
if (err) {
return res.status(500).send({error: err});
}
res.send(result);
});
});
self.embark.registerAPICall(
'ws',
// FIXME channel name
'/embark-api/communication/listenTo/jorain',
(ws, _req) => {
console.log('Listen to', 'jorain');
listenTo({
topic: 'jorain',
messageEvents,
toHex: self.web3.utils.toHex,
toAscii: self.web3.utils.hexToAscii,
sig,
symKeyID,
subscribe: self.web3.shh.subscribe
}, (err, result) => {
if (err) {
return ws.status(500).send(JSON.stringify({error: err}));
}
ws.send(JSON.stringify(result));
});
});
});
}
} }
module.exports = Whisper; module.exports = Whisper;

View File

@ -14,3 +14,6 @@ __MessageEvents.prototype.stop = function() {
this.filter.stopWatching(); this.filter.stopWatching();
}; };
if (typeof module !== 'undefined' && module.exports) {
module.exports = __MessageEvents;
}