diff --git a/embark-ui/src/actions/index.js b/embark-ui/src/actions/index.js index 02c1d74d..7cc92cd2 100644 --- a/embark-ui/src/actions/index.js +++ b/embark-ui/src/actions/index.js @@ -97,6 +97,27 @@ export const contractProfile = { failure: (error) => action(CONTRACT_PROFILE[FAILURE], {error}) }; +export const MESSAGE_VERSION = createRequestTypes('MESSAGE_VERSION'); +export const messageVersion = { + request: () => action(MESSAGE_VERSION[REQUEST]), + success: (messageVersion) => action(MESSAGE_VERSION[SUCCESS], {messageVersion}), + failure: (error) => action(MESSAGE_VERSION[FAILURE], {error}) +}; + +export const MESSAGE_SEND = createRequestTypes('MESSAGE_SEND'); +export const messageSend = { + request: (body) => action(MESSAGE_SEND[REQUEST], {body}), + success: () => action(MESSAGE_SEND[SUCCESS]), + failure: (error) => action(MESSAGE_SEND[FAILURE], {error}) +}; + +export const MESSAGE_LISTEN = createRequestTypes('MESSAGE_LISTEN'); +export const messageListen = { + request: (messageChannel) => action(MESSAGE_LISTEN[REQUEST], {messageChannels: [messageChannel]}), + success: (messages) => action(MESSAGE_LISTEN[SUCCESS], {messages}), + failure: (error) => action(MESSAGE_LISTEN[FAILURE], {error}) +}; + // Web Socket export const WATCH_NEW_PROCESS_LOGS = 'WATCH_NEW_PROCESS_LOGS'; export const INIT_BLOCK_HEADER = 'INIT_BLOCK_HEADER'; diff --git a/embark-ui/src/api/index.js b/embark-ui/src/api/index.js index d30b7b3b..48dbbda1 100644 --- a/embark-ui/src/api/index.js +++ b/embark-ui/src/api/index.js @@ -1,7 +1,6 @@ import axios from "axios"; import constants from '../constants'; - function get(path, params) { return axios.get(constants.httpEndpoint + path, params) .then((response) => { @@ -65,6 +64,18 @@ export function fetchContract(payload) { return get(`/contract/${payload.contractName}`); } +export function communicationVersion() { + return get(`/communication/version`); +} + +export function sendMessage(payload) { + return post(`/communication/sendMessage`, payload.body); +} + +export function listenToChannel(channel) { + return new WebSocket(`${constants.wsEndpoint}/communication/listenTo/${channel}`); +} + export function fetchContractProfile(payload) { return get(`/profiler/${payload.contractName}`); } diff --git a/embark-ui/src/components/Communication.js b/embark-ui/src/components/Communication.js new file mode 100644 index 00000000..3ecc897f --- /dev/null +++ b/embark-ui/src/components/Communication.js @@ -0,0 +1,107 @@ +import PropTypes from "prop-types"; +import React, {Component} from 'react'; +import {Button, Form, Card, Grid, List} from 'tabler-react'; + +class Communication extends Component { + constructor(props) { + super(props); + + this.state = { + listenTo: '', + channel: '', + message: '', + messageList: [] + }; + } + + handleChange(e, name) { + this.setState({ + [name]: e.target.value + }); + } + + sendMessage(e) { + e.preventDefault(); + this.props.sendMessage(this.state.channel, this.state.message); + } + + listenToChannel(e) { + e.preventDefault(); + this.props.listenToMessages(this.state.listenTo); + } + + render() { + return ( + +

Listen To channel

+ + + this.handleChange(e, 'listenTo')}/> + + + + + {this.props.subscriptions && this.props.subscriptions.length > 0 && +
+

Subscribed channels:

+ + {this.props.subscriptions.map((item, i) => {item})} + +
+ } + + {this.props.channels && Boolean(Object.keys(this.props.channels).length) && + +

Messages received:

+ + + {Object.keys(this.props.channels).map((channelName, i) => { + return ( + + + {this.props.channels[channelName].map((data, f) => { + return

{data.message}

; + })} +
+
+
); + })} +
+
+ } + +

Send Message

+ + + + this.handleChange(e, 'channel')}/> + + + this.handleChange(e, 'message')}/> + + + + +
+ ); + } +} + +Communication.propTypes = { + sendMessage: PropTypes.func, + listenToMessages: PropTypes.func, + subscriptions: PropTypes.array, + channels: PropTypes.object +}; + +export default Communication; + diff --git a/embark-ui/src/components/ExplorerLayout.js b/embark-ui/src/components/ExplorerLayout.js index bb7299c7..e98f7586 100644 --- a/embark-ui/src/components/ExplorerLayout.js +++ b/embark-ui/src/components/ExplorerLayout.js @@ -10,9 +10,12 @@ import AccountsContainer from '../containers/AccountsContainer'; import AccountContainer from '../containers/AccountContainer'; import BlocksContainer from '../containers/BlocksContainer'; import BlockContainer from '../containers/BlockContainer'; +import CommunicationContainer from '../containers/CommunicationContainer'; import TransactionsContainer from '../containers/TransactionsContainer'; import TransactionContainer from '../containers/TransactionContainer'; +const className = "d-flex align-items-center"; + const ExplorerLayout = () => ( @@ -20,7 +23,7 @@ const ExplorerLayout = () => (
( Accounts ( Blocks Transactions + + Communication +
@@ -52,6 +63,7 @@ const ExplorerLayout = () => ( + diff --git a/embark-ui/src/containers/CommunicationContainer.js b/embark-ui/src/containers/CommunicationContainer.js new file mode 100644 index 00000000..389c23dc --- /dev/null +++ b/embark-ui/src/containers/CommunicationContainer.js @@ -0,0 +1,74 @@ +import PropTypes from "prop-types"; +import React, {Component} from 'react'; +import connect from "react-redux/es/connect/connect"; +import {Alert, Loader, Page} from 'tabler-react'; +import {messageSend, messageListen, messageVersion} from "../actions"; +import Communication from "../components/Communication"; +import Loading from "../components/Loading"; +import {getMessageVersion, getMessages, getMessageChannels} from "../reducers/selectors"; + +class CommunicationContainer extends Component { + componentDidMount() { + this.props.communicationVersion(); + } + + sendMessage(topic, message) { + this.props.messageSend({topic, message}); + } + + listenToChannel(channel) { + this.props.messageListen(channel); + } + + render() { + let isEnabledMessage = ''; + if (this.props.messageVersion === undefined || this.props.messageVersion === null) { + isEnabledMessage = + Checking Whisper support, please wait; + } else if (!this.props.messageVersion) { + isEnabledMessage = The node you are using does not support Whisper; + } else if (this.props.messageVersion === -1) { + isEnabledMessage = The node uses an unsupported version of Whisper; + } + + if (!this.props.messages) { + return ; + } + return ( + + {isEnabledMessage} + this.listenToChannel(channel)} + sendMessage={(channel, message) => this.sendMessage(channel, message)} + channels={this.props.messages} + subscriptions={this.props.messageChannels}/> + + ); + } +} + +CommunicationContainer.propTypes = { + messageSend: PropTypes.func, + messageListen: PropTypes.func, + communicationVersion: PropTypes.func, + messageVersion: PropTypes.number, + messages: PropTypes.object, + messageChannels: PropTypes.array +}; + +function mapStateToProps(state) { + return { + messages: getMessages(state), + messageChannels: getMessageChannels(state), + messageVersion: getMessageVersion(state) + }; +} + +export default connect( + mapStateToProps, + { + messageSend: messageSend.request, + messageListen: messageListen.request, + communicationVersion: messageVersion.request + } +)(CommunicationContainer); + diff --git a/embark-ui/src/reducers/index.js b/embark-ui/src/reducers/index.js index 1caea355..e613daaa 100644 --- a/embark-ui/src/reducers/index.js +++ b/embark-ui/src/reducers/index.js @@ -11,7 +11,10 @@ const entitiesDefaultState = { processLogs: [], contracts: [], contractProfiles: [], - commands: [] + commands: [], + messages: [], + messageChannels: [], + messageVersion: null }; const sorter = { @@ -23,6 +26,9 @@ const sorter = { }, processLogs: function(a, b) { return a.timestamp - b.timestamp; + }, + messages: function(a, b) { + return a.time - b.time; } }; @@ -50,7 +56,10 @@ function entities(state = entitiesDefaultState, action) { for (let name of Object.keys(state)) { let filter = filtrer[name] || (() => true); let sort = sorter[name] || (() => true); - if (action[name] && action[name].length > 1) { + if (action[name] && !Array.isArray(action[name])) { + return {...state, [name]: action[name]}; + } + if (action[name] && (!Array.isArray(action[name]) || action[name].length > 1)) { return {...state, [name]: [...action[name], ...state[name]].filter(filter).sort(sort)}; } if (action[name] && action[name].length === 1) { diff --git a/embark-ui/src/reducers/selectors.js b/embark-ui/src/reducers/selectors.js index 81cd9f9b..bac61e4b 100644 --- a/embark-ui/src/reducers/selectors.js +++ b/embark-ui/src/reducers/selectors.js @@ -57,3 +57,22 @@ export function getContract(state, contractName) { export function getContractProfile(state, contractName) { return state.entities.contractProfiles.find((contractProfile => contractProfile.name === contractName)); } + +export function getMessageVersion(state) { + return state.entities.messageVersion; +} + +export function getMessageChannels(state) { + return state.entities.messageChannels; +} + +export function getMessages(state) { + const messages = {}; + state.entities.messages.forEach(message => { + if (!messages[message.channel]) { + messages[message.channel] = [] + } + messages[message.channel].push(message); + }); + return messages; +} diff --git a/embark-ui/src/sagas/index.js b/embark-ui/src/sagas/index.js index 70a4f145..6929c0c5 100644 --- a/embark-ui/src/sagas/index.js +++ b/embark-ui/src/sagas/index.js @@ -4,7 +4,7 @@ import {eventChannel} from 'redux-saga'; import {all, call, fork, put, takeEvery, take} from 'redux-saga/effects'; const {account, accounts, block, blocks, transaction, transactions, processes, commands, processLogs, - contracts, contract, contractProfile} = actions; + contracts, contract, contractProfile, messageSend, messageVersion, messageListen} = actions; function *doRequest(entity, apiFn, payload) { const {response, error} = yield call(apiFn, payload); @@ -114,6 +114,31 @@ export function *watchListenToProcessLogs() { yield takeEvery(actions.WATCH_NEW_PROCESS_LOGS, listenToProcessLogs); } +export const sendMessage = doRequest.bind(null, messageSend, api.sendMessage); + +export function *watchSendMessage() { + yield takeEvery(actions.MESSAGE_SEND[actions.REQUEST], sendMessage); +} + +export function *listenToMessages(action) { + const socket = api.listenToChannel(action.messageChannels[0]); + const channel = yield call(createChannel, socket); + while (true) { + const message = yield take(channel); + yield put(messageListen.success([{channel: action.messageChannels[0], message: message.data, time: message.time}])); + } +} + +export function *watchListenToMessages() { + yield takeEvery(actions.MESSAGE_LISTEN[actions.REQUEST], listenToMessages); +} + +export const fetchCommunicationVersion = doRequest.bind(null, messageVersion, api.communicationVersion); + +export function *watchCommunicationVersion() { + yield takeEvery(actions.MESSAGE_VERSION[actions.REQUEST], fetchCommunicationVersion); +} + export default function *root() { yield all([ fork(watchInitBlockHeader), @@ -124,14 +149,15 @@ export default function *root() { fork(watchListenToProcessLogs), fork(watchFetchBlock), fork(watchFetchTransactions), - fork(watchFetchTransaction), fork(watchPostCommand), + fork(watchCommunicationVersion), fork(watchFetchBlocks), fork(watchFetchContracts), + fork(watchListenToMessages), + fork(watchSendMessage), fork(watchFetchContract), fork(watchFetchTransaction), - fork(watchFetchContractProfile), - fork(watchFetchTransactions) + fork(watchFetchContractProfile) ]); } diff --git a/lib/modules/whisper/index.js b/lib/modules/whisper/index.js index e6c53c28..ad63d051 100644 --- a/lib/modules/whisper/index.js +++ b/lib/modules/whisper/index.js @@ -1,6 +1,9 @@ let utils = require('../../utils/utils.js'); let fs = require('../../core/fs.js'); let Web3 = require('web3'); +const {parallel} = require('async'); +const {sendMessage, listenTo} = require('./js/communicationFunctions'); +const messageEvents = require('./js/message_events'); const {canonicalHost, defaultHost} = require('../../utils/host'); @@ -12,6 +15,7 @@ class Whisper { this.communicationConfig = embark.config.communicationConfig; this.web3 = new Web3(); this.embark = embark; + this.web3Ready = false; if (!this.communicationConfig.enabled) { return; @@ -21,6 +25,9 @@ class Whisper { this.setServiceCheck(); this.addWhisperToEmbarkJS(); this.addSetProvider(); + this.waitForWeb3Ready(() => { + this.registerAPICalls(); + }); } connectToProvider() { @@ -29,13 +36,25 @@ class Whisper { this.web3.setProvider(new Web3.providers.WebsocketProvider(web3Endpoint, {headers: {Origin: "embark"}})); } + waitForWeb3Ready(cb) { + if (this.web3Ready) { + return cb(); + } + if (this.web3.currentProvider.connection.readyState !== 1) { + return setTimeout(this.waitForWeb3Ready.bind(this, cb), 50); + } + this.web3Ready = true; + cb(); + } + setServiceCheck() { const self = this; - self.events.request("services:register", 'Whisper', function (cb) { + self.events.request("services:register", 'Whisper', function(cb) { if (!self.web3.currentProvider || self.web3.currentProvider.connection.readyState !== 1) { return self.connectToProvider(); } - self.web3.shh.getVersion(function (err, version) { + self.web3.shh.getVersion(function(err, version) { + self.version = version; if (err || version === "2") { return cb({name: 'Whisper', status: 'off'}); } else { @@ -61,9 +80,11 @@ class Whisper { code += "\n" + fs.readFileSync(utils.joinPath(__dirname, 'js', 'message_events.js')).toString(); if (web3Version[0] === "0") { + self.isOldWeb3 = true; code += "\n" + fs.readFileSync(utils.joinPath(__dirname, 'js', 'embarkjs_old_web3.js')).toString(); code += "\nEmbarkJS.Messages.registerProvider('whisper', __embarkWhisperOld);"; } else { + code += "\n" + fs.readFileSync(utils.joinPath(__dirname, 'js', 'communicationFunctions.js')).toString(); code += "\n" + fs.readFileSync(utils.joinPath(__dirname, 'js', 'embarkjs.js')).toString(); code += "\nEmbarkJS.Messages.registerProvider('whisper', __embarkWhisperNewWeb3);"; } @@ -73,7 +94,7 @@ class Whisper { addSetProvider() { let connection = this.communicationConfig.connection || {}; - + // todo: make the add code a function as well let config = JSON.stringify({ server: canonicalHost(connection.host || defaultHost), @@ -90,6 +111,83 @@ class Whisper { this.embark.addProviderInit('communication', code, shouldInit); } + registerAPICalls() { + const self = this; + if (self.apiCallsRegistered) { + return; + } + self.apiCallsRegistered = true; + let symKeyID, sig; + parallel([ + function(paraCb) { + self.web3.shh.newSymKey((err, id) => { + symKeyID = id; + paraCb(err); + }); + }, + function(paraCb) { + self.web3.shh.newKeyPair((err, id) => { + sig = id; + paraCb(err); + }); + } + ], (err) => { + if (err) { + self.logger.error('Error getting Whisper keys:', err.message || err); + return; + } + self.embark.registerAPICall( + 'post', + '/embark-api/communication/sendMessage', + (req, res) => { + sendMessage({ + topic: req.body.topic, + data: req.body.message, + sig, + symKeyID, + fromAscii: self.web3.utils.asciiToHex, + toHex: self.web3.utils.toHex, + post: self.web3.shh.post + }, (err, result) => { + if (err) { + return res.status(500).send({error: err}); + } + res.send(result); + }); + }); + + self.embark.registerAPICall( + 'ws', + '/embark-api/communication/listenTo/:topic', + (ws, req) => { + self.webSocketsChannels[req.params.topic] = listenTo({ + topic: req.params.topic, + messageEvents, + toHex: self.web3.utils.toHex, + toAscii: self.web3.utils.hexToAscii, + sig, + symKeyID, + subscribe: self.web3.shh.subscribe + }, (err, result) => { + if (ws.readyState === ws.CLOSED) { + return; + } + if (err) { + return ws.status(500).send(JSON.stringify({error: err})); + } + ws.send(JSON.stringify(result)); + }); + }); + + self.embark.registerAPICall( + 'get', + '/embark-api/communication/version', + (req, res) => { + res.send(self.isOldWeb3 ? -1 : self.version || 0); + } + ); + }); + } } module.exports = Whisper; diff --git a/lib/modules/whisper/js/communicationFunctions.js b/lib/modules/whisper/js/communicationFunctions.js new file mode 100644 index 00000000..c647bbed --- /dev/null +++ b/lib/modules/whisper/js/communicationFunctions.js @@ -0,0 +1,105 @@ +function sendMessage(options, callback) { + let topics, ttl, payload; + topics = options.topic; + const data = options.data; + ttl = options.ttl || 100; + const powTime = options.powTime || 3; + const powTarget = options.powTarget || 0.5; + const sig = options.sig; + const fromAscii = options.fromAscii; + const toHex = options.toHex; + const symKeyID = options.symKeyID; + const post = options.post; + + if (topics) { + topics = toHex(topics).slice(0, 10); + } + + payload = JSON.stringify(data); + + let message = { + sig: sig, // signs the message using the keyPair ID + ttl: ttl, + payload: fromAscii(payload), + powTime: powTime, + powTarget: powTarget + }; + + if (topics) { + message.topic = topics; + } + + if (options.pubKey) { + message.pubKey = options.pubKey; // encrypt using a given pubKey + } else if(options.symKeyID) { + message.symKeyID = options.symKeyID; // encrypts using given sym key ID + } else { + message.symKeyID = symKeyID; // encrypts using the sym key ID + } + + if (topics === undefined && message.symKeyID && !message.pubKey) { + return callback("missing option: topic"); + } + + post(message, callback); +} + +function listenTo(options, callback) { + let topics = options.topic; + const messageEvents = options.messageEvents; + const toHex = options.toHex; + const toAscii = options.toAscii; + const sig = options.sig; + const symKeyID = options.symKeyID; + const subscribe = options.subscribe; + + let promise = new messageEvents(); + + let subOptions = {}; + + if(topics){ + if (typeof topics === 'string') { + topics = [toHex(topics).slice(0, 10)]; + } else { + topics = topics.map((t) => toHex(t).slice(0, 10)); + } + subOptions.topics = topics; + } + + if (options.minPow) { + subOptions.minPow = options.minPow; + } + + if (options.usePrivateKey === true) { + subOptions.privateKeyID = options.privateKeyID || sig; + } else { + subOptions.symKeyID = symKeyID; + } + + promise.filter = subscribe("messages", subOptions) + .on('data', function (result) { + var payload = JSON.parse(toAscii(result.payload)); + var data; + data = { + topic: toAscii(result.topic), + data: payload, + //from: result.from, + time: result.timestamp + }; + + if (callback) { + return callback(null, data); + } + promise.cb(payload, data, result); + }) + .catch(callback); + + return promise; +} + +if (typeof module !== 'undefined' && module.exports) { + module.exports = { + sendMessage, + listenTo + }; +} diff --git a/lib/modules/whisper/js/embarkjs.js b/lib/modules/whisper/js/embarkjs.js index ea72b9d0..b9a57df7 100644 --- a/lib/modules/whisper/js/embarkjs.js +++ b/lib/modules/whisper/js/embarkjs.js @@ -1,4 +1,4 @@ -/*global EmbarkJS, Web3, __MessageEvents */ +/*global EmbarkJS, Web3, __MessageEvents, sendMessage, listenTo*/ // for the whisper v5 and web3.js 1.0 let __embarkWhisperNewWeb3 = {}; @@ -30,106 +30,37 @@ __embarkWhisperNewWeb3.setProvider = function (options) { }); }; -__embarkWhisperNewWeb3.sendMessage = function (options) { - var topics, data, ttl, payload; - topics = options.topic; - data = options.data || options.payload; - ttl = options.ttl || 100; - var powTime = options.powTime || 3; - var powTarget = options.powTarget || 0.5; - - if (data === undefined) { +__embarkWhisperNewWeb3.sendMessage = function(options) { + const data = options.data || options.payload; + if (!data) { throw new Error("missing option: data"); } + Object.assign(options, { + sig: this.sig, + fromAscii: EmbarkJS.Utils.fromAscii, + toHex: this.web3.utils.toHex, + symKeyID: options.symKeyID || this.symKeyID, + post: this.web3.shh.post, + data + }); - if (topics) { - topics = this.web3.utils.toHex(topics).slice(0, 10); - } - - payload = JSON.stringify(data); - - let message = { - sig: this.sig, // signs the message using the keyPair ID - ttl: ttl, - payload: EmbarkJS.Utils.fromAscii(payload), - powTime: powTime, - powTarget: powTarget - }; - - if (topics) { - message.topic = topics; - } - - if (options.pubKey) { - message.pubKey = options.pubKey; // encrypt using a given pubKey - } else if(options.symKeyID) { - message.symKeyID = options.symKeyID; // encrypts using given sym key ID - } else { - message.symKeyID = this.symKeyID; // encrypts using the sym key ID - } - - if (topics === undefined && message.symKeyID && !message.pubKey) { - throw new Error("missing option: topic"); - } - - this.web3.shh.post(message, function () { + sendMessage(options, (err) => { + if (err) { + throw new Error(err); + } }); }; __embarkWhisperNewWeb3.listenTo = function (options, callback) { - var topics = options.topic; - - let promise = new __MessageEvents(); - - let subOptions = {}; - - if(topics){ - if (typeof topics === 'string') { - topics = [this.web3.utils.toHex(topics).slice(0, 10)]; - } else { - topics = topics.map((t) => this.web3.utils.toHex(t).slice(0, 10)); - } - subOptions.topics = topics; - } - - if (options.minPow) { - subOptions.minPow = options.minPow; - } - - if (options.usePrivateKey === true) { - if (options.privateKeyID) { - subOptions.privateKeyID = options.privateKeyID; - } else { - subOptions.privateKeyID = this.sig; - } - } else { - if (options.symKeyID) { - subOptions.symKeyID = options.symKeyID; - } else { - subOptions.symKeyID = this.symKeyID; - } - } - - let filter = this.web3.shh.subscribe("messages", subOptions) - .on('data', function (result) { - var payload = JSON.parse(EmbarkJS.Utils.toAscii(result.payload)); - var data; - data = { - topic: EmbarkJS.Utils.toAscii(result.topic), - data: payload, - //from: result.from, - time: result.timestamp - }; - - if (callback) { - return callback(null, data); - } - promise.cb(payload, data, result); + Object.assign(options, { + sig: this.sig, + toAscii: EmbarkJS.Utils.toAscii, + toHex: this.web3.utils.toHex, + symKeyID: options.symKeyID || this.symKeyID, + messageEvents: __MessageEvents, + subscribe: this.web3.shh.subscribe }); - - promise.filter = filter; - - return promise; + listenTo(options, callback); }; __embarkWhisperNewWeb3.getWhisperVersion = function (cb) { diff --git a/lib/modules/whisper/js/message_events.js b/lib/modules/whisper/js/message_events.js index 302d62be..8f11cd3b 100644 --- a/lib/modules/whisper/js/message_events.js +++ b/lib/modules/whisper/js/message_events.js @@ -15,3 +15,6 @@ __MessageEvents.prototype.stop = function() { this.filter.stopWatching(); }; +if (typeof module !== 'undefined' && module.exports) { + module.exports = __MessageEvents; +}