mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-10 09:23:09 +00:00
Implements todo list
This commit is contained in:
parent
3f697acefc
commit
58b3d9679c
67
codexcrawler/components/todolist.nim
Normal file
67
codexcrawler/components/todolist.nim
Normal file
@ -0,0 +1,67 @@
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/datastore
|
||||
import pkg/datastore/typedds
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import std/sets
|
||||
|
||||
import ../state
|
||||
import ../types
|
||||
import ../component
|
||||
import ../utils/asyncdataevent
|
||||
|
||||
logScope:
|
||||
topics = "todolist"
|
||||
|
||||
type TodoList* = ref object of Component
|
||||
nids: seq[Nid]
|
||||
state: State
|
||||
subNew: AsyncDataEventSubscription
|
||||
subExp: AsyncDataEventSubscription
|
||||
emptySignal: ?Future[void]
|
||||
|
||||
proc addNodes(t: TodoList, nids: seq[Nid]) =
|
||||
for nid in nids:
|
||||
t.nids.add(nid)
|
||||
|
||||
if s =? t.emptySignal:
|
||||
s.complete()
|
||||
t.emptySignal = Future[void].none
|
||||
|
||||
proc pop*(t: TodoList): Future[?!Nid] {.async.} =
|
||||
if t.nids.len < 1:
|
||||
trace "List is empty. Waiting for new items..."
|
||||
let signal = newFuture[void]("list.emptySignal")
|
||||
t.emptySignal = some(signal)
|
||||
await signal.wait(1.hours)
|
||||
if t.nids.len < 1:
|
||||
return failure("TodoList is empty.")
|
||||
|
||||
let item = t.nids[0]
|
||||
t.nids.del(0)
|
||||
|
||||
return success(item)
|
||||
|
||||
method start*(t: TodoList): Future[?!void] {.async.} =
|
||||
info "Starting TodoList..."
|
||||
|
||||
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
t.addNodes(nids)
|
||||
return success()
|
||||
|
||||
t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes)
|
||||
t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes)
|
||||
return success()
|
||||
|
||||
method stop*(t: TodoList): Future[?!void] {.async.} =
|
||||
await t.state.events.newNodesDiscovered.unsubscribe(t.subNew)
|
||||
await t.state.events.nodesExpired.unsubscribe(t.subExp)
|
||||
return success()
|
||||
|
||||
proc new*(_: type TodoList, state: State): TodoList =
|
||||
TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none)
|
||||
|
||||
proc createTodoList*(state: State): ?!TodoList =
|
||||
success(TodoList.new(state))
|
||||
@ -23,7 +23,6 @@ type List* = ref object of RootObj
|
||||
name: string
|
||||
store: TypedDatastore
|
||||
items: HashSet[Nid]
|
||||
emptySignal: ?Future[void]
|
||||
|
||||
proc encode(s: Nid): seq[byte] =
|
||||
s.toBytes()
|
||||
@ -68,11 +67,6 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
|
||||
if err =? (await this.saveItem(nid)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
if s =? this.emptySignal:
|
||||
trace "List no longer empty.", name = this.name
|
||||
s.complete()
|
||||
this.emptySignal = Future[void].none
|
||||
|
||||
return success()
|
||||
|
||||
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
|
||||
|
||||
59
tests/codexcrawler/components/testtodolist.nim
Normal file
59
tests/codexcrawler/components/testtodolist.nim
Normal file
@ -0,0 +1,59 @@
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
import pkg/asynctest/chronos/unittest
|
||||
|
||||
import ../../../codexcrawler/components/todolist
|
||||
import ../../../codexcrawler/utils/asyncdataevent
|
||||
import ../../../codexcrawler/types
|
||||
import ../../../codexcrawler/state
|
||||
import ../mockstate
|
||||
import ../helpers
|
||||
|
||||
suite "TodoList":
|
||||
var
|
||||
nid: Nid
|
||||
state: MockState
|
||||
todo: TodoList
|
||||
|
||||
setup:
|
||||
nid = genNid()
|
||||
state = createMockState()
|
||||
|
||||
todo = TodoList.new(state)
|
||||
|
||||
(await todo.start()).tryGet()
|
||||
|
||||
teardown:
|
||||
(await todo.stop()).tryGet()
|
||||
state.checkAllUnsubscribed()
|
||||
|
||||
proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} =
|
||||
(await state.events.newNodesDiscovered.fire(nids)).tryGet()
|
||||
|
||||
proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} =
|
||||
(await state.events.nodesExpired.fire(nids)).tryGet()
|
||||
|
||||
test "discovered nodes are added to todo list":
|
||||
await fireNewNodesDiscoveredEvent(@[nid])
|
||||
let item = (await todo.pop).tryGet()
|
||||
|
||||
check:
|
||||
item == nid
|
||||
|
||||
test "expired nodes are added to todo list":
|
||||
await fireNodesExpiredEvent(@[nid])
|
||||
let item = (await todo.pop).tryGet()
|
||||
|
||||
check:
|
||||
item == nid
|
||||
|
||||
test "pop on empty todo list waits until item is added":
|
||||
let popFuture = todo.pop()
|
||||
check:
|
||||
not popFuture.finished
|
||||
|
||||
await fireNewNodesDiscoveredEvent(@[nid])
|
||||
|
||||
check:
|
||||
popFuture.finished
|
||||
popFuture.value.tryGet() == nid
|
||||
@ -1,4 +1,5 @@
|
||||
import ./components/testnodestore
|
||||
import ./components/testdhtmetrics
|
||||
import ./components/testtodolist
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
||||
@ -80,6 +80,10 @@ suite "AsyncDataEvent":
|
||||
await event.unsubscribe(s2)
|
||||
await event.unsubscribe(s3)
|
||||
|
||||
test "Can fire and event without subscribers":
|
||||
check:
|
||||
isOK(await event.fire(ExampleData(s: msg)))
|
||||
|
||||
test "Can unsubscribe in handler":
|
||||
proc doNothing() {.async, closure.} =
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user