mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-05 23:13:11 +00:00
fixes app initialization
This commit is contained in:
parent
605b561e30
commit
6574d53d5f
@ -16,9 +16,10 @@ import ./types
|
||||
|
||||
type Application* = ref object
|
||||
state: State
|
||||
components: seq[Component]
|
||||
|
||||
proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
let state = State(
|
||||
app.state = State(
|
||||
status: ApplicationStatus.Running,
|
||||
config: config,
|
||||
events: Events(
|
||||
@ -29,9 +30,10 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
),
|
||||
)
|
||||
|
||||
without components =? (await createComponents(state)), err:
|
||||
without components =? (await createComponents(app.state)), err:
|
||||
error "Failed to create componenents", err = err.msg
|
||||
return failure(err)
|
||||
app.components = components
|
||||
|
||||
for c in components:
|
||||
if err =? (await c.start()).errorOption:
|
||||
@ -39,6 +41,11 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
|
||||
return success()
|
||||
|
||||
proc stopComponents(app: Application) {.async.} =
|
||||
for c in app.components:
|
||||
if err =? (await c.stop()).errorOption:
|
||||
error "Failed to stop component", err = err.msg
|
||||
|
||||
proc stop*(app: Application) =
|
||||
app.state.status = ApplicationStatus.Stopping
|
||||
|
||||
@ -56,7 +63,6 @@ proc run*(app: Application) =
|
||||
info "Metrics endpoint initialized"
|
||||
|
||||
info "Starting application"
|
||||
app.state.status = ApplicationStatus.Running
|
||||
if err =? (waitFor app.initializeApp(config)).errorOption:
|
||||
app.state.status = ApplicationStatus.Stopping
|
||||
error "Failed to start application", err = err.msg
|
||||
@ -68,4 +74,7 @@ proc run*(app: Application) =
|
||||
except Exception as exc:
|
||||
error "Unhandled exception", msg = exc.msg
|
||||
quit QuitFailure
|
||||
notice "Application closed"
|
||||
|
||||
notice "Application stopping..."
|
||||
waitFor app.stopComponents()
|
||||
notice "Application stopped"
|
||||
|
||||
@ -32,10 +32,11 @@ proc handleCheckEvent(
|
||||
d.metrics.setOkNodes(d.ok.len)
|
||||
d.metrics.setNokNodes(d.nok.len)
|
||||
|
||||
trace "metrics updated", ok = d.ok.len, nok = d.nok.len
|
||||
return success()
|
||||
|
||||
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
info "Starting DhtMetrics..."
|
||||
info "Starting..."
|
||||
?await d.ok.load()
|
||||
?await d.nok.load()
|
||||
|
||||
|
||||
@ -14,6 +14,9 @@ import ../utils/asyncdataevent
|
||||
|
||||
const nodestoreName = "nodestore"
|
||||
|
||||
logScope:
|
||||
topics = "nodestore"
|
||||
|
||||
type
|
||||
NodeEntry* = object
|
||||
id*: Nid
|
||||
@ -75,7 +78,6 @@ proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.asyn
|
||||
|
||||
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
var newNodes = newSeq[Nid]()
|
||||
|
||||
for nid in nids:
|
||||
without isNew =? (await s.storeNodeIsNew(nid)), err:
|
||||
return failure(err)
|
||||
@ -83,6 +85,7 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
if isNew:
|
||||
newNodes.add(nid)
|
||||
|
||||
trace "Processed found nodes", total = nids.len, numNew = newNodes.len
|
||||
if newNodes.len > 0:
|
||||
?await s.fireNewNodesDiscovered(newNodes)
|
||||
return success()
|
||||
@ -109,7 +112,7 @@ method iterateAll*(
|
||||
return success()
|
||||
|
||||
method start*(s: NodeStore): Future[?!void] {.async.} =
|
||||
info "Starting nodestore..."
|
||||
info "Starting..."
|
||||
|
||||
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
return await s.processFoundNodes(nids)
|
||||
|
||||
@ -16,6 +16,7 @@ type TimeTracker* = ref object of Component
|
||||
nodestore: NodeStore
|
||||
|
||||
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
trace "Checking for expired nodes..."
|
||||
let expiry =
|
||||
(Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64
|
||||
|
||||
@ -30,7 +31,7 @@ proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
|
||||
return success()
|
||||
|
||||
method start*(t: TimeTracker): Future[?!void] {.async.} =
|
||||
info "Starting timetracker..."
|
||||
info "Starting..."
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
await t.step()
|
||||
|
||||
@ -26,6 +26,7 @@ proc addNodes(t: TodoList, nids: seq[Nid]) =
|
||||
for nid in nids:
|
||||
t.nids.add(nid)
|
||||
|
||||
trace "Nodes added", nodes = nids.len
|
||||
if s =? t.emptySignal:
|
||||
s.complete()
|
||||
t.emptySignal = Future[void].none
|
||||
@ -36,7 +37,7 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
|
||||
let signal = newFuture[void]("list.emptySignal")
|
||||
t.emptySignal = some(signal)
|
||||
try:
|
||||
await signal.wait(1.hours)
|
||||
await signal.wait(InfiniteDuration)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
if t.nids.len < 1:
|
||||
|
||||
@ -70,8 +70,8 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
|
||||
return success()
|
||||
|
||||
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
|
||||
if this.items.len < 1:
|
||||
return failure(this.name & "List is empty.")
|
||||
if not this.contains(nid):
|
||||
return success()
|
||||
|
||||
this.items.excl(nid)
|
||||
without itemKey =? Key.init(this.name / $nid), err:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user