mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-06 23:43:10 +00:00
better list saving and loading
This commit is contained in:
parent
acd95287c2
commit
3f7392691e
@ -20,6 +20,7 @@ logScope:
|
||||
type
|
||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises:[].}
|
||||
Entry* = object
|
||||
id*: string # will be node ID
|
||||
value*: string
|
||||
|
||||
List* = ref object
|
||||
@ -27,49 +28,46 @@ type
|
||||
store: TypedDatastore
|
||||
items: seq[Entry]
|
||||
onMetric: OnUpdateMetric
|
||||
lastSaveUtc: DateTime
|
||||
|
||||
proc encode(i: int): seq[byte] =
|
||||
@(cast[uint64](i).toBytesBE)
|
||||
|
||||
proc decode(T: type int, bytes: seq[byte]): ?!T =
|
||||
if bytes.len >= sizeof(uint64):
|
||||
success(cast[int](uint64.fromBytesBE(bytes)))
|
||||
else:
|
||||
failure("not enough bytes to decode int")
|
||||
proc `$`*(entry: Entry): string =
|
||||
entry.id & ":" & entry.value
|
||||
|
||||
proc encode(s: Entry): seq[byte] =
|
||||
s.value.toBytes()
|
||||
(s.id & ";" & s.value).toBytes()
|
||||
|
||||
proc decode(T: type Entry, bytes: seq[byte]): ?!T =
|
||||
success(Entry(value: string.fromBytes(bytes)))
|
||||
let s = string.fromBytes(bytes)
|
||||
if s.len == 0:
|
||||
return success(Entry(id: "", value: ""))
|
||||
|
||||
proc save(this: List): Future[?!void] {.async.}=
|
||||
let countKey = Key.init(this.name / "count").tryGet
|
||||
trace "countkey", key = $countKey, count = this.items.len
|
||||
? await this.store.put(countKey, this.items.len)
|
||||
let tokens = s.split(";")
|
||||
if tokens.len != 2:
|
||||
return failure("expected 2 tokens")
|
||||
|
||||
for i in 0 ..< this.items.len:
|
||||
let itemKey = Key.init(this.name / $i).tryGet
|
||||
trace "itemKey", key = $itemKey, iter = i
|
||||
? await this.store.put(itemKey, this.items[i])
|
||||
success(Entry(
|
||||
id: tokens[0],
|
||||
value: tokens[1]
|
||||
))
|
||||
|
||||
info "List saved", name = this.name
|
||||
proc saveItem(this: List, item: Entry): Future[?!void] {.async.} =
|
||||
without itemKey =? Key.init(this.name / item.id), err:
|
||||
return failure(err)
|
||||
? await this.store.put(itemKey, item)
|
||||
return success()
|
||||
|
||||
proc load*(this: List): Future[?!void] {.async.}=
|
||||
let countKey = Key.init(this.name / "count").tryGet
|
||||
without hasKey =? (await this.store.has(countKey)), err:
|
||||
return failure (err)
|
||||
if hasKey:
|
||||
without count =? (await get[int](this.store, countKey)), err:
|
||||
return failure(err)
|
||||
without queryKey =? Key.init(this.name), err:
|
||||
return failure(err)
|
||||
without iter =? (await query[Entry](this.store, Query.init(queryKey))), err:
|
||||
return failure(err)
|
||||
|
||||
for i in 0 ..< count:
|
||||
let itemKey = Key.init(this.name / $i).tryGet
|
||||
without entry =? (await get[Entry](this.store, itemKey)), err:
|
||||
return failure(err)
|
||||
this.items.add(entry)
|
||||
while not iter.finished:
|
||||
without item =? (await iter.next()), err:
|
||||
return failure(err)
|
||||
without value =? item.value, err:
|
||||
return failure(err)
|
||||
if value.id.len > 0:
|
||||
this.items.add(value)
|
||||
|
||||
info "Loaded list", name = this.name, items = this.items.len
|
||||
return success()
|
||||
@ -84,18 +82,13 @@ proc new*(
|
||||
name: name,
|
||||
store: store,
|
||||
items: newSeq[Entry](),
|
||||
onMetric: onMetric,
|
||||
lastSaveUtc: now().utc
|
||||
onMetric: onMetric
|
||||
)
|
||||
|
||||
proc add*(this: List, item: Entry): Future[?!void] {.async.} =
|
||||
this.items.add(item)
|
||||
this.onMetric(this.items.len.int64)
|
||||
|
||||
if this.lastSaveUtc < now().utc - initDuration(seconds = 10):
|
||||
this.lastSaveUtc = now().utc
|
||||
trace "Saving changes...", name = this.name
|
||||
if err =? (await this.save()).errorOption:
|
||||
error "Failed to save list", name = this.name
|
||||
return failure("Failed to save list")
|
||||
return success()
|
||||
if err =? (await this.saveItem(item)).errorOption:
|
||||
return failure(err)
|
||||
return success()
|
||||
|
||||
@ -31,13 +31,15 @@ proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} =
|
||||
return failure(err)
|
||||
|
||||
proc aaa() {.async.} =
|
||||
var i = 0
|
||||
while true:
|
||||
trace "a"
|
||||
await sleepAsync(1000)
|
||||
discard await exampleList.add(Entry(
|
||||
id: $i,
|
||||
value: "str!"
|
||||
))
|
||||
|
||||
inc i
|
||||
|
||||
asyncSpawn aaa()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user