118: Clean up WakuMessage API r=D4nte a=D4nte



121: Always trigger store query at connection r=D4nte a=D4nte

If no new messages are received, the rendering does not change
as dupe messages are filtered out.

122: Use provided API r=D4nte a=D4nte



Co-authored-by: Franck Royer <franck@status.im>
This commit is contained in:
bors[bot] 2021-05-04 00:26:12 +00:00 committed by GitHub
commit 3ae782d3b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 72 additions and 78 deletions

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package waku.v2; package waku.v2;
message WakuMessageProto { message WakuMessage {
optional bytes payload = 1; optional bytes payload = 1;
optional string content_topic = 2; optional string content_topic = 2;
optional uint32 version = 3; optional uint32 version = 3;

View File

@ -32,7 +32,7 @@ message HistoryQuery {
} }
message HistoryResponse { message HistoryResponse {
repeated WakuMessageProto messages = 1; repeated WakuMessage messages = 1;
PagingInfo paging_info = 2; PagingInfo paging_info = 2;
} }

View File

@ -102,7 +102,7 @@ export default class Waku {
await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); await this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
} }
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { addPeerToAddressBook(peerId: PeerId, multiaddr: Multiaddr[]) {
this.libp2p.peerStore.addressBook.set(peerId, multiaddr); this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
} }

View File

@ -1,3 +1,4 @@
import { expect } from 'chai';
import fc from 'fast-check'; import fc from 'fast-check';
import { WakuMessage } from './waku_message'; import { WakuMessage } from './waku_message';
@ -7,10 +8,10 @@ describe('Waku Message', function () {
fc.assert( fc.assert(
fc.property(fc.string(), (s) => { fc.property(fc.string(), (s) => {
const msg = WakuMessage.fromUtf8String(s); const msg = WakuMessage.fromUtf8String(s);
const binary = msg.toBinary(); const binary = msg.encode();
const actual = WakuMessage.decode(binary); const actual = WakuMessage.decode(binary);
return actual.isEqualTo(msg); expect(actual).to.deep.equal(msg);
}) })
); );
}); });
@ -19,7 +20,7 @@ describe('Waku Message', function () {
fc.assert( fc.assert(
fc.property(fc.string(), (s) => { fc.property(fc.string(), (s) => {
const msg = WakuMessage.fromUtf8String(s); const msg = WakuMessage.fromUtf8String(s);
const utf8 = msg.utf8Payload(); const utf8 = msg.payloadAsUtf8;
return utf8 === s; return utf8 === s;
}) })

View File

@ -2,31 +2,30 @@
import { Reader } from 'protobufjs/minimal'; import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities // Protecting the user from protobuf oddities
import { WakuMessageProto } from '../proto/waku/v2/message'; import * as proto from '../proto/waku/v2/message';
export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto'; export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
const DEFAULT_VERSION = 0; const DEFAULT_VERSION = 0;
export class WakuMessage { export class WakuMessage {
// TODO: Adopt similar design to HistoryRPC public constructor(public proto: proto.WakuMessage) {}
private constructor(
public payload?: Uint8Array,
public contentTopic?: string,
public version?: number
) {}
static fromProto(proto: WakuMessageProto) {
return new WakuMessage(proto.payload, proto.contentTopic, proto.version);
}
/** /**
* Create Message with a utf-8 string as payload * Create Message with a utf-8 string as payload
* @param payload * @param utf8
* @param contentTopic
* @returns {WakuMessage} * @returns {WakuMessage}
*/ */
static fromUtf8String(payload: string): WakuMessage { static fromUtf8String(
const buf = Buffer.from(payload, 'utf-8'); utf8: string,
return new WakuMessage(buf, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION); contentTopic: string = DEFAULT_CONTENT_TOPIC
): WakuMessage {
const payload = Buffer.from(utf8, 'utf-8');
return new WakuMessage({
payload,
version: DEFAULT_VERSION,
contentTopic,
});
} }
/** /**
@ -39,50 +38,43 @@ export class WakuMessage {
payload: Uint8Array, payload: Uint8Array,
contentTopic: string = DEFAULT_CONTENT_TOPIC contentTopic: string = DEFAULT_CONTENT_TOPIC
): WakuMessage { ): WakuMessage {
return new WakuMessage(payload, contentTopic, DEFAULT_VERSION); return new WakuMessage({
payload,
version: DEFAULT_VERSION,
contentTopic,
});
} }
static decode(bytes: Uint8Array): WakuMessage { static decode(bytes: Uint8Array): WakuMessage {
const wakuMsg = WakuMessageProto.decode(Reader.create(bytes)); const wakuMsg = proto.WakuMessage.decode(Reader.create(bytes));
return new WakuMessage( return new WakuMessage(wakuMsg);
wakuMsg.payload,
wakuMsg.contentTopic,
wakuMsg.version
);
} }
toBinary(): Uint8Array { encode(): Uint8Array {
return WakuMessageProto.encode({ return proto.WakuMessage.encode(this.proto).finish();
payload: this.payload,
version: this.version,
contentTopic: this.contentTopic,
}).finish();
} }
utf8Payload(): string { get payloadAsUtf8(): string {
if (!this.payload) { if (!this.proto.payload) {
return ''; return '';
} }
return Array.from(this.payload) return Array.from(this.proto.payload)
.map((char) => { .map((char) => {
return String.fromCharCode(char); return String.fromCharCode(char);
}) })
.join(''); .join('');
} }
// Purely for tests purposes. get payload(): Uint8Array | undefined {
// We do consider protobuf field when checking equality return this.proto.payload;
// As the content is held by the other fields. }
isEqualTo(other: WakuMessage) {
const payloadsAreEqual = get contentTopic(): string | undefined {
this.payload && other.payload return this.proto.contentTopic;
? Buffer.compare(this.payload, other.payload) === 0 }
: !(this.payload || other.payload);
return ( get version(): number | undefined {
payloadsAreEqual && return this.proto.version;
this.contentTopic === other.contentTopic &&
this.version === other.version
);
} }
} }

View File

@ -32,7 +32,10 @@ describe('Waku Relay', () => {
}), }),
]); ]);
await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); await waku1.addPeerToAddressBook(
waku2.libp2p.peerId,
waku2.libp2p.multiaddrs
);
await Promise.all([ await Promise.all([
new Promise((resolve) => new Promise((resolve) =>
@ -319,7 +322,7 @@ describe('Waku Relay', () => {
console.log('Waiting for message'); console.log('Waiting for message');
const waku2ReceivedMsg = await waku2ReceivedPromise; const waku2ReceivedMsg = await waku2ReceivedPromise;
expect(waku2ReceivedMsg.utf8Payload()).to.eq(msgStr); expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr);
}); });
}); });
}); });

View File

@ -90,7 +90,7 @@ export class WakuRelay extends Gossipsub {
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async send(message: WakuMessage) { async send(message: WakuMessage) {
const msg = message.toBinary(); const msg = message.encode();
await super.publish(constants.RelayDefaultTopic, Buffer.from(msg)); await super.publish(constants.RelayDefaultTopic, Buffer.from(msg));
} }

View File

@ -41,7 +41,7 @@ describe('Waku Store', () => {
expect(messages?.length).eq(2); expect(messages?.length).eq(2);
const result = messages?.findIndex((msg) => { const result = messages?.findIndex((msg) => {
return msg.utf8Payload() === 'Message 0'; return msg.payloadAsUtf8 === 'Message 0';
}); });
expect(result).to.not.eq(-1); expect(result).to.not.eq(-1);
}); });
@ -75,7 +75,7 @@ describe('Waku Store', () => {
for (let index = 0; index < 2; index++) { for (let index = 0; index < 2; index++) {
expect( expect(
messages?.findIndex((msg) => { messages?.findIndex((msg) => {
return msg.utf8Payload() === `Message ${index}`; return msg.payloadAsUtf8 === `Message ${index}`;
}) })
).to.eq(index); ).to.eq(index);
} }

View File

@ -65,7 +65,7 @@ export class WakuStore {
} }
response.messages.map((protoMsg) => { response.messages.map((protoMsg) => {
messages.push(WakuMessage.fromProto(protoMsg)); messages.push(new WakuMessage(protoMsg));
}); });
const responsePageSize = response.pagingInfo?.pageSize; const responsePageSize = response.pagingInfo?.pageSize;

View File

@ -4,18 +4,18 @@ import _m0 from 'protobufjs/minimal';
export const protobufPackage = 'waku.v2'; export const protobufPackage = 'waku.v2';
export interface WakuMessageProto { export interface WakuMessage {
payload?: Uint8Array | undefined; payload?: Uint8Array | undefined;
contentTopic?: string | undefined; contentTopic?: string | undefined;
version?: number | undefined; version?: number | undefined;
timestamp?: number | undefined; timestamp?: number | undefined;
} }
const baseWakuMessageProto: object = {}; const baseWakuMessage: object = {};
export const WakuMessageProto = { export const WakuMessage = {
encode( encode(
message: WakuMessageProto, message: WakuMessage,
writer: _m0.Writer = _m0.Writer.create() writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer { ): _m0.Writer {
if (message.payload !== undefined) { if (message.payload !== undefined) {
@ -33,10 +33,10 @@ export const WakuMessageProto = {
return writer; return writer;
}, },
decode(input: _m0.Reader | Uint8Array, length?: number): WakuMessageProto { decode(input: _m0.Reader | Uint8Array, length?: number): WakuMessage {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length; let end = length === undefined ? reader.len : reader.pos + length;
const message = { ...baseWakuMessageProto } as WakuMessageProto; const message = { ...baseWakuMessage } as WakuMessage;
while (reader.pos < end) { while (reader.pos < end) {
const tag = reader.uint32(); const tag = reader.uint32();
switch (tag >>> 3) { switch (tag >>> 3) {
@ -60,8 +60,8 @@ export const WakuMessageProto = {
return message; return message;
}, },
fromJSON(object: any): WakuMessageProto { fromJSON(object: any): WakuMessage {
const message = { ...baseWakuMessageProto } as WakuMessageProto; const message = { ...baseWakuMessage } as WakuMessage;
if (object.payload !== undefined && object.payload !== null) { if (object.payload !== undefined && object.payload !== null) {
message.payload = bytesFromBase64(object.payload); message.payload = bytesFromBase64(object.payload);
} }
@ -83,7 +83,7 @@ export const WakuMessageProto = {
return message; return message;
}, },
toJSON(message: WakuMessageProto): unknown { toJSON(message: WakuMessage): unknown {
const obj: any = {}; const obj: any = {};
message.payload !== undefined && message.payload !== undefined &&
(obj.payload = (obj.payload =
@ -97,8 +97,8 @@ export const WakuMessageProto = {
return obj; return obj;
}, },
fromPartial(object: DeepPartial<WakuMessageProto>): WakuMessageProto { fromPartial(object: DeepPartial<WakuMessage>): WakuMessage {
const message = { ...baseWakuMessageProto } as WakuMessageProto; const message = { ...baseWakuMessage } as WakuMessage;
if (object.payload !== undefined && object.payload !== null) { if (object.payload !== undefined && object.payload !== null) {
message.payload = object.payload; message.payload = object.payload;
} else { } else {

View File

@ -1,7 +1,7 @@
/* eslint-disable */ /* eslint-disable */
import Long from 'long'; import Long from 'long';
import _m0 from 'protobufjs/minimal'; import _m0 from 'protobufjs/minimal';
import { WakuMessageProto } from '../../waku/v2/message'; import { WakuMessage } from '../../waku/v2/message';
export const protobufPackage = 'waku.v2'; export const protobufPackage = 'waku.v2';
@ -65,7 +65,7 @@ export interface HistoryQuery {
} }
export interface HistoryResponse { export interface HistoryResponse {
messages: WakuMessageProto[]; messages: WakuMessage[];
pagingInfo: PagingInfo | undefined; pagingInfo: PagingInfo | undefined;
} }
@ -453,7 +453,7 @@ export const HistoryResponse = {
writer: _m0.Writer = _m0.Writer.create() writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer { ): _m0.Writer {
for (const v of message.messages) { for (const v of message.messages) {
WakuMessageProto.encode(v!, writer.uint32(10).fork()).ldelim(); WakuMessage.encode(v!, writer.uint32(10).fork()).ldelim();
} }
if (message.pagingInfo !== undefined) { if (message.pagingInfo !== undefined) {
PagingInfo.encode(message.pagingInfo, writer.uint32(18).fork()).ldelim(); PagingInfo.encode(message.pagingInfo, writer.uint32(18).fork()).ldelim();
@ -470,9 +470,7 @@ export const HistoryResponse = {
const tag = reader.uint32(); const tag = reader.uint32();
switch (tag >>> 3) { switch (tag >>> 3) {
case 1: case 1:
message.messages.push( message.messages.push(WakuMessage.decode(reader, reader.uint32()));
WakuMessageProto.decode(reader, reader.uint32())
);
break; break;
case 2: case 2:
message.pagingInfo = PagingInfo.decode(reader, reader.uint32()); message.pagingInfo = PagingInfo.decode(reader, reader.uint32());
@ -490,7 +488,7 @@ export const HistoryResponse = {
message.messages = []; message.messages = [];
if (object.messages !== undefined && object.messages !== null) { if (object.messages !== undefined && object.messages !== null) {
for (const e of object.messages) { for (const e of object.messages) {
message.messages.push(WakuMessageProto.fromJSON(e)); message.messages.push(WakuMessage.fromJSON(e));
} }
} }
if (object.pagingInfo !== undefined && object.pagingInfo !== null) { if (object.pagingInfo !== undefined && object.pagingInfo !== null) {
@ -505,7 +503,7 @@ export const HistoryResponse = {
const obj: any = {}; const obj: any = {};
if (message.messages) { if (message.messages) {
obj.messages = message.messages.map((e) => obj.messages = message.messages.map((e) =>
e ? WakuMessageProto.toJSON(e) : undefined e ? WakuMessage.toJSON(e) : undefined
); );
} else { } else {
obj.messages = []; obj.messages = [];
@ -522,7 +520,7 @@ export const HistoryResponse = {
message.messages = []; message.messages = [];
if (object.messages !== undefined && object.messages !== null) { if (object.messages !== undefined && object.messages !== null) {
for (const e of object.messages) { for (const e of object.messages) {
message.messages.push(WakuMessageProto.fromPartial(e)); message.messages.push(WakuMessage.fromPartial(e));
} }
} }
if (object.pagingInfo !== undefined && object.pagingInfo !== null) { if (object.pagingInfo !== undefined && object.pagingInfo !== null) {

View File

@ -86,7 +86,7 @@ export default function App() {
} else { } else {
stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleNewMessages); stateWaku.libp2p.pubsub.on(RelayDefaultTopic, handleNewMessages);
stateWaku.libp2p.peerStore.once( stateWaku.libp2p.peerStore.on(
'change:protocols', 'change:protocols',
handleProtocolChange.bind({}, stateWaku) handleProtocolChange.bind({}, stateWaku)
); );

View File

@ -42,7 +42,7 @@ function connect(peer: string | undefined, waku: Waku | undefined): string[] {
if (!peerId) { if (!peerId) {
return ['Peer Id needed to dial']; return ['Peer Id needed to dial'];
} }
waku.libp2p.peerStore.addressBook.add(PeerId.createFromB58String(peerId), [ waku.addPeerToAddressBook(PeerId.createFromB58String(peerId), [
peerMultiaddr, peerMultiaddr,
]); ]);
return [ return [