From acd95287c2a361f7d3dfb0af016b15e41e15d0dc Mon Sep 17 00:00:00 2001 From: Ben Date: Wed, 5 Feb 2025 15:21:11 +0100 Subject: [PATCH] Applies typedDS in list --- .gitignore | 1 + codexcrawler.nim | 15 +++++- codexcrawler/list.nim | 101 +++++++++++++++++++++++++++++++++++---- codexcrawler/logging.nim | 10 +++- codexcrawler/main.nim | 26 ++++++++-- config.nims | 2 + 6 files changed, 138 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index 8ab1e78..ca02be0 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ NimBinaries *.dSYM .vscode/* *.exe +crawler_data diff --git a/codexcrawler.nim b/codexcrawler.nim index 6432a9c..dbaef24 100644 --- a/codexcrawler.nim +++ b/codexcrawler.nim @@ -1,9 +1,13 @@ +import std/os import pkg/chronicles import pkg/chronos +import pkg/questionable +import pkg/questionable/results import ./codexcrawler/main import ./codexcrawler/config import ./codexcrawler/metrics +import ./codexcrawler/logging when defined(posix): import system/ansi_c @@ -21,12 +25,21 @@ proc run(app: Application) = let config = parseConfig() info "Loaded configuration", config + # Configure loglevel + updateLogLevel(config.logLevel) + + # Ensure datadir path exists: + if not existsDir(config.dataDir): + createDir(config.dataDir) + setupMetrics(config.metricsAddress, config.metricsPort) info "Metrics endpoint initialized" info "Starting application" app.status = ApplicationStatus.Running - waitFor startApplication() + if err =? (waitFor startApplication(config)).errorOption: + app.status = ApplicationStatus.Stopping + error "Failed to start application", err = err.msg while app.status == ApplicationStatus.Running: try: diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 591ba21..a997346 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -1,20 +1,101 @@ +import pkg/chronos +import pkg/chronicles import pkg/metrics +import pkg/datastore +import pkg/datastore/typedds +import pkg/stew/byteutils +import pkg/stew/endians2 +import pkg/questionable +import pkg/questionable/results + +import std/os +import std/times +import std/options +import std/tables +import std/strutils + +logScope: + topics = "list" type OnUpdateMetric = proc(value: int64): void {.gcsafe, raises:[].} - List*[T] = ref object - items: seq[T] - onMetric: OnUpdateMetric + Entry* = object + value*: string -proc new*[T]( - _: type List[T], + List* = ref object + name: string + store: TypedDatastore + items: seq[Entry] + onMetric: OnUpdateMetric + lastSaveUtc: DateTime + +proc encode(i: int): seq[byte] = + @(cast[uint64](i).toBytesBE) + +proc decode(T: type int, bytes: seq[byte]): ?!T = + if bytes.len >= sizeof(uint64): + success(cast[int](uint64.fromBytesBE(bytes))) + else: + failure("not enough bytes to decode int") + +proc encode(s: Entry): seq[byte] = + s.value.toBytes() + +proc decode(T: type Entry, bytes: seq[byte]): ?!T = + success(Entry(value: string.fromBytes(bytes))) + +proc save(this: List): Future[?!void] {.async.}= + let countKey = Key.init(this.name / "count").tryGet + trace "countkey", key = $countKey, count = this.items.len + ? await this.store.put(countKey, this.items.len) + + for i in 0 ..< this.items.len: + let itemKey = Key.init(this.name / $i).tryGet + trace "itemKey", key = $itemKey, iter = i + ? await this.store.put(itemKey, this.items[i]) + + info "List saved", name = this.name + return success() + +proc load*(this: List): Future[?!void] {.async.}= + let countKey = Key.init(this.name / "count").tryGet + without hasKey =? (await this.store.has(countKey)), err: + return failure (err) + if hasKey: + without count =? (await get[int](this.store, countKey)), err: + return failure(err) + + for i in 0 ..< count: + let itemKey = Key.init(this.name / $i).tryGet + without entry =? (await get[Entry](this.store, itemKey)), err: + return failure(err) + this.items.add(entry) + + info "Loaded list", name = this.name, items = this.items.len + return success() + +proc new*( + _: type List, + name: string, + store: TypedDatastore, onMetric: OnUpdateMetric -): List[T] = - List[T]( - items: newSeq[T](), - onMetric: onMetric +): List = + List( + name: name, + store: store, + items: newSeq[Entry](), + onMetric: onMetric, + lastSaveUtc: now().utc ) -proc add*[T](this: List[T], item: T) = +proc add*(this: List, item: Entry): Future[?!void] {.async.} = this.items.add(item) this.onMetric(this.items.len.int64) + + if this.lastSaveUtc < now().utc - initDuration(seconds = 10): + this.lastSaveUtc = now().utc + trace "Saving changes...", name = this.name + if err =? (await this.save()).errorOption: + error "Failed to save list", name = this.name + return failure("Failed to save list") + return success() \ No newline at end of file diff --git a/codexcrawler/logging.nim b/codexcrawler/logging.nim index 213c2a3..66434f2 100644 --- a/codexcrawler/logging.nim +++ b/codexcrawler/logging.nim @@ -1,12 +1,18 @@ +import std/strutils +import std/typetraits + import pkg/chronicles import pkg/chronicles/helpers import pkg/chronicles/topics_registry -proc updateLogLevel*(logLevel: string) {.upraises: [ValueError].} = +proc updateLogLevel*(logLevel: string) = + notice "Updating logLevel", logLevel let directives = logLevel.split(";") try: - setLogLevel(parseEnum[LogLevel](directives[0].toUpperAscii)) + setLogLevel(LogLevel.TRACE) + #parseEnum[LogLevel](directives[0].toUpperAscii)) except ValueError: + notice "valueerror logLevel", logLevel raise (ref ValueError)( msg: "Please specify one of: trace, debug, " & "info, notice, warn, error or fatal" diff --git a/codexcrawler/main.nim b/codexcrawler/main.nim index 5fb488a..6a261cb 100644 --- a/codexcrawler/main.nim +++ b/codexcrawler/main.nim @@ -1,8 +1,14 @@ +import std/os import pkg/chronicles import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/datastore +import pkg/datastore/typedds import pkg/metrics +import ./config import ./list logScope: @@ -10,16 +16,27 @@ logScope: declareGauge(example, "testing") -proc startApplication*() {.async.} = +proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} = + without exampleStore =? LevelDbDatastore.new(config.dataDir / "example"): + error "Failed to create datastore" + return failure("Failed to create datastore") + + let typedDs = TypedDatastore.init(exampleStore) + proc onExampleMetric(value: int64) = example.set(value) - var exampleList = List[string].new(onExampleMetric) + + var exampleList = List.new("example", typedDs, onExampleMetric) + if err =? (await exampleList.load()).errorOption: + return failure(err) proc aaa() {.async.} = while true: - notice "a" + trace "a" await sleepAsync(1000) - exampleList.add("str!") + discard await exampleList.add(Entry( + value: "str!" + )) asyncSpawn aaa() @@ -27,3 +44,4 @@ proc startApplication*() {.async.} = await sleepAsync(1000) notice "b" + return success() diff --git a/config.nims b/config.nims index d8bdeee..d3a9671 100644 --- a/config.nims +++ b/config.nims @@ -1 +1,3 @@ --define:metrics +# switch("define", "chronicles_runtime_filtering=true") +switch("define", "chronicles_log_level=TRACE")