feat: `WakuLightPush.push` and `WakuRelay.send` returns `SendResult` with the list of recipients.

This commit is contained in:
fryorcraken.eth 2022-09-20 14:18:32 +10:00
parent ae46640ba8
commit 49e16de396
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 20 additions and 14 deletions

View File

@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- `WakuRelay.addObserver` now returns a function to delete the observer. - `WakuRelay.addObserver` now returns a function to delete the observer.
- `WakuLightPush.push` and `WakuRelay.send` returns `SendResult` with the list of recipients.
### Changed ### Changed

View File

@ -75,3 +75,7 @@ export interface Decoder<T extends Message> {
decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>; decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
decode: (proto: ProtoMessage) => Promise<T | undefined>; decode: (proto: ProtoMessage) => Promise<T | undefined>;
} }
export interface SendResult {
recipients: PeerId[];
}

View File

@ -47,7 +47,7 @@ describe("Waku Light Push [node only]", () => {
const pushResponse = await waku.lightPush.push(TestEncoder, { const pushResponse = await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes(messageText), payload: utf8ToBytes(messageText),
}); });
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse.recipients.length).to.eq(1);
let msgs: MessageRpcResponse[] = []; let msgs: MessageRpcResponse[] = [];
@ -90,7 +90,7 @@ describe("Waku Light Push [node only]", () => {
} }
); );
log("Ack received", pushResponse); log("Ack received", pushResponse);
expect(pushResponse?.isSuccess).to.be.true; expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
let msgs: MessageRpcResponse[] = []; let msgs: MessageRpcResponse[] = [];

View File

@ -9,7 +9,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push"; import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants"; import { DefaultPubSubTopic } from "../constants";
import { Encoder, Message } from "../interfaces"; import { Encoder, Message, SendResult } from "../interfaces";
import { selectConnection } from "../select_connection"; import { selectConnection } from "../select_connection";
import { import {
getPeersForProtocol, getPeersForProtocol,
@ -55,7 +55,7 @@ export class WakuLightPush {
encoder: Encoder, encoder: Encoder,
message: Message, message: Message,
opts?: PushOptions opts?: PushOptions
): Promise<PushResponse | undefined> { ): Promise<SendResult> {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic;
const res = await selectPeerForProtocol( const res = await selectPeerForProtocol(
@ -75,11 +75,14 @@ export class WakuLightPush {
if (!connection) throw "Failed to get a connection to the peer"; if (!connection) throw "Failed to get a connection to the peer";
const stream = await connection.newStream(LightPushCodec); const stream = await connection.newStream(LightPushCodec);
const recipients: PeerId[] = [];
try { try {
const protoMessage = await encoder.encodeProto(message); const protoMessage = await encoder.encodeProto(message);
if (!protoMessage) { if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push"); log("Failed to encode to protoMessage, aborting push");
return; return { recipients };
} }
const query = PushRPC.createRequest(protoMessage, pubSubTopic); const query = PushRPC.createRequest(protoMessage, pubSubTopic);
const res = await pipe( const res = await pipe(
@ -99,17 +102,19 @@ export class WakuLightPush {
if (!response) { if (!response) {
log("No response in PushRPC"); log("No response in PushRPC");
return; return { recipients };
} }
return response; if (response.isSuccess) {
recipients.push(peer.id);
}
} catch (err) { } catch (err) {
log("Failed to decode push reply", err); log("Failed to decode push reply", err);
} }
} catch (err) { } catch (err) {
log("Failed to send waku light push request", err); log("Failed to send waku light push request", err);
} }
return; return { recipients };
} }
/** /**

View File

@ -8,11 +8,10 @@ import {
TopicStr, TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types"; } from "@chainsafe/libp2p-gossipsub/dist/src/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import { PublishResult } from "@libp2p/interface-pubsub";
import debug from "debug"; import debug from "debug";
import { DefaultPubSubTopic } from "../constants"; import { DefaultPubSubTopic } from "../constants";
import { Decoder, Encoder, Message } from "../interfaces"; import { Decoder, Encoder, Message, SendResult } from "../interfaces";
import { pushOrInitMapSet } from "../push_or_init_map"; import { pushOrInitMapSet } from "../push_or_init_map";
import { TopicOnlyDecoder } from "../waku_message/topic_only_message"; import { TopicOnlyDecoder } from "../waku_message/topic_only_message";
@ -93,10 +92,7 @@ export class WakuRelay extends GossipSub {
/** /**
* Send Waku message. * Send Waku message.
*/ */
public async send( public async send(encoder: Encoder, message: Message): Promise<SendResult> {
encoder: Encoder,
message: Message
): Promise<PublishResult> {
const msg = await encoder.encode(message); const msg = await encoder.encode(message);
if (!msg) { if (!msg) {
log("Failed to encode message, aborting publish"); log("Failed to encode message, aborting publish");