Add stream prefetch limit as querystring.

This commit is contained in:
Alejandro Cabeza Romero 2025-10-17 15:33:00 +02:00
parent 79f50919f0
commit 1325799edb
No known key found for this signature in database
GPG Key ID: DA3D14AE478030FD
7 changed files with 177 additions and 214 deletions

View File

@ -18,7 +18,6 @@
- Get transaction by id
- Get block by id
- Block viewer
- htm
- Transaction viewer
- When requesting stream, querystring for number of prefetched blocks
- Show transaction
- htm
- Show transactions in table

View File

@ -1,5 +1,6 @@
from typing import List
from fastapi import Query
from starlette.responses import Response
from api.streams import into_ndjson_stream
@ -7,8 +8,16 @@ from core.api import NBERequest, NDJsonStreamingResponse
from node.models.blocks import Block
async def stream(request: NBERequest) -> Response:
bootstrap_blocks: List[Block] = await request.app.state.block_repository.get_latest(limit=5, ascending=True)
async def _prefetch_blocks(request: NBERequest, prefetch_limit: int) -> List[Block]:
return (
[]
if prefetch_limit == 0 else
await request.app.state.block_repository.get_latest(limit=prefetch_limit, ascending=True)
)
async def stream(request: NBERequest, prefetch_limit: int = Query(0, alias="prefetch-limit", ge=0)) -> Response:
bootstrap_blocks: List[Block] = await _prefetch_blocks(request, prefetch_limit)
highest_slot: int = max((block.slot for block in bootstrap_blocks), default=0)
updates_stream = request.app.state.block_repository.updates_stream(slot_from=highest_slot + 1)
block_stream = into_ndjson_stream(stream=updates_stream, bootstrap_data=bootstrap_blocks)

View File

@ -1,6 +1,7 @@
from datetime import datetime
from typing import List
from fastapi import Query
from starlette.responses import Response
from api.streams import into_ndjson_stream
@ -9,10 +10,16 @@ from node.models.transactions import Transaction
from utils.datetime import increment_datetime
async def stream(request: NBERequest) -> Response:
bootstrap_transactions: List[Transaction] = await request.app.state.transaction_repository.get_latest(
limit=5, descending=False
async def _prefetch_transactions(request: NBERequest, prefetch_limit: int) -> List[Transaction]:
return (
[]
if prefetch_limit == 0 else
await request.app.state.transaction_repository.get_latest(limit=prefetch_limit, descending=False)
)
async def stream(request: NBERequest, prefetch_limit: int = Query(0, alias="prefetch-limit", ge=0)) -> Response:
bootstrap_transactions: List[Transaction] = await _prefetch_transactions(request, prefetch_limit)
highest_timestamp: datetime = max(
(transaction.timestamp for transaction in bootstrap_transactions), default=datetime.min
)

View File

@ -1,106 +1,86 @@
import { h } from 'preact';
import { useEffect, useRef } from 'preact/hooks';
import { BLOCKS_ENDPOINT, TABLE_SIZE } from '../lib/api.js';
import { streamNdjson, ensureFixedRowCount, shortenHex, formatTimestamp } from '../lib/utils.js';
const COLUMN_COUNT = 5;
import { BLOCKS_ENDPOINT, TABLE_SIZE } from '../lib/api.js?dev=1';
import {streamNdjson, ensureFixedRowCount, shortenHex, formatTimestamp, withBenignFilter} from '../lib/utils.js?dev=1';
export default function BlocksTable() {
const tbodyRef = useRef(null);
const counterRef = useRef(null);
const controllerRef = useRef(null);
const countRef = useRef(null);
const abortRef = useRef(null);
const seenKeysRef = useRef(new Set());
useEffect(() => {
const tbody = tbodyRef.current;
const counter = counterRef.current;
const counter = countRef.current;
ensureFixedRowCount(tbody, 5, TABLE_SIZE);
ensureFixedRowCount(tbody, COLUMN_COUNT, TABLE_SIZE);
abortRef.current?.abort();
abortRef.current = new AbortController();
controllerRef.current?.abort();
controllerRef.current = new AbortController();
function updateCounter() {
let realRows = 0;
for (const row of tbody.rows) {
if (!row.classList.contains('ph')) realRows++;
}
counter.textContent = String(realRows);
}
function removePlaceholders() {
function pruneAndPad() {
// remove placeholders
for (let i = tbody.rows.length - 1; i >= 0; i--) {
if (tbody.rows[i].classList.contains('ph')) tbody.deleteRow(i);
}
}
function trimToTableSize() {
// count real rows
let realRows = 0;
for (const row of tbody.rows) {
if (!row.classList.contains('ph')) realRows++;
}
// drop rows beyond limit, and forget their keys
while (realRows > TABLE_SIZE) {
// trim overflow
while ([...tbody.rows].filter((r) => !r.classList.contains('ph')).length > TABLE_SIZE) {
const last = tbody.rows[tbody.rows.length - 1];
const key = last?.dataset?.key;
if (key) seenKeysRef.current.delete(key);
tbody.deleteRow(-1);
realRows--;
}
// pad placeholders
const real = [...tbody.rows].filter((r) => !r.classList.contains('ph')).length;
ensureFixedRowCount(tbody, 5, TABLE_SIZE);
counter.textContent = String(real);
}
function makeLink(href, text, title) {
const anchor = document.createElement('a');
anchor.className = 'linkish mono';
anchor.href = href;
if (title) anchor.title = title;
anchor.textContent = text;
return anchor;
}
const makeLink = (href, text, title) => {
const a = document.createElement('a');
a.className = 'linkish mono';
a.href = href;
if (title) a.title = title;
a.textContent = text;
return a;
};
function appendRow(block, key) {
const appendRow = (block, key) => {
const row = document.createElement('tr');
row.dataset.key = key;
const slotCell = document.createElement('td');
const slotSpan = document.createElement('span');
slotSpan.className = 'mono';
slotSpan.textContent = String(block.slot);
slotCell.appendChild(slotSpan);
const cellSlot = document.createElement('td');
const spanSlot = document.createElement('span');
spanSlot.className = 'mono';
spanSlot.textContent = String(block.slot);
cellSlot.appendChild(spanSlot);
const rootCell = document.createElement('td');
rootCell.appendChild(makeLink(`/block/${block.root}`, shortenHex(block.root), block.root));
const cellRoot = document.createElement('td');
cellRoot.appendChild(makeLink(`/block/${block.root}`, shortenHex(block.root), block.root));
const parentCell = document.createElement('td');
parentCell.appendChild(makeLink(`/block/${block.parent}`, shortenHex(block.parent), block.parent));
const cellParent = document.createElement('td');
cellParent.appendChild(makeLink(`/block/${block.parent}`, shortenHex(block.parent), block.parent));
const countCell = document.createElement('td');
const countSpan = document.createElement('span');
countSpan.className = 'mono';
countSpan.textContent = String(block.transactionCount);
countCell.appendChild(countSpan);
const cellTxCount = document.createElement('td');
const spanTx = document.createElement('span');
spanTx.className = 'mono';
spanTx.textContent = String(block.transactionCount);
cellTxCount.appendChild(spanTx);
const timeCell = document.createElement('td');
const timeSpan = document.createElement('span');
timeSpan.className = 'mono';
timeSpan.title = block.time ?? '';
timeSpan.textContent = formatTimestamp(block.time);
timeCell.appendChild(timeSpan);
const cellTime = document.createElement('td');
const spanTime = document.createElement('span');
spanTime.className = 'mono';
spanTime.title = block.time ?? '';
spanTime.textContent = formatTimestamp(block.time);
cellTime.appendChild(spanTime);
row.append(slotCell, rootCell, parentCell, countCell, timeCell);
row.append(cellSlot, cellRoot, cellParent, cellTxCount, cellTime);
tbody.insertBefore(row, tbody.firstChild);
pruneAndPad();
};
// housekeeping
removePlaceholders();
trimToTableSize();
ensureFixedRowCount(tbody, COLUMN_COUNT, TABLE_SIZE);
updateCounter();
}
function normalizeBlock(raw) {
const normalize = (raw) => {
const header = raw.header ?? raw;
const createdAt = raw.created_at ?? raw.header?.created_at ?? null;
const created = raw.created_at ?? raw.header?.created_at ?? null;
return {
id: Number(raw.id ?? 0),
slot: Number(header?.slot ?? raw.slot ?? 0),
@ -109,39 +89,37 @@ export default function BlocksTable() {
transactionCount: Array.isArray(raw.transactions)
? raw.transactions.length
: typeof raw.transaction_count === 'number'
? raw.transaction_count
: 0,
time: createdAt,
? raw.transaction_count
: 0,
time: created,
};
}
};
const url = `${BLOCKS_ENDPOINT}?prefetch-limit=${encodeURIComponent(TABLE_SIZE)}`;
streamNdjson(
BLOCKS_ENDPOINT,
url,
(raw) => {
const block = normalizeBlock(raw);
const block = normalize(raw);
const key = `${block.slot}:${block.id}`;
if (seenKeysRef.current.has(key)) {
// still keep placeholders consistent and counter fresh
removePlaceholders();
trimToTableSize();
ensureFixedRowCount(tbody, COLUMN_COUNT, TABLE_SIZE);
updateCounter();
pruneAndPad();
return;
}
seenKeysRef.current.add(key);
appendRow(block, key);
},
{
signal: controllerRef.current.signal,
onError: (err) => {
if (!controllerRef.current.signal.aborted) {
console.error('Blocks stream error:', err);
}
},
signal: abortRef.current.signal,
onError: withBenignFilter(
(e) => console.error('Blocks stream error:', e),
abortRef.current.signal
)
},
);
).catch((err) => {
if (!abortRef.current.signal.aborted) console.error('Blocks stream error:', err);
});
return () => controllerRef.current?.abort();
return () => abortRef.current?.abort();
}, []);
return h(
@ -150,8 +128,8 @@ export default function BlocksTable() {
h(
'div',
{ class: 'card-header' },
h('div', null, h('strong', null, 'Blocks '), h('span', { class: 'pill', ref: counterRef }, '0')),
h('div', { style: 'color:var(--muted); font-size:12px;' }, BLOCKS_ENDPOINT),
h('div', null, h('strong', null, 'Blocks '), h('span', { class: 'pill', ref: countRef }, '0')),
h('div', { style: 'color:var(--muted); font-size:12px;' }, '/api/v1/blocks/stream'),
),
h(
'div',
@ -171,15 +149,7 @@ export default function BlocksTable() {
h(
'thead',
null,
h(
'tr',
null,
h('th', null, 'Slot'),
h('th', null, 'Block Root'),
h('th', null, 'Parent'),
h('th', null, 'Transactions'),
h('th', null, 'Time'),
),
h('tr', null, h('th', null, 'Slot'), h('th', null, 'Block Root'), h('th', null, 'Parent'), h('th', null, 'Transactions'), h('th', null, 'Time')),
),
h('tbody', { ref: tbodyRef }),
),

View File

@ -1,7 +1,7 @@
import { h } from 'preact';
import { useEffect, useRef, useState } from 'preact/hooks';
import { HEALTH_ENDPOINT } from '../lib/api.js';
import { streamNdjson } from '../lib/utils.js';
import {streamNdjson, withBenignFilter} from '../lib/utils.js';
const STATUS = {
CONNECTING: 'connecting',
@ -12,7 +12,7 @@ const STATUS = {
export default function HealthPill() {
const [status, setStatus] = useState(STATUS.CONNECTING);
const pillRef = useRef(null);
const controllerRef = useRef(null);
const abortRef = useRef(null);
// Flash animation whenever status changes
useEffect(() => {
@ -24,8 +24,8 @@ export default function HealthPill() {
}, [status]);
useEffect(() => {
controllerRef.current?.abort();
controllerRef.current = new AbortController();
abortRef.current?.abort();
abortRef.current = new AbortController();
streamNdjson(
HEALTH_ENDPOINT,
@ -35,18 +35,20 @@ export default function HealthPill() {
}
},
{
signal: controllerRef.current.signal,
onStart: () => setStatus(STATUS.CONNECTING),
onError: (err) => {
if (!controllerRef.current.signal.aborted) {
console.error('Health stream error:', err);
setStatus(STATUS.OFFLINE);
}
},
signal: abortRef.current.signal,
onError: withBenignFilter(
(err) => {
if (!abortRef.current.signal.aborted) {
console.error('Health stream error:', err);
setStatus(STATUS.OFFLINE);
}
},
abortRef.current.signal
),
},
);
return () => controllerRef.current?.abort();
return () => abortRef.current?.abort();
}, []);
const className = 'pill ' + (status === STATUS.ONLINE ? 'online' : status === STATUS.OFFLINE ? 'offline' : '');

View File

@ -1,99 +1,77 @@
import { h } from 'preact';
import { useEffect, useRef } from 'preact/hooks';
import { TRANSACTIONS_ENDPOINT, TABLE_SIZE } from '../lib/api.js?dev=1';
import { streamNdjson, ensureFixedRowCount, shortenHex, formatTimestamp } from '../lib/utils.js?dev=1';
import {streamNdjson, ensureFixedRowCount, shortenHex, formatTimestamp, withBenignFilter} from '../lib/utils.js?dev=1';
export default function TransactionsTable() {
const tableBodyRef = useRef(null);
const totalCountPillRef = useRef(null);
const abortControllerRef = useRef(null);
const totalStreamedCountRef = useRef(0);
const tbodyRef = useRef(null);
const countRef = useRef(null);
const abortRef = useRef(null);
const totalCountRef = useRef(0);
useEffect(() => {
const tableBody = tableBodyRef.current;
const totalCountPill = totalCountPillRef.current;
const PLACEHOLDER_CLASS = 'ph';
const tbody = tbodyRef.current;
const counter = countRef.current;
ensureFixedRowCount(tbody, 4, TABLE_SIZE);
ensureFixedRowCount(tableBody, 4, TABLE_SIZE);
abortRef.current?.abort();
abortRef.current = new AbortController();
abortControllerRef.current?.abort();
abortControllerRef.current = new AbortController();
const createSpan = (className, textContent, title) => {
const element = document.createElement('span');
if (className) element.className = className;
if (title) element.title = title;
element.textContent = textContent;
return element;
const makeSpan = (className, text, title) => {
const s = document.createElement('span');
if (className) s.className = className;
if (title) s.title = title;
s.textContent = text;
return s;
};
const makeLink = (href, text, title) => {
const a = document.createElement('a');
a.className = 'linkish mono';
a.href = href;
if (title) a.title = title;
a.textContent = text;
return a;
};
const createLink = (href, textContent, title) => {
const element = document.createElement('a');
element.className = 'linkish mono';
element.href = href;
if (title) element.title = title;
element.textContent = textContent;
return element;
};
const url = `${TRANSACTIONS_ENDPOINT}?prefetch-limit=${encodeURIComponent(TABLE_SIZE)}`;
streamNdjson(
url,
(t) => {
const row = document.createElement('tr');
const countNonPlaceholderRows = () =>
[...tableBody.rows].filter((row) => !row.classList.contains(PLACEHOLDER_CLASS)).length;
const cellHash = document.createElement('td');
cellHash.appendChild(makeLink(`/transaction/${t.hash ?? ''}`, shortenHex(t.hash ?? ''), t.hash ?? ''));
const appendTransactionRow = (transaction) => {
// Trim one placeholder from the end to keep height stable
const lastRow = tableBody.rows[tableBody.rows.length - 1];
if (lastRow?.classList.contains(PLACEHOLDER_CLASS)) tableBody.deleteRow(-1);
const cellFromTo = document.createElement('td');
cellFromTo.appendChild(makeSpan('mono', shortenHex(t.sender ?? ''), t.sender ?? ''));
cellFromTo.appendChild(document.createTextNode(' \u2192 '));
cellFromTo.appendChild(makeSpan('mono', shortenHex(t.recipient ?? ''), t.recipient ?? ''));
const row = document.createElement('tr');
const cellAmount = document.createElement('td');
cellAmount.className = 'amount';
cellAmount.textContent = Number(t.amount ?? 0).toLocaleString(undefined, { maximumFractionDigits: 8 });
const cellHash = document.createElement('td');
cellHash.appendChild(
createLink(
`/transaction/${transaction.hash ?? ''}`,
shortenHex(transaction.hash ?? ''),
transaction.hash ?? '',
),
);
const cellTime = document.createElement('td');
const spanTime = makeSpan('mono', formatTimestamp(t.timestamp), t.timestamp ?? '');
cellTime.appendChild(spanTime);
const cellSenderRecipient = document.createElement('td');
cellSenderRecipient.appendChild(
createSpan('mono', shortenHex(transaction.sender ?? ''), transaction.sender ?? ''),
);
cellSenderRecipient.appendChild(document.createTextNode(' \u2192 '));
cellSenderRecipient.appendChild(
createSpan('mono', shortenHex(transaction.recipient ?? ''), transaction.recipient ?? ''),
);
const cellAmount = document.createElement('td');
cellAmount.className = 'amount';
const amount = Number(transaction.amount ?? 0);
cellAmount.textContent = Number.isFinite(amount)
? amount.toLocaleString(undefined, { maximumFractionDigits: 8 })
: '—';
const cellTime = document.createElement('td');
cellTime.appendChild(
createSpan('mono', formatTimestamp(transaction.timestamp), transaction.timestamp ?? ''),
);
row.append(cellHash, cellSenderRecipient, cellAmount, cellTime);
tableBody.insertBefore(row, tableBody.firstChild);
// Trim to TABLE_SIZE (counting only non-placeholder rows)
while (countNonPlaceholderRows() > TABLE_SIZE) tableBody.deleteRow(-1);
totalCountPill.textContent = String(++totalStreamedCountRef.current);
};
streamNdjson(TRANSACTIONS_ENDPOINT, (transaction) => appendTransactionRow(transaction), {
signal: abortControllerRef.current.signal,
}).catch((error) => {
if (!abortControllerRef.current.signal.aborted) {
console.error('Transactions stream error:', error);
}
row.append(cellHash, cellFromTo, cellAmount, cellTime);
tbody.insertBefore(row, tbody.firstChild);
while (tbody.rows.length > TABLE_SIZE) tbody.deleteRow(-1);
counter.textContent = String(++totalCountRef.current);
},
{
signal: abortRef.current.signal,
onError: withBenignFilter(
(e) => console.error('Transaction stream error:', e),
abortRef.current.signal
)
},
).catch((err) => {
if (!abortRef.current.signal.aborted) console.error('Transactions stream error:', err);
});
return () => abortControllerRef.current?.abort();
return () => abortRef.current?.abort();
}, []);
return h(
@ -102,12 +80,7 @@ export default function TransactionsTable() {
h(
'div',
{ class: 'card-header' },
h(
'div',
null,
h('strong', null, 'Transactions '),
h('span', { class: 'pill', ref: totalCountPillRef }, '0'),
),
h('div', null, h('strong', null, 'Transactions '), h('span', { class: 'pill', ref: countRef }, '0')),
h('div', { style: 'color:var(--muted); font-size:12px;' }, '/api/v1/transactions/stream'),
),
h(
@ -127,16 +100,9 @@ export default function TransactionsTable() {
h(
'thead',
null,
h(
'tr',
null,
h('th', null, 'Hash'),
h('th', null, 'From → To'),
h('th', null, 'Amount'),
h('th', null, 'Time'),
),
h('tr', null, h('th', null, 'Hash'), h('th', null, 'From → To'), h('th', null, 'Amount'), h('th', null, 'Time')),
),
h('tbody', { ref: tableBodyRef }),
h('tbody', { ref: tbodyRef }),
),
),
);

View File

@ -1,3 +1,13 @@
export const isBenignStreamError = (error, signal) => {
return false;
};
export const withBenignFilter =
(onError, signal) =>
(error) => {
if (!isBenignStreamError(error, signal)) onError?.(error);
};
export async function streamNdjson(url, handleItem, { signal, onError = () => {} } = {}) {
const response = await fetch(url, {
headers: { accept: 'application/x-ndjson' },