mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-03-21 19:43:07 +00:00
fixes crash, updates metrics on node delete
This commit is contained in:
parent
d5437a0d2e
commit
5808132503
@ -27,6 +27,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
|
||||
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
|
||||
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
|
||||
nodesToRevisit: newAsyncDataEvent[seq[Nid]](),
|
||||
nodesDeleted: newAsyncDataEvent[seq[Nid]](),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ import pkg/questionable/results
|
||||
|
||||
import ../list
|
||||
import ../state
|
||||
import ../types
|
||||
import ../services/metrics
|
||||
import ../component
|
||||
import ../utils/asyncdataevent
|
||||
@ -16,9 +17,14 @@ type DhtMetrics* = ref object of Component
|
||||
state: State
|
||||
ok: List
|
||||
nok: List
|
||||
sub: AsyncDataEventSubscription
|
||||
subCheck: AsyncDataEventSubscription
|
||||
subDel: AsyncDataEventSubscription
|
||||
metrics: Metrics
|
||||
|
||||
proc updateMetrics(d: DhtMetrics) =
|
||||
d.metrics.setOkNodes(d.ok.len)
|
||||
d.metrics.setNokNodes(d.nok.len)
|
||||
|
||||
proc handleCheckEvent(
|
||||
d: DhtMetrics, event: DhtNodeCheckEventData
|
||||
): Future[?!void] {.async.} =
|
||||
@ -29,8 +35,14 @@ proc handleCheckEvent(
|
||||
?await d.ok.remove(event.id)
|
||||
?await d.nok.add(event.id)
|
||||
|
||||
d.metrics.setOkNodes(d.ok.len)
|
||||
d.metrics.setNokNodes(d.nok.len)
|
||||
d.updateMetrics()
|
||||
return success()
|
||||
|
||||
proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
for nid in nids:
|
||||
?await d.ok.remove(nid)
|
||||
?await d.nok.remove(nid)
|
||||
d.updateMetrics()
|
||||
return success()
|
||||
|
||||
method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
@ -41,12 +53,17 @@ method start*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
|
||||
await d.handleCheckEvent(event)
|
||||
|
||||
d.sub = d.state.events.dhtNodeCheck.subscribe(onCheck)
|
||||
proc onDelete(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
await d.handleDeleteEvent(nids)
|
||||
|
||||
d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck)
|
||||
d.subDel = d.state.events.nodesDeleted.subscribe(onDelete)
|
||||
|
||||
return success()
|
||||
|
||||
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
|
||||
await d.state.events.dhtNodeCheck.unsubscribe(d.sub)
|
||||
await d.state.events.dhtNodeCheck.unsubscribe(d.subCheck)
|
||||
await d.state.events.nodesDeleted.unsubscribe(d.subDel)
|
||||
return success()
|
||||
|
||||
proc new*(
|
||||
|
||||
@ -94,6 +94,11 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
|
||||
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
await s.state.events.newNodesDiscovered.fire(nids)
|
||||
|
||||
proc fireNodesDeleted(
|
||||
s: NodeStore, nids: seq[Nid]
|
||||
): Future[?!void] {.async: (raises: []).} =
|
||||
await s.state.events.nodesDeleted.fire(nids)
|
||||
|
||||
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
var newNodes = newSeq[Nid]()
|
||||
for nid in nids:
|
||||
@ -114,6 +119,16 @@ proc processNodeCheck(
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
without exists =? (await s.store.has(key)), err:
|
||||
error "failed to check store for key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if not exists:
|
||||
warn "Expected node entry to exist in store, but was not found.", key = $key
|
||||
# We treat the node as deleted, so it can be rediscovered and readded if it still exists in the network.
|
||||
?await s.fireNodesDeleted(@[event.id])
|
||||
return success()
|
||||
|
||||
without var entry =? (await get[NodeEntry](s.store, key)), err:
|
||||
error "failed to get entry for key", err = err.msg, key = $key
|
||||
return failure(err)
|
||||
@ -127,7 +142,7 @@ proc processNodeCheck(
|
||||
?await s.store.put(key, entry)
|
||||
return success()
|
||||
|
||||
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} =
|
||||
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
|
||||
without key =? Key.init(nodestoreName / $nid), err:
|
||||
error "failed to format key", err = err.msg
|
||||
return failure(err)
|
||||
@ -137,7 +152,8 @@ proc deleteEntry(s: NodeStore, nid: Nid): Future[?!void] {.async.} =
|
||||
|
||||
if exists:
|
||||
?await s.store.delete(key)
|
||||
return success()
|
||||
|
||||
return success(exists)
|
||||
|
||||
method iterateAll*(
|
||||
s: NodeStore, onNode: OnNodeEntry
|
||||
@ -173,11 +189,17 @@ method iterateAll*(
|
||||
method deleteEntries*(
|
||||
s: NodeStore, nids: seq[Nid]
|
||||
): Future[?!void] {.async: (raises: []), base.} =
|
||||
var deleted = newSeq[Nid]()
|
||||
for nid in nids:
|
||||
try:
|
||||
?await s.deleteEntry(nid)
|
||||
without wasDeleted =? (await s.deleteEntry(nid)), err:
|
||||
return failure(err)
|
||||
if wasDeleted:
|
||||
deleted.add(nid)
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
|
||||
?await s.fireNodesDeleted(deleted)
|
||||
return success()
|
||||
|
||||
method start*(s: NodeStore): Future[?!void] {.async.} =
|
||||
|
||||
@ -21,6 +21,7 @@ type
|
||||
newNodesDiscovered*: AsyncDataEvent[seq[Nid]]
|
||||
dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData]
|
||||
nodesToRevisit*: AsyncDataEvent[seq[Nid]]
|
||||
nodesDeleted*: AsyncDataEvent[seq[Nid]]
|
||||
|
||||
ApplicationStatus* {.pure.} = enum
|
||||
Stopped
|
||||
|
||||
@ -40,6 +40,9 @@ suite "DhtMetrics":
|
||||
|
||||
(await state.events.dhtNodeCheck.fire(event)).tryGet()
|
||||
|
||||
proc fireNodesDeletedEvent(nids: seq[Nid]) {.async.} =
|
||||
(await state.events.nodesDeleted.fire(nids)).tryGet()
|
||||
|
||||
test "dhtmetrics start should load both lists":
|
||||
check:
|
||||
okList.loadCalled
|
||||
@ -88,3 +91,20 @@ suite "DhtMetrics":
|
||||
|
||||
check:
|
||||
metrics.nok == length
|
||||
|
||||
test "nodesDeleted event should remove node from both lists":
|
||||
await fireNodesDeletedEvent(@[nid])
|
||||
|
||||
check:
|
||||
nid in okList.removed
|
||||
nid in nokList.removed
|
||||
|
||||
test "nodesDeleted event should set list lengths as metrics for both lists":
|
||||
okList.length = 123
|
||||
nokList.length = 345
|
||||
|
||||
await fireNodesDeletedEvent(@[nid])
|
||||
|
||||
check:
|
||||
metrics.ok == okList.length
|
||||
metrics.nok == nokList.length
|
||||
|
||||
@ -174,6 +174,28 @@ suite "Nodestore":
|
||||
nid2 notin iterNodes
|
||||
nid3 in iterNodes
|
||||
|
||||
test "deleteEntries fires nodesDeleted event":
|
||||
var deletedNodes = newSeq[Nid]()
|
||||
proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
deletedNodes = nids
|
||||
return success()
|
||||
|
||||
let
|
||||
sub = state.events.nodesDeleted.subscribe(onDeleted)
|
||||
nid1 = genNid()
|
||||
nid2 = genNid()
|
||||
nid3 = genNid()
|
||||
|
||||
await fireNodeFoundEvent(@[nid1, nid2, nid3])
|
||||
(await store.deleteEntries(@[nid1, nid2])).tryGet()
|
||||
|
||||
check:
|
||||
nid1 in deletedNodes
|
||||
nid2 in deletedNodes
|
||||
nid3 notin deletedNodes
|
||||
|
||||
await state.events.nodesDeleted.unsubscribe(sub)
|
||||
|
||||
test "dhtNodeCheck event should update lastVisit":
|
||||
let
|
||||
nid = genNid()
|
||||
@ -221,3 +243,21 @@ suite "Nodestore":
|
||||
let updatedEntry = (await get[NodeEntry](ds, expectedKey)).tryGet()
|
||||
check:
|
||||
updatedEntry.firstInactive == 0
|
||||
|
||||
test "dhtNodeCheck event for non-existing node should fire nodesDeleted":
|
||||
var deletedNodes = newSeq[Nid]()
|
||||
proc onDeleted(nids: seq[Nid]): Future[?!void] {.async.} =
|
||||
deletedNodes = nids
|
||||
return success()
|
||||
|
||||
let
|
||||
sub = state.events.nodesDeleted.subscribe(onDeleted)
|
||||
nid = genNid()
|
||||
|
||||
# We don't fire nodeFound first. So the store doesn't know it exists.
|
||||
await fireCheckEvent(nid, true)
|
||||
|
||||
check:
|
||||
nid in deletedNodes
|
||||
|
||||
await state.events.nodesDeleted.unsubscribe(sub)
|
||||
|
||||
@ -28,6 +28,7 @@ proc createMockState*(): MockState =
|
||||
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
|
||||
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
|
||||
nodesToRevisit: newAsyncDataEvent[seq[Nid]](),
|
||||
nodesDeleted: newAsyncDataEvent[seq[Nid]](),
|
||||
),
|
||||
steppers: newSeq[OnStep](),
|
||||
delays: newSeq[Duration](),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user