mirror of
https://github.com/logos-blockchain/logos-blockchain-block-explorer-template.git
synced 2026-01-02 13:13:10 +00:00
24 lines
917 B
Python
24 lines
917 B
Python
from datetime import datetime
|
|
from typing import List
|
|
|
|
from starlette.responses import Response
|
|
|
|
from api.utils import streamer
|
|
from core.api import NBERequest, NDJsonStreamingResponse
|
|
from node.models.transactions import Transaction
|
|
from utils.datetime import increment_datetime
|
|
|
|
|
|
async def stream(request: NBERequest) -> Response:
|
|
bootstrap_transactions: List[Transaction] = await request.app.state.transaction_repository.get_latest(
|
|
limit=5, descending=False
|
|
)
|
|
highest_timestamp: datetime = max(
|
|
(transaction.timestamp for transaction in bootstrap_transactions), default=datetime.min
|
|
)
|
|
updates_stream = request.app.state.transaction_repository.updates_stream(
|
|
timestamp_from=increment_datetime(highest_timestamp)
|
|
)
|
|
transaction_stream = streamer(stream=updates_stream, bootstrap_data=bootstrap_transactions)
|
|
return NDJsonStreamingResponse(transaction_stream)
|