139: Provide API to process waku messages over relay r=D4nte a=D4nte

Resolves #54

Co-authored-by: Franck Royer <franck@status.im>
This commit is contained in:
bors[bot] 2021-05-10 05:44:39 +00:00 committed by GitHub
commit 330caa2525
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 189 additions and 87 deletions

View File

@ -6,7 +6,6 @@ import { multiaddr, Multiaddr } from 'multiaddr';
import { ChatMessage } from 'waku/chat_message';
import Waku from 'waku/waku';
import { WakuMessage } from 'waku/waku_message';
import { RelayDefaultTopic } from 'waku/waku_relay';
import { StoreCodec } from 'waku/waku_store';
const ChatContentTopic = 'dingpu';
@ -43,15 +42,15 @@ export default async function startChat(): Promise<void> {
console.log(`Hi, ${nick}!`);
// TODO: Bubble event to waku, infer topic, decode msg
// Tracked with https://github.com/status-im/js-waku/issues/19
waku.libp2p.pubsub.on(RelayDefaultTopic, (event) => {
const wakuMsg = WakuMessage.decode(event.data);
if (wakuMsg.payload) {
const chatMsg = ChatMessage.decode(wakuMsg.payload);
console.log(formatMessage(chatMsg));
}
});
waku.relay.addObserver(
(message) => {
if (message.payload) {
const chatMsg = ChatMessage.decode(message.payload);
console.log(formatMessage(chatMsg));
}
},
[ChatContentTopic]
);
if (opts.staticNode) {
console.log(`Dialing ${opts.staticNode}`);

View File

@ -5,7 +5,6 @@ import './App.css';
import { ChatMessage } from './ChatMessage';
import { ChatMessage as WakuChatMessage } from 'waku/chat_message';
import { WakuMessage } from 'waku/waku_message';
import { RelayDefaultTopic } from 'waku/waku_relay';
import { StoreCodec } from 'waku/waku_store';
import handleCommand from './command';
import Room from './Room';
@ -52,10 +51,14 @@ export default function App() {
let [nick, setNick] = useState<string>(generate());
useEffect(() => {
const handleRelayMessage = (event: { data: Uint8Array }) => {
const chatMsg = decodeWakuMessage(event.data);
if (chatMsg) {
setNewMessages([chatMsg]);
const handleRelayMessage = (wakuMsg: WakuMessage) => {
if (wakuMsg.payload) {
const chatMsg = ChatMessage.fromWakuChatMessage(
WakuChatMessage.decode(wakuMsg.payload)
);
if (chatMsg) {
setNewMessages([chatMsg]);
}
}
};
@ -94,7 +97,7 @@ export default function App() {
.then(() => console.log('Waku init done'))
.catch((e) => console.log('Waku init failed ', e));
} else {
stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleRelayMessage);
stateWaku.relay.addObserver(handleRelayMessage, [ChatContentTopic]);
stateWaku.libp2p.peerStore.on(
'change:protocols',
@ -103,10 +106,6 @@ export default function App() {
// To clean up listener when component unmounts
return () => {
stateWaku?.libp2p.pubsub.removeListener(
RelayDefaultTopic,
handleRelayMessage
);
stateWaku?.libp2p.peerStore.removeListener(
'change:protocols',
handleProtocolChange.bind({}, stateWaku)
@ -173,13 +172,3 @@ async function initWaku(setter: (waku: Waku) => void) {
console.log('Issue starting waku ', e);
}
}
function decodeWakuMessage(data: Uint8Array): null | ChatMessage {
const wakuMsg = WakuMessage.decode(data);
if (!wakuMsg.payload) {
return null;
}
return ChatMessage.fromWakuChatMessage(
WakuChatMessage.decode(wakuMsg.payload)
);
}

View File

@ -4,8 +4,8 @@ import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities
import * as proto from '../proto/waku/v2/message';
export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
const DEFAULT_VERSION = 0;
export const DefaultContentTopic = '/waku/2/default-content/proto';
const DefaultVersion = 0;
export class WakuMessage {
public constructor(public proto: proto.WakuMessage) {}
@ -18,12 +18,12 @@ export class WakuMessage {
*/
static fromUtf8String(
utf8: string,
contentTopic: string = DEFAULT_CONTENT_TOPIC
contentTopic: string = DefaultContentTopic
): WakuMessage {
const payload = Buffer.from(utf8, 'utf-8');
return new WakuMessage({
payload,
version: DEFAULT_VERSION,
version: DefaultVersion,
contentTopic,
});
}
@ -36,11 +36,11 @@ export class WakuMessage {
*/
static fromBytes(
payload: Uint8Array,
contentTopic: string = DEFAULT_CONTENT_TOPIC
contentTopic: string = DefaultContentTopic
): WakuMessage {
return new WakuMessage({
payload,
version: DEFAULT_VERSION,
version: DefaultVersion,
contentTopic,
});
}

View File

@ -1,5 +1,4 @@
import { expect } from 'chai';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import TCP from 'libp2p-tcp';
import {
@ -72,19 +71,56 @@ describe('Waku Relay', () => {
it('Publish', async function () {
this.timeout(10000);
const message = WakuMessage.fromUtf8String('JS to JS communication works');
const messageText = 'JS to JS communication works';
const message = WakuMessage.fromUtf8String(messageText);
const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
const receivedMsgPromise: Promise<WakuMessage> = new Promise((resolve) => {
waku2.relay.addObserver(resolve);
});
await waku1.relay.send(message);
const receivedMsg = await receivedPromise;
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
const payload = Buffer.from(receivedMsg.payload!);
expect(Buffer.compare(payload, message.payload!)).to.eq(0);
it('Filter on content topics', async function () {
this.timeout(10000);
const fooMessageText = 'Published on content topic foo';
const barMessageText = 'Published on content topic bar';
const fooMessage = WakuMessage.fromUtf8String(fooMessageText, 'foo');
const barMessage = WakuMessage.fromUtf8String(barMessageText, 'bar');
const receivedBarMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve, ['bar']);
}
);
const allMessages: WakuMessage[] = [];
waku2.relay.addObserver((wakuMsg) => {
allMessages.push(wakuMsg);
});
await waku1.relay.send(fooMessage);
await waku1.relay.send(barMessage);
const receivedBarMsg = await receivedBarMsgPromise;
expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic);
expect(receivedBarMsg.version).to.eq(barMessage.version);
expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText);
expect(allMessages.length).to.eq(2);
expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic);
expect(allMessages[0].version).to.eq(fooMessage.version);
expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText);
expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic);
expect(allMessages[1].version).to.eq(barMessage.version);
expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText);
});
describe('Interop: Nim', function () {
@ -126,7 +162,8 @@ describe('Waku Relay', () => {
it('Js publishes to nim', async function () {
this.timeout(5000);
const message = WakuMessage.fromUtf8String('This is a message');
const messageText = 'This is a message';
const message = WakuMessage.fromUtf8String(messageText);
await waku.relay.send(message);
@ -139,26 +176,27 @@ describe('Waku Relay', () => {
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
const payload = Buffer.from(msgs[0].payload!);
expect(Buffer.compare(payload, message.payload!)).to.equal(0);
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
});
it('Nim publishes to js', async function () {
this.timeout(5000);
const message = WakuMessage.fromUtf8String('Here is another message.');
const messageText = 'Here is another message.';
const message = WakuMessage.fromUtf8String(messageText);
const receivedPromise = waitForNextData(waku.libp2p.pubsub);
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
await nimWaku.sendMessage(message);
const receivedMsg = await receivedPromise;
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
const payload = Buffer.from(receivedMsg.payload!);
expect(Buffer.compare(payload, message.payload!)).to.eq(0);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
});
@ -209,7 +247,8 @@ describe('Waku Relay', () => {
it('Js publishes to nim', async function () {
this.timeout(30000);
const message = WakuMessage.fromUtf8String('This is a message');
const messageText = 'This is a message';
const message = WakuMessage.fromUtf8String(messageText);
await delay(1000);
await waku.relay.send(message);
@ -223,27 +262,28 @@ describe('Waku Relay', () => {
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
const payload = Buffer.from(msgs[0].payload!);
expect(Buffer.compare(payload, message.payload!)).to.equal(0);
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
});
it('Nim publishes to js', async function () {
await delay(200);
const message = WakuMessage.fromUtf8String('Here is another message.');
const messageText = 'Here is another message.';
const message = WakuMessage.fromUtf8String(messageText);
const receivedPromise = waitForNextData(waku.libp2p.pubsub);
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
await nimWaku.sendMessage(message);
const receivedMsg = await receivedPromise;
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
const payload = Buffer.from(receivedMsg.payload!);
expect(Buffer.compare(payload, message.payload!)).to.eq(0);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
});
@ -313,21 +353,18 @@ describe('Waku Relay', () => {
const msgStr = 'Hello there!';
const message = WakuMessage.fromUtf8String(msgStr);
const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub);
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
}
);
await waku1.relay.send(message);
console.log('Waiting for message');
const waku2ReceivedMsg = await waku2ReceivedPromise;
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr);
});
});
});
});
async function waitForNextData(pubsub: Pubsub): Promise<WakuMessage> {
const msg = (await new Promise((resolve) => {
pubsub.once(RelayDefaultTopic, resolve);
})) as Pubsub.InMessage;
return WakuMessage.decode(msg.data);
}

View File

@ -12,7 +12,7 @@ import {
messageIdToString,
shuffle,
} from 'libp2p-gossipsub/src/utils';
import { InMessage } from 'libp2p-interfaces/src/pubsub';
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import PeerId from 'peer-id';
@ -26,7 +26,7 @@ export * from './constants';
export * from './relay_heartbeat';
/**
* See GossipOptions from libp2p-gossipsub
* See {GossipOptions} from libp2p-gossipsub
*/
interface GossipOptions {
emitSelf: boolean;
@ -48,8 +48,21 @@ interface GossipOptions {
Dlazy: number;
}
export class WakuRelay extends Gossipsub {
/**
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
* Must be passed as a `pubsub` module to a {Libp2p} instance.
*
* @implements {Pubsub}
*/
export class WakuRelay extends Gossipsub implements Pubsub {
heartbeat: RelayHeartbeat;
/**
* observers called when receiving new message.
* Observers under key "" are always called.
*/
public observers: {
[contentTopic: string]: Array<(message: WakuMessage) => void>;
};
/**
*
@ -66,6 +79,7 @@ export class WakuRelay extends Gossipsub {
);
this.heartbeat = new RelayHeartbeat(this);
this.observers = {};
const multicodecs = [constants.RelayCodec];
@ -74,28 +88,76 @@ export class WakuRelay extends Gossipsub {
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to the default topic
* and subscribes to the default topic.
*
* @override
* @returns {void}
*/
start(): void {
public start(): void {
this.on(constants.RelayDefaultTopic, (event) => {
const wakuMsg = WakuMessage.decode(event.data);
if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
});
super.start();
super.subscribe(constants.RelayDefaultTopic);
}
/**
* Send Waku messages under default topic
* @override
* Send Waku message.
*
* @param {WakuMessage} message
* @returns {Promise<void>}
*/
async send(message: WakuMessage): Promise<void> {
public async send(message: WakuMessage): Promise<void> {
const msg = message.encode();
await super.publish(constants.RelayDefaultTopic, Buffer.from(msg));
}
/**
* Join topic
* Register an observer of new messages received via waku relay
*
* @param callback called when a new message is received via waku relay
* @param contentTopics Content Topics for which the callback with be called,
* all of them if undefined, [] or ["",..] is passed.
* @returns {void}
*/
addObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
if (!this.observers['']) {
this.observers[''] = [];
}
this.observers[''].push(callback);
} else {
contentTopics.forEach((contentTopic) => {
if (!this.observers[contentTopic]) {
this.observers[contentTopic] = [];
}
this.observers[contentTopic].push(callback);
});
}
}
/**
* Join pubsub topic.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @param {string} topic
* @returns {void}
* @override
@ -152,8 +214,11 @@ export class WakuRelay extends Gossipsub {
}
/**
* Publish messages
* Publish messages.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {InMessage} msg
* @returns {void}
@ -222,7 +287,13 @@ export class WakuRelay extends Gossipsub {
}
/**
* Emits gossip to peers in a particular topic
* Emits gossip to peers in a particular topic.
*
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {string} topic
* @param {Set<string>} exclude peers to exclude
* @returns {void}
@ -300,7 +371,12 @@ export class WakuRelay extends Gossipsub {
}
/**
* Make a PRUNE control message for a peer in a topic
* Make a PRUNE control message for a peer in a topic.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {string} id
* @param {string} topic
* @param {boolean} doPX

View File

@ -2,14 +2,14 @@ import { Reader } from 'protobufjs/minimal';
import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/store';
import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
import { DefaultContentTopic } from '../waku_message';
import { RelayDefaultTopic } from '../waku_relay';
export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {}
static createQuery(
contentTopics: string[] = [DEFAULT_CONTENT_TOPIC],
contentTopics: string[] = [DefaultContentTopic],
cursor?: proto.Index,
pubsubTopic: string = RelayDefaultTopic
): HistoryRPC {

View File

@ -9,6 +9,7 @@ import PeerId from 'peer-id';
import { WakuMessage } from '../lib/waku_message';
import { RelayDefaultTopic } from '../lib/waku_relay';
import * as proto from '../proto/waku/v2/message';
import { existsAsync, mkdirAsync, openAsync } from './async_fs';
import waitForLine from './log_file';
@ -167,9 +168,9 @@ export class NimWaku {
async messages(): Promise<WakuMessage[]> {
this.checkProcess();
return this.rpcCall<WakuMessage[]>('get_waku_v2_relay_v1_messages', [
return this.rpcCall<proto.WakuMessage[]>('get_waku_v2_relay_v1_messages', [
RelayDefaultTopic,
]);
]).then((msgs) => msgs.map((protoMsg) => new WakuMessage(protoMsg)));
}
async getPeerId(): Promise<PeerId> {