fix: fix pipe usage

This commit is contained in:
Franck Royer 2022-06-24 16:02:49 +10:00 committed by fryorcraken.eth
parent f3833564f2
commit f768686e51
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
6 changed files with 37 additions and 25 deletions

1
package-lock.json generated
View File

@ -26,6 +26,7 @@
"debug": "^4.3.4",
"dns-query": "^0.11.1",
"hi-base32": "^0.5.1",
"it-all": "^1.0.6",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"js-sha3": "^0.8.0",

View File

@ -83,6 +83,7 @@
"debug": "^4.3.4",
"dns-query": "^0.11.1",
"hi-base32": "^0.5.1",
"it-all": "^1.0.6",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"js-sha3": "^0.8.0",

View File

@ -22,14 +22,14 @@ describe("Waku Filter", () => {
beforeEach(async function () {
this.timeout(10000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
await nwaku.start({ filter: true, lightpush: true });
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
await waku.waitForRemotePeer([Protocols.Filter, Protocols.LightPush]);
});
it("creates a subscription", async function () {
@ -48,7 +48,7 @@ describe("Waku Filter", () => {
messageText,
TestContentTopic
);
await waku.relay.send(message);
await waku.lightPush.push(message);
while (messageCount === 0) {
await delay(250);
}
@ -64,10 +64,10 @@ describe("Waku Filter", () => {
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
@ -87,7 +87,7 @@ describe("Waku Filter", () => {
const unsubscribe = await waku.filter.subscribe(callback, [
TestContentTopic,
]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
@ -95,7 +95,7 @@ describe("Waku Filter", () => {
);
await delay(100);
await unsubscribe();
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic

View File

@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
@ -49,7 +50,9 @@ export class WakuFilter {
constructor(public libp2p: Libp2p) {
this.subscriptions = new Map();
this.decryptionKeys = new Map();
this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
this.libp2p
.handle(FilterCodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
/**
@ -84,7 +87,15 @@ export class WakuFilter {
const stream = await this.newStream(peer);
try {
await pipe([request.encode()], lp.encode(), stream);
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
log("response", res);
} catch (e) {
log(
"Error subscribing to peer ",
@ -109,14 +120,21 @@ export class WakuFilter {
private onRequest({ stream }: any): void {
log("Receiving message push");
try {
pipe(stream.source, lp.decode(), async (source) => {
pipe(stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
if (res.requestId && res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
});
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
log("Error decoding message", e);
}

View File

@ -1,5 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
@ -71,17 +72,12 @@ export class WakuLightPush {
? opts.pubSubTopic
: this.pubSubTopic;
const query = PushRPC.createRequest(message, pubSubTopic);
const res: Uint8Array[] = [];
await pipe(
const res = await pipe(
[query.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => {
for await (const chunk of source) {
res.push(chunk.slice());
}
}
async (source) => await all(source)
);
try {
const bytes = concat(res);

View File

@ -1,6 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
@ -205,17 +206,12 @@ export class WakuStore {
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connections[0].remoteAddr.toString());
const res: Uint8Array[] = [];
await pipe(
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => {
for await (const chunk of source) {
res.push(chunk.slice());
}
}
async (source) => await all(source)
);
const bytes = concat(res);
const reply = historyRpcQuery.decode(bytes);