2025-10-08 20:06:46 +05:30
{. push raises : [ ] . }
import
std / [ options ] ,
chronos ,
chronicles ,
metrics ,
results ,
eth / keys ,
eth / p2p / discoveryv5 / enr ,
libp2p / crypto / crypto ,
libp2p / protocols / ping ,
libp2p / protocols / pubsub / gossipsub ,
libp2p / protocols / pubsub / rpc / messages ,
libp2p / builders ,
libp2p / transports / tcptransport ,
libp2p / transports / wstransport ,
libp2p / utility
import
.. / waku_node ,
.. / .. / waku_core ,
.. / .. / waku_store_legacy / protocol as legacy_store ,
.. / .. / waku_store_legacy / client as legacy_store_client ,
.. / .. / waku_store_legacy / common as legacy_store_common ,
.. / .. / waku_store / protocol as store ,
.. / .. / waku_store / client as store_client ,
.. / .. / waku_store / common as store_common ,
.. / .. / waku_store / resume ,
.. / peer_manager ,
.. / .. / common / rate_limit / setting ,
.. / .. / waku_archive ,
.. / .. / waku_archive_legacy
logScope :
topics = " waku node store api "
## Waku archive
proc mountArchive * (
node : WakuNode ,
driver : waku_archive . ArchiveDriver ,
retentionPolicy = none ( waku_archive . RetentionPolicy ) ,
) : Result [ void , string ] =
node . wakuArchive = waku_archive . WakuArchive . new (
driver = driver , retentionPolicy = retentionPolicy
) . valueOr :
return err ( " error in mountArchive: " & error )
node . wakuArchive . start ( )
return ok ( )
proc mountLegacyArchive * (
node : WakuNode , driver : waku_archive_legacy . ArchiveDriver
) : Result [ void , string ] =
node . wakuLegacyArchive = waku_archive_legacy . WakuArchive . new ( driver = driver ) . valueOr :
return err ( " error in mountLegacyArchive: " & error )
return ok ( )
## Legacy Waku Store
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toArchiveQuery (
request : legacy_store_common . HistoryQuery
) : waku_archive_legacy . ArchiveQuery =
waku_archive_legacy . ArchiveQuery (
pubsubTopic : request . pubsubTopic ,
contentTopics : request . contentTopics ,
cursor : request . cursor . map (
proc ( cursor : HistoryCursor ) : waku_archive_legacy . ArchiveCursor =
waku_archive_legacy . ArchiveCursor (
pubsubTopic : cursor . pubsubTopic ,
senderTime : cursor . senderTime ,
storeTime : cursor . storeTime ,
digest : cursor . digest ,
)
) ,
startTime : request . startTime ,
endTime : request . endTime ,
pageSize : request . pageSize . uint ,
direction : request . direction ,
requestId : request . requestId ,
)
# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toHistoryResult * (
res : waku_archive_legacy . ArchiveResult
) : legacy_store_common . HistoryResult =
2025-10-27 14:07:06 -03:00
let response = res . valueOr :
case error . kind
2025-10-08 20:06:46 +05:30
of waku_archive_legacy . ArchiveErrorKind . DRIVER_ERROR ,
waku_archive_legacy . ArchiveErrorKind . INVALID_QUERY :
2025-10-27 14:07:06 -03:00
return err ( HistoryError ( kind : HistoryErrorKind . BAD_REQUEST , cause : error . cause ) )
2025-10-08 20:06:46 +05:30
else :
2025-10-27 14:07:06 -03:00
return err ( HistoryError ( kind : HistoryErrorKind . UNKNOWN ) )
return ok (
HistoryResponse (
messages : response . messages ,
cursor : response . cursor . map (
proc ( cursor : waku_archive_legacy . ArchiveCursor ) : HistoryCursor =
HistoryCursor (
pubsubTopic : cursor . pubsubTopic ,
senderTime : cursor . senderTime ,
storeTime : cursor . storeTime ,
digest : cursor . digest ,
)
) ,
2025-10-08 20:06:46 +05:30
)
2025-10-27 14:07:06 -03:00
)
2025-10-08 20:06:46 +05:30
proc mountLegacyStore * (
node : WakuNode , rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {. async . } =
info " mounting waku legacy store protocol "
if node . wakuLegacyArchive . isNil ( ) :
error " failed to mount waku legacy store protocol " , error = " waku archive not set "
return
# TODO: Review this handler logic. Maybe, move it to the appplication code
let queryHandler : HistoryQueryHandler = proc (
request : HistoryQuery
) : Future [ legacy_store_common . HistoryResult ] {. async . } =
if request . cursor . isSome ( ) :
2025-10-27 14:07:06 -03:00
? request . cursor . get ( ) . checkHistCursor ( )
2025-10-08 20:06:46 +05:30
let request = request . toArchiveQuery ( )
let response = await node . wakuLegacyArchive . findMessagesV2 ( request )
return response . toHistoryResult ( )
node . wakuLegacyStore = legacy_store . WakuStore . new (
node . peerManager , node . rng , queryHandler , some ( rateLimit )
)
if node . started :
# Node has started already. Let's start store too.
await node . wakuLegacyStore . start ( )
node . switch . mount (
node . wakuLegacyStore , protocolMatcher ( legacy_store_common . WakuLegacyStoreCodec )
)
proc mountLegacyStoreClient * ( node : WakuNode ) =
info " mounting legacy store client "
node . wakuLegacyStoreClient =
legacy_store_client . WakuStoreClient . new ( node . peerManager , node . rng )
proc query * (
node : WakuNode , query : legacy_store_common . HistoryQuery , peer : RemotePeerInfo
) : Future [ legacy_store_common . WakuStoreResult [ legacy_store_common . HistoryResponse ] ] {.
async , gcsafe
. } =
## Queries known nodes for historical messages
if node . wakuLegacyStoreClient . isNil ( ) :
return err ( " waku legacy store client is nil " )
2025-10-27 14:07:06 -03:00
let response = ( await node . wakuLegacyStoreClient . query ( query , peer ) ) . valueOr :
return err ( " legacy store client query error: " & $ error )
2025-10-08 20:06:46 +05:30
return ok ( response )
# TODO: Move to application module (e.g., wakunode2.nim)
proc query * (
node : WakuNode , query : legacy_store_common . HistoryQuery
) : Future [ legacy_store_common . WakuStoreResult [ legacy_store_common . HistoryResponse ] ] {.
async , gcsafe , deprecated : " Use ' node.query() ' with peer destination instead "
. } =
## Queries known nodes for historical messages
if node . wakuLegacyStoreClient . isNil ( ) :
return err ( " waku legacy store client is nil " )
let peerOpt = node . peerManager . selectPeer ( legacy_store_common . WakuLegacyStoreCodec )
if peerOpt . isNone ( ) :
error " no suitable remote peers "
return err ( " peer_not_found_failure " )
return await node . query ( query , peerOpt . get ( ) )
when defined ( waku_exp_store_resume ) :
# TODO: Move to application module (e.g., wakunode2.nim)
proc resume * (
node : WakuNode , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] )
) {. async , gcsafe . } =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
## messages are stored in the wakuStore's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
if node . wakuLegacyStoreClient . isNil ( ) :
return
2025-10-27 14:07:06 -03:00
let retrievedMessages = ( await node . wakuLegacyStoreClient . resume ( peerList ) ) . valueOr :
error " failed to resume store " , error = error
2025-10-08 20:06:46 +05:30
return
info " the number of retrieved messages since the last online time: " ,
number = retrievedMessages . value
## Waku Store
proc toArchiveQuery ( request : StoreQueryRequest ) : waku_archive . ArchiveQuery =
var query = waku_archive . ArchiveQuery ( )
query . includeData = request . includeData
query . pubsubTopic = request . pubsubTopic
query . contentTopics = request . contentTopics
query . startTime = request . startTime
query . endTime = request . endTime
query . hashes = request . messageHashes
query . cursor = request . paginationCursor
query . direction = request . paginationForward
query . requestId = request . requestId
if request . paginationLimit . isSome ( ) :
query . pageSize = uint ( request . paginationLimit . get ( ) )
return query
proc toStoreResult ( res : waku_archive . ArchiveResult ) : StoreQueryResult =
let response = res . valueOr :
return err ( StoreError . new ( 300 , " archive error: " & $ error ) )
var res = StoreQueryResponse ( )
res . statusCode = 200
res . statusDesc = " OK "
for i in 0 .. < response . hashes . len :
let hash = response . hashes [ i ]
let kv = store_common . WakuMessageKeyValue ( messageHash : hash )
res . messages . add ( kv )
for i in 0 .. < response . messages . len :
res . messages [ i ] . message = some ( response . messages [ i ] )
res . messages [ i ] . pubsubTopic = some ( response . topics [ i ] )
res . paginationCursor = response . cursor
return ok ( res )
proc mountStore * (
node : WakuNode , rateLimit : RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {. async . } =
if node . wakuArchive . isNil ( ) :
error " failed to mount waku store protocol " , error = " waku archive not set "
return
info " mounting waku store protocol "
let requestHandler : StoreQueryRequestHandler = proc (
request : StoreQueryRequest
) : Future [ StoreQueryResult ] {. async . } =
let request = request . toArchiveQuery ( )
let response = await node . wakuArchive . findMessages ( request )
return response . toStoreResult ( )
node . wakuStore =
store . WakuStore . new ( node . peerManager , node . rng , requestHandler , some ( rateLimit ) )
if node . started :
await node . wakuStore . start ( )
node . switch . mount ( node . wakuStore , protocolMatcher ( store_common . WakuStoreCodec ) )
proc mountStoreClient * ( node : WakuNode ) =
info " mounting store client "
node . wakuStoreClient = store_client . WakuStoreClient . new ( node . peerManager , node . rng )
proc query * (
node : WakuNode , request : store_common . StoreQueryRequest , peer : RemotePeerInfo
) : Future [ store_common . WakuStoreResult [ store_common . StoreQueryResponse ] ] {.
async , gcsafe
. } =
## Queries known nodes for historical messages
if node . wakuStoreClient . isNil ( ) :
return err ( " waku store v3 client is nil " )
let response = ( await node . wakuStoreClient . query ( request , peer ) ) . valueOr :
var res = StoreQueryResponse ( )
res . statusCode = uint32 ( error . kind )
res . statusDesc = $ error
return ok ( res )
return ok ( response )
proc setupStoreResume * ( node : WakuNode ) =
node . wakuStoreResume = StoreResume . new (
node . peerManager , node . wakuArchive , node . wakuStoreClient
) . valueOr :
error " Failed to setup Store Resume " , error = $ error
return