diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index eb5027c..0f7f73c 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -27,6 +27,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), nodesToRevisit: newAsyncDataEvent[seq[Nid]](), + nodesDeleted: newAsyncDataEvent[seq[Nid]](), ), ) diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim index d2b1f7c..af9c834 100644 --- a/codexcrawler/components/dhtmetrics.nim +++ b/codexcrawler/components/dhtmetrics.nim @@ -5,6 +5,7 @@ import pkg/questionable/results import ../list import ../state +import ../types import ../services/metrics import ../component import ../utils/asyncdataevent @@ -16,9 +17,14 @@ type DhtMetrics* = ref object of Component state: State ok: List nok: List - sub: AsyncDataEventSubscription + subCheck: AsyncDataEventSubscription + subDel: AsyncDataEventSubscription metrics: Metrics +proc updateMetrics(d: DhtMetrics) = + d.metrics.setOkNodes(d.ok.len) + d.metrics.setNokNodes(d.nok.len) + proc handleCheckEvent( d: DhtMetrics, event: DhtNodeCheckEventData ): Future[?!void] {.async.} = @@ -29,8 +35,14 @@ proc handleCheckEvent( ?await d.ok.remove(event.id) ?await d.nok.add(event.id) - d.metrics.setOkNodes(d.ok.len) - d.metrics.setNokNodes(d.nok.len) + d.updateMetrics() + return success() + +proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} = + for nid in nids: + ?await d.ok.remove(nid) + ?await d.nok.remove(nid) + d.updateMetrics() return success() method start*(d: DhtMetrics): Future[?!void] {.async.} = @@ -41,12 +53,17 @@ method start*(d: DhtMetrics): Future[?!void] {.async.} = proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = await d.handleCheckEvent(event) - d.sub = d.state.events.dhtNodeCheck.subscribe(onCheck) + proc onDelete(nids: seq[Nid]): Future[?!void] {.async.} = + await d.handleDeleteEvent(nids) + + d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck) + d.subDel = d.state.events.nodesDeleted.subscribe(onDelete) return success() method stop*(d: DhtMetrics): Future[?!void] {.async.} = - await d.state.events.dhtNodeCheck.unsubscribe(d.sub) + await d.state.events.dhtNodeCheck.unsubscribe(d.subCheck) + await d.state.events.nodesDeleted.unsubscribe(d.subDel) return success() proc new*( diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index e60fcad..2bd42bc 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -94,6 +94,11 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = await s.state.events.newNodesDiscovered.fire(nids) +proc fireNodesDeleted( + s: NodeStore, nids: seq[Nid] +): Future[?!void] {.async: (raises: []).} = + await s.state.events.nodesDeleted.fire(nids) + proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = var newNodes = newSeq[Nid]() for nid in nids: @@ -114,6 +119,16 @@ proc processNodeCheck( error "failed to format key", err = err.msg return failure(err) + without exists =? (await s.store.has(key)), err: + error "failed to check store for key", err = err.msg + return failure(err) + + if not exists: + warn "Expected node entry to exist in store, but was not found.", key = $key + # We treat the node as deleted, so it can be rediscovered and readded if it still exists in the network. + ?await s.fireNodesDeleted(@[event.id]) + return success() + without var entry =? (await get[NodeEntry](s.store, key)), err: error "failed to get entry for key", err = err.msg, key = $key return failure(err) @@ -127,7 +142,7 @@ proc processNodeCheck( ?await s.store.put(key, entry) return success() -proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} = +proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = without key =? Key.init(nodestoreName / $nid), err: error "failed to format key", err = err.msg return failure(err) @@ -137,7 +152,8 @@ proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} = if exists: ?await s.store.delete(key) - return success() + + return success(exists) method iterateAll*( s: NodeStore, onNode: OnNodeEntry @@ -173,11 +189,17 @@ method iterateAll*( method deleteEntries*( s: NodeStore, nids: seq[Nid] ): Future[?!void] {.async: (raises: []), base.} = + var deleted = newSeq[Nid]() for nid in nids: try: - ?await s.deleteEntry(nid) + without wasDeleted =? (await s.deleteEntry(nid)), err: + return failure(err) + if wasDeleted: + deleted.add(nid) except CatchableError as exc: return failure(exc.msg) + + ?await s.fireNodesDeleted(deleted) return success() method start*(s: NodeStore): Future[?!void] {.async.} = diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index 66b7cd7..76e293c 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -21,6 +21,7 @@ type newNodesDiscovered*: AsyncDataEvent[seq[Nid]] dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData] nodesToRevisit*: AsyncDataEvent[seq[Nid]] + nodesDeleted*: AsyncDataEvent[seq[Nid]] ApplicationStatus* {.pure.} = enum Stopped diff --git a/tests/codexcrawler/components/testdhtmetrics.nim b/tests/codexcrawler/components/testdhtmetrics.nim index 1620c87..6475d05 100644 --- a/tests/codexcrawler/components/testdhtmetrics.nim +++ b/tests/codexcrawler/components/testdhtmetrics.nim @@ -40,6 +40,9 @@ suite "DhtMetrics": (await state.events.dhtNodeCheck.fire(event)).tryGet() + proc fireNodesDeletedEvent(nids: seq[Nid]) {.async.} = + (await state.events.nodesDeleted.fire(nids)).tryGet() + test "dhtmetrics start should load both lists": check: okList.loadCalled @@ -88,3 +91,20 @@ suite "DhtMetrics": check: metrics.nok == length + + test "nodesDeleted event should remove node from both lists": + await fireNodesDeletedEvent(@[nid]) + + check: + nid in okList.removed + nid in nokList.removed + + test "nodesDeleted event should set list lengths as metrics for both lists": + okList.length = 123 + nokList.length = 345 + + await fireNodesDeletedEvent(@[nid]) + + check: + metrics.ok == okList.length + metrics.nok == nokList.length diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim index 282eab9..f2a3a17 100644 --- a/tests/codexcrawler/components/testnodestore.nim +++ b/tests/codexcrawler/components/testnodestore.nim @@ -174,6 +174,28 @@ suite "Nodestore": nid2 notin iterNodes nid3 in iterNodes + test "deleteEntries fires nodesDeleted event": + var deletedNodes = newSeq[Nid]() + proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} = + deletedNodes = nids + return success() + + let + sub = state.events.nodesDeleted.subscribe(onDeleted) + nid1 = genNid() + nid2 = genNid() + nid3 = genNid() + + await fireNodeFoundEvent(@[nid1, nid2, nid3]) + (await store.deleteEntries(@[nid1, nid2])).tryGet() + + check: + nid1 in deletedNodes + nid2 in deletedNodes + nid3 notin deletedNodes + + await state.events.nodesDeleted.unsubscribe(sub) + test "dhtNodeCheck event should update lastVisit": let nid = genNid() @@ -221,3 +243,21 @@ suite "Nodestore": let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet() check: updatedEntry.firstInactive == 0 + + test "dhtNodeCheck event for non-existing node should fire nodesDeleted": + var deletedNodes = newSeq[Nid]() + proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} = + deletedNodes = nids + return success() + + let + sub = state.events.nodesDeleted.subscribe(onDeleted) + nid = genNid() + + # We don't fire nodeFound first. So the store doesn't know it exists. + await fireCheckEvent(nid, true) + + check: + nid in deletedNodes + + await state.events.nodesDeleted.unsubscribe(sub) diff --git a/tests/codexcrawler/mocks/mockstate.nim b/tests/codexcrawler/mocks/mockstate.nim index bb15f11..00e1198 100644 --- a/tests/codexcrawler/mocks/mockstate.nim +++ b/tests/codexcrawler/mocks/mockstate.nim @@ -28,6 +28,7 @@ proc createMockState*(): MockState = newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), nodesToRevisit: newAsyncDataEvent[seq[Nid]](), + nodesDeleted: newAsyncDataEvent[seq[Nid]](), ), steppers: newSeq[OnStep](), delays: newSeq[Duration](),