feat: use IReceiver with toSubscriptionIterator

This commit is contained in:
Sasha 2023-05-15 10:50:43 +02:00 committed by GitHub
commit 31510dac38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 95 deletions

132
package-lock.json generated
View File

@ -15,7 +15,7 @@
"@stablelib/random": "^1.0.2",
"@stablelib/sha256": "^1.0.1",
"@stablelib/x25519": "^1.0.1",
"@waku/core": "0.0.16",
"@waku/core": "0.0.17",
"@waku/proto": "0.0.4",
"bn.js": "^5.2.1",
"eventemitter3": "^5.0.0",
@ -39,7 +39,7 @@
"@types/uuid": "^8.3.0",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",
"@waku/interfaces": "0.0.11",
"@waku/interfaces": "0.0.12",
"app-root-path": "^3.0.0",
"chai": "^4.3.4",
"cspell": "^5.14.0",
@ -1263,11 +1263,11 @@
}
},
"node_modules/@libp2p/interface-pubsub": {
"version": "3.0.6",
"resolved": "https://registry.npmjs.org/@libp2p/interface-pubsub/-/interface-pubsub-3.0.6.tgz",
"integrity": "sha512-c1aVHAhxmEh9IpLBgJyCsMscVDl7YUeP1Iq6ILEQoWiPJhNpQqdfmqyk7ZfrzuBU19VFe1EqH0bLuLDbtfysTQ==",
"version": "3.0.7",
"resolved": "https://registry.npmjs.org/@libp2p/interface-pubsub/-/interface-pubsub-3.0.7.tgz",
"integrity": "sha512-+c74EVUBTfw2sx1GE/z/IjsYO6dhur+ukF0knAppeZsRQ1Kgg6K5R3eECtT28fC6dBWLjFpAvW/7QGfiDAL4RA==",
"dependencies": {
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-connection": "^4.0.0",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interfaces": "^3.0.0",
"it-pushable": "^3.0.0",
@ -1278,6 +1278,22 @@
"npm": ">=7.0.0"
}
},
"node_modules/@libp2p/interface-pubsub/node_modules/@libp2p/interface-connection": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/@libp2p/interface-connection/-/interface-connection-4.0.0.tgz",
"integrity": "sha512-6xx/NmEc84HX7QmsjSC3hHredQYjHv4Dkf4G27adAPf+qN+vnPxmQ7gaTnk243a0++DOFTbZ2gKX/15G2B6SRg==",
"dependencies": {
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^12.0.0",
"it-stream-types": "^1.0.4",
"uint8arraylist": "^2.1.2"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@libp2p/interface-pubsub/node_modules/@libp2p/interface-peer-id": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@libp2p/interface-peer-id/-/interface-peer-id-2.0.1.tgz",
@ -2708,18 +2724,19 @@
"dev": true
},
"node_modules/@waku/core": {
"version": "0.0.16",
"resolved": "https://registry.npmjs.org/@waku/core/-/core-0.0.16.tgz",
"integrity": "sha512-XrCvgw5lq45K29XNCVCPOh31fmMQXYOo5JGyzMyyYDFlPjMls5D4EQMPJhn5/JcksjhHgp4HCSmxP+3T68SJ+A==",
"version": "0.0.17",
"resolved": "https://registry.npmjs.org/@waku/core/-/core-0.0.17.tgz",
"integrity": "sha512-Sx8nlRivsGyOPD4E0ZL3A0CD+pUtUg4MRtcuPjyz/YiS7POFTeztxTG+EK3aG3nPIHZvYXoVGRm+ZbfmjGh8xA==",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^6.1.0",
"@libp2p/interface-pubsub": "^3.0.7",
"@noble/hashes": "^1.3.0",
"@waku/interfaces": "0.0.11",
"@waku/interfaces": "0.0.12",
"@waku/proto": "0.0.4",
"@waku/utils": "0.0.4",
"@waku/utils": "0.0.5",
"debug": "^4.3.4",
"it-all": "^2.0.0",
"it-length-prefixed": "^8.0.4",
"it-all": "^3.0.1",
"it-length-prefixed": "^9.0.1",
"it-pipe": "^2.0.5",
"p-event": "^5.0.1",
"uint8arraylist": "^2.4.3",
@ -2789,6 +2806,22 @@
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/@chainsafe/libp2p-gossipsub/node_modules/it-length-prefixed": {
"version": "8.0.4",
"resolved": "https://registry.npmjs.org/it-length-prefixed/-/it-length-prefixed-8.0.4.tgz",
"integrity": "sha512-5OJ1lxH+IaqJB7lxe8IAIwt9UfSfsmjKJoAI/RO9djYoBDt1Jfy9PeVHUmOfqhqyu/4kJvWBFAJUaG1HhLQ12A==",
"dependencies": {
"err-code": "^3.0.1",
"it-stream-types": "^1.0.4",
"uint8-varint": "^1.0.1",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/@libp2p/interface-peer-id": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@libp2p/interface-peer-id/-/interface-peer-id-2.0.1.tgz",
@ -2880,22 +2913,6 @@
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/@libp2p/pubsub/node_modules/it-length-prefixed": {
"version": "9.0.0",
"resolved": "https://registry.npmjs.org/it-length-prefixed/-/it-length-prefixed-9.0.0.tgz",
"integrity": "sha512-LCne3R3wxxLv94GTA8ywIeopdyA+2oKXiWWo7g58sQHiD7d1A6WMuWCrwP+xv4i7CmSuR3aeHo66SJUgArLOyA==",
"dependencies": {
"err-code": "^3.0.1",
"it-stream-types": "^1.0.5",
"uint8-varint": "^1.0.1",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/@libp2p/pubsub/node_modules/it-pipe": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/it-pipe/-/it-pipe-3.0.0.tgz",
@ -2924,6 +2941,49 @@
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/@libp2p/topology/node_modules/it-all": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/it-all/-/it-all-2.0.1.tgz",
"integrity": "sha512-9UuJcCRZsboz+HBQTNOau80Dw+ryGaHYFP/cPYzFBJBFcfDathMYnhHk4t52en9+fcyDGPTdLB+lFc1wzQIroA==",
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/it-all": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/it-all/-/it-all-3.0.2.tgz",
"integrity": "sha512-ujqWETXhsDbF6C+6X6fvRw5ohlowRoy/o/h9BC8D+R3JQ13oLQ153w9gSWkWupOY7omZFQbJiAL1aJo5Gwe2yw==",
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/it-length-prefixed": {
"version": "9.0.1",
"resolved": "https://registry.npmjs.org/it-length-prefixed/-/it-length-prefixed-9.0.1.tgz",
"integrity": "sha512-ZBD8ZFLERj8d1q9CeBtk0eJ4EpeI3qwnkmWtemBSm3ZI2dM8PUweNVk5haZ2vw3EIq2uYQiabV9YwNm6EASM4A==",
"dependencies": {
"err-code": "^3.0.1",
"it-stream-types": "^2.0.1",
"uint8-varint": "^1.0.1",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/it-length-prefixed/node_modules/it-stream-types": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/it-stream-types/-/it-stream-types-2.0.1.tgz",
"integrity": "sha512-6DmOs5r7ERDbvS4q8yLKENcj6Yecr7QQTqWApbZdfAUTEC947d+PEha7PCqhm//9oxaLYL7TWRekwhoXl2s6fg==",
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@waku/core/node_modules/it-merge": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/it-merge/-/it-merge-3.0.0.tgz",
@ -2990,9 +3050,9 @@
}
},
"node_modules/@waku/interfaces": {
"version": "0.0.11",
"resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.11.tgz",
"integrity": "sha512-15J0GckXRtpJ5MPyfKeb86fhTlC7i4YV8nDt/4vy9sHYy2wRKDidpKQuqkvQBn2Rlf/Jz+Z4lfLtZFqhGp6ILw==",
"version": "0.0.12",
"resolved": "https://registry.npmjs.org/@waku/interfaces/-/interfaces-0.0.12.tgz",
"integrity": "sha512-Y5Kwa5bMU1YbZkS3tCR3fhGsYiZdLgK/FKKPbMeeRtax3t58tWbP8IdUj4OD3n56EwSC4ivGB7uTKOQAzKmwvw==",
"engines": {
"node": ">=16"
}
@ -3053,9 +3113,9 @@
}
},
"node_modules/@waku/utils": {
"version": "0.0.4",
"resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.4.tgz",
"integrity": "sha512-13YeA1ACZ3g+cRSC+p2nrqm+FjGdTIkopxrhNdYH8l4lvsLwXRufCIhDN6YrQ/QrUIhTNQOmJe1f0TpxVBG9oA==",
"version": "0.0.5",
"resolved": "https://registry.npmjs.org/@waku/utils/-/utils-0.0.5.tgz",
"integrity": "sha512-/o4dKDG6utY3boiIWLHLd2xlrYN9PKK+K1ozrUFtisWzyhSVpvJRfv9ExaYGcYIhOW7Mr2fn4HKXd3/2tbnAoA==",
"dependencies": {
"debug": "^4.3.4",
"uint8arrays": "^4.0.3"
@ -7109,6 +7169,7 @@
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/it-all/-/it-all-2.0.1.tgz",
"integrity": "sha512-9UuJcCRZsboz+HBQTNOau80Dw+ryGaHYFP/cPYzFBJBFcfDathMYnhHk4t52en9+fcyDGPTdLB+lFc1wzQIroA==",
"peer": true,
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
@ -7175,6 +7236,7 @@
"version": "8.0.4",
"resolved": "https://registry.npmjs.org/it-length-prefixed/-/it-length-prefixed-8.0.4.tgz",
"integrity": "sha512-5OJ1lxH+IaqJB7lxe8IAIwt9UfSfsmjKJoAI/RO9djYoBDt1Jfy9PeVHUmOfqhqyu/4kJvWBFAJUaG1HhLQ12A==",
"peer": true,
"dependencies": {
"err-code": "^3.0.1",
"it-stream-types": "^1.0.4",

View File

@ -59,7 +59,7 @@
"@types/uuid": "^8.3.0",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",
"@waku/interfaces": "0.0.11",
"@waku/interfaces": "0.0.12",
"app-root-path": "^3.0.0",
"chai": "^4.3.4",
"cspell": "^5.14.0",
@ -121,7 +121,7 @@
"@stablelib/random": "^1.0.2",
"@stablelib/sha256": "^1.0.1",
"@stablelib/x25519": "^1.0.1",
"@waku/core": "0.0.16",
"@waku/core": "0.0.17",
"@waku/proto": "0.0.4",
"bn.js": "^5.2.1",
"eventemitter3": "^5.0.0",

View File

@ -14,7 +14,7 @@ import {
StepHandshakeParameters,
} from "./handshake.js";
import { MessageNametagBuffer } from "./messagenametag.js";
import { InitiatorParameters, Responder, ResponderParameters, WakuPairing } from "./pairing.js";
import { InitiatorParameters, ResponderParameters, WakuPairing } from "./pairing.js";
import {
HandshakePattern,
MessageDirection,
@ -49,4 +49,4 @@ export { ChaChaPolyCipherState, NoisePublicKey };
export { MessageNametagBuffer };
export { NoiseHandshakeDecoder, NoiseHandshakeEncoder, NoiseSecureTransferDecoder, NoiseSecureTransferEncoder };
export { QR };
export { InitiatorParameters, ResponderParameters, Responder, WakuPairing };
export { InitiatorParameters, ResponderParameters, WakuPairing };

View File

@ -1,6 +1,6 @@
import { HMACDRBG } from "@stablelib/hmac-drbg";
import { randomBytes } from "@stablelib/random";
import type { IDecoder, IEncoder, IMessage, IProtoMessage, ISender } from "@waku/interfaces";
import type { IDecoder, IEncoder, IMessage, IProtoMessage, IReceiver, ISender } from "@waku/interfaces";
import { expect } from "chai";
import { EventEmitter } from "eventemitter3";
import { pEvent } from "p-event";
@ -43,24 +43,26 @@ describe("js-noise: pairing object", () => {
};
},
};
const decoderMap: { [key: string]: IDecoder<NoiseHandshakeMessage> } = {};
const responder = {
subscribe(decoder: IDecoder<NoiseHandshakeMessage>): Promise<void> {
return new Promise((resolve) => {
decoderMap[decoder.contentTopic] = decoder;
resolve();
});
toSubscriptionIterator(decoder: IDecoder<NoiseHandshakeMessage>) {
return {
iterator: {
async next() {
const msg = await pEvent(msgEmitter, decoder.contentTopic);
const decodedMessage = await decoder.fromProtoObj(PUBSUB_TOPIC, msg);
return {
value: decodedMessage,
done: false,
};
},
},
stop() {
// Do nothing. This is just a simulation
console.debug("stopping subscription to", decoder.contentTopic);
},
};
},
async nextMessage(contentTopic: string): Promise<NoiseHandshakeMessage> {
const msg = await pEvent(msgEmitter, contentTopic);
const decodedMessage = await decoderMap[contentTopic].fromProtoObj(PUBSUB_TOPIC, msg);
return decodedMessage!;
},
async stop(contentTopic: string): Promise<void> {
// Do nothing. This is just a simulation
console.debug("stopping subscription to", contentTopic);
},
};
} as any as IReceiver;
// =================
it("should pair", async function () {

View File

@ -1,6 +1,6 @@
import { HMACDRBG } from "@stablelib/hmac-drbg";
import { randomBytes } from "@stablelib/random";
import type { IDecoder, IMetaSetter, ISender } from "@waku/interfaces";
import type { IMetaSetter, IReceiver, ISender } from "@waku/interfaces";
import debug from "debug";
import { EventEmitter } from "eventemitter3";
import { pEvent } from "p-event";
@ -23,32 +23,6 @@ import { QR } from "./qr.js";
const log = debug("waku:noise:pairing");
/**
* Responder interface than an object must implement so the pairing object can receive noise messages
*/
export interface Responder {
/**
* subscribe to receive the messages from a content topic
* @param decoder Decoder to use to decrypt the NoiseHandshakeMessages
*/
subscribe(decoder: IDecoder<NoiseHandshakeMessage>): Promise<void>;
/**
* should return messages received in a content topic
* messages should be kept in a queue, meaning that nextMessage
* will call pop in the queue to remove the oldest message received
* (it's important to maintain order of received messages)
* @param contentTopic content topic to get the next message from
*/
nextMessage(contentTopic: string): Promise<NoiseHandshakeMessage>;
/**
* Stop the subscription to the content topic
* @param contentTopic
*/
stop(contentTopic: string): Promise<void>;
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
@ -117,7 +91,7 @@ export class WakuPairing {
*/
constructor(
private sender: ISender,
private responder: Responder,
private responder: IReceiver,
private myStaticKey: KeyPair,
pairingParameters: InitiatorParameters | ResponderParameters,
private myEphemeralKey: KeyPair = generateX25519KeyPair(),
@ -203,8 +177,8 @@ export class WakuPairing {
}
private async executeReadStepWithNextMessage(
contentTopic: string,
messageNametag: Uint8Array
messageNametag: Uint8Array,
iterator: AsyncIterator<NoiseHandshakeMessage>
): Promise<HandshakeStepResult> {
// TODO: create test unit for this function
let stopLoop = false;
@ -219,9 +193,14 @@ export class WakuPairing {
while (!stopLoop) {
try {
const hsMessage = await this.responder.nextMessage(contentTopic);
const item = await iterator.next();
if (!item.value) {
throw Error("Received no message");
}
const step = this.handshake.stepHandshake({
readPayloadV2: hsMessage.payloadV2,
readPayloadV2: item.value.payloadV2,
messageNametag,
});
return step;
@ -240,7 +219,7 @@ export class WakuPairing {
private async initiatorHandshake(): Promise<[NoiseSecureTransferEncoder, NoiseSecureTransferDecoder]> {
// Subscribe to the contact content topic
const decoder = new NoiseHandshakeDecoder(this.contentTopic);
await this.responder.subscribe(decoder);
const subscriptionIterator = await this.responder.toSubscriptionIterator(decoder);
// The handshake initiator writes a Waku2 payload v2 containing the handshake message
// and the (encrypted) transport message
@ -270,9 +249,12 @@ export class WakuPairing {
// 2nd step
// <- sB, eAsB {r}
hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag());
hsStep = await this.executeReadStepWithNextMessage(
this.handshake.hs.toMessageNametag(),
subscriptionIterator.iterator
);
await this.responder.stop(this.contentTopic);
await subscriptionIterator.stop();
if (!this.handshake.hs.rs) throw new Error("invalid handshake state");
@ -306,11 +288,11 @@ export class WakuPairing {
private async responderHandshake(): Promise<[NoiseSecureTransferEncoder, NoiseSecureTransferDecoder]> {
// Subscribe to the contact content topic
const decoder = new NoiseHandshakeDecoder(this.contentTopic);
await this.responder.subscribe(decoder);
const subscriptionIterator = await this.responder.toSubscriptionIterator(decoder);
// the received reads the initiator's payloads, and returns the (decrypted) transport message the initiator sent
// Note that the received verifies if the received payloadV2 has the expected messageNametag set
let hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.qrMessageNameTag);
let hsStep = await this.executeReadStepWithNextMessage(this.qrMessageNameTag, subscriptionIterator.iterator);
const initiatorCommittedStaticKey = new Uint8Array(hsStep.transportMessage);
@ -340,9 +322,12 @@ export class WakuPairing {
// -> sA, sAeB, sAsB {s}
// The responder reads the initiator's payload sent by the initiator
hsStep = await this.executeReadStepWithNextMessage(this.contentTopic, this.handshake.hs.toMessageNametag());
hsStep = await this.executeReadStepWithNextMessage(
this.handshake.hs.toMessageNametag(),
subscriptionIterator.iterator
);
await this.responder.stop(this.contentTopic);
await subscriptionIterator.stop();
if (!this.handshake.hs.rs) throw new Error("invalid handshake state");