From de22684ff3144551e62e6fd315d1816c4feb5435 Mon Sep 17 00:00:00 2001 From: thatben Date: Wed, 19 Mar 2025 15:25:17 +0100 Subject: [PATCH] Implementing requeststore --- codexcrawler/components/requeststore.nim | 157 ++++++++++++++++++ codexcrawler/services/marketplace.nim | 10 ++ codexcrawler/services/marketplace/market.nim | 8 +- codexcrawler/types.nim | 11 +- .../components/testrequeststore.nim | 89 ++++++++++ tests/codexcrawler/helpers.nim | 20 +++ tests/codexcrawler/mocks/mockclock.nim | 2 +- tests/codexcrawler/testcomponents.nim | 1 + 8 files changed, 295 insertions(+), 3 deletions(-) create mode 100644 codexcrawler/components/requeststore.nim create mode 100644 tests/codexcrawler/components/testrequeststore.nim diff --git a/codexcrawler/components/requeststore.nim b/codexcrawler/components/requeststore.nim new file mode 100644 index 0000000..b101fce --- /dev/null +++ b/codexcrawler/components/requeststore.nim @@ -0,0 +1,157 @@ +import std/os +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos +import pkg/libp2p + +import ../types +import ../component +import ../state +import ../utils/datastoreutils +import ../utils/asyncdataevent +import ../services/clock + +const requeststoreName = "requeststore" + +logScope: + topics = "requeststore" + +type + RequestEntry* = object + id*: Rid + lastSeen*: uint64 + + OnRequestEntry* = + proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} + + RequestStore* = ref object of Component + state: State + store: TypedDatastore + clock: Clock + +proc `$`*(entry: RequestEntry): string = + $entry.id + +proc toBytes*(entry: RequestEntry): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $entry.id) + buffer.write(2, entry.lastSeen) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry = + var + buffer = initProtoBuffer(data) + idStr: string + lastSeen: uint64 + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + if buffer.getField(2, lastSeen).isErr: + return failure("Unable to decode `lastSeen`") + + return success(RequestEntry(id: Rid.fromStr(idStr), lastSeen: lastSeen)) + +proc encode*(e: RequestEntry): seq[byte] = + e.toBytes() + +proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T = + if bytes.len < 1: + return success(RequestEntry(lastSeen: 0)) + return RequestEntry.fromBytes(bytes) + +proc update*(s: RequestStore, rid: Rid): Future[?!void] {.async.} = + without key =? Key.init(requeststoreName / $rid), err: + error "failed to format key", err = err.msg + return failure(err) + + let entry = RequestEntry(id: rid, lastSeen: s.clock.now) + ?await s.store.put(key, entry) + trace "Request entry updated", id = $rid + return success() + +proc remove*(s: RequestStore, rid: Rid): Future[?!void] {.async.} = + without key =? Key.init(requeststoreName / $rid), err: + error "failed to format key", err = err.msg + return failure(err) + + ?await s.store.delete(key) + trace "Request entry removed", id = $rid + return success() + +# proc storeNodeIsNew(s: RequestStore, nid: Nid): Future[?!bool] {.async.} = +# without key =? Key.init(requeststoreName / $nid), err: +# error "failed to format key", err = err.msg +# return failure(err) +# without exists =? (await s.store.has(key)), err: +# error "failed to check store for key", err = err.msg +# return failure(err) + +# if not exists: +# let entry = RequestEntry(id: nid, lastVisit: 0, firstInactive: 0) +# ?await s.store.put(key, entry) +# info "New node", nodeId = $nid + +# return success(not exists) + +# proc deleteEntry(s: RequestStore, nid: Nid): Future[?!bool] {.async.} = +# without key =? Key.init(requeststoreName / $nid), err: +# error "failed to format key", err = err.msg +# return failure(err) +# without exists =? (await s.store.has(key)), err: +# error "failed to check store for key", err = err.msg, key = $key +# return failure(err) + +# if exists: +# ?await s.store.delete(key) + +# return success(exists) + +# method iterateAll*( +# s: RequestStore, onNode: OnRequestEntry +# ): Future[?!void] {.async: (raises: []), base.} = +# without queryKey =? Key.init(requeststoreName), err: +# error "failed to format key", err = err.msg +# return failure(err) +# try: +# without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err: +# error "failed to create query", err = err.msg +# return failure(err) + +# while not iter.finished: +# without item =? (await iter.next()), err: +# error "failure during query iteration", err = err.msg +# return failure(err) +# without value =? item.value, err: +# error "failed to get value from iterator", err = err.msg +# return failure(err) + +# if value.lastSeen > 0: +# ?await onNode(value) + +# await sleepAsync(1.millis) +# except CatchableError as exc: +# return failure(exc.msg) + +# return success() + +method start*(s: RequestStore): Future[?!void] {.async.} = + info "Starting..." + return success() + +method stop*(s: RequestStore): Future[?!void] {.async.} = + return success() + +proc new*( + T: type RequestStore, state: State, store: TypedDatastore, clock: Clock +): RequestStore = + RequestStore(state: state, store: store, clock: clock) + +proc createRequestStore*(state: State, clock: Clock): ?!RequestStore = + without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err: + error "Failed to create typed datastore for request store", err = err.msg + return failure(err) + + return success(RequestStore.new(state, ds, clock)) diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index c959a22..f652ad6 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -28,6 +28,16 @@ method getRecentSlotFillEvents*( return failure("MarketplaceService is not started") method start*(m: MarketplaceService): Future[?!void] {.async.} = + # Todo: + # - subscribe to requestSubmitted -> add id to list + # - queryPastStorageRequestedEvents from 3 months ago (max duration) -> add ids to list + # for list: + # - get status of request + # if running: + # - sum total bytes + # else: + # - remove from list + let provider = JsonRpcProvider.new(m.state.config.ethProvider) without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): return failure("Invalid MarketplaceAddress provided") diff --git a/codexcrawler/services/marketplace/market.nim b/codexcrawler/services/marketplace/market.nim index 294bccc..c0b88fa 100644 --- a/codexcrawler/services/marketplace/market.nim +++ b/codexcrawler/services/marketplace/market.nim @@ -558,9 +558,15 @@ proc queryPastStorageRequestedEvents*( ): Future[seq[StorageRequested]] {.async.} = convertEthersError: let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) - return await market.queryPastStorageRequestedEvents(fromBlock) +proc queryPastStorageRequestedEvents*( + market: OnChainMarket, fromTime: int64 +): Future[seq[StorageRequested]] {.async.} = + convertEthersError: + let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) + return await market.queryPastStorageRequestedEvents(BlockTag.init(fromBlock)) + proc slotCollateral*( market: OnChainMarket, collateralPerSlot: UInt256, slotState: SlotState ): ?!UInt256 {.raises: [].} = diff --git a/codexcrawler/types.nim b/codexcrawler/types.nim index 9895ed1..5948eb2 100644 --- a/codexcrawler/types.nim +++ b/codexcrawler/types.nim @@ -4,7 +4,13 @@ import pkg/questionable/results import pkg/codexdht import pkg/libp2p -type Nid* = NodeId +import ./services/marketplace/requests + +export requests + +type + Nid* = NodeId + Rid* = requests.RequestId proc `$`*(nid: Nid): string = nid.toHex() @@ -12,6 +18,9 @@ proc `$`*(nid: Nid): string = proc fromStr*(T: type Nid, s: string): Nid = Nid(UInt256.fromHex(s)) +proc fromStr*(T: type Rid, s: string): Rid = + Rid(requests.RequestId.fromHex(s)) + proc toBytes*(nid: Nid): seq[byte] = var buffer = initProtoBuffer() buffer.write(1, $nid) diff --git a/tests/codexcrawler/components/testrequeststore.nim b/tests/codexcrawler/components/testrequeststore.nim new file mode 100644 index 0000000..6072fab --- /dev/null +++ b/tests/codexcrawler/components/testrequeststore.nim @@ -0,0 +1,89 @@ +import std/os +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import pkg/datastore/typedds + +import ../../../codexcrawler/components/requeststore +import ../../../codexcrawler/utils/datastoreutils +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mockclock +import ../helpers + +suite "Requeststore": + let + dsPath = getTempDir() / "testds" + requeststoreName = "requeststore" + + var + ds: TypedDatastore + state: MockState + clock: MockClock + store: RequestStore + + setup: + ds = createTypedDatastore(dsPath).tryGet() + state = createMockState() + clock = createMockClock() + + store = RequestStore.new(state, ds, clock) + + (await store.start()).tryGet() + + teardown: + (await store.stop()).tryGet() + (await ds.close()).tryGet() + state.checkAllUnsubscribed() + removeDir(dsPath) + + test "requestEntry encoding": + let entry = RequestEntry(id: genRid(), lastSeen: 123.uint64) + + let + bytes = entry.encode() + decoded = RequestEntry.decode(bytes).tryGet() + + check: + entry.id == decoded.id + entry.lastSeen == decoded.lastSeen + + test "update stores a new requestId with current time": + let rid = genRid() + (await store.update(rid)).tryGet() + + let + key = Key.init(requeststoreName / $rid).tryGet() + stored = (await get[RequestEntry](ds, key)).tryGet() + + check: + stored.id == rid + stored.lastSeen == clock.setNow + + test "update updates the current time of an existing requestId with current time": + let rid = genRid() + (await store.update(rid)).tryGet() + + clock.setNow = 1234 + (await store.update(rid)).tryGet() + + let + key = Key.init(requeststoreName / $rid).tryGet() + stored = (await get[RequestEntry](ds, key)).tryGet() + + check: + stored.id == rid + stored.lastSeen == clock.setNow + + test "remove will remove an entry": + let rid = genRid() + (await store.update(rid)).tryGet() + (await store.remove(rid)).tryGet() + + let + key = Key.init(requeststoreName / $rid).tryGet() + isStored = (await ds.has(key)).tryGet() + + check: + isStored == false diff --git a/tests/codexcrawler/helpers.nim b/tests/codexcrawler/helpers.nim index 5feb345..b7a5b9e 100644 --- a/tests/codexcrawler/helpers.nim +++ b/tests/codexcrawler/helpers.nim @@ -1,6 +1,26 @@ import std/random +import std/sequtils +import std/typetraits import pkg/stint +import pkg/stew/byteutils import ../../codexcrawler/types +proc example*[T: SomeInteger](_: type T): T = + rand(T) + +proc example*[T, N](_: type array[N, T]): array[N, T] = + for item in result.mitems: + item = T.example + +proc example*(_: type UInt256): UInt256 = + UInt256.fromBytes(array[32, byte].example) + +proc example*[T: distinct](_: type T): T = + type baseType = T.distinctBase + T(baseType.example) + proc genNid*(): Nid = Nid(rand(uint64).u256) + +proc genRid*(): Rid = + Rid(array[32, byte].example) diff --git a/tests/codexcrawler/mocks/mockclock.nim b/tests/codexcrawler/mocks/mockclock.nim index 5c5cf44..d4621bc 100644 --- a/tests/codexcrawler/mocks/mockclock.nim +++ b/tests/codexcrawler/mocks/mockclock.nim @@ -7,4 +7,4 @@ method now*(clock: MockClock): uint64 {.raises: [].} = clock.setNow proc createMockClock*(): MockClock = - MockClock() + MockClock(setNow: 12) diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index a81a08f..99b5be9 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -2,6 +2,7 @@ import ./components/testchainmetrics import ./components/testcrawler import ./components/testdhtmetrics import ./components/testnodestore +import ./components/testrequeststore import ./components/testtimetracker import ./components/testtodolist