diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim new file mode 100644 index 0000000..7eac856 --- /dev/null +++ b/codexcrawler/components/todolist.nim @@ -0,0 +1,67 @@ +import pkg/chronos +import pkg/chronicles +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable +import pkg/questionable/results + +import std/sets + +import ../state +import ../types +import ../component +import ../utils/asyncdataevent + +logScope: + topics = "todolist" + +type TodoList* = ref object of Component + nids: seq[Nid] + state: State + subNew: AsyncDataEventSubscription + subExp: AsyncDataEventSubscription + emptySignal: ?Future[void] + +proc addNodes(t: TodoList, nids: seq[Nid]) = + for nid in nids: + t.nids.add(nid) + + if s =? t.emptySignal: + s.complete() + t.emptySignal = Future[void].none + +proc pop*(t: TodoList): Future[?!Nid] {.async.} = + if t.nids.len < 1: + trace "List is empty. Waiting for new items..." + let signal = newFuture[void]("list.emptySignal") + t.emptySignal = some(signal) + await signal.wait(1.hours) + if t.nids.len < 1: + return failure("TodoList is empty.") + + let item = t.nids[0] + t.nids.del(0) + + return success(item) + +method start*(t: TodoList): Future[?!void] {.async.} = + info "Starting TodoList..." + + proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = + t.addNodes(nids) + return success() + + t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes) + t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes) + return success() + +method stop*(t: TodoList): Future[?!void] {.async.} = + await t.state.events.newNodesDiscovered.unsubscribe(t.subNew) + await t.state.events.nodesExpired.unsubscribe(t.subExp) + return success() + +proc new*(_: type TodoList, state: State): TodoList = + TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none) + +proc createTodoList*(state: State): ?!TodoList = + success(TodoList.new(state)) diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 5eeb280..dd68314 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -23,7 +23,6 @@ type List* = ref object of RootObj name: string store: TypedDatastore items: HashSet[Nid] - emptySignal: ?Future[void] proc encode(s: Nid): seq[byte] = s.toBytes() @@ -68,11 +67,6 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = if err =? (await this.saveItem(nid)).errorOption: return failure(err) - if s =? this.emptySignal: - trace "List no longer empty.", name = this.name - s.complete() - this.emptySignal = Future[void].none - return success() method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} = diff --git a/tests/codexcrawler/components/testtodolist.nim b/tests/codexcrawler/components/testtodolist.nim new file mode 100644 index 0000000..8960859 --- /dev/null +++ b/tests/codexcrawler/components/testtodolist.nim @@ -0,0 +1,59 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/todolist +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mockstate +import ../helpers + +suite "TodoList": + var + nid: Nid + state: MockState + todo: TodoList + + setup: + nid = genNid() + state = createMockState() + + todo = TodoList.new(state) + + (await todo.start()).tryGet() + + teardown: + (await todo.stop()).tryGet() + state.checkAllUnsubscribed() + + proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} = + (await state.events.newNodesDiscovered.fire(nids)).tryGet() + + proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} = + (await state.events.nodesExpired.fire(nids)).tryGet() + + test "discovered nodes are added to todo list": + await fireNewNodesDiscoveredEvent(@[nid]) + let item = (await todo.pop).tryGet() + + check: + item == nid + + test "expired nodes are added to todo list": + await fireNodesExpiredEvent(@[nid]) + let item = (await todo.pop).tryGet() + + check: + item == nid + + test "pop on empty todo list waits until item is added": + let popFuture = todo.pop() + check: + not popFuture.finished + + await fireNewNodesDiscoveredEvent(@[nid]) + + check: + popFuture.finished + popFuture.value.tryGet() == nid diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim index f013640..4226e95 100644 --- a/tests/codexcrawler/testcomponents.nim +++ b/tests/codexcrawler/testcomponents.nim @@ -1,4 +1,5 @@ import ./components/testnodestore import ./components/testdhtmetrics +import ./components/testtodolist {.warning[UnusedImport]: off.} diff --git a/tests/codexcrawler/utils/testasyncdataevent.nim b/tests/codexcrawler/utils/testasyncdataevent.nim index 8c1efd8..c5a47c5 100644 --- a/tests/codexcrawler/utils/testasyncdataevent.nim +++ b/tests/codexcrawler/utils/testasyncdataevent.nim @@ -80,6 +80,10 @@ suite "AsyncDataEvent": await event.unsubscribe(s2) await event.unsubscribe(s3) + test "Can fire and event without subscribers": + check: + isOK(await event.fire(ExampleData(s: msg))) + test "Can unsubscribe in handler": proc doNothing() {.async, closure.} = await sleepAsync(1.millis)