chore: refactor LightPush `send` (#1487)

* refactor lightpush send

* add trycatch
This commit is contained in:
Danish Arora 2023-08-22 15:11:34 +05:30 committed by GitHub
parent 9d4fa3f159
commit 812310a816
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 57 additions and 22 deletions

View File

@ -38,37 +38,70 @@ class LightPush extends BaseProtocol implements ILightPush {
this.options = options || {};
}
private async preparePushMessage(
encoder: IEncoder,
message: IMessage,
pubSubTopic: string
): Promise<{
query: PushRpc | null;
error?: SendError;
}> {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger than 1MB");
return { query: null, error: SendError.SIZE_TOO_BIG };
}
const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return {
query: null,
error: SendError.ENCODE_FAILED
};
}
const query = PushRpc.createRequest(protoMessage, pubSubTopic);
return { query };
}
async send(
encoder: IEncoder,
message: IMessage,
opts?: ProtocolOptions
): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const recipients: PeerId[] = [];
let error: undefined | SendError = undefined;
let query: PushRpc | null = null;
try {
const { query: preparedQuery, error: preparationError } =
await this.preparePushMessage(encoder, message, pubSubTopic);
if (preparationError) {
return {
recipients,
error: preparationError
};
}
query = preparedQuery;
} catch (error) {
log("Failed to prepare push message", error);
}
if (!query) {
return {
recipients,
error: SendError.GENERIC_FAIL
};
}
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
const recipients: PeerId[] = [];
let error: undefined | SendError = undefined;
try {
if (!isSizeValid(message.payload)) {
log("Failed to send waku light push: message is bigger that 1MB");
return {
recipients,
error: SendError.SIZE_TOO_BIG
};
}
const protoMessage = await encoder.toProtoObj(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return {
recipients,
error: SendError.ENCODE_FAILED
};
}
const query = PushRpc.createRequest(protoMessage, pubSubTopic);
const res = await pipe(
[query.encode()],
lp.encode,
@ -76,6 +109,7 @@ class LightPush extends BaseProtocol implements ILightPush {
lp.decode,
async (source) => await all(source)
);
try {
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
@ -98,9 +132,10 @@ class LightPush extends BaseProtocol implements ILightPush {
log("Failed to send waku light push request", err);
error = SendError.GENERIC_FAIL;
}
return {
error,
recipients
recipients,
error
};
}
}