Implementing requeststore

This commit is contained in:
thatben 2025-03-19 15:25:17 +01:00
parent eaeddb5a28
commit de22684ff3
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
8 changed files with 295 additions and 3 deletions

View File

@ -0,0 +1,157 @@
import std/os
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../types
import ../component
import ../state
import ../utils/datastoreutils
import ../utils/asyncdataevent
import ../services/clock
const requeststoreName = "requeststore"
logScope:
topics = "requeststore"
type
RequestEntry* = object
id*: Rid
lastSeen*: uint64
OnRequestEntry* =
proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.}
RequestStore* = ref object of Component
state: State
store: TypedDatastore
clock: Clock
proc `$`*(entry: RequestEntry): string =
$entry.id
proc toBytes*(entry: RequestEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastSeen)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastSeen: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastSeen).isErr:
return failure("Unable to decode `lastSeen`")
return success(RequestEntry(id: Rid.fromStr(idStr), lastSeen: lastSeen))
proc encode*(e: RequestEntry): seq[byte] =
e.toBytes()
proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(RequestEntry(lastSeen: 0))
return RequestEntry.fromBytes(bytes)
proc update*(s: RequestStore, rid: Rid): Future[?!void] {.async.} =
without key =? Key.init(requeststoreName / $rid), err:
error "failed to format key", err = err.msg
return failure(err)
let entry = RequestEntry(id: rid, lastSeen: s.clock.now)
?await s.store.put(key, entry)
trace "Request entry updated", id = $rid
return success()
proc remove*(s: RequestStore, rid: Rid): Future[?!void] {.async.} =
without key =? Key.init(requeststoreName / $rid), err:
error "failed to format key", err = err.msg
return failure(err)
?await s.store.delete(key)
trace "Request entry removed", id = $rid
return success()
# proc storeNodeIsNew(s: RequestStore, nid: Nid): Future[?!bool] {.async.} =
# without key =? Key.init(requeststoreName / $nid), err:
# error "failed to format key", err = err.msg
# return failure(err)
# without exists =? (await s.store.has(key)), err:
# error "failed to check store for key", err = err.msg
# return failure(err)
# if not exists:
# let entry = RequestEntry(id: nid, lastVisit: 0, firstInactive: 0)
# ?await s.store.put(key, entry)
# info "New node", nodeId = $nid
# return success(not exists)
# proc deleteEntry(s: RequestStore, nid: Nid): Future[?!bool] {.async.} =
# without key =? Key.init(requeststoreName / $nid), err:
# error "failed to format key", err = err.msg
# return failure(err)
# without exists =? (await s.store.has(key)), err:
# error "failed to check store for key", err = err.msg, key = $key
# return failure(err)
# if exists:
# ?await s.store.delete(key)
# return success(exists)
# method iterateAll*(
# s: RequestStore, onNode: OnRequestEntry
# ): Future[?!void] {.async: (raises: []), base.} =
# without queryKey =? Key.init(requeststoreName), err:
# error "failed to format key", err = err.msg
# return failure(err)
# try:
# without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err:
# error "failed to create query", err = err.msg
# return failure(err)
# while not iter.finished:
# without item =? (await iter.next()), err:
# error "failure during query iteration", err = err.msg
# return failure(err)
# without value =? item.value, err:
# error "failed to get value from iterator", err = err.msg
# return failure(err)
# if value.lastSeen > 0:
# ?await onNode(value)
# await sleepAsync(1.millis)
# except CatchableError as exc:
# return failure(exc.msg)
# return success()
method start*(s: RequestStore): Future[?!void] {.async.} =
info "Starting..."
return success()
method stop*(s: RequestStore): Future[?!void] {.async.} =
return success()
proc new*(
T: type RequestStore, state: State, store: TypedDatastore, clock: Clock
): RequestStore =
RequestStore(state: state, store: store, clock: clock)
proc createRequestStore*(state: State, clock: Clock): ?!RequestStore =
without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err:
error "Failed to create typed datastore for request store", err = err.msg
return failure(err)
return success(RequestStore.new(state, ds, clock))

View File

@ -28,6 +28,16 @@ method getRecentSlotFillEvents*(
return failure("MarketplaceService is not started")
method start*(m: MarketplaceService): Future[?!void] {.async.} =
# Todo:
# - subscribe to requestSubmitted -> add id to list
# - queryPastStorageRequestedEvents from 3 months ago (max duration) -> add ids to list
# for list:
# - get status of request
# if running:
# - sum total bytes
# else:
# - remove from list
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")

View File

@ -558,9 +558,15 @@ proc queryPastStorageRequestedEvents*(
): Future[seq[StorageRequested]] {.async.} =
convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
return await market.queryPastStorageRequestedEvents(fromBlock)
proc queryPastStorageRequestedEvents*(
market: OnChainMarket, fromTime: int64
): Future[seq[StorageRequested]] {.async.} =
convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
return await market.queryPastStorageRequestedEvents(BlockTag.init(fromBlock))
proc slotCollateral*(
market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState
): ?!UInt256 {.raises: [].} =

View File

@ -4,7 +4,13 @@ import pkg/questionable/results
import pkg/codexdht
import pkg/libp2p
type Nid* = NodeId
import ./services/marketplace/requests
export requests
type
Nid* = NodeId
Rid* = requests.RequestId
proc `$`*(nid: Nid): string =
nid.toHex()
@ -12,6 +18,9 @@ proc `$`*(nid: Nid): string =
proc fromStr*(T: type Nid, s: string): Nid =
Nid(UInt256.fromHex(s))
proc fromStr*(T: type Rid, s: string): Rid =
Rid(requests.RequestId.fromHex(s))
proc toBytes*(nid: Nid): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $nid)

View File

@ -0,0 +1,89 @@
import std/os
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import pkg/datastore/typedds
import ../../../codexcrawler/components/requeststore
import ../../../codexcrawler/utils/datastoreutils
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mockclock
import ../helpers
suite "Requeststore":
let
dsPath = getTempDir() / "testds"
requeststoreName = "requeststore"
var
ds: TypedDatastore
state: MockState
clock: MockClock
store: RequestStore
setup:
ds = createTypedDatastore(dsPath).tryGet()
state = createMockState()
clock = createMockClock()
store = RequestStore.new(state, ds, clock)
(await store.start()).tryGet()
teardown:
(await store.stop()).tryGet()
(await ds.close()).tryGet()
state.checkAllUnsubscribed()
removeDir(dsPath)
test "requestEntry encoding":
let entry = RequestEntry(id: genRid(), lastSeen: 123.uint64)
let
bytes = entry.encode()
decoded = RequestEntry.decode(bytes).tryGet()
check:
entry.id == decoded.id
entry.lastSeen == decoded.lastSeen
test "update stores a new requestId with current time":
let rid = genRid()
(await store.update(rid)).tryGet()
let
key = Key.init(requeststoreName / $rid).tryGet()
stored = (await get[RequestEntry](ds, key)).tryGet()
check:
stored.id == rid
stored.lastSeen == clock.setNow
test "update updates the current time of an existing requestId with current time":
let rid = genRid()
(await store.update(rid)).tryGet()
clock.setNow = 1234
(await store.update(rid)).tryGet()
let
key = Key.init(requeststoreName / $rid).tryGet()
stored = (await get[RequestEntry](ds, key)).tryGet()
check:
stored.id == rid
stored.lastSeen == clock.setNow
test "remove will remove an entry":
let rid = genRid()
(await store.update(rid)).tryGet()
(await store.remove(rid)).tryGet()
let
key = Key.init(requeststoreName / $rid).tryGet()
isStored = (await ds.has(key)).tryGet()
check:
isStored == false

View File

@ -1,6 +1,26 @@
import std/random
import std/sequtils
import std/typetraits
import pkg/stint
import pkg/stew/byteutils
import ../../codexcrawler/types
proc example*[T: SomeInteger](_: type T): T =
rand(T)
proc example*[T, N](_: type array[N, T]): array[N, T] =
for item in result.mitems:
item = T.example
proc example*(_: type UInt256): UInt256 =
UInt256.fromBytes(array[32, byte].example)
proc example*[T: distinct](_: type T): T =
type baseType = T.distinctBase
T(baseType.example)
proc genNid*(): Nid =
Nid(rand(uint64).u256)
proc genRid*(): Rid =
Rid(array[32, byte].example)

View File

@ -7,4 +7,4 @@ method now*(clock: MockClock): uint64 {.raises: [].} =
clock.setNow
proc createMockClock*(): MockClock =
MockClock()
MockClock(setNow: 12)

View File

@ -2,6 +2,7 @@ import ./components/testchainmetrics
import ./components/testcrawler
import ./components/testdhtmetrics
import ./components/testnodestore
import ./components/testrequeststore
import ./components/testtimetracker
import ./components/testtodolist