mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-09 09:13:10 +00:00
refactor: abstract messageIndex behind ILocalHistory as findMissingDependencies
This commit is contained in:
parent
055d7fecca
commit
42bfd98861
@ -1,7 +1,12 @@
|
||||
import { Logger } from "@waku/utils";
|
||||
import _ from "lodash";
|
||||
|
||||
import { type ChannelId, ContentMessage, isContentMessage } from "./message.js";
|
||||
import {
|
||||
type ChannelId,
|
||||
ContentMessage,
|
||||
type HistoryEntry,
|
||||
isContentMessage
|
||||
} from "./message.js";
|
||||
import { PersistentStorage } from "./persistent_storage.js";
|
||||
|
||||
export const DEFAULT_MAX_LENGTH = 10_000;
|
||||
@ -26,6 +31,7 @@ export interface ILocalHistory {
|
||||
getMessage(messageId: string): ContentMessage | undefined;
|
||||
getRecentMessages(count: number): ContentMessage[];
|
||||
getAllMessages(): ContentMessage[];
|
||||
findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[];
|
||||
}
|
||||
|
||||
export type MemLocalHistoryOptions = {
|
||||
@ -114,6 +120,10 @@ export class MemLocalHistory implements ILocalHistory {
|
||||
return this.messageIndex.get(messageId);
|
||||
}
|
||||
|
||||
public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] {
|
||||
return entries.filter((entry) => !this.messageIndex.has(entry.messageId));
|
||||
}
|
||||
|
||||
private rebuildIndex(): void {
|
||||
this.messageIndex.clear();
|
||||
for (const message of this.items) {
|
||||
|
||||
@ -297,9 +297,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
message.messageId,
|
||||
message.causalHistory.map((ch) => ch.messageId)
|
||||
);
|
||||
const missingDependencies = message.causalHistory.filter(
|
||||
(messageHistoryEntry) =>
|
||||
!this.isMessageAvailable(messageHistoryEntry.messageId)
|
||||
const missingDependencies = this.findMissingDependencies(
|
||||
message.causalHistory
|
||||
);
|
||||
if (missingDependencies.length === 0) {
|
||||
if (isContentMessage(message) && this.deliverMessage(message)) {
|
||||
@ -583,9 +582,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.filter.insert(message.messageId);
|
||||
}
|
||||
|
||||
const missingDependencies = message.causalHistory.filter(
|
||||
(messageHistoryEntry) =>
|
||||
!this.isMessageAvailable(messageHistoryEntry.messageId)
|
||||
const missingDependencies = this.findMissingDependencies(
|
||||
message.causalHistory
|
||||
);
|
||||
|
||||
if (missingDependencies.length > 0) {
|
||||
@ -739,6 +737,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
}
|
||||
}
|
||||
|
||||
private findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] {
|
||||
const missingFromHistory =
|
||||
this.localHistory.findMissingDependencies(entries);
|
||||
|
||||
const incomingIds = new Set(this.incomingBuffer.map((m) => m.messageId));
|
||||
|
||||
return missingFromHistory.filter(
|
||||
(entry) => !incomingIds.has(entry.messageId)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a message is available (either in localHistory or incomingBuffer)
|
||||
* This prevents treating messages as "missing" when they've already been received
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user