mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-04 06:23:08 +00:00
adds old-check to chain metrics entry handling
This commit is contained in:
parent
ec3c7c0be3
commit
a5618d0cd2
@ -6,6 +6,7 @@ import pkg/questionable/results
|
|||||||
import ../state
|
import ../state
|
||||||
import ../services/metrics
|
import ../services/metrics
|
||||||
import ../services/marketplace
|
import ../services/marketplace
|
||||||
|
import ../services/clock
|
||||||
import ../components/requeststore
|
import ../components/requeststore
|
||||||
import ../component
|
import ../component
|
||||||
import ../types
|
import ../types
|
||||||
@ -19,12 +20,17 @@ type
|
|||||||
metrics: Metrics
|
metrics: Metrics
|
||||||
store: RequestStore
|
store: RequestStore
|
||||||
marketplace: MarketplaceService
|
marketplace: MarketplaceService
|
||||||
|
clock: Clock
|
||||||
|
|
||||||
Update = ref object
|
Update = ref object
|
||||||
numRequests: int
|
numRequests: int
|
||||||
numSlots: int
|
numSlots: int
|
||||||
totalSize: int64
|
totalSize: int64
|
||||||
|
|
||||||
|
proc isOld(c: ChainMetrics, entry: RequestEntry): bool =
|
||||||
|
let oneDay = 60 * 60 * 24
|
||||||
|
return entry.lastSeen < (c.clock.now - oneDay.uint64)
|
||||||
|
|
||||||
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
||||||
var update = Update(numRequests: 0, numSlots: 0, totalSize: 0)
|
var update = Update(numRequests: 0, numSlots: 0, totalSize: 0)
|
||||||
|
|
||||||
@ -35,7 +41,8 @@ proc collectUpdate(c: ChainMetrics): Future[?!Update] {.async: (raises: []).} =
|
|||||||
update.numSlots += info.slots.int
|
update.numSlots += info.slots.int
|
||||||
update.totalSize += (info.slots * info.slotSize).int64
|
update.totalSize += (info.slots * info.slotSize).int64
|
||||||
else:
|
else:
|
||||||
?await c.store.remove(entry.id)
|
if c.isOld(entry):
|
||||||
|
?await c.store.remove(entry.id)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
?await c.store.iterateAll(onRequest)
|
?await c.store.iterateAll(onRequest)
|
||||||
@ -73,5 +80,6 @@ proc new*(
|
|||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
store: RequestStore,
|
store: RequestStore,
|
||||||
marketplace: MarketplaceService,
|
marketplace: MarketplaceService,
|
||||||
|
clock: Clock
|
||||||
): ChainMetrics =
|
): ChainMetrics =
|
||||||
ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace)
|
ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace, clock: clock)
|
||||||
|
|||||||
@ -15,6 +15,8 @@ import ./components/nodestore
|
|||||||
import ./components/dhtmetrics
|
import ./components/dhtmetrics
|
||||||
import ./components/todolist
|
import ./components/todolist
|
||||||
import ./components/chainmetrics
|
import ./components/chainmetrics
|
||||||
|
import ./components/chaincrawler
|
||||||
|
import ./components/requeststore
|
||||||
|
|
||||||
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
||||||
var components: seq[Component] = newSeq[Component]()
|
var components: seq[Component] = newSeq[Component]()
|
||||||
@ -26,11 +28,14 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
|||||||
without nodeStore =? createNodeStore(state, clock), err:
|
without nodeStore =? createNodeStore(state, clock), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
|
without requestStore =? createRequestStore(state, clock), err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
let
|
let
|
||||||
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
|
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
|
||||||
todoList = createTodoList(state, metrics)
|
todoList = createTodoList(state, metrics)
|
||||||
marketplace = createMarketplace(state, clock)
|
marketplace = createMarketplace(state, clock)
|
||||||
chainMetrics = ChainMetrics.new(state, metrics, marketplace)
|
chainMetrics = ChainMetrics.new(state, metrics, requestStore, marketplace, clock)
|
||||||
|
|
||||||
without dhtMetrics =? createDhtMetrics(state, metrics), err:
|
without dhtMetrics =? createDhtMetrics(state, metrics), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
@ -43,5 +48,6 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
|
|||||||
components.add(dhtMetrics)
|
components.add(dhtMetrics)
|
||||||
components.add(marketplace)
|
components.add(marketplace)
|
||||||
components.add(chainMetrics)
|
components.add(chainMetrics)
|
||||||
|
components.add(ChainCrawler.new(state, requestStore, marketplace))
|
||||||
|
|
||||||
return success(components)
|
return success(components)
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import ../mocks/mockstate
|
|||||||
import ../mocks/mockmetrics
|
import ../mocks/mockmetrics
|
||||||
import ../mocks/mockrequeststore
|
import ../mocks/mockrequeststore
|
||||||
import ../mocks/mockmarketplace
|
import ../mocks/mockmarketplace
|
||||||
|
import ../mocks/mockclock
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
|
||||||
suite "ChainMetrics":
|
suite "ChainMetrics":
|
||||||
@ -19,6 +20,7 @@ suite "ChainMetrics":
|
|||||||
metrics: MockMetrics
|
metrics: MockMetrics
|
||||||
store: MockRequestStore
|
store: MockRequestStore
|
||||||
marketplace: MockMarketplaceService
|
marketplace: MockMarketplaceService
|
||||||
|
clock: MockClock
|
||||||
chain: ChainMetrics
|
chain: ChainMetrics
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
@ -26,8 +28,9 @@ suite "ChainMetrics":
|
|||||||
metrics = createMockMetrics()
|
metrics = createMockMetrics()
|
||||||
store = createMockRequestStore()
|
store = createMockRequestStore()
|
||||||
marketplace = createMockMarketplaceService()
|
marketplace = createMockMarketplaceService()
|
||||||
|
clock = createMockClock()
|
||||||
|
|
||||||
chain = ChainMetrics.new(state, metrics, store, marketplace)
|
chain = ChainMetrics.new(state, metrics, store, marketplace, clock)
|
||||||
|
|
||||||
(await chain.start()).tryGet()
|
(await chain.start()).tryGet()
|
||||||
|
|
||||||
@ -43,10 +46,12 @@ suite "ChainMetrics":
|
|||||||
state.delays.len == 1
|
state.delays.len == 1
|
||||||
state.delays[0] == 10.minutes
|
state.delays[0] == 10.minutes
|
||||||
|
|
||||||
test "onStep should remove non-running requests from request store":
|
test "onStep should remove old non-running requests from request store":
|
||||||
let rid = genRid()
|
let rid = genRid()
|
||||||
store.iterateEntries.add(RequestEntry(id: rid))
|
let oneDay = (60 * 60 * 24).uint64
|
||||||
|
store.iterateEntries.add(RequestEntry(id: rid, lastSeen: 100.uint64))
|
||||||
|
|
||||||
|
clock.setNow = 100 + oneDay + 1
|
||||||
marketplace.requestInfoReturns = none(RequestInfo)
|
marketplace.requestInfoReturns = none(RequestInfo)
|
||||||
|
|
||||||
await onStep()
|
await onStep()
|
||||||
@ -55,6 +60,20 @@ suite "ChainMetrics":
|
|||||||
marketplace.requestInfoRid == rid
|
marketplace.requestInfoRid == rid
|
||||||
store.removeRid == rid
|
store.removeRid == rid
|
||||||
|
|
||||||
|
test "onStep should not remove recent non-running requests from request store":
|
||||||
|
let rid = genRid()
|
||||||
|
let now = 123456789.uint64
|
||||||
|
store.iterateEntries.add(RequestEntry(id: rid, lastSeen: now - 1))
|
||||||
|
|
||||||
|
clock.setNow = now
|
||||||
|
marketplace.requestInfoReturns = none(RequestInfo)
|
||||||
|
|
||||||
|
await onStep()
|
||||||
|
|
||||||
|
check:
|
||||||
|
marketplace.requestInfoRid == rid
|
||||||
|
not (store.removeRid == rid)
|
||||||
|
|
||||||
test "onStep should count the number of active requests":
|
test "onStep should count the number of active requests":
|
||||||
let rid1 = genRid()
|
let rid1 = genRid()
|
||||||
let rid2 = genRid()
|
let rid2 = genRid()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user