diff --git a/codexcrawler.nim b/codexcrawler.nim index dbaef24..07832ad 100644 --- a/codexcrawler.nim +++ b/codexcrawler.nim @@ -4,58 +4,18 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results -import ./codexcrawler/main -import ./codexcrawler/config -import ./codexcrawler/metrics -import ./codexcrawler/logging +import ./codexcrawler/application when defined(posix): import system/ansi_c -type - ApplicationStatus {.pure.} = enum - Stopped - Stopping - Running - - Application = ref object - status: ApplicationStatus - -proc run(app: Application) = - let config = parseConfig() - info "Loaded configuration", config - - # Configure loglevel - updateLogLevel(config.logLevel) - - # Ensure datadir path exists: - if not existsDir(config.dataDir): - createDir(config.dataDir) - - setupMetrics(config.metricsAddress, config.metricsPort) - info "Metrics endpoint initialized" - - info "Starting application" - app.status = ApplicationStatus.Running - if err =? (waitFor startApplication(config)).errorOption: - app.status = ApplicationStatus.Stopping - error "Failed to start application", err = err.msg - - while app.status == ApplicationStatus.Running: - try: - chronos.poll() - except Exception as exc: - error "Unhandled exception", msg = exc.msg - quit QuitFailure - notice "Application closed" - when isMainModule: let app = Application() # Stopping code must be in scope of app declaration. # Else capture of the instance is not allowed due to {.noconv.}. proc onStopSignal() = - app.status = ApplicationStatus.Stopping + app.stop() notice "Stopping Crawler..." proc controlCHandler() {.noconv.} = diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim new file mode 100644 index 0000000..6e0f89d --- /dev/null +++ b/codexcrawler/application.nim @@ -0,0 +1,51 @@ +import std/os +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ./config +import ./logging +import ./metrics + +import ./main + +type + ApplicationStatus* {.pure.} = enum + Stopped + Stopping + Running + + Application* = ref object + status: ApplicationStatus + +proc run*(app: Application) = + let config = parseConfig() + info "Loaded configuration", config + + # Configure loglevel + updateLogLevel(config.logLevel) + + # Ensure datadir path exists: + if not existsDir(config.dataDir): + createDir(config.dataDir) + + setupMetrics(config.metricsAddress, config.metricsPort) + info "Metrics endpoint initialized" + + info "Starting application" + app.status = ApplicationStatus.Running + if err =? (waitFor startApplication(config)).errorOption: + app.status = ApplicationStatus.Stopping + error "Failed to start application", err = err.msg + + while app.status == ApplicationStatus.Running: + try: + chronos.poll() + except Exception as exc: + error "Unhandled exception", msg = exc.msg + quit QuitFailure + notice "Application closed" + +proc stop*(app: Application) = + app.status = ApplicationStatus.Stopping diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 870b68d..19ae13f 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -3,62 +3,35 @@ import pkg/chronicles import pkg/metrics import pkg/datastore import pkg/datastore/typedds -import pkg/stew/byteutils -import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results import std/os -import std/times -import std/options -import std/tables -import std/strutils + +import ./nodeentry logScope: topics = "list" type - OnUpdateMetric = proc(value: int64): void {.gcsafe, raises:[].} - Entry* = object - id*: string # will be node ID - value*: string + OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} List* = ref object name: string store: TypedDatastore - items: seq[Entry] + items: seq[NodeEntry] onMetric: OnUpdateMetric -proc `$`*(entry: Entry): string = - entry.id & ":" & entry.value - -proc encode(s: Entry): seq[byte] = - (s.id & ";" & s.value).toBytes() - -proc decode(T: type Entry, bytes: seq[byte]): ?!T = - let s = string.fromBytes(bytes) - if s.len == 0: - return success(Entry(id: "", value: "")) - - let tokens = s.split(";") - if tokens.len != 2: - return failure("expected 2 tokens") - - success(Entry( - id: tokens[0], - value: tokens[1] - )) - -proc saveItem(this: List, item: Entry): Future[?!void] {.async.} = +proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = without itemKey =? Key.init(this.name / item.id), err: return failure(err) - ? await this.store.put(itemKey, item) + ?await this.store.put(itemKey, item) return success() -proc load*(this: List): Future[?!void] {.async.}= +proc load*(this: List): Future[?!void] {.async.} = without queryKey =? Key.init(this.name), err: return failure(err) - without iter =? (await query[Entry](this.store, Query.init(queryKey))), err: + without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err: return failure(err) while not iter.finished: @@ -73,19 +46,11 @@ proc load*(this: List): Future[?!void] {.async.}= return success() proc new*( - _: type List, - name: string, - store: TypedDatastore, - onMetric: OnUpdateMetric + _: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric ): List = - List( - name: name, - store: store, - items: newSeq[Entry](), - onMetric: onMetric - ) + List(name: name, store: store, items: newSeq[NodeEntry](), onMetric: onMetric) -proc add*(this: List, item: Entry): Future[?!void] {.async.} = +proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} = this.items.add(item) this.onMetric(this.items.len.int64) diff --git a/codexcrawler/logging.nim b/codexcrawler/logging.nim index 66434f2..3c83c1f 100644 --- a/codexcrawler/logging.nim +++ b/codexcrawler/logging.nim @@ -9,8 +9,7 @@ proc updateLogLevel*(logLevel: string) = notice "Updating logLevel", logLevel let directives = logLevel.split(";") try: - setLogLevel(LogLevel.TRACE) - #parseEnum[LogLevel](directives[0].toUpperAscii)) + setLogLevel(LogLevel.TRACE) #parseEnum[LogLevel](directives[0].toUpperAscii)) except ValueError: notice "valueerror logLevel", logLevel raise (ref ValueError)( diff --git a/codexcrawler/main.nim b/codexcrawler/main.nim index 61feaba..5acf0a4 100644 --- a/codexcrawler/main.nim +++ b/codexcrawler/main.nim @@ -35,10 +35,7 @@ proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} = while true: trace "a" await sleepAsync(1000) - discard await exampleList.add(Entry( - id: $i, - value: "str!" - )) + discard await exampleList.add(Entry(id: $i, value: "str!")) inc i asyncSpawn aaa() diff --git a/codexcrawler/nodeentry.nim b/codexcrawler/nodeentry.nim new file mode 100644 index 0000000..470d03d --- /dev/null +++ b/codexcrawler/nodeentry.nim @@ -0,0 +1,25 @@ +import pkg/stew/byteutils +import pkg/stew/endians2 +import pkg/questionable +import pkg/questionable/results + +type NodeEntry* = object + id*: string # will be node ID + value*: string + +proc `$`*(entry: NodeEntry): string = + entry.id & ":" & entry.value + +proc encode(s: NodeEntry): seq[byte] = + (s.id & ";" & s.value).toBytes() + +proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T = + let s = string.fromBytes(bytes) + if s.len == 0: + return success(NodeEntry(id: "", value: "")) + + let tokens = s.split(";") + if tokens.len != 2: + return failure("expected 2 tokens") + + success(NodeEntry(id: tokens[0], value: tokens[1]))