Enable observers to filter by contentTopic

This commit is contained in:
Franck Royer 2021-05-10 14:54:08 +10:00
parent de3dcd9e22
commit ba6b67cc53
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 85 additions and 14 deletions

View File

@ -42,12 +42,15 @@ export default async function startChat(): Promise<void> {
console.log(`Hi, ${nick}!`); console.log(`Hi, ${nick}!`);
waku.relay.addObserver((message) => { waku.relay.addObserver(
(message) => {
if (message.payload) { if (message.payload) {
const chatMsg = ChatMessage.decode(message.payload); const chatMsg = ChatMessage.decode(message.payload);
console.log(formatMessage(chatMsg)); console.log(formatMessage(chatMsg));
} }
}); },
[ChatContentTopic]
);
if (opts.staticNode) { if (opts.staticNode) {
console.log(`Dialing ${opts.staticNode}`); console.log(`Dialing ${opts.staticNode}`);

View File

@ -97,7 +97,7 @@ export default function App() {
.then(() => console.log('Waku init done')) .then(() => console.log('Waku init done'))
.catch((e) => console.log('Waku init failed ', e)); .catch((e) => console.log('Waku init failed ', e));
} else { } else {
stateWaku.relay.addObserver(handleRelayMessage); stateWaku.relay.addObserver(handleRelayMessage, [ChatContentTopic]);
stateWaku.libp2p.peerStore.on( stateWaku.libp2p.peerStore.on(
'change:protocols', 'change:protocols',

View File

@ -87,6 +87,42 @@ describe('Waku Relay', () => {
expect(receivedMsg.payloadAsUtf8).to.eq(messageText); expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
}); });
it('Filter on content topics', async function () {
this.timeout(10000);
const fooMessageText = 'Published on content topic foo';
const barMessageText = 'Published on content topic bar';
const fooMessage = WakuMessage.fromUtf8String(fooMessageText, 'foo');
const barMessage = WakuMessage.fromUtf8String(barMessageText, 'bar');
const receivedBarMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve, ['bar']);
}
);
const allMessages: WakuMessage[] = [];
waku2.relay.addObserver((wakuMsg) => {
allMessages.push(wakuMsg);
});
await waku1.relay.send(fooMessage);
await waku1.relay.send(barMessage);
const receivedBarMsg = await receivedBarMsgPromise;
expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic);
expect(receivedBarMsg.version).to.eq(barMessage.version);
expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText);
expect(allMessages.length).to.eq(2);
expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic);
expect(allMessages[0].version).to.eq(fooMessage.version);
expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText);
expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic);
expect(allMessages[1].version).to.eq(barMessage.version);
expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText);
});
describe('Interop: Nim', function () { describe('Interop: Nim', function () {
describe('Nim connects to js', function () { describe('Nim connects to js', function () {
let waku: Waku; let waku: Waku;

View File

@ -56,7 +56,13 @@ interface GossipOptions {
*/ */
export class WakuRelay extends Gossipsub implements Pubsub { export class WakuRelay extends Gossipsub implements Pubsub {
heartbeat: RelayHeartbeat; heartbeat: RelayHeartbeat;
public observers: Array<(message: WakuMessage) => void>; /**
* observers called when receiving new message.
* Observers under key "" are always called.
*/
public observers: {
[contentTopic: string]: Array<(message: WakuMessage) => void>;
};
/** /**
* *
@ -73,7 +79,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
); );
this.heartbeat = new RelayHeartbeat(this); this.heartbeat = new RelayHeartbeat(this);
this.observers = []; this.observers = {};
const multicodecs = [constants.RelayCodec]; const multicodecs = [constants.RelayCodec];
@ -90,9 +96,18 @@ export class WakuRelay extends Gossipsub implements Pubsub {
public start(): void { public start(): void {
this.on(constants.RelayDefaultTopic, (event) => { this.on(constants.RelayDefaultTopic, (event) => {
const wakuMsg = WakuMessage.decode(event.data); const wakuMsg = WakuMessage.decode(event.data);
this.observers.forEach((callbackFn) => { if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg); callbackFn(wakuMsg);
}); });
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
}); });
super.start(); super.start();
@ -114,10 +129,27 @@ export class WakuRelay extends Gossipsub implements Pubsub {
* Register an observer of new messages received via waku relay * Register an observer of new messages received via waku relay
* *
* @param callback called when a new message is received via waku relay * @param callback called when a new message is received via waku relay
* @param contentTopics Content Topics for which the callback with be called,
* all of them if undefined, [] or ["",..] is passed.
* @returns {void} * @returns {void}
*/ */
addObserver(callback: (message: WakuMessage) => void): void { addObserver(
this.observers.push(callback); callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
if (!this.observers['']) {
this.observers[''] = [];
}
this.observers[''].push(callback);
} else {
contentTopics.forEach((contentTopic) => {
if (!this.observers[contentTopic]) {
this.observers[contentTopic] = [];
}
this.observers[contentTopic].push(callback);
});
}
} }
/** /**