splits awake and start component methods

This commit is contained in:
ThatBen 2025-03-21 13:04:10 +01:00
parent a5618d0cd2
commit c1b0c30cc0
No known key found for this signature in database
GPG Key ID: E020A7DDCD52E1AB
21 changed files with 40 additions and 79 deletions

View File

@ -36,9 +36,15 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
return failure(err)
app.components = components
for c in components:
if err =? (await c.awake()).errorOption:
error "Failed during component awake", err = err.msg
return failure(err)
for c in components:
if err =? (await c.start()).errorOption:
error "Failed to start component", err = err.msg
error "Failed during component start", err = err.msg
return failure(err)
return success()

View File

@ -1,12 +1,19 @@
import pkg/chronos
import pkg/questionable/results
import ./state
type Component* = ref object of RootObj
method awake*(c: Component): Future[?!void] {.async, base.} =
# Awake is called on all components in an unspecified order.
# Use this method to subscribe/connect to other components.
return success()
method start*(c: Component): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method: component.start")
# Start is called on all components in an unspecified order.
# Is is guaranteed that all components have already successfulled handled 'awake'.
# Use this method to begin the work of this component.
return success()
method stop*(c: Component): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method: component.stop")
# Use this method to stop, unsubscribe, and clean up any resources.
return success()

View File

@ -25,13 +25,13 @@ method start*(c: ChainCrawler): Future[?!void] {.async.} =
proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.onNewRequest(rid)
# Normally subscriptions must be done in awake.
# Marketplace is a little different: It uses awake to set up its connections.
# And so it can't handle subscribes until we're in 'start'.
?await c.marketplace.subscribeToNewRequests(onRequest)
?await c.marketplace.iteratePastNewRequestEvents(onRequest)
return success()
method stop*(c: ChainCrawler): Future[?!void] {.async.} =
return success()
proc new*(
T: type ChainCrawler,
state: State,

View File

@ -71,15 +71,14 @@ method start*(c: ChainMetrics): Future[?!void] {.async.} =
return success()
method stop*(c: ChainMetrics): Future[?!void] {.async.} =
return success()
proc new*(
T: type ChainMetrics,
state: State,
metrics: Metrics,
store: RequestStore,
marketplace: MarketplaceService,
clock: Clock
clock: Clock,
): ChainMetrics =
ChainMetrics(state: state, metrics: metrics, store: store, marketplace: marketplace, clock: clock)
ChainMetrics(
state: state, metrics: metrics, store: store, marketplace: marketplace, clock: clock
)

View File

@ -57,8 +57,5 @@ method start*(c: DhtCrawler): Future[?!void] {.async.} =
return success()
method stop*(c: DhtCrawler): Future[?!void] {.async.} =
return success()
proc new*(T: type DhtCrawler, state: State, dht: Dht, todo: TodoList): DhtCrawler =
DhtCrawler(state: state, dht: dht, todo: todo)

View File

@ -45,11 +45,7 @@ proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.}
d.updateMetrics()
return success()
method start*(d: DhtMetrics): Future[?!void] {.async.} =
info "starting..."
?await d.ok.load()
?await d.nok.load()
method awake*(d: DhtMetrics): Future[?!void] {.async.} =
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
await d.handleCheckEvent(event)
@ -58,7 +54,12 @@ method start*(d: DhtMetrics): Future[?!void] {.async.} =
d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck)
d.subDel = d.state.events.nodesDeleted.subscribe(onDelete)
return success()
method start*(d: DhtMetrics): Future[?!void] {.async.} =
info "starting..."
?await d.ok.load()
?await d.nok.load()
return success()
method stop*(d: DhtMetrics): Future[?!void] {.async.} =

View File

@ -117,13 +117,6 @@ method iterateAll*(
return failure(exc.msg)
return success()
method start*(s: RequestStore): Future[?!void] {.async.} =
info "starting..."
return success()
method stop*(s: RequestStore): Future[?!void] {.async.} =
return success()
proc new*(
T: type RequestStore, state: State, store: TypedDatastore, clock: Clock
): RequestStore =

View File

@ -71,9 +71,6 @@ method start*(t: TimeTracker): Future[?!void] {.async.} =
await t.state.whileRunning(onRoutingTable, 30.minutes)
return success()
method stop*(t: TimeTracker): Future[?!void] {.async.} =
return success()
proc new*(
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht, clock: Clock
): TimeTracker =

View File

@ -32,13 +32,13 @@ proc addNodes(t: TodoList, nids: seq[Nid]) =
t.metrics.setTodoNodes(t.nids.len)
if s =? t.emptySignal:
trace "Nodes added, resuming...", nodes = nids.len
trace "nodes added, resuming...", nodes = nids.len
s.complete()
t.emptySignal = Future[void].none
method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
if t.nids.len < 1:
trace "List is empty. Waiting for new items..."
trace "list is empty. Waiting for new items..."
let signal = newFuture[void]("list.emptySignal")
t.emptySignal = some(signal)
try:
@ -54,8 +54,8 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
return success(item)
method start*(t: TodoList): Future[?!void] {.async.} =
info "Starting TodoList..."
method awake*(t: TodoList): Future[?!void] {.async.} =
info "initializing..."
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
t.addNodes(nids)

View File

@ -53,11 +53,11 @@ method subscribeToNewRequests*(
if market =? m.market:
try:
discard await market.subscribeRequests(onRequest)
return success()
except CatchableError as exc:
return failure(exc.msg)
else:
notStarted()
return success()
method iteratePastNewRequestEvents*(
m: MarketplaceService, onNewRequest: OnNewRequest
@ -73,6 +73,7 @@ method iteratePastNewRequestEvents*(
for request in requests:
if error =? (await onNewRequest(Rid(request.requestId))).errorOption:
return failure(error.msg)
return success()
except CatchableError as exc:
return failure(exc.msg)
else:
@ -95,17 +96,13 @@ method getRequestInfo*(
else:
notStarted()
method start*(m: MarketplaceService): Future[?!void] {.async.} =
method awake*(m: MarketplaceService): Future[?!void] {.async.} =
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")
let marketplace = Marketplace.new(marketplaceAddress, provider)
m.market = some(OnChainMarket.new(marketplace))
return success()
method stop*(m: MarketplaceService): Future[?!void] {.async.} =
return success()
proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =

View File

@ -27,17 +27,11 @@ suite "ChainCrawler":
marketplace = createMockMarketplaceService()
crawler = ChainCrawler.new(state, store, marketplace)
(await crawler.start()).tryGet()
teardown:
(await crawler.stop()).tryGet()
state.checkAllUnsubscribed()
# subscribe to newrequests
# iterate past requests on start-up
# push them into the request store
test "start should subscribe to new requests":
check:
marketplace.subNewRequestsCallback.isSome()

View File

@ -31,11 +31,9 @@ suite "ChainMetrics":
clock = createMockClock()
chain = ChainMetrics.new(state, metrics, store, marketplace, clock)
(await chain.start()).tryGet()
teardown:
(await chain.stop()).tryGet()
state.checkAllUnsubscribed()
proc onStep() {.async.} =

View File

@ -30,11 +30,9 @@ suite "DhtCrawler":
dht = createMockDht()
crawler = DhtCrawler.new(state, dht, todo)
(await crawler.start()).tryGet()
teardown:
(await crawler.stop()).tryGet()
state.checkAllUnsubscribed()
proc onStep() {.async.} =
@ -57,7 +55,6 @@ suite "DhtCrawler":
test "onStep is not activated when config.dhtEnable is false":
# Recreate crawler, reset mockstate:
(await crawler.stop()).tryGet()
state.steppers = @[]
# disable DHT:
state.config.dhtEnable = false

View File

@ -28,7 +28,7 @@ suite "DhtMetrics":
metrics = createMockMetrics()
dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics)
(await dhtmetrics.awake()).tryGet()
(await dhtmetrics.start()).tryGet()
teardown:

View File

@ -30,7 +30,6 @@ suite "Nodestore":
clock = createMockClock()
store = NodeStore.new(state, ds, clock)
(await store.start()).tryGet()
teardown:

View File

@ -30,10 +30,7 @@ suite "Requeststore":
store = RequestStore.new(state, ds, clock)
(await store.start()).tryGet()
teardown:
(await store.stop()).tryGet()
(await ds.close()).tryGet()
state.checkAllUnsubscribed()
removeDir(dsPath)

View File

@ -47,11 +47,9 @@ suite "TimeTracker":
state.config.expiryDelayMins = 22
time = TimeTracker.new(state, store, dht, clock)
(await time.start()).tryGet()
teardown:
(await time.stop()).tryGet()
await state.events.nodesToRevisit.unsubscribe(sub)
state.checkAllUnsubscribed()

View File

@ -23,8 +23,7 @@ suite "TodoList":
metrics = createMockMetrics()
todo = TodoList.new(state, metrics)
(await todo.start()).tryGet()
(await todo.awake()).tryGet()
teardown:
(await todo.stop()).tryGet()

View File

@ -18,11 +18,5 @@ method getNeighbors*(
d.getNeighborsArg = some(target)
return d.getNeighborsReturn
method start*(d: MockDht): Future[?!void] {.async.} =
return success()
method stop*(d: MockDht): Future[?!void] {.async.} =
return success()
proc createMockDht*(): MockDht =
MockDht()

View File

@ -22,11 +22,5 @@ method deleteEntries*(
s.nodesToDelete = nids
return success()
method start*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
method stop*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
proc createMockNodeStore*(): MockNodeStore =
MockNodeStore(nodesToIterate: newSeq[NodeEntry](), nodesToDelete: newSeq[Nid]())

View File

@ -10,11 +10,5 @@ type MockTodoList* = ref object of TodoList
method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} =
return t.popReturn
method start*(t: MockTodoList): Future[?!void] {.async.} =
return success()
method stop*(t: MockTodoList): Future[?!void] {.async.} =
return success()
proc createMockTodoList*(): MockTodoList =
MockTodoList()