Applies typedDS in list

This commit is contained in:
Ben 2025-02-05 15:21:11 +01:00
parent faa9d2f4fd
commit acd95287c2
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
6 changed files with 138 additions and 17 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@ NimBinaries
*.dSYM
.vscode/*
*.exe
crawler_data

View File

@ -1,9 +1,13 @@
import std/os
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./codexcrawler/main
import ./codexcrawler/config
import ./codexcrawler/metrics
import ./codexcrawler/logging
when defined(posix):
import system/ansi_c
@ -21,12 +25,21 @@ proc run(app: Application) =
let config = parseConfig()
info "Loaded configuration", config
# Configure loglevel
updateLogLevel(config.logLevel)
# Ensure datadir path exists:
if not existsDir(config.dataDir):
createDir(config.dataDir)
setupMetrics(config.metricsAddress, config.metricsPort)
info "Metrics endpoint initialized"
info "Starting application"
app.status = ApplicationStatus.Running
waitFor startApplication()
if err =? (waitFor startApplication(config)).errorOption:
app.status = ApplicationStatus.Stopping
error "Failed to start application", err = err.msg
while app.status == ApplicationStatus.Running:
try:

View File

@ -1,20 +1,101 @@
import pkg/chronos
import pkg/chronicles
import pkg/metrics
import pkg/datastore
import pkg/datastore/typedds
import pkg/stew/byteutils
import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import std/os
import std/times
import std/options
import std/tables
import std/strutils
logScope:
topics = "list"
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises:[].}
List*[T] = ref object
items: seq[T]
onMetric: OnUpdateMetric
Entry* = object
value*: string
proc new*[T](
_: type List[T],
List* = ref object
name: string
store: TypedDatastore
items: seq[Entry]
onMetric: OnUpdateMetric
lastSaveUtc: DateTime
proc encode(i: int): seq[byte] =
@(cast[uint64](i).toBytesBE)
proc decode(T: type int, bytes: seq[byte]): ?!T =
if bytes.len >= sizeof(uint64):
success(cast[int](uint64.fromBytesBE(bytes)))
else:
failure("not enough bytes to decode int")
proc encode(s: Entry): seq[byte] =
s.value.toBytes()
proc decode(T: type Entry, bytes: seq[byte]): ?!T =
success(Entry(value: string.fromBytes(bytes)))
proc save(this: List): Future[?!void] {.async.}=
let countKey = Key.init(this.name / "count").tryGet
trace "countkey", key = $countKey, count = this.items.len
? await this.store.put(countKey, this.items.len)
for i in 0 ..< this.items.len:
let itemKey = Key.init(this.name / $i).tryGet
trace "itemKey", key = $itemKey, iter = i
? await this.store.put(itemKey, this.items[i])
info "List saved", name = this.name
return success()
proc load*(this: List): Future[?!void] {.async.}=
let countKey = Key.init(this.name / "count").tryGet
without hasKey =? (await this.store.has(countKey)), err:
return failure (err)
if hasKey:
without count =? (await get[int](this.store, countKey)), err:
return failure(err)
for i in 0 ..< count:
let itemKey = Key.init(this.name / $i).tryGet
without entry =? (await get[Entry](this.store, itemKey)), err:
return failure(err)
this.items.add(entry)
info "Loaded list", name = this.name, items = this.items.len
return success()
proc new*(
_: type List,
name: string,
store: TypedDatastore,
onMetric: OnUpdateMetric
): List[T] =
List[T](
items: newSeq[T](),
onMetric: onMetric
): List =
List(
name: name,
store: store,
items: newSeq[Entry](),
onMetric: onMetric,
lastSaveUtc: now().utc
)
proc add*[T](this: List[T], item: T) =
proc add*(this: List, item: Entry): Future[?!void] {.async.} =
this.items.add(item)
this.onMetric(this.items.len.int64)
if this.lastSaveUtc < now().utc - initDuration(seconds = 10):
this.lastSaveUtc = now().utc
trace "Saving changes...", name = this.name
if err =? (await this.save()).errorOption:
error "Failed to save list", name = this.name
return failure("Failed to save list")
return success()

View File

@ -1,12 +1,18 @@
import std/strutils
import std/typetraits
import pkg/chronicles
import pkg/chronicles/helpers
import pkg/chronicles/topics_registry
proc updateLogLevel*(logLevel: string) {.upraises: [ValueError].} =
proc updateLogLevel*(logLevel: string) =
notice "Updating logLevel", logLevel
let directives = logLevel.split(";")
try:
setLogLevel(parseEnum[LogLevel](directives[0].toUpperAscii))
setLogLevel(LogLevel.TRACE)
#parseEnum[LogLevel](directives[0].toUpperAscii))
except ValueError:
notice "valueerror logLevel", logLevel
raise (ref ValueError)(
msg:
"Please specify one of: trace, debug, " & "info, notice, warn, error or fatal"

View File

@ -1,8 +1,14 @@
import std/os
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/datastore/typedds
import pkg/metrics
import ./config
import ./list
logScope:
@ -10,16 +16,27 @@ logScope:
declareGauge(example, "testing")
proc startApplication*() {.async.} =
proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} =
without exampleStore =? LevelDbDatastore.new(config.dataDir / "example"):
error "Failed to create datastore"
return failure("Failed to create datastore")
let typedDs = TypedDatastore.init(exampleStore)
proc onExampleMetric(value: int64) =
example.set(value)
var exampleList = List[string].new(onExampleMetric)
var exampleList = List.new("example", typedDs, onExampleMetric)
if err =? (await exampleList.load()).errorOption:
return failure(err)
proc aaa() {.async.} =
while true:
notice "a"
trace "a"
await sleepAsync(1000)
exampleList.add("str!")
discard await exampleList.add(Entry(
value: "str!"
))
asyncSpawn aaa()
@ -27,3 +44,4 @@ proc startApplication*() {.async.} =
await sleepAsync(1000)
notice "b"
return success()

View File

@ -1 +1,3 @@
--define:metrics
# switch("define", "chronicles_runtime_filtering=true")
switch("define", "chronicles_log_level=TRACE")