Implements chaincrawler module

This commit is contained in:
ThatBen 2025-03-20 15:27:17 +01:00
parent 1e8621b639
commit 86643e7647
No known key found for this signature in database
GPG Key ID: E020A7DDCD52E1AB
10 changed files with 266 additions and 106 deletions

View File

@ -1,3 +1,42 @@
# subscribe to newrequests
# iterate past requests on start-up import pkg/chronicles
# push them into the request store import pkg/chronos
import pkg/questionable/results
import ../state
import ../components/requeststore
import ../services/marketplace
import ../component
import ../types
logScope:
topics = "chaincrawler"
type ChainCrawler* = ref object of Component
state: State
store: RequestStore
marketplace: MarketplaceService
proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.store.update(rid)
method start*(c: ChainCrawler): Future[?!void] {.async.} =
info "starting..."
proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.onNewRequest(rid)
?await c.marketplace.subscribeToNewRequests(onRequest)
?await c.marketplace.iteratePastNewRequestEvents(onRequest)
return success()
method stop*(c: ChainCrawler): Future[?!void] {.async.} =
return success()
proc new*(
T: type ChainCrawler,
state: State,
store: RequestStore,
marketplace: MarketplaceService,
): ChainCrawler =
ChainCrawler(state: state, store: store, marketplace: marketplace)

View File

@ -31,12 +31,12 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
# iter finished: update metrics! # iter finished: update metrics!
without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err: # without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err:
trace "Unable to get recent slotFill events from chain", err = err.msg # trace "Unable to get recent slotFill events from chain", err = err.msg
return success() # We don't propagate this error. # return success() # We don't propagate this error.
# The call is allowed to fail and the app should continue as normal. # # The call is allowed to fail and the app should continue as normal.
c.metrics.setSlotFill(slotFills.len) # c.metrics.setSlotFill(slotFills.len)
return success() return success()
method start*(c: ChainMetrics): Future[?!void] {.async.} = method start*(c: ChainMetrics): Future[?!void] {.async.} =

View File

@ -25,9 +25,9 @@ Options:
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60] --revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 60]
--checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10] --checkDelay=<m> Delay with which the 'revisitDelay' is checked for all known nodes [default: 10]
--expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h) --expiryDelay=<m> Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h)
--marketplaceEnable=<e> Set to "1" to enable marketplace metrics [default: 1]
--ethProvider=<a> Address including http(s) or ws of the eth provider --ethProvider=<a> Address including http(s) or ws of the eth provider
--marketplaceAddress=<a> Eth address of Codex contracts deployment --marketplaceAddress=<a> Eth address of Codex contracts deployment
--marketplaceEnable=<e> Set to "1" to enable marketplace metrics [default: 1]
""" """
import strutils import strutils

View File

@ -29,7 +29,7 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
let let
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
todoList = createTodoList(state, metrics) todoList = createTodoList(state, metrics)
marketplace = createMarketplace(state) marketplace = createMarketplace(state, clock)
chainMetrics = ChainMetrics.new(state, metrics, marketplace) chainMetrics = ChainMetrics.new(state, metrics, marketplace)
without dhtMetrics =? createDhtMetrics(state, metrics), err: without dhtMetrics =? createDhtMetrics(state, metrics), err:

View File

@ -1,43 +1,63 @@
import pkg/ethers import pkg/ethers
import pkg/questionable import pkg/questionable
import pkg/upraises
import ./marketplace/market import ./marketplace/market
import ./marketplace/marketplace import ./marketplace/marketplace
import ../config import ../config
import ../component import ../component
import ../state import ../state
import ../types
import ./clock
logScope: logScope:
topics = "marketplace" topics = "marketplace"
type MarketplaceService* = ref object of Component type
state: State MarketplaceService* = ref object of Component
market: ?OnChainMarket state: State
market: ?OnChainMarket
clock: Clock
OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.}
method getRecentSlotFillEvents*( proc notStarted() =
m: MarketplaceService raiseAssert("MarketplaceService was called before it was started.")
): Future[?!seq[SlotFilled]] {.async: (raises: []), base.} =
# There is (aprox.) 1 block every 10 seconds. method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} =
# 10 seconds * 6 * 60 = 3600 = 1 hour. proc resultWrapper(rid: Rid): Future[void] {.async.} =
let blocksAgo = 6 * 60 let response = await onNewRequest(rid)
if error =? response.errorOption:
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
proc onRequest(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} =
asyncSpawn resultWrapper(Rid(id))
if market =? m.market: if market =? m.market:
try: try:
return success(await market.queryPastSlotFilledEvents(blocksAgo)) discard await market.subscribeRequests(onRequest)
except CatchableError as err: except CatchableError as exc:
return failure(err.msg) return failure(exc.msg)
return failure("MarketplaceService is not started") else:
notStarted()
return success()
method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} =
let
oneDay = 60 * 60 * 24
timespan = oneDay * 30
startTime = m.clock.now() - timespan.uint64
if market =? m.market:
try:
let requests = await market.queryPastStorageRequestedEvents(startTime.int64)
for request in requests:
if error =? (await onNewRequest(Rid(request.requestId))).errorOption:
return failure(error.msg)
except CatchableError as exc:
return failure(exc.msg)
else:
notStarted()
method start*(m: MarketplaceService): Future[?!void] {.async.} = method start*(m: MarketplaceService): Future[?!void] {.async.} =
# Todo:
# - subscribe to requestSubmitted -> add id to list
# - queryPastStorageRequestedEvents from 3 months ago (max duration) -> add ids to list
# for list:
# - get status of request
# if running:
# - sum total bytes
# else:
# - remove from list
let provider = JsonRpcProvider.new(m.state.config.ethProvider) let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided") return failure("Invalid MarketplaceAddress provided")
@ -50,8 +70,8 @@ method start*(m: MarketplaceService): Future[?!void] {.async.} =
method stop*(m: MarketplaceService): Future[?!void] {.async.} = method stop*(m: MarketplaceService): Future[?!void] {.async.} =
return success() return success()
proc new(T: type MarketplaceService, state: State): MarketplaceService = proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =
return MarketplaceService(state: state, market: none(OnChainMarket)) return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock)
proc createMarketplace*(state: State): MarketplaceService = proc createMarketplace*(state: State, clock: Clock): MarketplaceService =
return MarketplaceService.new(state) return MarketplaceService.new(state, clock)

View File

@ -0,0 +1,60 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import std/sequtils
import std/options
import ../../../codexcrawler/components/chaincrawler
import ../../../codexcrawler/services/marketplace/market
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mockrequeststore
import ../mocks/mockmarketplace
import ../helpers
suite "ChainCrawler":
var
state: MockState
store: MockRequestStore
marketplace: MockMarketplaceService
crawler: ChainCrawler
setup:
state = createMockState()
store = createMockRequestStore()
marketplace = createMockMarketplaceService()
crawler = ChainCrawler.new(state, store, marketplace)
(await crawler.start()).tryGet()
teardown:
(await crawler.stop()).tryGet()
state.checkAllUnsubscribed()
# subscribe to newrequests
# iterate past requests on start-up
# push them into the request store
test "start should subscribe to new requests":
check:
marketplace.subNewRequestsCallback.isSome()
test "new-request subscription should add requestId to store":
let rid = genRid()
(await (marketplace.subNewRequestsCallback.get())(rid)).tryGet()
check:
store.updateRid == rid
test "start should iterate past requests and add then to store":
check:
marketplace.iterRequestsCallback.isSome()
let rid = genRid()
(await marketplace.iterRequestsCallback.get()(rid)).tryGet()
check:
store.updateRid == rid

View File

@ -1,85 +1,85 @@
import pkg/chronos # import pkg/chronos
import pkg/questionable # import pkg/questionable
import pkg/questionable/results # import pkg/questionable/results
import pkg/asynctest/chronos/unittest # import pkg/asynctest/chronos/unittest
import std/sequtils # import std/sequtils
import ../../../codexcrawler/components/chainmetrics # import ../../../codexcrawler/components/chainmetrics
import ../../../codexcrawler/services/marketplace/market # import ../../../codexcrawler/services/marketplace/market
import ../../../codexcrawler/types # import ../../../codexcrawler/types
import ../../../codexcrawler/state # import ../../../codexcrawler/state
import ../mocks/mockstate # import ../mocks/mockstate
import ../mocks/mockmetrics # import ../mocks/mockmetrics
import ../mocks/mockmarketplace # import ../mocks/mockmarketplace
import ../helpers # import ../helpers
suite "ChainMetrics": # suite "ChainMetrics":
var # var
state: MockState # state: MockState
metrics: MockMetrics # metrics: MockMetrics
marketplace: MockMarketplaceService # marketplace: MockMarketplaceService
chain: ChainMetrics # chain: ChainMetrics
setup: # setup:
state = createMockState() # state = createMockState()
metrics = createMockMetrics() # metrics = createMockMetrics()
marketplace = createMockMarketplaceService() # marketplace = createMockMarketplaceService()
metrics.slotFill = -1 # metrics.slotFill = -1
chain = ChainMetrics.new(state, metrics, marketplace) # chain = ChainMetrics.new(state, metrics, marketplace)
(await chain.start()).tryGet() # (await chain.start()).tryGet()
teardown: # teardown:
(await chain.stop()).tryGet() # (await chain.stop()).tryGet()
state.checkAllUnsubscribed() # state.checkAllUnsubscribed()
proc onStep() {.async.} = # proc onStep() {.async.} =
(await state.steppers[0]()).tryGet() # (await state.steppers[0]()).tryGet()
test "start should start stepper for 10 minutes": # test "start should start stepper for 10 minutes":
check: # check:
state.delays.len == 1 # state.delays.len == 1
state.delays[0] == 10.minutes # state.delays[0] == 10.minutes
test "onStep is not activated when config.marketplaceEnable is false": # test "onStep is not activated when config.marketplaceEnable is false":
# Recreate chainMetrics, reset mockstate: # # Recreate chainMetrics, reset mockstate:
(await chain.stop()).tryGet() # (await chain.stop()).tryGet()
state.steppers = @[] # state.steppers = @[]
# disable marketplace: # # disable marketplace:
state.config.marketplaceEnable = false # state.config.marketplaceEnable = false
(await chain.start()).tryGet() # (await chain.start()).tryGet()
check: # check:
state.steppers.len == 0 # state.steppers.len == 0
test "step should not call setSlotFill when getRecentSlotFillEvents fails": # test "step should not call setSlotFill when getRecentSlotFillEvents fails":
let testValue = -123 # let testValue = -123
metrics.slotFill = testValue # metrics.slotFill = testValue
marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure") # marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure")
await onStep() # await onStep()
check: # check:
metrics.slotFill == testValue # metrics.slotFill == testValue
test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq": # test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq":
metrics.slotFill = -123 # metrics.slotFill = -123
marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]()) # marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]())
await onStep() # await onStep()
check: # check:
metrics.slotFill == 0 # metrics.slotFill == 0
test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents": # test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents":
let fills = @[SlotFilled(), SlotFilled(), SlotFilled(), SlotFilled()] # let fills = @[SlotFilled(), SlotFilled(), SlotFilled(), SlotFilled()]
marketplace.recentSlotFillEventsReturn = success(fills) # marketplace.recentSlotFillEventsReturn = success(fills)
await onStep() # await onStep()
check: # check:
metrics.slotFill == fills.len # metrics.slotFill == fills.len

View File

@ -8,12 +8,19 @@ logScope:
topics = "marketplace" topics = "marketplace"
type MockMarketplaceService* = ref object of MarketplaceService type MockMarketplaceService* = ref object of MarketplaceService
recentSlotFillEventsReturn*: ?!seq[SlotFilled] subNewRequestsCallback*: ?OnNewRequest
iterRequestsCallback*: ?OnNewRequest
method getRecentSlotFillEvents*( method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} =
m: MockMarketplaceService m.subNewRequestsCallback = some(onNewRequest)
): Future[?!seq[SlotFilled]] {.async: (raises: []).} = return success()
return m.recentSlotFillEventsReturn
method iteratePastNewRequestEvents*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} =
m.iterRequestsCallback = some(onNewRequest)
return success()
proc createMockMarketplaceService*(): MockMarketplaceService = proc createMockMarketplaceService*(): MockMarketplaceService =
MockMarketplaceService() MockMarketplaceService(
subNewRequestsCallback: none(OnNewRequest),
iterRequestsCallback: none(OnNewRequest)
)

View File

@ -0,0 +1,33 @@
import std/sequtils
import pkg/questionable/results
import pkg/chronos
import ../../../codexcrawler/components/requeststore
import ../../../codexcrawler/types
type MockRequestStore* = ref object of RequestStore
updateRid*: Rid
removeRid*: Rid
iterateEntries*: seq[RequestEntry]
method update*(
s: MockRequestStore, rid: Rid
): Future[?!void] {.async: (raises: []).} =
s.updateRid = rid
return success()
method remove*(
s: MockRequestStore, rid: Rid
): Future[?!void] {.async: (raises: []).} =
s.removeRid = rid
return success()
method iterateAll*(
s: MockRequestStore, onNode: OnRequestEntry
): Future[?!void] {.async: (raises: []).} =
for entry in s.iterateEntries:
?await onNode(entry)
return success()
proc createMockRequestStore*(): MockRequestStore =
MockRequestStore(iterateEntries: newSeq[RequestEntry]())

View File

@ -1,3 +1,4 @@
import ./components/testchaincrawler
import ./components/testchainmetrics import ./components/testchainmetrics
import ./components/testdhtcrawler import ./components/testdhtcrawler
import ./components/testdhtmetrics import ./components/testdhtmetrics