diff --git a/codexcrawler/components/chaincrawler.nim b/codexcrawler/components/chaincrawler.nim index b29e9f8..f957630 100644 --- a/codexcrawler/components/chaincrawler.nim +++ b/codexcrawler/components/chaincrawler.nim @@ -17,7 +17,7 @@ type ChainCrawler* = ref object of Component marketplace: MarketplaceService proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} = - return await c.store.update(rid) + return await c.store.add(rid) method start*(c: ChainCrawler): Future[?!void] {.async.} = info "starting..." diff --git a/codexcrawler/components/chainmetrics.nim b/codexcrawler/components/chainmetrics.nim index 4956ba0..85395ca 100644 --- a/codexcrawler/components/chainmetrics.nim +++ b/codexcrawler/components/chainmetrics.nim @@ -6,7 +6,6 @@ import pkg/questionable/results import ../state import ../services/metrics import ../services/marketplace -import ../services/clock import ../components/requeststore import ../component import ../types @@ -20,29 +19,29 @@ type metrics: Metrics store: RequestStore marketplace: MarketplaceService - clock: Clock Update = ref object numRequests: int + numPending: int numSlots: int totalSize: int64 -proc isOld(c: ChainMetrics, entry: RequestEntry): bool = - let oneDay = 60 * 60 * 24 - return entry.lastSeen < (c.clock.now - oneDay.uint64) - proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} = - var update = Update(numRequests: 0, numSlots: 0, totalSize: 0) + var update = Update(numRequests: 0, numPending: 0, numSlots: 0, totalSize: 0) proc onRequest(entry: RequestEntry): Future[?!void] {.async: (raises: []).} = let response = await c.marketplace.getRequestInfo(entry.id) if info =? response: - inc update.numRequests - update.numSlots += info.slots.int - update.totalSize += (info.slots * info.slotSize).int64 + if info.pending: + trace "request is pending", id = $entry.id + inc update.numPending + else: + trace "request is running", id = $entry.id + inc update.numRequests + update.numSlots += info.slots.int + update.totalSize += (info.slots * info.slotSize).int64 else: - if c.isOld(entry): - ?await c.store.remove(entry.id) + ?await c.store.remove(entry.id) return success() ?await c.store.iterateAll(onRequest) @@ -50,6 +49,7 @@ proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} = proc updateMetrics(c: ChainMetrics, update: Update) = c.metrics.setRequests(update.numRequests) + c.metrics.setPendingRequests(update.numPending) c.metrics.setRequestSlots(update.numSlots) c.metrics.setTotalSize(update.totalSize) @@ -77,8 +77,7 @@ proc new*( metrics: Metrics, store: RequestStore, marketplace: MarketplaceService, - clock: Clock, ): ChainMetrics = ChainMetrics( - state: state, metrics: metrics, store: store, marketplace: marketplace, clock: clock + state: state, metrics: metrics, store: store, marketplace: marketplace ) diff --git a/codexcrawler/components/requeststore.nim b/codexcrawler/components/requeststore.nim index 130a6e5..8e0d38d 100644 --- a/codexcrawler/components/requeststore.nim +++ b/codexcrawler/components/requeststore.nim @@ -10,7 +10,6 @@ import ../types import ../component import ../state import ../utils/datastoreutils -import ../services/clock const requeststoreName = "requeststore" @@ -20,7 +19,7 @@ logScope: type RequestEntry* = object id*: Rid - lastSeen*: uint64 + isValid: bool OnRequestEntry* = proc(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} @@ -28,7 +27,6 @@ type RequestStore* = ref object of Component state: State store: TypedDatastore - clock: Clock proc `$`*(entry: RequestEntry): string = $entry.id @@ -36,7 +34,6 @@ proc `$`*(entry: RequestEntry): string = proc toBytes*(entry: RequestEntry): seq[byte] = var buffer = initProtoBuffer() buffer.write(1, $entry.id) - buffer.write(2, entry.lastSeen) buffer.finish() return buffer.buffer @@ -44,36 +41,32 @@ proc fromBytes*(_: type RequestEntry, data: openArray[byte]): ?!RequestEntry = var buffer = initProtoBuffer(data) idStr: string - lastSeen: uint64 if buffer.getField(1, idStr).isErr: return failure("Unable to decode `idStr`") - if buffer.getField(2, lastSeen).isErr: - return failure("Unable to decode `lastSeen`") - return success(RequestEntry(id: Rid.fromStr(idStr), lastSeen: lastSeen)) + return success(RequestEntry(id: Rid.fromStr(idStr), isValid: true)) proc encode*(e: RequestEntry): seq[byte] = e.toBytes() proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T = if bytes.len < 1: - return success(RequestEntry(lastSeen: 0)) + return success(RequestEntry(isValid: false)) return RequestEntry.fromBytes(bytes) -method update*( +method add*( s: RequestStore, rid: Rid ): Future[?!void] {.async: (raises: []), base.} = without key =? Key.init(requeststoreName / $rid), err: error "failed to format key", err = err.msg return failure(err) - let entry = RequestEntry(id: rid, lastSeen: s.clock.now) try: - ?await s.store.put(key, entry) + ?await s.store.put(key, RequestEntry(id: rid)) except CatchableError as exc: return failure(exc.msg) - trace "Request entry updated", id = $rid + trace "Request entry added", id = $rid return success() method remove*( @@ -109,7 +102,7 @@ method iterateAll*( error "failed to get value from iterator", err = err.msg return failure(err) - if value.lastSeen > 0: + if value.isValid: ?await onNode(value) await sleepAsync(1.millis) @@ -118,13 +111,13 @@ method iterateAll*( return success() proc new*( - T: type RequestStore, state: State, store: TypedDatastore, clock: Clock + T: type RequestStore, state: State, store: TypedDatastore ): RequestStore = - RequestStore(state: state, store: store, clock: clock) + RequestStore(state: state, store: store) -proc createRequestStore*(state: State, clock: Clock): ?!RequestStore = +proc createRequestStore*(state: State): ?!RequestStore = without ds =? createTypedDatastore(state.config.dataDir / "requeststore"), err: error "Failed to create typed datastore for request store", err = err.msg return failure(err) - return success(RequestStore.new(state, ds, clock)) + return success(RequestStore.new(state, ds)) diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index abb3ecd..f4a39cc 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -21,7 +21,7 @@ Options: --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] - --dhtEnable= Set to "1" to enable DHT crawler [default: 0] + --dhtEnable= Set to "1" to enable DHT crawler [default: 1] --stepDelay= Delay in milliseconds per node visit [default: 1000] --revisitDelay= Delay in minutes after which a node can be revisited [default: 60] --checkDelay= Delay with which the 'revisitDelay' is checked for all known nodes [default: 10] diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index bdff348..80fa924 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -28,14 +28,14 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = without nodeStore =? createNodeStore(state, clock), err: return failure(err) - without requestStore =? createRequestStore(state, clock), err: + without requestStore =? createRequestStore(state), err: return failure(err) let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) todoList = createTodoList(state, metrics) marketplace = createMarketplace(state, clock) - chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace, clock) + chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace) without dhtMetrics =? createDhtMetrics(state, metrics), err: return failure(err) diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index 494ff4c..d349158 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -20,6 +20,7 @@ type OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.} RequestInfo* = ref object + pending*: bool slots*: uint64 slotSize*: uint64 @@ -32,7 +33,7 @@ proc fetchRequestInfo( try: let request = await market.getRequest(rid) if r =? request: - return some(RequestInfo(slots: r.ask.slots, slotSize: r.ask.slotSize)) + return some(RequestInfo(pending: false, slots: r.ask.slots, slotSize: r.ask.slotSize)) except CatchableError as exc: trace "Failed to get request info", err = exc.msg return none(RequestInfo) @@ -88,6 +89,8 @@ method getRequestInfo*( try: let state = await market.requestState(rid) if s =? state: + if s == RequestState.New: + return some(RequestInfo(pending: true)) if s == RequestState.Started: return await market.fetchRequestInfo(rid) except CatchableError as exc: diff --git a/codexcrawler/services/metrics.nim b/codexcrawler/services/metrics.nim index adc8852..020c5aa 100644 --- a/codexcrawler/services/metrics.nim +++ b/codexcrawler/services/metrics.nim @@ -7,6 +7,7 @@ declareGauge(okNodesGauge, "DHT nodes successfully contacted") declareGauge(nokNodesGauge, "DHT nodes failed to contact") declareGauge(requestsGauge, "Marketplace active storage requests") +declareGauge(pendingGauge, "Marketplace pending storage requests") declareGauge(requestSlotsGauge, "Marketplace active storage request slots") declareGauge( totalStorageSizeGauge, "Marketplace total bytes stored in active storage requests" @@ -21,6 +22,7 @@ type nokNodes: OnUpdateMetric onRequests: OnUpdateMetric + onPending: OnUpdateMetric onRequestSlots: OnUpdateMetric onTotalSize: OnUpdateMetric @@ -47,6 +49,9 @@ method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = method setRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = m.onRequests(value.int64) +method setPendingRequests*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.onPending(value.int64) + method setRequestSlots*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = m.onRequestSlots(value.int64) @@ -70,6 +75,9 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = proc onRequests(value: int64) = requestsGauge.set(value) + proc onPending(value: int64) = + pendingGauge.set(value) + proc onRequestSlots(value: int64) = requestSlotsGauge.set(value) @@ -81,6 +89,7 @@ proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = okNodes: onOk, nokNodes: onNok, onRequests: onRequests, + onPending: onPending, onRequestSlots: onRequestSlots, onTotalSize: onTotalSize, ) diff --git a/tests/codexcrawler/components/testchaincrawler.nim b/tests/codexcrawler/components/testchaincrawler.nim index 172b2b3..04f98b6 100644 --- a/tests/codexcrawler/components/testchaincrawler.nim +++ b/tests/codexcrawler/components/testchaincrawler.nim @@ -41,7 +41,7 @@ suite "ChainCrawler": (await (marketplace.subNewRequestsCallback.get())(rid)).tryGet() check: - store.updateRid == rid + store.addRid == rid test "start should iterate past requests and add then to store": check: @@ -51,4 +51,4 @@ suite "ChainCrawler": (await marketplace.iterRequestsCallback.get()(rid)).tryGet() check: - store.updateRid == rid + store.addRid == rid diff --git a/tests/codexcrawler/components/testchainmetrics.nim b/tests/codexcrawler/components/testchainmetrics.nim index 4d279dd..be222d2 100644 --- a/tests/codexcrawler/components/testchainmetrics.nim +++ b/tests/codexcrawler/components/testchainmetrics.nim @@ -20,7 +20,6 @@ suite "ChainMetrics": metrics: MockMetrics store: MockRequestStore marketplace: MockMarketplaceService - clock: MockClock chain: ChainMetrics setup: @@ -28,9 +27,8 @@ suite "ChainMetrics": metrics = createMockMetrics() store = createMockRequestStore() marketplace = createMockMarketplaceService() - clock = createMockClock() - chain = ChainMetrics.new(state, metrics, store, marketplace, clock) + chain = ChainMetrics.new(state, metrics, store, marketplace) (await chain.start()).tryGet() teardown: @@ -44,12 +42,10 @@ suite "ChainMetrics": state.delays.len == 1 state.delays[0] == state.config.requestCheckDelay.minutes - test "onStep should remove old non-running requests from request store": + test "onStep removes requests from request store when info can't be fetched": let rid = genRid() - let oneDay = (60 * 60 * 24).uint64 - store.iterateEntries.add(RequestEntry(id: rid, lastSeen: 100.uint64)) + store.iterateEntries.add(RequestEntry(id: rid)) - clock.setNow = 100 + oneDay + 1 marketplace.requestInfoReturns = none(RequestInfo) await onStep() @@ -58,20 +54,6 @@ suite "ChainMetrics": marketplace.requestInfoRid == rid store.removeRid == rid - test "onStep should not remove recent non-running requests from request store": - let rid = genRid() - let now = 123456789.uint64 - store.iterateEntries.add(RequestEntry(id: rid, lastSeen: now - 1)) - - clock.setNow = now - marketplace.requestInfoReturns = none(RequestInfo) - - await onStep() - - check: - marketplace.requestInfoRid == rid - not (store.removeRid == rid) - test "onStep should count the number of active requests": let rid1 = genRid() let rid2 = genRid() @@ -85,6 +67,19 @@ suite "ChainMetrics": check: metrics.requests == 2 + test "onStep should count the number of pending requests": + let rid1 = genRid() + let rid2 = genRid() + store.iterateEntries.add(RequestEntry(id: rid1)) + store.iterateEntries.add(RequestEntry(id: rid2)) + + marketplace.requestInfoReturns = some(RequestInfo(pending: true)) + + await onStep() + + check: + metrics.pending == 2 + test "onStep should count the number of active slots": let rid = genRid() store.iterateEntries.add(RequestEntry(id: rid)) diff --git a/tests/codexcrawler/components/testrequeststore.nim b/tests/codexcrawler/components/testrequeststore.nim index c3202f6..32c6c70 100644 --- a/tests/codexcrawler/components/testrequeststore.nim +++ b/tests/codexcrawler/components/testrequeststore.nim @@ -9,7 +9,6 @@ import ../../../codexcrawler/utils/datastoreutils import ../../../codexcrawler/types import ../../../codexcrawler/state import ../mocks/mockstate -import ../mocks/mockclock import ../helpers suite "Requeststore": @@ -20,15 +19,13 @@ suite "Requeststore": var ds: TypedDatastore state: MockState - clock: MockClock store: RequestStore setup: ds = createTypedDatastore(dsPath).tryGet() state = createMockState() - clock = createMockClock() - store = RequestStore.new(state, ds, clock) + store = RequestStore.new(state, ds) teardown: (await ds.close()).tryGet() @@ -36,7 +33,7 @@ suite "Requeststore": removeDir(dsPath) test "requestEntry encoding": - let entry = RequestEntry(id: genRid(), lastSeen: 123.uint64) + let entry = RequestEntry(id: genRid()) let bytes = entry.encode() @@ -44,11 +41,10 @@ suite "Requeststore": check: entry.id == decoded.id - entry.lastSeen == decoded.lastSeen - test "update stores a new requestId with current time": + test "add stores a new requestId": let rid = genRid() - (await store.update(rid)).tryGet() + (await store.add(rid)).tryGet() let key = Key.init(requeststoreName / $rid).tryGet() @@ -56,26 +52,10 @@ suite "Requeststore": check: stored.id == rid - stored.lastSeen == clock.setNow - - test "update updates the current time of an existing requestId with current time": - let rid = genRid() - (await store.update(rid)).tryGet() - - clock.setNow = 1234 - (await store.update(rid)).tryGet() - - let - key = Key.init(requeststoreName / $rid).tryGet() - stored = (await get[RequestEntry](ds, key)).tryGet() - - check: - stored.id == rid - stored.lastSeen == clock.setNow test "remove will remove an entry": let rid = genRid() - (await store.update(rid)).tryGet() + (await store.add(rid)).tryGet() (await store.remove(rid)).tryGet() let @@ -91,9 +71,9 @@ suite "Requeststore": rid2 = genRid() rid3 = genRid() - (await store.update(rid1)).tryGet() - (await store.update(rid2)).tryGet() - (await store.update(rid3)).tryGet() + (await store.add(rid1)).tryGet() + (await store.add(rid2)).tryGet() + (await store.add(rid3)).tryGet() var entries = newSeq[RequestEntry]() proc onEntry(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} = @@ -117,7 +97,3 @@ suite "Requeststore": check: id in ids - check: - entries[0].lastSeen == clock.setNow - entries[1].lastSeen == clock.setNow - entries[2].lastSeen == clock.setNow diff --git a/tests/codexcrawler/mocks/mockmetrics.nim b/tests/codexcrawler/mocks/mockmetrics.nim index 01787cc..29c61e9 100644 --- a/tests/codexcrawler/mocks/mockmetrics.nim +++ b/tests/codexcrawler/mocks/mockmetrics.nim @@ -5,6 +5,7 @@ type MockMetrics* = ref object of Metrics ok*: int nok*: int requests*: int + pending*: int slots*: int totalSize*: int64 @@ -20,6 +21,9 @@ method setNokNodes*(m: MockMetrics, value: int) = method setRequests*(m: MockMetrics, value: int) = m.requests = value +method setPendingRequests*(m: MockMetrics, value: int) = + m.pending = value + method setRequestSlots*(m: MockMetrics, value: int) = m.slots = value diff --git a/tests/codexcrawler/mocks/mockrequeststore.nim b/tests/codexcrawler/mocks/mockrequeststore.nim index 075fc42..e17606e 100644 --- a/tests/codexcrawler/mocks/mockrequeststore.nim +++ b/tests/codexcrawler/mocks/mockrequeststore.nim @@ -1,4 +1,3 @@ -import std/sequtils import pkg/questionable/results import pkg/chronos @@ -6,12 +5,12 @@ import ../../../codexcrawler/components/requeststore import ../../../codexcrawler/types type MockRequestStore* = ref object of RequestStore - updateRid*: Rid + addRid*: Rid removeRid*: Rid iterateEntries*: seq[RequestEntry] -method update*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = - s.updateRid = rid +method add*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} = + s.addRid = rid return success() method remove*(s: MockRequestStore, rid: Rid): Future[?!void] {.async: (raises: []).} =