replaces timestamp check with pending-status check

This commit is contained in:
ThatBen 2025-03-24 14:11:32 +01:00
parent 46264b90b5
commit a695b0ee31
No known key found for this signature in database
GPG Key ID: E020A7DDCD52E1AB
12 changed files with 74 additions and 96 deletions

View File

@ -17,7 +17,7 @@ type ChainCrawler* = ref object of Component
marketplace: MarketplaceService
proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.store.update(rid)
return await c.store.add(rid)
method start*(c: ChainCrawler): Future[?!void] {.async.} =
info "starting..."

View File

@ -6,7 +6,6 @@ import pkg/questionable/results
import ../state
import ../services/metrics
import ../services/marketplace
import ../services/clock
import ../components/requeststore
import ../component
import ../types
@ -20,29 +19,29 @@ type
metrics: Metrics
store: RequestStore
marketplace: MarketplaceService
clock: Clock
Update = ref object
numRequests: int
numPending: int
numSlots: int
totalSize: int64
proc isOld(c: ChainMetrics, entry: RequestEntry): bool =
let oneDay = 60 * 60 * 24
return entry.lastSeen < (c.clock.now - oneDay.uint64)
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
var update = Update(numRequests: 0, numSlots: 0, totalSize: 0)
var update = Update(numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0)
proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} =
let response = await c.marketplace.getRequestInfo(entry.id)
if info =? response:
inc update.numRequests
update.numSlots += info.slots.int
update.totalSize += (info.slots * info.slotSize).int64
if info.pending:
trace "request is pending", id = $entry.id
inc update.numPending
else:
trace "request is running", id = $entry.id
inc update.numRequests
update.numSlots += info.slots.int
update.totalSize += (info.slots * info.slotSize).int64
else:
if c.isOld(entry):
?await c.store.remove(entry.id)
?await c.store.remove(entry.id)
return success()
?await c.store.iterateAll(onRequest)
@ -50,6 +49,7 @@ proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
proc updateMetrics(c: ChainMetrics, update: Update) =
c.metrics.setRequests(update.numRequests)
c.metrics.setPendingRequests(update.numPending)
c.metrics.setRequestSlots(update.numSlots)
c.metrics.setTotalSize(update.totalSize)
@ -77,8 +77,7 @@ proc new*(
metrics: Metrics,
store: RequestStore,
marketplace: MarketplaceService,
clock: Clock,
): ChainMetrics =
ChainMetrics(
state: state, metrics: metrics, store: store, marketplace: marketplace, clock: clock
state: state, metrics: metrics, store: store, marketplace: marketplace
)

View File

@ -10,7 +10,6 @@ import ../types
import ../component
import ../state
import ../utils/datastoreutils
import ../services/clock
const requeststoreName = "requeststore"
@ -20,7 +19,7 @@ logScope:
type
RequestEntry* = object
id*: Rid
lastSeen*: uint64
isValid: bool
OnRequestEntry* =
proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.}
@ -28,7 +27,6 @@ type
RequestStore* = ref object of Component
state: State
store: TypedDatastore
clock: Clock
proc `$`*(entry: RequestEntry): string =
$entry.id
@ -36,7 +34,6 @@ proc `$`*(entry: RequestEntry): string =
proc toBytes*(entry: RequestEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastSeen)
buffer.finish()
return buffer.buffer
@ -44,36 +41,32 @@ proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastSeen: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastSeen).isErr:
return failure("Unable to decode `lastSeen`")
return success(RequestEntry(id: Rid.fromStr(idStr), lastSeen: lastSeen))
return success(RequestEntry(id: Rid.fromStr(idStr), isValid: true))
proc encode*(e: RequestEntry): seq[byte] =
e.toBytes()
proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(RequestEntry(lastSeen: 0))
return success(RequestEntry(isValid: false))
return RequestEntry.fromBytes(bytes)
method update*(
method add*(
s: RequestStore, rid: Rid
): Future[?!void] {.async: (raises: []), base.} =
without key =? Key.init(requeststoreName / $rid), err:
error "failed to format key", err = err.msg
return failure(err)
let entry = RequestEntry(id: rid, lastSeen: s.clock.now)
try:
?await s.store.put(key, entry)
?await s.store.put(key, RequestEntry(id: rid))
except CatchableError as exc:
return failure(exc.msg)
trace "Request entry updated", id = $rid
trace "Request entry added", id = $rid
return success()
method remove*(
@ -109,7 +102,7 @@ method iterateAll*(
error "failed to get value from iterator", err = err.msg
return failure(err)
if value.lastSeen > 0:
if value.isValid:
?await onNode(value)
await sleepAsync(1.millis)
@ -118,13 +111,13 @@ method iterateAll*(
return success()
proc new*(
T: type RequestStore, state: State, store: TypedDatastore, clock: Clock
T: type RequestStore, state: State, store: TypedDatastore
): RequestStore =
RequestStore(state: state, store: store, clock: clock)
RequestStore(state: state, store: store)
proc createRequestStore*(state: State, clock: Clock): ?!RequestStore =
proc createRequestStore*(state: State): ?!RequestStore =
without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err:
error "Failed to create typed datastore for request store", err = err.msg
return failure(err)
return success(RequestStore.new(state, ds, clock))
return success(RequestStore.new(state, ds))

View File

@ -21,7 +21,7 @@ Options:
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--dhtEnable=<e> Set to "1" to enable DHT crawler [default: 0]
--dhtEnable=<e> Set to "1" to enable DHT crawler [default: 1]
--stepDelay=<ms> Delay in milliseconds per node visit [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]

View File

@ -28,14 +28,14 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
without nodeStore =? createNodeStore(state, clock), err:
return failure(err)
without requestStore =? createRequestStore(state, clock), err:
without requestStore =? createRequestStore(state), err:
return failure(err)
let
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
todoList = createTodoList(state, metrics)
marketplace = createMarketplace(state, clock)
chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace, clock)
chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace)
without dhtMetrics =? createDhtMetrics(state, metrics), err:
return failure(err)

View File

@ -20,6 +20,7 @@ type
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
RequestInfo* = ref object
pending*: bool
slots*: uint64
slotSize*: uint64
@ -32,7 +33,7 @@ proc fetchRequestInfo(
try:
let request = await market.getRequest(rid)
if r =? request:
return some(RequestInfo(slots: r.ask.slots, slotSize: r.ask.slotSize))
return some(RequestInfo(pending: false, slots: r.ask.slots, slotSize: r.ask.slotSize))
except CatchableError as exc:
trace "Failed to get request info", err = exc.msg
return none(RequestInfo)
@ -88,6 +89,8 @@ method getRequestInfo*(
try:
let state = await market.requestState(rid)
if s =? state:
if s == RequestState.New:
return some(RequestInfo(pending: true))
if s == RequestState.Started:
return await market.fetchRequestInfo(rid)
except CatchableError as exc:

View File

@ -7,6 +7,7 @@ declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
declareGauge(requestsGauge, "Marketplace active storage requests")
declareGauge(pendingGauge, "Marketplace pending storage requests")
declareGauge(requestSlotsGauge, "Marketplace active storage request slots")
declareGauge(
totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests"
@ -21,6 +22,7 @@ type
nokNodes: OnUpdateMetric
onRequests: OnUpdateMetric
onPending: OnUpdateMetric
onRequestSlots: OnUpdateMetric
onTotalSize: OnUpdateMetric
@ -47,6 +49,9 @@ method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onRequests(value.int64)
method setPendingRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onPending(value.int64)
method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.onRequestSlots(value.int64)
@ -70,6 +75,9 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
proc onRequests(value: int64) =
requestsGauge.set(value)
proc onPending(value: int64) =
pendingGauge.set(value)
proc onRequestSlots(value: int64) =
requestSlotsGauge.set(value)
@ -81,6 +89,7 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
okNodes: onOk,
nokNodes: onNok,
onRequests: onRequests,
onPending: onPending,
onRequestSlots: onRequestSlots,
onTotalSize: onTotalSize,
)

View File

@ -41,7 +41,7 @@ suite "ChainCrawler":
(await (marketplace.subNewRequestsCallback.get())(rid)).tryGet()
check:
store.updateRid == rid
store.addRid == rid
test "start should iterate past requests and add then to store":
check:
@ -51,4 +51,4 @@ suite "ChainCrawler":
(await marketplace.iterRequestsCallback.get()(rid)).tryGet()
check:
store.updateRid == rid
store.addRid == rid

View File

@ -20,7 +20,6 @@ suite "ChainMetrics":
metrics: MockMetrics
store: MockRequestStore
marketplace: MockMarketplaceService
clock: MockClock
chain: ChainMetrics
setup:
@ -28,9 +27,8 @@ suite "ChainMetrics":
metrics = createMockMetrics()
store = createMockRequestStore()
marketplace = createMockMarketplaceService()
clock = createMockClock()
chain = ChainMetrics.new(state, metrics, store, marketplace, clock)
chain = ChainMetrics.new(state, metrics, store, marketplace)
(await chain.start()).tryGet()
teardown:
@ -44,12 +42,10 @@ suite "ChainMetrics":
state.delays.len == 1
state.delays[0] == state.config.requestCheckDelay.minutes
test "onStep should remove old non-running requests from request store":
test "onStep removes requests from request store when info can't be fetched":
let rid = genRid()
let oneDay = (60 * 60 * 24).uint64
store.iterateEntries.add(RequestEntry(id: rid, lastSeen: 100.uint64))
store.iterateEntries.add(RequestEntry(id: rid))
clock.setNow = 100 + oneDay + 1
marketplace.requestInfoReturns = none(RequestInfo)
await onStep()
@ -58,20 +54,6 @@ suite "ChainMetrics":
marketplace.requestInfoRid == rid
store.removeRid == rid
test "onStep should not remove recent non-running requests from request store":
let rid = genRid()
let now = 123456789.uint64
store.iterateEntries.add(RequestEntry(id: rid, lastSeen: now - 1))
clock.setNow = now
marketplace.requestInfoReturns = none(RequestInfo)
await onStep()
check:
marketplace.requestInfoRid == rid
not (store.removeRid == rid)
test "onStep should count the number of active requests":
let rid1 = genRid()
let rid2 = genRid()
@ -85,6 +67,19 @@ suite "ChainMetrics":
check:
metrics.requests == 2
test "onStep should count the number of pending requests":
let rid1 = genRid()
let rid2 = genRid()
store.iterateEntries.add(RequestEntry(id: rid1))
store.iterateEntries.add(RequestEntry(id: rid2))
marketplace.requestInfoReturns = some(RequestInfo(pending: true))
await onStep()
check:
metrics.pending == 2
test "onStep should count the number of active slots":
let rid = genRid()
store.iterateEntries.add(RequestEntry(id: rid))

View File

@ -9,7 +9,6 @@ import ../../../codexcrawler/utils/datastoreutils
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mockclock
import ../helpers
suite "Requeststore":
@ -20,15 +19,13 @@ suite "Requeststore":
var
ds: TypedDatastore
state: MockState
clock: MockClock
store: RequestStore
setup:
ds = createTypedDatastore(dsPath).tryGet()
state = createMockState()
clock = createMockClock()
store = RequestStore.new(state, ds, clock)
store = RequestStore.new(state, ds)
teardown:
(await ds.close()).tryGet()
@ -36,7 +33,7 @@ suite "Requeststore":
removeDir(dsPath)
test "requestEntry encoding":
let entry = RequestEntry(id: genRid(), lastSeen: 123.uint64)
let entry = RequestEntry(id: genRid())
let
bytes = entry.encode()
@ -44,11 +41,10 @@ suite "Requeststore":
check:
entry.id == decoded.id
entry.lastSeen == decoded.lastSeen
test "update stores a new requestId with current time":
test "add stores a new requestId":
let rid = genRid()
(await store.update(rid)).tryGet()
(await store.add(rid)).tryGet()
let
key = Key.init(requeststoreName / $rid).tryGet()
@ -56,26 +52,10 @@ suite "Requeststore":
check:
stored.id == rid
stored.lastSeen == clock.setNow
test "update updates the current time of an existing requestId with current time":
let rid = genRid()
(await store.update(rid)).tryGet()
clock.setNow = 1234
(await store.update(rid)).tryGet()
let
key = Key.init(requeststoreName / $rid).tryGet()
stored = (await get[RequestEntry](ds, key)).tryGet()
check:
stored.id == rid
stored.lastSeen == clock.setNow
test "remove will remove an entry":
let rid = genRid()
(await store.update(rid)).tryGet()
(await store.add(rid)).tryGet()
(await store.remove(rid)).tryGet()
let
@ -91,9 +71,9 @@ suite "Requeststore":
rid2 = genRid()
rid3 = genRid()
(await store.update(rid1)).tryGet()
(await store.update(rid2)).tryGet()
(await store.update(rid3)).tryGet()
(await store.add(rid1)).tryGet()
(await store.add(rid2)).tryGet()
(await store.add(rid3)).tryGet()
var entries = newSeq[RequestEntry]()
proc onEntry(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
@ -117,7 +97,3 @@ suite "Requeststore":
check:
id in ids
check:
entries[0].lastSeen == clock.setNow
entries[1].lastSeen == clock.setNow
entries[2].lastSeen == clock.setNow

View File

@ -5,6 +5,7 @@ type MockMetrics* = ref object of Metrics
ok*: int
nok*: int
requests*: int
pending*: int
slots*: int
totalSize*: int64
@ -20,6 +21,9 @@ method setNokNodes*(m: MockMetrics, value: int) =
method setRequests*(m: MockMetrics, value: int) =
m.requests = value
method setPendingRequests*(m: MockMetrics, value: int) =
m.pending = value
method setRequestSlots*(m: MockMetrics, value: int) =
m.slots = value

View File

@ -1,4 +1,3 @@
import std/sequtils
import pkg/questionable/results
import pkg/chronos
@ -6,12 +5,12 @@ import ../../../codexcrawler/components/requeststore
import ../../../codexcrawler/types
type MockRequestStore* = ref object of RequestStore
updateRid*: Rid
addRid*: Rid
removeRid*: Rid
iterateEntries*: seq[RequestEntry]
method update*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
s.updateRid = rid
method add*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =
s.addRid = rid
return success()
method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =