mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-05 23:33:08 +00:00
feat!: do not send sync messages with empty history (#2658)
* feat!: do not send sync messages with empty history A sync message without any history as no value. If there are no messages in the channel, then a sync messages does not help. If there are messages in the channel, but this participant is not aware of them, then it can confuse other participants to assume that the channel is empty. * fix test by adding a message to channel history * make `pushOutgoingSyncMessage` return true even if no callback passed
This commit is contained in:
parent
c0ecb6abba
commit
e92f6a2409
@ -56,6 +56,19 @@ describe("Reliable Channel: Sync", () => {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Send a message to have a history
|
||||||
|
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannel.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === sentMsgId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
let syncMessageSent = false;
|
let syncMessageSent = false;
|
||||||
reliableChannel.messageChannel.addEventListener(
|
reliableChannel.messageChannel.addEventListener(
|
||||||
MessageChannelEvent.OutSyncSent,
|
MessageChannelEvent.OutSyncSent,
|
||||||
@ -131,6 +144,19 @@ describe("Reliable Channel: Sync", () => {
|
|||||||
return 1;
|
return 1;
|
||||||
}; // will wait a full second
|
}; // will wait a full second
|
||||||
|
|
||||||
|
// Send a message to have a history
|
||||||
|
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === sentMsgId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
let syncMessageSent = false;
|
let syncMessageSent = false;
|
||||||
reliableChannelBob.messageChannel.addEventListener(
|
reliableChannelBob.messageChannel.addEventListener(
|
||||||
MessageChannelEvent.OutSyncSent,
|
MessageChannelEvent.OutSyncSent,
|
||||||
@ -191,6 +217,19 @@ describe("Reliable Channel: Sync", () => {
|
|||||||
return 1;
|
return 1;
|
||||||
}; // will wait a full second
|
}; // will wait a full second
|
||||||
|
|
||||||
|
// Send a message to have a history
|
||||||
|
const sentMsgId = reliableChannelAlice.send(utf8ToBytes("some message"));
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannelAlice.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === sentMsgId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
let syncMessageSent = false;
|
let syncMessageSent = false;
|
||||||
reliableChannelBob.messageChannel.addEventListener(
|
reliableChannelBob.messageChannel.addEventListener(
|
||||||
MessageChannelEvent.OutSyncSent,
|
MessageChannelEvent.OutSyncSent,
|
||||||
@ -232,6 +271,19 @@ describe("Reliable Channel: Sync", () => {
|
|||||||
return 1;
|
return 1;
|
||||||
}; // will wait a full second
|
}; // will wait a full second
|
||||||
|
|
||||||
|
// Send a message to have a history
|
||||||
|
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannel.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === sentMsgId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
let syncMessageSent = false;
|
let syncMessageSent = false;
|
||||||
reliableChannel.messageChannel.addEventListener(
|
reliableChannel.messageChannel.addEventListener(
|
||||||
MessageChannelEvent.OutSyncSent,
|
MessageChannelEvent.OutSyncSent,
|
||||||
@ -273,6 +325,19 @@ describe("Reliable Channel: Sync", () => {
|
|||||||
return 1;
|
return 1;
|
||||||
}; // will wait a full second
|
}; // will wait a full second
|
||||||
|
|
||||||
|
// Send a message to have a history
|
||||||
|
const sentMsgId = reliableChannel.send(utf8ToBytes("some message"));
|
||||||
|
let messageSent = false;
|
||||||
|
reliableChannel.addEventListener("message-sent", (event) => {
|
||||||
|
if (event.detail === sentMsgId) {
|
||||||
|
messageSent = true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while (!messageSent) {
|
||||||
|
await delay(50);
|
||||||
|
}
|
||||||
|
|
||||||
let syncMessageSent = false;
|
let syncMessageSent = false;
|
||||||
reliableChannel.messageChannel.addEventListener(
|
reliableChannel.messageChannel.addEventListener(
|
||||||
MessageChannelEvent.OutSyncSent,
|
MessageChannelEvent.OutSyncSent,
|
||||||
|
|||||||
@ -647,11 +647,12 @@ describe("MessageChannel", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
// And be sends a sync message
|
// And be sends a sync message
|
||||||
await channelB.pushOutgoingSyncMessage(async (message) => {
|
const res = await channelB.pushOutgoingSyncMessage(async (message) => {
|
||||||
await receiveMessage(channelA, message);
|
await receiveMessage(channelA, message);
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
expect(res).to.be.true;
|
||||||
expect(messageAcked).to.be.true;
|
expect(messageAcked).to.be.true;
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -1089,17 +1090,41 @@ describe("MessageChannel", function () {
|
|||||||
causalHistorySize: 2
|
causalHistorySize: 2
|
||||||
});
|
});
|
||||||
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
|
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
|
||||||
|
const message = utf8ToBytes("first message in channel");
|
||||||
|
channelA["localHistory"].push(
|
||||||
|
new ContentMessage(
|
||||||
|
MessageChannel.getMessageId(message),
|
||||||
|
"MyChannel",
|
||||||
|
"alice",
|
||||||
|
[],
|
||||||
|
1n,
|
||||||
|
undefined,
|
||||||
|
message
|
||||||
|
)
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should be sent with empty content", async () => {
|
it("should be sent with empty content", async () => {
|
||||||
await channelA.pushOutgoingSyncMessage(async (message) => {
|
const res = await channelA.pushOutgoingSyncMessage(async (message) => {
|
||||||
expect(message.content).to.be.undefined;
|
expect(message.content).to.be.undefined;
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
expect(res).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should not be sent when there is no history", async () => {
|
||||||
|
const channelC = new MessageChannel(channelId, "carol", {
|
||||||
|
causalHistorySize: 2
|
||||||
|
});
|
||||||
|
const res = await channelC.pushOutgoingSyncMessage(async (_msg) => {
|
||||||
|
throw "callback was called when it's not expected";
|
||||||
|
});
|
||||||
|
expect(res).to.be.false;
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
|
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
|
||||||
await channelA.pushOutgoingSyncMessage();
|
const res = await channelA.pushOutgoingSyncMessage();
|
||||||
|
expect(res).to.be.true;
|
||||||
|
|
||||||
const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
|
const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
|
||||||
expect(outgoingBuffer.length).to.equal(0);
|
expect(outgoingBuffer.length).to.equal(0);
|
||||||
@ -1110,15 +1135,16 @@ describe("MessageChannel", function () {
|
|||||||
).to.equal(false);
|
).to.equal(false);
|
||||||
|
|
||||||
const localLog = channelA["localHistory"];
|
const localLog = channelA["localHistory"];
|
||||||
expect(localLog.length).to.equal(0);
|
expect(localLog.length).to.equal(1); // beforeEach adds one message
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should not be delivered", async () => {
|
it("should not be delivered", async () => {
|
||||||
const timestampBefore = channelB["lamportTimestamp"];
|
const timestampBefore = channelB["lamportTimestamp"];
|
||||||
await channelA.pushOutgoingSyncMessage(async (message) => {
|
const res = await channelA.pushOutgoingSyncMessage(async (message) => {
|
||||||
await receiveMessage(channelB, message);
|
await receiveMessage(channelB, message);
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
expect(res).to.be.true;
|
||||||
const timestampAfter = channelB["lamportTimestamp"];
|
const timestampAfter = channelB["lamportTimestamp"];
|
||||||
expect(timestampAfter).to.equal(timestampBefore);
|
expect(timestampAfter).to.equal(timestampBefore);
|
||||||
|
|
||||||
@ -1132,20 +1158,23 @@ describe("MessageChannel", function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("should update ack status of messages in outgoing buffer", async () => {
|
it("should update ack status of messages in outgoing buffer", async () => {
|
||||||
|
const channelC = new MessageChannel(channelId, "carol", {
|
||||||
|
causalHistorySize: 2
|
||||||
|
});
|
||||||
for (const m of messagesA) {
|
for (const m of messagesA) {
|
||||||
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
|
await sendMessage(channelC, utf8ToBytes(m), async (message) => {
|
||||||
await receiveMessage(channelB, message);
|
await receiveMessage(channelB, message);
|
||||||
return { success: true };
|
return { success: true };
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendSyncMessage(channelB, async (message) => {
|
await sendSyncMessage(channelB, async (message) => {
|
||||||
await receiveMessage(channelA, message);
|
await receiveMessage(channelC, message);
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
|
|
||||||
const causalHistorySize = channelA["causalHistorySize"];
|
const causalHistorySize = channelC["causalHistorySize"];
|
||||||
const outgoingBuffer = channelA["outgoingBuffer"] as Message[];
|
const outgoingBuffer = channelC["outgoingBuffer"] as Message[];
|
||||||
expect(outgoingBuffer.length).to.equal(
|
expect(outgoingBuffer.length).to.equal(
|
||||||
messagesA.length - causalHistorySize
|
messagesA.length - causalHistorySize
|
||||||
);
|
);
|
||||||
|
|||||||
@ -384,6 +384,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
undefined
|
undefined
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if (!message.causalHistory || message.causalHistory.length === 0) {
|
||||||
|
log.info(
|
||||||
|
this.senderId,
|
||||||
|
"no causal history in sync message, aborting sending"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (callback) {
|
if (callback) {
|
||||||
try {
|
try {
|
||||||
await callback(message);
|
await callback(message);
|
||||||
@ -400,7 +408,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
// No problem encountered so returning true
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private _pushIncomingMessage(message: Message): void {
|
private _pushIncomingMessage(message: Message): void {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user