support more than 2 participants

This commit is contained in:
Arseniy Klempner 2025-04-03 21:29:50 -07:00
parent 597308c546
commit d15815219d
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
11 changed files with 622 additions and 255 deletions

View File

@ -5,6 +5,8 @@
import { health, unregisterHealthListener } from "../waku/waku.svelte";
import { page } from '$app/state';
import { listening } from "../waku/pingpong.svelte";
let healthStatus = $state(HealthStatus.Unhealthy);
let isHomePage = $state(false);
let shouldShowConnectButton = $state(false);
@ -39,15 +41,16 @@
stopHealthCheck();
});
function getHealthColor(status: HealthStatus) {
function getHealthColor() {
const status = listening.listening ? HealthStatus.SufficientlyHealthy : HealthStatus.Unhealthy;
if ($connectionState.status !== "connected") {
return "gray";
}
switch (status) {
case HealthStatus.SufficientlyHealthy:
return "green";
case HealthStatus.MinimallyHealthy:
return "goldenrod";
// case HealthStatus.MinimallyHealthy:
// return "goldenrod";
case HealthStatus.Unhealthy:
default:
return "red";
@ -123,7 +126,7 @@
<div class="status-wrapper">
<div
class="health-indicator"
style="background-color: {getHealthColor(healthStatus)}"
style="background-color: {getHealthColor()}"
>
<span class="tooltip">{getHealthText(healthStatus)}</span>
</div>

View File

@ -8,6 +8,7 @@
import HistoryItem from './HistoryItem.svelte';
import LegendModal from './LegendModal.svelte';
import { matchesIdFilter, currentIdFilter } from '$lib/utils/event.svelte';
import { hash } from '$lib/utils/hash';
// Store for history items
let history: Array<MessageChannelEventObject> = $state([]);
@ -106,6 +107,9 @@
if (event.type === MessageChannelEvent.MissedMessages) {
return;
}
if (event.type === MessageChannelEvent.SyncSent || event.type === MessageChannelEvent.SyncReceived) {
event.payload.messageId = hash(event.payload.messageId + event.payload.causalHistory[0].messageId)
}
history = [event, ...history];
}
@ -134,6 +138,8 @@
<option value="received">Received ({eventCounts.received})</option>
<option value="delivered">Delivered ({eventCounts.delivered})</option>
<option value="acknowledged">Acknowledged ({eventCounts.acknowledged})</option>
<option value="syncSent">Sync Sent ({eventCounts.syncSent})</option>
<option value="syncReceived">Sync Received ({eventCounts.syncReceived})</option>
</select>
</div>
@ -144,15 +150,18 @@
</div>
{/if}
{#each filteredHistory as event, index}
<HistoryItem
{event}
identicon={identicons[index]}
currentIdFilter={currentIdFilter.id}
onEventClick={handleEventClick}
onDependencyClick={handleDependencyClick}
/>
{/each}
<div class="history-items-container">
{#each filteredHistory as event, index}
<HistoryItem
{event}
identicon={identicons[index]}
currentIdFilter={currentIdFilter.id}
onEventClick={handleEventClick}
onDependencyClick={handleDependencyClick}
/>
{/each}
</div>
<div class="bottom-fade"></div>
<LegendModal bind:isOpen={showLegend} />
</div>
@ -162,10 +171,11 @@
display: flex;
flex-direction: column;
height: 100%;
overflow-y: scroll;
overflow-y: auto;
overflow-x: hidden;
min-width: 400px;
scrollbar-width: none;
scrollbar-width: none; /* Firefox */
-ms-overflow-style: none; /* IE and Edge */
background-color: #ffffff;
border-radius: 4px;
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1);
@ -173,6 +183,30 @@
border: 1px solid #e0ddd4;
padding: 12px;
}
/* Hide scrollbar for Chrome, Safari and Opera */
.history-container::-webkit-scrollbar {
display: none;
}
.history-items-container {
flex: 1;
position: relative;
padding-bottom: 30px; /* Add space for the fade effect */
width: 100%;
overflow-x: hidden;
}
.bottom-fade {
position: absolute;
bottom: 0;
left: 0;
right: 0;
height: 40px;
background: linear-gradient(to bottom, rgba(255, 255, 255, 0), rgba(255, 255, 255, 1));
pointer-events: none; /* Allows interaction with elements underneath */
z-index: 1;
}
.virtualizer-container {
flex: 1;
@ -274,4 +308,10 @@
.clear-filter-btn:hover {
background: rgba(0, 0, 0, 0.2);
}
/* Ensure all child elements don't cause horizontal overflow */
.id-filter-badge, .header, .item-filter {
max-width: 100%;
box-sizing: border-box;
}
</style>

View File

@ -15,8 +15,8 @@
export let overflow: boolean = true;
$: id = event ? getMessageId(event) : null;
$: color = event ? (eventColors[event.type] || '#888') : '#f0f0f0';
$: name = event ? (eventNames[event.type] || event.type) : '';
$: color = event ? eventColors[event.type] || '#888' : '#f0f0f0';
$: name = event ? eventNames[event.type] || event.type : '';
$: matchesFilter = currentIdFilter && id === currentIdFilter;
function handleEventClick() {
@ -30,13 +30,14 @@
}
</script>
<div class="history-item {!event ? 'empty' : ''}" style="width: 100%; height: {height}px;" on:click={event ? handleEventClick : undefined}>
<div
class="history-item {!event ? 'empty' : ''}"
style="width: 100%;"
on:click={event ? handleEventClick : undefined}
>
{#if event}
<div class="item-container">
<div
class="event-box {matchesFilter ? 'highlight' : ''}"
style="background-color: {color};"
>
<div class="event-box {matchesFilter ? 'highlight' : ''}" style="background-color: {color};">
<div class="identicon">
<img src="data:image/svg+xml;base64,{identicon}" alt="Identicon" />
</div>
@ -48,18 +49,13 @@
{id}
</div>
</div>
{#if event.type === MessageChannelEvent.MessageDelivered}
<div class="sent-or-received">
{event.payload.sentOrReceived}
</div>
{/if}
{#if event.type === MessageChannelEvent.MessageSent || event.type === MessageChannelEvent.MessageReceived}
{#if event.type === MessageChannelEvent.MessageSent || event.type === MessageChannelEvent.MessageReceived || event.type === MessageChannelEvent.SyncSent || event.type === MessageChannelEvent.SyncReceived}
<div class="lamport-timestamp">
{event.payload.lamportTimestamp}
</div>
{/if}
</div>
{#if event.type === MessageChannelEvent.MessageSent || event.type === MessageChannelEvent.MessageReceived}
{#if event.type === MessageChannelEvent.MessageSent || event.type === MessageChannelEvent.MessageReceived || event.type === MessageChannelEvent.SyncSent || event.type === MessageChannelEvent.SyncReceived}
{#each event.payload.causalHistory as dependency}
{@const dependencyMatchesFilter =
currentIdFilter && dependency.messageId === currentIdFilter}
@ -81,6 +77,8 @@
padding: 8px;
box-sizing: border-box;
transition: transform 0.2s ease;
width: 100%;
max-width: 100%;
}
.history-item:not(.empty):hover {
@ -104,6 +102,7 @@
gap: 6px;
width: 100%;
height: 100%;
overflow: hidden;
}
.event-box {
@ -120,6 +119,8 @@
position: relative;
transition: all 0.2s ease;
border: none;
box-sizing: border-box;
overflow: hidden;
}
.dependency-box {
@ -140,11 +141,12 @@
transition: all 0.2s ease;
overflow: hidden;
text-overflow: ellipsis;
box-sizing: border-box;
}
.highlight {
border-left: 2px solid #DB8D43;
border-right: 2px solid #DB8D43;
border-left: 2px solid #db8d43;
border-right: 2px solid #db8d43;
}
.highlight .event-type {
@ -199,6 +201,7 @@
max-width: 220px;
text-overflow: ellipsis;
white-space: nowrap;
overflow: hidden;
}
.lamport-timestamp {
@ -224,4 +227,4 @@
padding: 2px 6px;
border-radius: 4px;
}
</style>
</style>

View File

@ -1,43 +1,105 @@
<script lang="ts">
import { subscribeToMissingMessageStream } from '$lib/sds/stream.svelte';
import {
subscribeToAllEventsStream,
subscribeToChannelEvents,
subscribeToMissingMessageStream
} from '$lib/sds/stream.svelte';
import { MessageChannelEvent, type HistoryEntry } from '@waku/sds';
import { onMount, onDestroy } from 'svelte';
import { getIdenticon } from '$lib/identicon.svelte';
import { bytesToHex } from '@waku/utils/bytes';
// Map event types to colors using index signature
const eventColors: { [key in string]: string } = {
[MessageChannelEvent.MessageSent]: '#3B82F6', // blue
[MessageChannelEvent.MessageDelivered]: '#10B981', // green
[MessageChannelEvent.MessageReceived]: '#8B5CF6', // purple
[MessageChannelEvent.MessageAcknowledged]: '#059669', // dark green
[MessageChannelEvent.PartialAcknowledgement]: '#6D28D9', // dark purple
[MessageChannelEvent.MissedMessages]: '#EF4444' // red
};
import { getMessageId } from '$lib/sds/message';
import type { MessageChannelEventObject } from '$lib/sds/stream';
import HistoryItem from './HistoryItem.svelte';
import LegendModal from './LegendModal.svelte';
import { currentIdFilter, eventColors } from '$lib/utils/event.svelte';
import { hash } from '$lib/utils/hash';
import { encodeBase64 } from 'effect/Encoding';
// Store for history items
let missedMessages: HistoryEntry[] = $state([]);
let history: HistoryEntry[] = $state([]);
let identicon: any = $state(null);
let currentFilter: string = $state('all');
let showLegend: boolean = $state(false);
let { channelId = null }: { channelId: string | null } = $props();
// Filtered history based on selected filter and ID filter
let filteredHistory = $derived(
(() => {
// Then filter by ID if present
if (currentIdFilter.id) {
return history.filter((event) => event.messageId === currentIdFilter.id);
}
return history;
})()
);
let identicons = $derived(
identicon &&
missedMessages.map((entry) => {
const id = entry.messageId;
return new identicon(id, { size: 40, format: 'svg' }).toString();
filteredHistory.map((event: HistoryEntry) => {
const id = event.messageId;
// Handle the case where id could be null
return new identicon(id || '', { size: 40, format: 'svg' }).toString();
})
);
// Unsubscribe function
let unsubscribe: (() => void) | null = $state(null);
// Handle filter change
function handleFilterChange(event: Event) {
const select = event.target as HTMLSelectElement;
currentFilter = select.value;
}
// Handle event item click to filter by ID
function handleEventClick(id: string | null) {
if (id !== null) {
currentIdFilter.id = id;
}
}
// Handle dependency click to filter by dependency messageId
function handleDependencyClick(messageId: string, event: Event) {
// Stop event propagation so it doesn't trigger parent click handler
event.stopPropagation();
currentIdFilter.id = messageId;
}
// Clear ID filter
function clearIdFilter() {
currentIdFilter.id = null;
}
// Toggle legend display
function toggleLegend() {
showLegend = !showLegend;
}
const active: { [messageId: string]: boolean } = $state({});
const log: Set<string> = $state(new Set());
function eventStreamCallback(event: MessageChannelEventObject) {
if (event.type !== MessageChannelEvent.MissedMessages) {
return;
}
console.log('missed messages', event);
event.payload.forEach((message) => {
if (!log.has(message.messageId)) {
history.push(message);
log.add(message.messageId);
}
});
// history = event.payload;
}
onMount(async () => {
identicon = await getIdenticon();
// Subscribe to the event stream and collect events
if (unsubscribe) {
unsubscribe();
}
unsubscribe = subscribeToMissingMessageStream((event) => {
missedMessages = event.payload;
});
unsubscribe = channelId
? subscribeToChannelEvents(channelId, eventStreamCallback)
: subscribeToMissingMessageStream(eventStreamCallback);
});
onDestroy(() => {
@ -46,31 +108,48 @@
unsubscribe();
}
});
const color = eventColors[MessageChannelEvent.MissedMessages];
</script>
<div class="history-container">
{#each missedMessages as entry, index}
{@const color = eventColors[MessageChannelEvent.MissedMessages]}
<div class="history-item">
<div class="item-container">
<div class="event-box" style="background-color: {color};">
<div class="identicon">
<img src="data:image/svg+xml;base64,{identicons[index]}" alt="Identicon" />
</div>
<div class="event-info">
<div class="event-id">
{entry.messageId}
</div>
{#if entry.retrievalHint}
<div class="event-id">
{bytesToHex(entry.retrievalHint)}
</div>
{/if}
</div>
</div>
</div>
{#if currentIdFilter.id}
<div class="id-filter-badge">
<span class="id-label">ID: {currentIdFilter.id}</span>
<button class="clear-filter-btn" onclick={clearIdFilter}>×</button>
</div>
{/each}
{/if}
<div class="history-items-container">
{#each filteredHistory as event, index}
<div class="history-item {!event ? 'empty' : ''}" style="width: 100%; height: 100px;">
{#if event}
<div class="item-container">
<div class="event-box" style="background-color: {color};">
<div class="identicon">
<img src="data:image/svg+xml;base64,{identicons[index]}" alt="Identicon" />
</div>
<div class="event-info">
<div class="event-type"></div>
<div class="event-id">
{event.messageId}
</div>
</div>
</div>
<!-- {#if event.retrievalHint}
<div class="dependency-box" style="background-color: {color};">
{encodeBase64(event.retrievalHint)}
</div>
{/if} -->
</div>
{/if}
</div>
{/each}
</div>
<div class="bottom-fade"></div>
<LegendModal bind:isOpen={showLegend} />
</div>
<style>
@ -78,7 +157,41 @@
display: flex;
flex-direction: column;
height: 100%;
overflow: hidden;
overflow-y: auto;
overflow-x: hidden;
min-width: 400px;
scrollbar-width: none; /* Firefox */
-ms-overflow-style: none; /* IE and Edge */
background-color: #ffffff;
border-radius: 4px;
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1);
position: relative;
border: 1px solid #e0ddd4;
padding: 12px;
}
/* Hide scrollbar for Chrome, Safari and Opera */
.history-container::-webkit-scrollbar {
display: none;
}
.history-items-container {
flex: 1;
position: relative;
padding-bottom: 30px; /* Add space for the fade effect */
width: 100%;
overflow-x: hidden;
}
.bottom-fade {
position: absolute;
bottom: 0;
left: 0;
right: 0;
height: 40px;
background: linear-gradient(to bottom, rgba(255, 255, 255, 0), rgba(255, 255, 255, 1));
pointer-events: none; /* Allows interaction with elements underneath */
z-index: 1;
}
.virtualizer-container {
@ -87,9 +200,130 @@
padding: 10px;
}
.header {
display: flex;
align-items: center;
padding: 8px 8px 16px 8px;
border-bottom: 1px solid #e0ddd4;
margin-bottom: 12px;
position: relative;
}
.help-button {
width: 28px;
height: 28px;
border-radius: 50%;
background-color: #f5f2e8;
border: 1px solid #e0ddd4;
color: #333333;
font-weight: bold;
font-size: 16px;
display: flex;
align-items: center;
justify-content: center;
cursor: pointer;
margin-right: 12px;
transition: all 0.2s;
}
.help-button:hover {
background-color: #e8e5db;
}
.item-filter {
flex: 1;
padding: 8px 12px;
border-radius: 4px;
border: 1px solid #e0ddd4;
background-color: white;
font-size: 14px;
color: #333333;
appearance: none;
background-image: url("data:image/svg+xml;charset=UTF-8,%3csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 24 24' fill='none' stroke='%23333333' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3e%3cpolyline points='6 9 12 15 18 9'%3e%3c/polyline%3e%3c/svg%3e");
background-repeat: no-repeat;
background-position: right 12px center;
background-size: 16px;
transition: all 0.2s;
}
.item-filter:hover,
.item-filter:focus {
border-color: #ccc9c2;
outline: none;
}
.id-filter-badge {
display: flex;
align-items: center;
background-color: #f5f2e8;
border-radius: 4px;
padding: 6px 12px;
margin: 0 8px 12px 8px;
max-width: fit-content;
font-size: 12px;
color: #333333;
border: 1px solid #e0ddd4;
}
.id-label {
font-family: monospace;
margin-right: 8px;
max-width: 200px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
font-weight: bold;
}
.clear-filter-btn {
background: rgba(0, 0, 0, 0.1);
border: none;
color: #333333;
font-size: 16px;
font-weight: bold;
cursor: pointer;
padding: 0 6px;
border-radius: 50%;
width: 20px;
height: 20px;
display: flex;
align-items: center;
justify-content: center;
transition: all 0.2s;
}
.clear-filter-btn:hover {
background: rgba(0, 0, 0, 0.2);
}
/* Ensure all child elements don't cause horizontal overflow */
.id-filter-badge,
.header,
.item-filter {
max-width: 100%;
box-sizing: border-box;
}
.history-item {
padding: 8px;
box-sizing: border-box;
transition: transform 0.2s ease;
width: 100%;
max-width: 100%;
}
.history-item:not(.empty):hover {
transform: translateX(2px);
}
.history-item:not(.empty) {
cursor: pointer;
}
.empty {
border: 1px dashed rgba(0, 0, 0, 0.1);
border-radius: 4px;
background-color: #ffffff;
}
.item-container {
@ -98,6 +332,8 @@
align-items: stretch;
gap: 6px;
width: 100%;
height: 100%;
overflow: hidden;
}
.event-box {
@ -105,34 +341,67 @@
flex-direction: row;
align-items: center;
justify-content: flex-start;
border-radius: 8px;
border-radius: 4px;
width: 100%;
min-height: 70px;
min-height: 60px;
color: white;
padding: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
padding: 12px;
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.1);
position: relative;
transition: all 0.2s ease;
border: none;
box-sizing: border-box;
overflow: hidden;
}
.event-box.dependency {
margin-left: 0;
margin-right: 0;
.dependency-box {
display: flex;
align-items: center;
justify-content: center;
margin-left: auto;
width: auto;
max-width: 80%;
min-height: 40px;
min-height: 30px;
font-size: 11px;
font-family: monospace;
opacity: 0.85;
padding: 6px 12px;
opacity: 0.9;
padding: 6px 10px;
border-radius: 4px;
color: white;
box-shadow: 0 1px 2px rgba(0, 0, 0, 0.1);
transition: all 0.2s ease;
overflow: hidden;
text-overflow: ellipsis;
box-sizing: border-box;
}
.highlight {
border-left: 2px solid #db8d43;
border-right: 2px solid #db8d43;
}
.highlight .event-type {
font-size: 15px;
color: white;
font-weight: bold;
}
.highlight .event-id,
.dependency-box.highlight {
font-weight: bold;
}
.dependency-box.highlight {
font-size: 12px;
}
.identicon {
width: 40px;
height: 40px;
width: 36px;
height: 36px;
border-radius: 4px;
overflow: hidden;
margin-right: 12px;
position: relative;
}
.identicon img {
@ -141,14 +410,6 @@
object-fit: contain;
}
.identicon-placeholder {
width: 40px;
height: 40px;
border-radius: 4px;
background-color: rgba(255, 255, 255, 0.2);
margin-right: 12px;
}
.event-info {
display: flex;
flex-direction: column;
@ -160,26 +421,41 @@
font-weight: bold;
text-align: left;
white-space: nowrap;
/* overflow: hidden; */
/* text-overflow: ellipsis; */
overflow: hidden;
text-overflow: ellipsis;
}
.event-id {
font-family: monospace;
font-size: 10px;
color: rgba(255, 255, 255, 0.7);
font-size: 11px;
color: rgba(255, 255, 255, 0.8);
max-width: 220px;
/* overflow: hidden; */
/* text-overflow: ellipsis; */
text-overflow: ellipsis;
white-space: nowrap;
overflow: hidden;
}
.lamport-timestamp {
position: absolute;
top: 8px;
right: 12px;
top: 12px;
right: 14px;
font-size: 12px;
color: rgba(255, 255, 255, 0.9);
font-weight: 500;
background-color: rgba(0, 0, 0, 0.1);
padding: 2px 6px;
border-radius: 4px;
}
.sent-or-received {
position: absolute;
top: 12px;
right: 14px;
font-size: 12px;
color: rgba(255, 255, 255, 0.9);
font-weight: 500;
background-color: rgba(0, 0, 0, 0.1);
padding: 2px 6px;
border-radius: 4px;
}
</style>

View File

@ -1,75 +0,0 @@
<script lang="ts">
import type { MessageChannelEventObject } from '$lib/sds/stream';
import { grid } from '$lib/utils/stateGraph.svelte';
import HistoryItem from './HistoryItem.svelte';
import { onMount, onDestroy, createEventDispatcher } from 'svelte';
// Props
export let identicons: Array<string> = [];
export let currentIdFilter: string | null = null;
export let onEventClick: (id: string | null) => void;
export let onDependencyClick: (messageId: string, event: Event) => void;
export let columns: number = 10; // Default number of columns
export let rows: number = 10; // Default number of rows
// Create 2D grid of items initialized with null (empty items)
onMount(() => {
// Initialize the state graph stream
});
onDestroy(() => {
// Clean up if needed
});
</script>
<div class="state-graph" style="--columns: {columns};">
{#each grid as row, y}
{#each row as item, x}
<div class="grid-item {item !== null ? 'filled' : ''}">
{#if item !== null && y * columns + x < identicons.length}
<HistoryItem
event={item}
identicon={identicons[y * columns + x]}
{currentIdFilter}
{onEventClick}
{onDependencyClick}
/>
{:else}
<HistoryItem />
{/if}
</div>
{/each}
{/each}
</div>
<style>
.state-graph {
display: grid;
grid-template-columns: repeat(var(--columns), 1fr);
gap: 10px;
width: 100%;
padding: 10px;
}
.grid-item {
width: 340px;
height: 178px;
justify-self: center;
}
.filled {
animation: fadeIn 0.5s ease-in-out;
}
@keyframes fadeIn {
from { opacity: 0; }
to { opacity: 1; }
}
@media (max-width: 1200px) {
.state-graph {
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
}
}
</style>

View File

@ -5,6 +5,7 @@
import { MessageChannelEvent } from '@waku/sds';
import { eventColors, eventNames, currentIdFilter, matchesIdFilter } from '$lib/utils/event.svelte';
import type { MessageChannelEventObject } from '$lib/sds/stream';
import { hash } from '$lib/utils/hash';
let { channelId = null }: { channelId: string | null } = $props();
let unsubscribe: (() => void) | null = $state(null);
@ -14,8 +15,7 @@
return;
}
if (event.type === MessageChannelEvent.SyncSent || event.type === MessageChannelEvent.SyncReceived) {
event
return;
event.payload.messageId = hash(event.payload.messageId + event.payload.causalHistory[0].messageId)
}
console.log('updating virtual grid', event);
update_virtual_grid(event);

View File

@ -0,0 +1,38 @@
import { Schedule } from "effect";
import { Ref } from "effect";
import { Effect } from "effect";
export function frequency() {
return Effect.runPromise(
Effect.gen(function* () {
const listenCondition = yield* Ref.make(false);
// const queue = yield* initializeQueue;
yield* Effect.all(
[
// setup filter
// subscribe(listenCondition),
// // send messages
Effect.repeat(Effect.sync(() => {}), Schedule.spaced('2000 millis')),
// // Effect.repeat(takeAndSend, Schedule.spaced('2000 millis')),
// // periodic sync
// Effect.repeat(sendSync, Schedule.spaced('10000 millis')),
// // periodically process queue
// Effect.repeat(processQueue, Schedule.spaced('200 millis')),
// // periodically sweep buffers
// Effect.repeat(sweep, Schedule.spaced('2000 millis')),
// // periodically switch off filter to miss messages
Effect.repeat(
Ref.update(listenCondition, (listening) => !listening),
Schedule.spaced('2000 millis')
)
],
{
concurrency: 'unbounded'
}
);
})
);
}

View File

@ -1,4 +1,5 @@
import { Effect, Option, pipe, Stream } from 'effect';
import type { MatchParams } from './waku.svelte';
// Define the type for state transition events
export interface StateTransitionDetail {
@ -22,7 +23,8 @@ export enum LobbyMessageType {
Ping = 'ping',
Request = 'request',
Accept = 'accept',
Match = 'match'
Match = 'match',
Ongoing = 'ongoing'
}
enum LobbyEvent {
@ -32,7 +34,8 @@ enum LobbyEvent {
GotAccept = 'got_accept',
SentAccept = 'sent_accept',
GotMatch = 'got_match',
SentMatch = 'sent_match'
SentMatch = 'sent_match',
Ongoing = 'ongoing'
}
export type LobbyMessage = {
@ -41,6 +44,7 @@ export type LobbyMessage = {
expiry?: Date;
from: string;
to?: string;
match?: MatchParams;
};
export enum PeerState {
@ -51,25 +55,26 @@ export enum PeerState {
AcceptTo = 'accept_to',
AcceptFrom = 'accept_from',
Success = 'success',
Failure = 'failure'
Failure = 'failure',
Ongoing = 'ongoing'
}
// Create a class that extends EventTarget for native event handling
class LobbyState extends EventTarget {
// The peer state map
peerState = $state(new Map<string, { state: PeerState; messages: LobbyMessage[] }>());
// Method to update the state of a peer
updatePeerState(peerId: string, newState: PeerState, message: LobbyMessage): void {
const prevState = this.peerState.get(peerId)?.state || PeerState.None;
const messages = this.peerState.get(peerId)?.messages || [];
// Update the state
this.peerState.set(peerId, {
state: newState,
messages: [...messages, message]
this.peerState.set(peerId, {
state: newState,
messages: [...messages, message]
});
// Create and dispatch the event using native event handling
const event = new CustomEvent<StateTransitionDetail>('state-transition', {
detail: {
@ -79,7 +84,7 @@ class LobbyState extends EventTarget {
message
}
});
this.dispatchEvent(event);
}
}
@ -104,26 +109,29 @@ type TransitionTable = {
const stateMachine: TransitionTable = {
[LobbyEvent.GotPing]: {
[PeerState.None]: PeerState.Found,
[PeerState.Found]: PeerState.Found,
[PeerState.Found]: PeerState.Found
},
[LobbyEvent.SentRequest]: {
[PeerState.Found]: PeerState.RequestTo,
[PeerState.Found]: PeerState.RequestTo
},
[LobbyEvent.GotRequest]: {
[PeerState.None]: PeerState.RequestFrom,
[PeerState.Found]: PeerState.RequestFrom,
[PeerState.Found]: PeerState.RequestFrom
},
[LobbyEvent.SentAccept]: {
[PeerState.RequestFrom]: PeerState.AcceptTo,
[PeerState.RequestFrom]: PeerState.AcceptTo
},
[LobbyEvent.GotAccept]: {
[PeerState.RequestTo]: PeerState.AcceptFrom,
[PeerState.RequestTo]: PeerState.AcceptFrom
},
[LobbyEvent.SentMatch]: {
[PeerState.AcceptFrom]: PeerState.Success,
[PeerState.AcceptFrom]: PeerState.Success
},
[LobbyEvent.GotMatch]: {
[PeerState.AcceptTo]: PeerState.Success,
[PeerState.AcceptTo]: PeerState.Success
},
[LobbyEvent.Ongoing]: {
[PeerState.None]: PeerState.Ongoing
}
};
@ -142,19 +150,18 @@ function processMessage<MT extends LobbyMessageType>(
throw `Don't track sent pings`;
}
event = LobbyEvent.GotPing;
} else if (
message.messageType === LobbyMessageType.Request
) {
} else if (message.messageType === LobbyMessageType.Request) {
console.log(`Received request from ${message.from} to ${message.to || 'everyone'}`);
event = sent ? LobbyEvent.SentRequest : LobbyEvent.GotRequest;
} else if (
message.messageType === LobbyMessageType.Accept
) {
} else if (message.messageType === LobbyMessageType.Accept) {
console.log(`Received accept from ${message.from} to ${message.to || 'unknown'}`);
event = sent ? LobbyEvent.SentAccept : LobbyEvent.GotAccept;
} else if (message.messageType === LobbyMessageType.Match) {
console.log(`Received match between peers`);
event = sent ? LobbyEvent.SentMatch : LobbyEvent.GotMatch;
} else if (message.messageType === LobbyMessageType.Ongoing) {
console.log(`Received ongoing match`);
event = LobbyEvent.Ongoing;
}
// Get next state from transition table
@ -162,7 +169,10 @@ function processMessage<MT extends LobbyMessageType>(
console.warn(`Invalid message type: ${message.messageType}`);
return Option.none();
}
const nextStateValue = stateMachine[event][currentState];
const nextStateValue =
message.messageType === LobbyMessageType.Ongoing
? PeerState.Ongoing
: stateMachine[event][currentState];
if (nextStateValue === undefined) {
// Handle invalid transitions - throw error or return current state
console.warn(`Invalid transition: ${event} from ${currentState}`);
@ -171,7 +181,9 @@ function processMessage<MT extends LobbyMessageType>(
return Option.some(nextStateValue);
}
export async function processUpdates(updates: { peerId: string; message: LobbyMessage, sent: boolean }[]) {
export async function processUpdates(
updates: { peerId: string; message: LobbyMessage; sent: boolean }[]
) {
for (const update of updates) {
const { peerId, message, sent } = update;
const currentState = lobbyState.peerState.get(peerId)?.state || PeerState.None;
@ -189,28 +201,33 @@ export async function processUpdates(updates: { peerId: string; message: LobbyMe
}
// Create a typed stream from the events
export const stateTransitionStream = $state(Stream.map(
Stream.fromEventListener(lobbyState, 'state-transition', { passive: true }),
(event: Event) => event as CustomEvent<StateTransitionDetail>
));
export const stateTransitionStream = $state(
Stream.map(
Stream.fromEventListener(lobbyState, 'state-transition', { passive: true }),
(event: Event) => event as CustomEvent<StateTransitionDetail>
)
);
export function subscribeToStateTransitionStream<A>(stream: Stream.Stream<CustomEvent<A>>, onEvent: (event: A) => void): () => void {
export function subscribeToStateTransitionStream<A>(
stream: Stream.Stream<CustomEvent<A>>,
onEvent: (event: A) => void
): () => void {
const fiber = Effect.runFork(
pipe(
stream,
Stream.tap((event) =>
Effect.sync(() => {
onEvent(event.detail);
})
),
Stream.runDrain
)
pipe(
stream,
Stream.tap((event) =>
Effect.sync(() => {
onEvent(event.detail);
})
),
Stream.runDrain
)
);
return () => {
Effect.runFork(
Effect.sync(() => {
(fiber as unknown as { interrupt: () => void }).interrupt();
})
);
Effect.runFork(
Effect.sync(() => {
(fiber as unknown as { interrupt: () => void }).interrupt();
})
);
};
}
}

View File

@ -1,14 +1,17 @@
import { decoder, encoder, wakuNode } from '$lib/waku/waku.svelte';
import { decoder, encoder, lobbyEncoder, wakuNode } from '$lib/waku/waku.svelte';
import { type Message, encodeMessage, decodeMessage } from '@waku/sds';
import type { MatchParams } from './waku.svelte';
import { getOrCreateChannel } from '$lib/sds/channel.svelte';
import { Effect, Schedule, Stream, Option, Queue, Chunk, Ref } from 'effect';
import type { SubscribeResult } from '@waku/sdk';
import type { SDKProtocolResult, SubscribeResult } from '@waku/sdk';
import { messageHash } from '@waku/message-hash';
import { hash } from '$lib/utils/hash';
import { encodeBase64 } from 'effect/Encoding';
import type { MessageChannel } from '@waku/sds';
import { sweepIn, sweepOut } from '$lib/sds.svelte';
import { LobbyMessageType, type LobbyMessage } from './lobby.svelte';
export const listening = $state({ listening: false });
export function send(channel: MessageChannel, payload: Uint8Array) {
return channel.sendMessage(payload, async (message: Message) => {
@ -23,7 +26,6 @@ export function send(channel: MessageChannel, payload: Uint8Array) {
if (result.failures.length > 0) {
console.error('error sending message', result.failures);
}
console.log('sent message over waku', message);
return {
success: true,
retrievalHint: hash
@ -31,10 +33,30 @@ export function send(channel: MessageChannel, payload: Uint8Array) {
});
}
export function start(params: MatchParams) {
console.log('starting pingpong', params);
const broadcastMatchToLobby = (params: MatchParams) =>
Effect.async<SDKProtocolResult, Error>((resume) => {
const m: LobbyMessage = {
messageType: LobbyMessageType.Ongoing,
timestamp: new Date(),
from: params.myPeerId,
to: params.otherPeerId,
match: params
};
wakuNode.node?.lightPush
.send(lobbyEncoder, {
payload: new TextEncoder().encode(JSON.stringify(m)),
timestamp: new Date()
})
.then((result) => resume(Effect.succeed(result)))
.catch((error) => resume(Effect.fail(new Error(error as string))));
});
export function start(params: MatchParams, joined: boolean = false) {
const { matchId } = params;
const first = params.myPeerId.localeCompare(params.otherPeerId) < 0 ? true : false;
const first =
params.myPeerId.localeCompare(params.otherPeerId + (joined ? Math.random() : '')) < 0
? true
: false;
let lastMessage = new Date();
const sinkTimeout = 10_000;
@ -44,7 +66,9 @@ export function start(params: MatchParams) {
// we can predetermine the list of messages we expect to send
const payloadsStream = Stream.make(
...Array.from({ length: params.messages }, (_, i) => {
return new TextEncoder().encode(hash(matchId + params.myPeerId + i));
return new TextEncoder().encode(
hash(matchId + (joined ? Math.random() : params.myPeerId) + i)
);
})
);
const skips = Array.from({ length: params.messages * 2 }, (_, i) =>
@ -85,7 +109,6 @@ export function start(params: MatchParams) {
// Initialize the queue with the payloads based on strategy
const initializeQueue = Effect.gen(function* () {
const sendQueue = Queue.bounded<Option.Option<Uint8Array>>(100);
console.log('initializing queue');
const q = yield* sendQueue;
const result = yield* q.offerAll(yield* payloadsWithSkips);
console.log('queue initialized', result);
@ -94,14 +117,11 @@ export function start(params: MatchParams) {
const takeAndSend = (sendQueue: Queue.Queue<Option.Option<Uint8Array>>) =>
Effect.gen(function* () {
console.log('taking and sending');
const result = yield* sendQueue.take;
console.log('result', result);
return yield* sendEffect(result);
});
const validateMessage = (message: Message) => {
console.log('validating message', message);
if (message.channelId !== channel.channelId) {
console.error('Message is not for this match');
return false;
@ -112,15 +132,12 @@ export function start(params: MatchParams) {
const subscribe = (listenCondition: Ref.Ref<boolean>) =>
Effect.async<SubscribeResult, Error>((resume) => {
try {
console.log('subscribing to filter');
wakuNode.node?.filter
.subscribe([decoder], async (message) => {
const listening = await Effect.runPromise(listenCondition.get);
console.log('listening', listening);
if (!listening) {
return;
}
console.log('received filter message', message);
const hash = encodeBase64(messageHash(encoder.pubsubTopic, message));
const sdsMessage = decodeMessage(message.payload) as unknown as Message;
if (validateMessage(sdsMessage) && !sent.has(hash)) {
@ -140,9 +157,7 @@ export function start(params: MatchParams) {
const sent = new Map<string, boolean>();
const sendSync = Effect.async<boolean, Error>((resume) => {
console.log('sending sync', new Date().getTime() - lastMessage.getTime());
if (new Date().getTime() - lastMessage.getTime() < sinkTimeout + Math.random() * 2000) {
console.log('sink timeout', new Date().getTime() - lastMessage.getTime());
return resume(Effect.succeed(false));
} else {
channel
@ -151,7 +166,7 @@ export function start(params: MatchParams) {
const timestamp = new Date();
const protoMessage = await encoder.toProtoObj({
payload: encodedMessage,
timestamp,
timestamp
});
const hash = encodeBase64(messageHash(encoder.pubsubTopic, protoMessage));
sent.set(hash, true);
@ -159,7 +174,6 @@ export function start(params: MatchParams) {
if (result.failures.length > 0) {
console.error('error sending message', result.failures);
}
console.log('sent message over waku', message);
lastMessage = new Date();
return true;
})
@ -194,11 +208,18 @@ export function start(params: MatchParams) {
// periodically process queue
Effect.repeat(processQueue, Schedule.spaced('200 millis')),
// periodically sweep buffers
Effect.repeat(sweep, Schedule.spaced('500 millis')),
Effect.repeat(sweep, Schedule.spaced('2000 millis')),
// periodically switch off filter to miss messages
Effect.repeat(
Ref.update(listenCondition, (listening) => !listening),
Schedule.spaced(first ? '5000 millis' : `${5000 * 2} millis`)
Ref.update(listenCondition, (_listening) => {
listening.listening = !_listening;
return !_listening;
}),
Schedule.spaced(first ? '4500 millis' : `8000 millis`)
),
Effect.repeat(
broadcastMatchToLobby(params),
Schedule.spaced('2000 millis')
)
],
{

View File

@ -78,7 +78,7 @@ export class WakuNode {
}
return await node.lightPush.send(encoder, {
payload: payload,
timestamp: timestamp,
timestamp: timestamp
});
}
@ -133,10 +133,19 @@ export async function startWaku(): Promise<void> {
await wakuNode.setNode(node);
// Connect to peers
await node.dial(
// '/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ'
'/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm3TLea2NVs4dAqYM2gAgoV9CMKGeD1BkP3RAvmk7HBAbU'
);
const peers = [
// '/ip4/127.0.0.1/tcp/8000/ws/p2p/16Uiu2HAm3TLea2NVs4dAqYM2gAgoV9CMKGeD1BkP3RAvmk7HBAbU',
'/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ',
'/dns4/waku.fryorcraken.xyz/tcp/8000/wss/p2p/16Uiu2HAmMRvhDHrtiHft1FTUYnn6cVA8AWVrTyLUayJJ3MWpUZDB',
'/dns4/ivansete.xyz/tcp/8000/wss/p2p/16Uiu2HAmDAHuJ8w9zgxVnhtFe8otWNJdCewPAerJJPbXJcn8tu4r'
];
for (const peer of peers) {
try {
await node.dial(peer);
} catch (error) {
console.error(`Error dialing peer ${peer}:`, error);
}
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(window as any).waku = node;
connectionState.update((state) => ({
@ -227,7 +236,11 @@ export async function joinLobby(): Promise<MatchParams> {
if (messageData.from === id) {
return;
}
if (messageData.messageType === LobbyMessageType.Ping || messageData.to === id) {
if (
messageData.messageType === LobbyMessageType.Ongoing ||
messageData.messageType === LobbyMessageType.Ping ||
messageData.to === id
) {
processUpdates([{ peerId: messageData.from, message: messageData, sent: false }]);
}
console.log('Received lobby message:', messageData);
@ -344,6 +357,26 @@ export async function joinLobby(): Promise<MatchParams> {
subscription.unsubscribe([lobbyDecoder.contentTopic]);
}
resume(Effect.succeed(params));
} else if (event.newState === PeerState.Ongoing) {
console.log(`Match ongoing with ${event.peerId}`);
// if we sent a match, then start the simulation
// stop responding to lobby messages, as we should now be in a match
if (fiber) {
fiber();
}
if (pingFiber) {
Effect.runFork(Fiber.interrupt(pingFiber));
}
if (subscription) {
subscription.unsubscribe([lobbyDecoder.contentTopic]);
}
if (event.message.match) {
const params: MatchParams = event.message.match;
resume(Effect.succeed(params));
} else {
resume(Effect.fail(new Error('No match found')));
}
}
};

View File

@ -1,7 +1,7 @@
<script lang="ts">
import History from '$lib/components/History.svelte';
import StateGraphSummary from '$lib/components/StateGraphSummary.svelte';
import Missing from '$lib/components/Missing.svelte';
import { getMatch } from '$lib/utils/match.svelte';
import { goto } from '$app/navigation';
import type { MatchParams } from '$lib/waku/waku.svelte';
@ -21,7 +21,10 @@
{#if match}
<div class="main-container">
<!-- History Sidebar -->
<div class="history-panel">
<div class="state-graph-panel">
<div class="panel-header">
<h2>Event History</h2>
</div>
<History channelId={match?.matchId ?? null} />
</div>
@ -29,11 +32,18 @@
<!-- Summary State Graph -->
<div class="state-graph-panel">
<div class="panel-header">
<h2>State Synchronization Visualization</h2>
<h2>Events by Lamport Timestamp</h2>
</div>
<StateGraphSummary channelId={match?.matchId ?? null} />
</div>
</div>
<div class="state-graph-panel">
<div class="panel-header">
<h2>Missed Messages</h2>
</div>
<Missing channelId={match?.matchId ?? null} />
</div>
</div>
{/if}
@ -70,6 +80,7 @@
overflow: auto;
position: relative;
border: 1px solid #e0ddd4;
scrollbar-width: none;
}
.panel-header {