mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
sets up main loop
This commit is contained in:
parent
58b3d9679c
commit
4fff78903d
@ -8,60 +8,19 @@ import pkg/metrics
|
||||
|
||||
import ./config
|
||||
import ./utils/logging
|
||||
import ./metrics
|
||||
import ./list
|
||||
import ./utils/datastoreutils
|
||||
import ./utils/asyncdataevent
|
||||
import ./installer
|
||||
import ./state
|
||||
import ./component
|
||||
import ./types
|
||||
|
||||
type
|
||||
ApplicationStatus* {.pure.} = enum
|
||||
Stopped
|
||||
Stopping
|
||||
Running
|
||||
type Application* = ref object
|
||||
state: State
|
||||
|
||||
Application* = ref object
|
||||
status: ApplicationStatus
|
||||
config*: Config
|
||||
todoNodes*: List
|
||||
okNodes*: List
|
||||
nokNodes*: List
|
||||
|
||||
# proc initializeLists(app: Application): Future[?!void] {.async.} =
|
||||
# without store =? createTypedDatastore(app.config.dataDir / "lists"), err:
|
||||
# return failure(err)
|
||||
|
||||
# # We can't extract this into a function because gauges cannot be passed as argument.
|
||||
# # The use of global state in nim-metrics is not pleasant.
|
||||
# proc onTodoMetric(value: int64) =
|
||||
# todoNodesGauge.set(value)
|
||||
|
||||
# proc onOkMetric(value: int64) =
|
||||
# okNodesGauge.set(value)
|
||||
|
||||
# proc onNokMetric(value: int64) =
|
||||
# nokNodesGauge.set(value)
|
||||
|
||||
# app.todoNodes = List.new("todo", store, onTodoMetric)
|
||||
# app.okNodes = List.new("ok", store, onOkMetric)
|
||||
# app.nokNodes = List.new("nok", store, onNokMetric)
|
||||
|
||||
# if err =? (await app.todoNodes.load()).errorOption:
|
||||
# return failure(err)
|
||||
# if err =? (await app.okNodes.load()).errorOption:
|
||||
# return failure(err)
|
||||
# if err =? (await app.nokNodes.load()).errorOption:
|
||||
# return failure(err)
|
||||
|
||||
# return success()
|
||||
|
||||
proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
# todo move this
|
||||
proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
let state = State(
|
||||
config: app.config,
|
||||
status: ApplicationStatus.Running,
|
||||
config: config,
|
||||
events: Events(
|
||||
nodesFound: newAsyncDataEvent[seq[Nid]](),
|
||||
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
|
||||
@ -81,30 +40,29 @@ proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||
return success()
|
||||
|
||||
proc stop*(app: Application) =
|
||||
app.status = ApplicationStatus.Stopping
|
||||
# waitFor app.dht.stop()
|
||||
app.state.status = ApplicationStatus.Stopping
|
||||
|
||||
proc run*(app: Application) =
|
||||
app.config = parseConfig()
|
||||
info "Loaded configuration", config = app.config
|
||||
let config = parseConfig()
|
||||
info "Loaded configuration", config = $config
|
||||
|
||||
# Configure loglevel
|
||||
updateLogLevel(app.config.logLevel)
|
||||
updateLogLevel(config.logLevel)
|
||||
|
||||
# Ensure datadir path exists:
|
||||
if not existsDir(app.config.dataDir):
|
||||
createDir(app.config.dataDir)
|
||||
if not existsDir(config.dataDir):
|
||||
createDir(config.dataDir)
|
||||
|
||||
info "Metrics endpoint initialized"
|
||||
|
||||
info "Starting application"
|
||||
app.status = ApplicationStatus.Running
|
||||
if err =? (waitFor app.initializeApp()).errorOption:
|
||||
app.status = ApplicationStatus.Stopping
|
||||
app.state.status = ApplicationStatus.Running
|
||||
if err =? (waitFor app.initializeApp(config)).errorOption:
|
||||
app.state.status = ApplicationStatus.Stopping
|
||||
error "Failed to start application", err = err.msg
|
||||
return
|
||||
|
||||
while app.status == ApplicationStatus.Running:
|
||||
while app.state.status == ApplicationStatus.Running:
|
||||
try:
|
||||
chronos.poll()
|
||||
except Exception as exc:
|
||||
|
||||
@ -1,10 +1,14 @@
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./config
|
||||
import ./utils/asyncdataevent
|
||||
import ./types
|
||||
|
||||
logScope:
|
||||
topics = "state"
|
||||
|
||||
type
|
||||
OnStep = proc(): Future[?!void] {.async: (raises: []), gcsafe.}
|
||||
|
||||
@ -18,10 +22,22 @@ type
|
||||
dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData]
|
||||
nodesExpired*: AsyncDataEvent[seq[Nid]]
|
||||
|
||||
ApplicationStatus* {.pure.} = enum
|
||||
Stopped
|
||||
Stopping
|
||||
Running
|
||||
|
||||
State* = ref object of RootObj
|
||||
status*: ApplicationStatus
|
||||
config*: Config
|
||||
events*: Events
|
||||
|
||||
proc whileRunning*(this: State, step: OnStep, delay: Duration) =
|
||||
discard
|
||||
#todo: while status == running, step(), asyncsleep duration
|
||||
proc whileRunning*(s: State, step: OnStep, delay: Duration) {.async.} =
|
||||
proc worker(): Future[void] {.async.} =
|
||||
while s.status == ApplicationStatus.Running:
|
||||
if err =? (await step()).errorOption:
|
||||
error "Failure-result caught in main loop. Stopping...", err = err.msg
|
||||
s.status = ApplicationStatus.Stopping
|
||||
await sleepAsync(delay)
|
||||
|
||||
asyncSpawn worker()
|
||||
|
||||
@ -8,6 +8,7 @@ type MockState* = ref object of State
|
||||
|
||||
proc createMockState*(): MockState =
|
||||
MockState(
|
||||
status: ApplicationStatus.Running,
|
||||
config: Config(),
|
||||
events: Events(
|
||||
nodesFound: newAsyncDataEvent[seq[Nid]](),
|
||||
|
||||
32
tests/codexcrawler/teststate.nim
Normal file
32
tests/codexcrawler/teststate.nim
Normal file
@ -0,0 +1,32 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
|
||||
import ../../codexcrawler/state
|
||||
import ./mockstate
|
||||
|
||||
suite "State":
|
||||
var state: State
|
||||
|
||||
setup:
|
||||
# The behavior we're testing is the same for the mock
|
||||
state = createMockState()
|
||||
|
||||
test "whileRunning":
|
||||
var counter = 0
|
||||
|
||||
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
inc counter
|
||||
return success()
|
||||
|
||||
await state.whileRunning(onStep, 1.milliseconds)
|
||||
|
||||
while counter < 5:
|
||||
await sleepAsync(1.milliseconds)
|
||||
|
||||
state.status = ApplicationStatus.Stopped
|
||||
|
||||
await sleepAsync(10.milliseconds)
|
||||
|
||||
check:
|
||||
counter == 5
|
||||
@ -1,5 +1,6 @@
|
||||
import ./codexcrawler/testutils
|
||||
import ./codexcrawler/testcomponents
|
||||
import ./codexcrawler/testtypes
|
||||
import ./codexcrawler/teststate
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user