diff --git a/codexcrawler/components/chaincrawler.nim b/codexcrawler/components/chaincrawler.nim index 1eefa26..8295618 100644 --- a/codexcrawler/components/chaincrawler.nim +++ b/codexcrawler/components/chaincrawler.nim @@ -1,3 +1,42 @@ -# subscribe to newrequests -# iterate past requests on start-up -# push them into the request store + +import pkg/chronicles +import pkg/chronos +import pkg/questionable/results + +import ../state +import ../components/requeststore +import ../services/marketplace +import ../component +import ../types + +logScope: + topics = "chaincrawler" + +type ChainCrawler* = ref object of Component + state: State + store: RequestStore + marketplace: MarketplaceService + +proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} = + return await c.store.update(rid) + +method start*(c: ChainCrawler): Future[?!void] {.async.} = + info "starting..." + + proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} = + return await c.onNewRequest(rid) + + ?await c.marketplace.subscribeToNewRequests(onRequest) + ?await c.marketplace.iteratePastNewRequestEvents(onRequest) + return success() + +method stop*(c: ChainCrawler): Future[?!void] {.async.} = + return success() + +proc new*( + T: type ChainCrawler, + state: State, + store: RequestStore, + marketplace: MarketplaceService, +): ChainCrawler = + ChainCrawler(state: state, store: store, marketplace: marketplace) diff --git a/codexcrawler/components/chainmetrics.nim b/codexcrawler/components/chainmetrics.nim index 44f8604..e08334b 100644 --- a/codexcrawler/components/chainmetrics.nim +++ b/codexcrawler/components/chainmetrics.nim @@ -31,12 +31,12 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} = # iter finished: update metrics! - without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err: - trace "Unable to get recent slotFill events from chain", err = err.msg - return success() # We don't propagate this error. - # The call is allowed to fail and the app should continue as normal. + # without slotFills =? (await c.marketplace.getRecentSlotFillEvents()), err: + # trace "Unable to get recent slotFill events from chain", err = err.msg + # return success() # We don't propagate this error. + # # The call is allowed to fail and the app should continue as normal. - c.metrics.setSlotFill(slotFills.len) + # c.metrics.setSlotFill(slotFills.len) return success() method start*(c: ChainMetrics): Future[?!void] {.async.} = diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index bc56e40..10f434b 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -25,9 +25,9 @@ Options: --revisitDelay= Delay in minutes after which a node can be revisited [default: 60] --checkDelay= Delay with which the 'revisitDelay' is checked for all known nodes [default: 10] --expiryDelay= Delay in minutes after which unresponsive nodes are discarded [default: 1440] (24h) + --marketplaceEnable= Set to "1" to enable marketplace metrics [default: 1] --ethProvider= Address including http(s) or ws of the eth provider --marketplaceAddress= Eth address of Codex contracts deployment - --marketplaceEnable= Set to "1" to enable marketplace metrics [default: 1] """ import strutils diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 500bc46..fbaf172 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -29,7 +29,7 @@ proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = let metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) todoList = createTodoList(state, metrics) - marketplace = createMarketplace(state) + marketplace = createMarketplace(state, clock) chainMetrics = ChainMetrics.new(state, metrics, marketplace) without dhtMetrics =? createDhtMetrics(state, metrics), err: diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index f652ad6..88a1d9f 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -1,43 +1,63 @@ import pkg/ethers import pkg/questionable +import pkg/upraises import ./marketplace/market import ./marketplace/marketplace import ../config import ../component import ../state +import ../types +import ./clock logScope: topics = "marketplace" -type MarketplaceService* = ref object of Component - state: State - market: ?OnChainMarket +type + MarketplaceService* = ref object of Component + state: State + market: ?OnChainMarket + clock: Clock + OnNewRequest* = proc(id: Rid): Future[?!void] {.async: (raises: []), gcsafe.} -method getRecentSlotFillEvents*( - m: MarketplaceService -): Future[?!seq[SlotFilled]] {.async: (raises: []), base.} = - # There is (aprox.) 1 block every 10 seconds. - # 10 seconds * 6 * 60 = 3600 = 1 hour. - let blocksAgo = 6 * 60 +proc notStarted() = + raiseAssert("MarketplaceService was called before it was started.") + +method subscribeToNewRequests*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} = + proc resultWrapper(rid: Rid): Future[void] {.async.} = + let response = await onNewRequest(rid) + if error =? response.errorOption: + raiseAssert("Error result in handling of onNewRequest callback: " & error.msg) + + proc onRequest(id: RequestId, ask: StorageAsk, expiry: uint64) {.gcsafe, upraises: [].} = + asyncSpawn resultWrapper(Rid(id)) if market =? m.market: try: - return success(await market.queryPastSlotFilledEvents(blocksAgo)) - except CatchableError as err: - return failure(err.msg) - return failure("MarketplaceService is not started") + discard await market.subscribeRequests(onRequest) + except CatchableError as exc: + return failure(exc.msg) + else: + notStarted() + return success() + +method iteratePastNewRequestEvents*(m: MarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []), base.} = + let + oneDay = 60 * 60 * 24 + timespan = oneDay * 30 + startTime = m.clock.now() - timespan.uint64 + + if market =? m.market: + try: + let requests = await market.queryPastStorageRequestedEvents(startTime.int64) + for request in requests: + if error =? (await onNewRequest(Rid(request.requestId))).errorOption: + return failure(error.msg) + except CatchableError as exc: + return failure(exc.msg) + else: + notStarted() method start*(m: MarketplaceService): Future[?!void] {.async.} = - # Todo: - # - subscribe to requestSubmitted -> add id to list - # - queryPastStorageRequestedEvents from 3 months ago (max duration) -> add ids to list - # for list: - # - get status of request - # if running: - # - sum total bytes - # else: - # - remove from list - let provider = JsonRpcProvider.new(m.state.config.ethProvider) without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): return failure("Invalid MarketplaceAddress provided") @@ -50,8 +70,8 @@ method start*(m: MarketplaceService): Future[?!void] {.async.} = method stop*(m: MarketplaceService): Future[?!void] {.async.} = return success() -proc new(T: type MarketplaceService, state: State): MarketplaceService = - return MarketplaceService(state: state, market: none(OnChainMarket)) +proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService = + return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock) -proc createMarketplace*(state: State): MarketplaceService = - return MarketplaceService.new(state) +proc createMarketplace*(state: State, clock: Clock): MarketplaceService = + return MarketplaceService.new(state, clock) diff --git a/tests/codexcrawler/components/testchaincrawler.nim b/tests/codexcrawler/components/testchaincrawler.nim new file mode 100644 index 0000000..8a50a78 --- /dev/null +++ b/tests/codexcrawler/components/testchaincrawler.nim @@ -0,0 +1,60 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import std/sequtils +import std/options + +import ../../../codexcrawler/components/chaincrawler +import ../../../codexcrawler/services/marketplace/market +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mockrequeststore +import ../mocks/mockmarketplace +import ../helpers + +suite "ChainCrawler": + var + state: MockState + store: MockRequestStore + marketplace: MockMarketplaceService + crawler: ChainCrawler + + setup: + state = createMockState() + store = createMockRequestStore() + marketplace = createMockMarketplaceService() + + crawler = ChainCrawler.new(state, store, marketplace) + + (await crawler.start()).tryGet() + + teardown: + (await crawler.stop()).tryGet() + state.checkAllUnsubscribed() + + # subscribe to newrequests + # iterate past requests on start-up + # push them into the request store + + test "start should subscribe to new requests": + check: + marketplace.subNewRequestsCallback.isSome() + + test "new-request subscription should add requestId to store": + let rid = genRid() + (await (marketplace.subNewRequestsCallback.get())(rid)).tryGet() + + check: + store.updateRid == rid + + test "start should iterate past requests and add then to store": + check: + marketplace.iterRequestsCallback.isSome() + + let rid = genRid() + (await marketplace.iterRequestsCallback.get()(rid)).tryGet() + + check: + store.updateRid == rid diff --git a/tests/codexcrawler/components/testchainmetrics.nim b/tests/codexcrawler/components/testchainmetrics.nim index 99c2a89..2f0092f 100644 --- a/tests/codexcrawler/components/testchainmetrics.nim +++ b/tests/codexcrawler/components/testchainmetrics.nim @@ -1,85 +1,85 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/asynctest/chronos/unittest -import std/sequtils +# import pkg/chronos +# import pkg/questionable +# import pkg/questionable/results +# import pkg/asynctest/chronos/unittest +# import std/sequtils -import ../../../codexcrawler/components/chainmetrics -import ../../../codexcrawler/services/marketplace/market -import ../../../codexcrawler/types -import ../../../codexcrawler/state -import ../mocks/mockstate -import ../mocks/mockmetrics -import ../mocks/mockmarketplace -import ../helpers +# import ../../../codexcrawler/components/chainmetrics +# import ../../../codexcrawler/services/marketplace/market +# import ../../../codexcrawler/types +# import ../../../codexcrawler/state +# import ../mocks/mockstate +# import ../mocks/mockmetrics +# import ../mocks/mockmarketplace +# import ../helpers -suite "ChainMetrics": - var - state: MockState - metrics: MockMetrics - marketplace: MockMarketplaceService - chain: ChainMetrics +# suite "ChainMetrics": +# var +# state: MockState +# metrics: MockMetrics +# marketplace: MockMarketplaceService +# chain: ChainMetrics - setup: - state = createMockState() - metrics = createMockMetrics() - marketplace = createMockMarketplaceService() +# setup: +# state = createMockState() +# metrics = createMockMetrics() +# marketplace = createMockMarketplaceService() - metrics.slotFill = -1 - chain = ChainMetrics.new(state, metrics, marketplace) +# metrics.slotFill = -1 +# chain = ChainMetrics.new(state, metrics, marketplace) - (await chain.start()).tryGet() +# (await chain.start()).tryGet() - teardown: - (await chain.stop()).tryGet() - state.checkAllUnsubscribed() +# teardown: +# (await chain.stop()).tryGet() +# state.checkAllUnsubscribed() - proc onStep() {.async.} = - (await state.steppers[0]()).tryGet() +# proc onStep() {.async.} = +# (await state.steppers[0]()).tryGet() - test "start should start stepper for 10 minutes": - check: - state.delays.len == 1 - state.delays[0] == 10.minutes +# test "start should start stepper for 10 minutes": +# check: +# state.delays.len == 1 +# state.delays[0] == 10.minutes - test "onStep is not activated when config.marketplaceEnable is false": - # Recreate chainMetrics, reset mockstate: - (await chain.stop()).tryGet() - state.steppers = @[] - # disable marketplace: - state.config.marketplaceEnable = false - (await chain.start()).tryGet() +# test "onStep is not activated when config.marketplaceEnable is false": +# # Recreate chainMetrics, reset mockstate: +# (await chain.stop()).tryGet() +# state.steppers = @[] +# # disable marketplace: +# state.config.marketplaceEnable = false +# (await chain.start()).tryGet() - check: - state.steppers.len == 0 +# check: +# state.steppers.len == 0 - test "step should not call setSlotFill when getRecentSlotFillEvents fails": - let testValue = -123 - metrics.slotFill = testValue +# test "step should not call setSlotFill when getRecentSlotFillEvents fails": +# let testValue = -123 +# metrics.slotFill = testValue - marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure") +# marketplace.recentSlotFillEventsReturn = seq[SlotFilled].failure("testfailure") - await onStep() +# await onStep() - check: - metrics.slotFill == testValue +# check: +# metrics.slotFill == testValue - test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq": - metrics.slotFill = -123 +# test "step should setSlotFill to zero when getRecentSlotFillEvents returns empty seq": +# metrics.slotFill = -123 - marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]()) +# marketplace.recentSlotFillEventsReturn = success(newSeq[SlotFilled]()) - await onStep() +# await onStep() - check: - metrics.slotFill == 0 +# check: +# metrics.slotFill == 0 - test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents": - let fills = @[SlotFilled(), SlotFilled(), SlotFilled(), SlotFilled()] +# test "step should setSlotFill to the length of seq returned from getRecentSlotFillEvents": +# let fills = @[SlotFilled(), SlotFilled(), SlotFilled(), SlotFilled()] - marketplace.recentSlotFillEventsReturn = success(fills) +# marketplace.recentSlotFillEventsReturn = success(fills) - await onStep() +# await onStep() - check: - metrics.slotFill == fills.len +# check: +# metrics.slotFill == fills.len diff --git a/tests/codexcrawler/mocks/mockmarketplace.nim b/tests/codexcrawler/mocks/mockmarketplace.nim index 2956e8d..e93931c 100644 --- a/tests/codexcrawler/mocks/mockmarketplace.nim +++ b/tests/codexcrawler/mocks/mockmarketplace.nim @@ -8,12 +8,19 @@ logScope: topics = "marketplace" type MockMarketplaceService* = ref object of MarketplaceService - recentSlotFillEventsReturn*: ?!seq[SlotFilled] + subNewRequestsCallback*: ?OnNewRequest + iterRequestsCallback*: ?OnNewRequest -method getRecentSlotFillEvents*( - m: MockMarketplaceService -): Future[?!seq[SlotFilled]] {.async: (raises: []).} = - return m.recentSlotFillEventsReturn +method subscribeToNewRequests*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} = + m.subNewRequestsCallback = some(onNewRequest) + return success() + +method iteratePastNewRequestEvents*(m: MockMarketplaceService, onNewRequest: OnNewRequest): Future[?!void] {.async: (raises: []).} = + m.iterRequestsCallback = some(onNewRequest) + return success() proc createMockMarketplaceService*(): MockMarketplaceService = - MockMarketplaceService() + MockMarketplaceService( + subNewRequestsCallback: none(OnNewRequest), + iterRequestsCallback: none(OnNewRequest) + ) diff --git a/tests/codexcrawler/mocks/mockrequeststore.nim b/tests/codexcrawler/mocks/mockrequeststore.nim new file mode 100644 index 0000000..7005594 --- /dev/null +++ b/tests/codexcrawler/mocks/mockrequeststore.nim @@ -0,0 +1,33 @@ +import std/sequtils +import pkg/questionable/results +import pkg/chronos + +import ../../../codexcrawler/components/requeststore +import ../../../codexcrawler/types + +type MockRequestStore* = ref object of RequestStore + updateRid*: Rid + removeRid*: Rid + iterateEntries*: seq[RequestEntry] + +method update*( + s: MockRequestStore, rid: Rid +): Future[?!void] {.async: (raises: []).} = + s.updateRid = rid + return success() + +method remove*( + s: MockRequestStore, rid: Rid +): Future[?!void] {.async: (raises: []).} = + s.removeRid = rid + return success() + +method iterateAll*( + s: MockRequestStore, onNode: OnRequestEntry +): Future[?!void] {.async: (raises: []).} = + for entry in s.iterateEntries: + ?await onNode(entry) + return success() + +proc createMockRequestStore*(): MockRequestStore = + MockRequestStore(iterateEntries: newSeq[RequestEntry]()) diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index fa7de7d..bf3b66b 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -1,3 +1,4 @@ +import ./components/testchaincrawler import ./components/testchainmetrics import ./components/testdhtcrawler import ./components/testdhtmetrics