handle segmented waku messages (#556)
* handle segmented waku messages * Create red-zoos-watch.md
This commit is contained in:
parent
4de3e43dc8
commit
a7008b5108
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
"@status-im/js": patch
|
||||
---
|
||||
|
||||
handle segmented waku messages
|
|
@ -0,0 +1,12 @@
|
|||
syntax = "proto3";
|
||||
|
||||
message SegmentMessage {
|
||||
// hash of the entire original message
|
||||
bytes entire_message_hash = 1;
|
||||
// Index of this segment within the entire original message
|
||||
uint32 index = 2;
|
||||
// Total number of segments the entire original message is divided into
|
||||
uint32 segments_count = 3;
|
||||
// The payload data for this particular segment
|
||||
bytes payload = 4;
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
// @generated by protoc-gen-es v1.4.2 with parameter "target=ts"
|
||||
// @generated from file segment-message.proto (syntax proto3)
|
||||
/* eslint-disable */
|
||||
// @ts-nocheck
|
||||
|
||||
import type {
|
||||
BinaryReadOptions,
|
||||
FieldList,
|
||||
JsonReadOptions,
|
||||
JsonValue,
|
||||
PartialMessage,
|
||||
PlainMessage,
|
||||
} from '@bufbuild/protobuf'
|
||||
import { Message, proto3 } from '@bufbuild/protobuf'
|
||||
|
||||
/**
|
||||
* @generated from message SegmentMessage
|
||||
*/
|
||||
export class SegmentMessage extends Message<SegmentMessage> {
|
||||
/**
|
||||
* hash of the entire original message
|
||||
*
|
||||
* @generated from field: bytes entire_message_hash = 1;
|
||||
*/
|
||||
entireMessageHash = new Uint8Array(0)
|
||||
|
||||
/**
|
||||
* Index of this segment within the entire original message
|
||||
*
|
||||
* @generated from field: uint32 index = 2;
|
||||
*/
|
||||
index = 0
|
||||
|
||||
/**
|
||||
* Total number of segments the entire original message is divided into
|
||||
*
|
||||
* @generated from field: uint32 segments_count = 3;
|
||||
*/
|
||||
segmentsCount = 0
|
||||
|
||||
/**
|
||||
* The payload data for this particular segment
|
||||
*
|
||||
* @generated from field: bytes payload = 4;
|
||||
*/
|
||||
payload = new Uint8Array(0)
|
||||
|
||||
constructor(data?: PartialMessage<SegmentMessage>) {
|
||||
super()
|
||||
proto3.util.initPartial(data, this)
|
||||
}
|
||||
|
||||
static readonly runtime: typeof proto3 = proto3
|
||||
static readonly typeName = 'SegmentMessage'
|
||||
static readonly fields: FieldList = proto3.util.newFieldList(() => [
|
||||
{
|
||||
no: 1,
|
||||
name: 'entire_message_hash',
|
||||
kind: 'scalar',
|
||||
T: 12 /* ScalarType.BYTES */,
|
||||
},
|
||||
{ no: 2, name: 'index', kind: 'scalar', T: 13 /* ScalarType.UINT32 */ },
|
||||
{
|
||||
no: 3,
|
||||
name: 'segments_count',
|
||||
kind: 'scalar',
|
||||
T: 13 /* ScalarType.UINT32 */,
|
||||
},
|
||||
{ no: 4, name: 'payload', kind: 'scalar', T: 12 /* ScalarType.BYTES */ },
|
||||
])
|
||||
|
||||
static fromBinary(
|
||||
bytes: Uint8Array,
|
||||
options?: Partial<BinaryReadOptions>
|
||||
): SegmentMessage {
|
||||
return new SegmentMessage().fromBinary(bytes, options)
|
||||
}
|
||||
|
||||
static fromJson(
|
||||
jsonValue: JsonValue,
|
||||
options?: Partial<JsonReadOptions>
|
||||
): SegmentMessage {
|
||||
return new SegmentMessage().fromJson(jsonValue, options)
|
||||
}
|
||||
|
||||
static fromJsonString(
|
||||
jsonString: string,
|
||||
options?: Partial<JsonReadOptions>
|
||||
): SegmentMessage {
|
||||
return new SegmentMessage().fromJsonString(jsonString, options)
|
||||
}
|
||||
|
||||
static equals(
|
||||
a: SegmentMessage | PlainMessage<SegmentMessage> | undefined,
|
||||
b: SegmentMessage | PlainMessage<SegmentMessage> | undefined
|
||||
): boolean {
|
||||
return proto3.util.equals(SegmentMessage, a, b)
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@ import { bootstrap } from '@libp2p/bootstrap'
|
|||
import { Protocols } from '@waku/interfaces'
|
||||
import { createDecoder } from '@waku/message-encryption/symmetric'
|
||||
import { createLightNode, waitForRemotePeer } from '@waku/sdk'
|
||||
import { bytesToHex } from 'ethereum-cryptography/utils'
|
||||
import { bytesToHex, concatBytes } from 'ethereum-cryptography/utils'
|
||||
|
||||
import { isEncrypted } from '../client/community/is-encrypted'
|
||||
import { contracts } from '../consts/contracts'
|
||||
|
@ -19,6 +19,7 @@ import {
|
|||
} from '../protos/communities_pb'
|
||||
import { ProtocolMessage } from '../protos/protocol-message_pb'
|
||||
import { ContactCodeAdvertisement } from '../protos/push-notifications_pb'
|
||||
import { SegmentMessage } from '../protos/segment-message_pb'
|
||||
import { compressPublicKey } from '../utils/compress-public-key'
|
||||
import { generateKeyFromPassword } from '../utils/generate-key-from-password'
|
||||
import { idToContentTopic } from '../utils/id-to-content-topic'
|
||||
|
@ -44,6 +45,7 @@ class RequestClient {
|
|||
public waku: LightNode
|
||||
/** Cache. */
|
||||
public readonly wakuMessages: Set<string>
|
||||
#segmentedWakuMessages: Map<string, Map<number, SegmentMessage>>
|
||||
|
||||
#started: boolean
|
||||
|
||||
|
@ -67,6 +69,7 @@ class RequestClient {
|
|||
|
||||
this.waku = waku
|
||||
this.wakuMessages = new Set()
|
||||
this.#segmentedWakuMessages = new Map()
|
||||
this.#started = options.started ?? false
|
||||
this.#ethProviderURLs =
|
||||
options.ethProviderURLs ?? providers[environment].infura
|
||||
|
@ -392,7 +395,57 @@ class RequestClient {
|
|||
}
|
||||
|
||||
// decode (layers)
|
||||
let messageToDecode = wakuMessage.payload
|
||||
let messageToDecode = wakuMessage.payload // default
|
||||
|
||||
try {
|
||||
const decodedSegment = SegmentMessage.fromBinary(messageToDecode)
|
||||
|
||||
if (decodedSegment) {
|
||||
const unsegmentedMessageHash = bytesToHex(
|
||||
decodedSegment.entireMessageHash
|
||||
)
|
||||
|
||||
const segmentedWakuMessages = this.#segmentedWakuMessages.get(
|
||||
unsegmentedMessageHash
|
||||
)
|
||||
|
||||
if (!segmentedWakuMessages) {
|
||||
this.#segmentedWakuMessages.set(
|
||||
unsegmentedMessageHash,
|
||||
new Map([[decodedSegment.index, decodedSegment]])
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if (segmentedWakuMessages.has(decodedSegment.index)) {
|
||||
return
|
||||
}
|
||||
|
||||
segmentedWakuMessages.set(decodedSegment.index, decodedSegment)
|
||||
|
||||
if (segmentedWakuMessages.size !== decodedSegment.segmentsCount) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const segmentedPayloads: Uint8Array[] = []
|
||||
segmentedWakuMessages.forEach(segment => {
|
||||
segmentedPayloads[segment.index] = segment.payload
|
||||
})
|
||||
const unsegmentedPayload = concatBytes(...segmentedPayloads)
|
||||
|
||||
messageToDecode = unsegmentedPayload
|
||||
|
||||
this.#segmentedWakuMessages.delete(unsegmentedMessageHash)
|
||||
} catch (error) {
|
||||
return
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// eslint-disable-next-line no-empty
|
||||
}
|
||||
|
||||
let decodedProtocol
|
||||
try {
|
||||
decodedProtocol = ProtocolMessage.fromBinary(messageToDecode)
|
||||
|
|
Loading…
Reference in New Issue