mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-02 13:33:08 +00:00
finishes implement of requeststore
This commit is contained in:
parent
de22684ff3
commit
9f8ba85d35
@ -62,80 +62,61 @@ proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
|
||||
return success(RequestEntry(lastSeen: 0))
|
||||
return RequestEntry.fromBytes(bytes)
|
||||
|
||||
proc update*(s: RequestStore, rid: Rid): Future[?!void] {.async.} =
|
||||
method update*(
|
||||
s: RequestStore, rid: Rid
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
without key =? Key.init(requeststoreName / $rid), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
let entry = RequestEntry(id: rid, lastSeen: s.clock.now)
|
||||
?await s.store.put(key, entry)
|
||||
try:
|
||||
?await s.store.put(key, entry)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
trace "Request entry updated", id = $rid
|
||||
return success()
|
||||
|
||||
proc remove*(s: RequestStore, rid: Rid): Future[?!void] {.async.} =
|
||||
method remove*(
|
||||
s: RequestStore, rid: Rid
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
without key =? Key.init(requeststoreName / $rid), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
?await s.store.delete(key)
|
||||
try:
|
||||
?await s.store.delete(key)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
trace "Request entry removed", id = $rid
|
||||
return success()
|
||||
|
||||
# proc storeNodeIsNew(s: RequestStore, nid: Nid): Future[?!bool] {.async.} =
|
||||
# without key =? Key.init(requeststoreName / $nid), err:
|
||||
# error "failed to format key", err = err.msg
|
||||
# return failure(err)
|
||||
# without exists =? (await s.store.has(key)), err:
|
||||
# error "failed to check store for key", err = err.msg
|
||||
# return failure(err)
|
||||
method iterateAll*(
|
||||
s: RequestStore, onNode: OnRequestEntry
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
without queryKey =? Key.init(requeststoreName), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
try:
|
||||
without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err:
|
||||
error "failed to create query", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
# if not exists:
|
||||
# let entry = RequestEntry(id: nid, lastVisit: 0, firstInactive: 0)
|
||||
# ?await s.store.put(key, entry)
|
||||
# info "New node", nodeId = $nid
|
||||
while not iter.finished:
|
||||
without item =? (await iter.next()), err:
|
||||
error "failure during query iteration", err = err.msg
|
||||
return failure(err)
|
||||
without value =? item.value, err:
|
||||
error "failed to get value from iterator", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
# return success(not exists)
|
||||
if value.lastSeen > 0:
|
||||
?await onNode(value)
|
||||
|
||||
# proc deleteEntry(s: RequestStore, nid: Nid): Future[?!bool] {.async.} =
|
||||
# without key =? Key.init(requeststoreName / $nid), err:
|
||||
# error "failed to format key", err = err.msg
|
||||
# return failure(err)
|
||||
# without exists =? (await s.store.has(key)), err:
|
||||
# error "failed to check store for key", err = err.msg, key = $key
|
||||
# return failure(err)
|
||||
|
||||
# if exists:
|
||||
# ?await s.store.delete(key)
|
||||
|
||||
# return success(exists)
|
||||
|
||||
# method iterateAll*(
|
||||
# s: RequestStore, onNode: OnRequestEntry
|
||||
# ): Future[?!void] {.async: (raises: []), base.} =
|
||||
# without queryKey =? Key.init(requeststoreName), err:
|
||||
# error "failed to format key", err = err.msg
|
||||
# return failure(err)
|
||||
# try:
|
||||
# without iter =? (await query[RequestEntry](s.store, Query.init(queryKey))), err:
|
||||
# error "failed to create query", err = err.msg
|
||||
# return failure(err)
|
||||
|
||||
# while not iter.finished:
|
||||
# without item =? (await iter.next()), err:
|
||||
# error "failure during query iteration", err = err.msg
|
||||
# return failure(err)
|
||||
# without value =? item.value, err:
|
||||
# error "failed to get value from iterator", err = err.msg
|
||||
# return failure(err)
|
||||
|
||||
# if value.lastSeen > 0:
|
||||
# ?await onNode(value)
|
||||
|
||||
# await sleepAsync(1.millis)
|
||||
# except CatchableError as exc:
|
||||
# return failure(exc.msg)
|
||||
|
||||
# return success()
|
||||
await sleepAsync(1.millis)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
return success()
|
||||
|
||||
method start*(s: RequestStore): Future[?!void] {.async.} =
|
||||
info "Starting..."
|
||||
|
||||
@ -87,3 +87,29 @@ suite "Requeststore":
|
||||
|
||||
check:
|
||||
isStored == false
|
||||
|
||||
test "iterateAll yields all entries":
|
||||
let
|
||||
rid1 = genRid()
|
||||
rid2 = genRid()
|
||||
rid3 = genRid()
|
||||
|
||||
(await store.update(rid1)).tryGet()
|
||||
(await store.update(rid2)).tryGet()
|
||||
(await store.update(rid3)).tryGet()
|
||||
|
||||
var entries = newSeq[RequestEntry]()
|
||||
proc onEntry(entry: RequestEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
|
||||
entries.add(entry)
|
||||
return success()
|
||||
|
||||
(await store.iterateAll(onEntry)).tryGet()
|
||||
|
||||
check:
|
||||
entries.len == 3
|
||||
entries[0].id == rid1
|
||||
entries[0].lastSeen == clock.setNow
|
||||
entries[1].id == rid2
|
||||
entries[1].lastSeen == clock.setNow
|
||||
entries[2].id == rid3
|
||||
entries[2].lastSeen == clock.setNow
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user