mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-05 23:13:11 +00:00
formatting
This commit is contained in:
parent
5fa90c5c2f
commit
c1f25f10cc
@ -12,8 +12,7 @@ import ../state
|
|||||||
import ../utils/datastoreutils
|
import ../utils/datastoreutils
|
||||||
import ../utils/asyncdataevent
|
import ../utils/asyncdataevent
|
||||||
|
|
||||||
const
|
const nodestoreName = "nodestore"
|
||||||
nodestoreName = "nodestore"
|
|
||||||
|
|
||||||
type
|
type
|
||||||
NodeEntry* = object
|
NodeEntry* = object
|
||||||
@ -66,12 +65,9 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
|
|||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
if not exists:
|
if not exists:
|
||||||
let entry = NodeEntry(
|
let entry = NodeEntry(id: nid, lastVisit: 0)
|
||||||
id: nid,
|
|
||||||
lastVisit: 0
|
|
||||||
)
|
|
||||||
?await s.store.put(key, entry)
|
?await s.store.put(key, entry)
|
||||||
|
|
||||||
return success(not exists)
|
return success(not exists)
|
||||||
|
|
||||||
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
||||||
@ -83,12 +79,12 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
|
|||||||
for nid in nids:
|
for nid in nids:
|
||||||
without isNew =? (await s.storeNodeIsNew(nid)), err:
|
without isNew =? (await s.storeNodeIsNew(nid)), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
if isNew:
|
if isNew:
|
||||||
newNodes.add(nid)
|
newNodes.add(nid)
|
||||||
|
|
||||||
if newNodes.len > 0:
|
if newNodes.len > 0:
|
||||||
? await s.fireNewNodesDiscovered(newNodes)
|
?await s.fireNewNodesDiscovered(newNodes)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} =
|
proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} =
|
||||||
@ -102,10 +98,10 @@ proc iterateAll*(s: NodeStore, onNode: OnNodeEntry): Future[?!void] {.async.} =
|
|||||||
return failure(err)
|
return failure(err)
|
||||||
without value =? item.value, err:
|
without value =? item.value, err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
?await onNode(value)
|
?await onNode(value)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
method start*(s: NodeStore): Future[?!void] {.async.} =
|
method start*(s: NodeStore): Future[?!void] {.async.} =
|
||||||
info "Starting nodestore..."
|
info "Starting nodestore..."
|
||||||
|
|
||||||
@ -119,15 +115,8 @@ method stop*(s: NodeStore): Future[?!void] {.async.} =
|
|||||||
await s.state.events.nodesFound.unsubscribe(s.sub)
|
await s.state.events.nodesFound.unsubscribe(s.sub)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc new*(
|
proc new*(T: type NodeStore, state: State, store: TypedDatastore): NodeStore =
|
||||||
T: type NodeStore,
|
NodeStore(state: state, store: store)
|
||||||
state: State,
|
|
||||||
store: TypedDatastore
|
|
||||||
): NodeStore =
|
|
||||||
NodeStore(
|
|
||||||
state: state,
|
|
||||||
store: store
|
|
||||||
)
|
|
||||||
|
|
||||||
proc createNodeStore*(state: State): ?!NodeStore =
|
proc createNodeStore*(state: State): ?!NodeStore =
|
||||||
without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err:
|
without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err:
|
||||||
|
|||||||
@ -113,4 +113,3 @@ proc pop*(this: List): Future[?!Nid] {.async.} =
|
|||||||
|
|
||||||
proc len*(this: List): int =
|
proc len*(this: List): int =
|
||||||
this.items.len
|
this.items.len
|
||||||
|
|
||||||
|
|||||||
@ -22,7 +22,9 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
|
|||||||
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
|
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
|
||||||
)
|
)
|
||||||
|
|
||||||
proc performUnsubscribe[T](event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription) {.async.} =
|
proc performUnsubscribe[T](
|
||||||
|
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
|
||||||
|
) {.async.} =
|
||||||
if subscription in event.subscriptions:
|
if subscription in event.subscriptions:
|
||||||
await subscription.listenFuture.cancelAndWait()
|
await subscription.listenFuture.cancelAndWait()
|
||||||
event.subscriptions.delete(event.subscriptions.find(subscription))
|
event.subscriptions.delete(event.subscriptions.find(subscription))
|
||||||
@ -35,7 +37,7 @@ proc subscribe*[T](
|
|||||||
listenFuture: newFuture[void](),
|
listenFuture: newFuture[void](),
|
||||||
fireEvent: newAsyncEvent(),
|
fireEvent: newAsyncEvent(),
|
||||||
inHandler: false,
|
inHandler: false,
|
||||||
delayedUnsubscribe: false
|
delayedUnsubscribe: false,
|
||||||
)
|
)
|
||||||
|
|
||||||
proc listener() {.async.} =
|
proc listener() {.async.} =
|
||||||
@ -62,7 +64,7 @@ proc fire*[T](event: AsyncDataEvent[T], data: T): Future[?!void] {.async.} =
|
|||||||
return failure(err)
|
return failure(err)
|
||||||
if sub.delayedUnsubscribe:
|
if sub.delayedUnsubscribe:
|
||||||
toUnsubscribe.add(sub)
|
toUnsubscribe.add(sub)
|
||||||
|
|
||||||
for sub in toUnsubscribe:
|
for sub in toUnsubscribe:
|
||||||
await event.unsubscribe(sub)
|
await event.unsubscribe(sub)
|
||||||
|
|
||||||
|
|||||||
@ -16,7 +16,7 @@ suite "Nodestore":
|
|||||||
dsPath = getTempDir() / "testds"
|
dsPath = getTempDir() / "testds"
|
||||||
nodestoreName = "nodestore"
|
nodestoreName = "nodestore"
|
||||||
|
|
||||||
var
|
var
|
||||||
ds: TypedDatastore
|
ds: TypedDatastore
|
||||||
state: MockState
|
state: MockState
|
||||||
store: NodeStore
|
store: NodeStore
|
||||||
@ -25,9 +25,7 @@ suite "Nodestore":
|
|||||||
ds = createTypedDatastore(dsPath).tryGet()
|
ds = createTypedDatastore(dsPath).tryGet()
|
||||||
state = createMockState()
|
state = createMockState()
|
||||||
|
|
||||||
store = NodeStore.new(
|
store = NodeStore.new(state, ds)
|
||||||
state, ds
|
|
||||||
)
|
|
||||||
|
|
||||||
(await store.start()).tryGet()
|
(await store.start()).tryGet()
|
||||||
|
|
||||||
@ -38,10 +36,7 @@ suite "Nodestore":
|
|||||||
removeDir(dsPath)
|
removeDir(dsPath)
|
||||||
|
|
||||||
test "nodeEntry encoding":
|
test "nodeEntry encoding":
|
||||||
let entry = NodeEntry(
|
let entry = NodeEntry(id: genNid(), lastVisit: 123.uint64)
|
||||||
id: genNid(),
|
|
||||||
lastVisit: 123.uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
let
|
let
|
||||||
bytes = entry.encode()
|
bytes = entry.encode()
|
||||||
@ -52,7 +47,7 @@ suite "Nodestore":
|
|||||||
entry.lastVisit == decoded.lastVisit
|
entry.lastVisit == decoded.lastVisit
|
||||||
|
|
||||||
test "nodesFound event should store nodes":
|
test "nodesFound event should store nodes":
|
||||||
let
|
let
|
||||||
nid = genNid()
|
nid = genNid()
|
||||||
expectedKey = Key.init(nodestoreName / $nid).tryGet()
|
expectedKey = Key.init(nodestoreName / $nid).tryGet()
|
||||||
|
|
||||||
@ -60,7 +55,7 @@ suite "Nodestore":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
(await ds.has(expectedKey)).tryGet()
|
(await ds.has(expectedKey)).tryGet()
|
||||||
|
|
||||||
let entry = (await get[NodeEntry](ds, expectedKey)).tryGet()
|
let entry = (await get[NodeEntry](ds, expectedKey)).tryGet()
|
||||||
check:
|
check:
|
||||||
entry.id == nid
|
entry.id == nid
|
||||||
@ -71,7 +66,7 @@ suite "Nodestore":
|
|||||||
newNodes = nids
|
newNodes = nids
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
let
|
let
|
||||||
sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
|
sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
|
||||||
nid = genNid()
|
nid = genNid()
|
||||||
|
|
||||||
@ -81,10 +76,9 @@ suite "Nodestore":
|
|||||||
newNodes == @[nid]
|
newNodes == @[nid]
|
||||||
|
|
||||||
await state.events.newNodesDiscovered.unsubscribe(sub)
|
await state.events.newNodesDiscovered.unsubscribe(sub)
|
||||||
|
|
||||||
test "nodesFound event should not fire newNodesDiscovered for previously seen nodes":
|
test "nodesFound event should not fire newNodesDiscovered for previously seen nodes":
|
||||||
let
|
let nid = genNid()
|
||||||
nid = genNid()
|
|
||||||
|
|
||||||
# Make nid known first. Then subscribe.
|
# Make nid known first. Then subscribe.
|
||||||
(await state.events.nodesFound.fire(@[nid])).tryGet()
|
(await state.events.nodesFound.fire(@[nid])).tryGet()
|
||||||
@ -97,9 +91,8 @@ suite "Nodestore":
|
|||||||
inc count
|
inc count
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
let
|
let sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
|
||||||
sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
|
|
||||||
|
|
||||||
# Firing the event again should not trigger newNodesDiscovered for nid
|
# Firing the event again should not trigger newNodesDiscovered for nid
|
||||||
(await state.events.nodesFound.fire(@[nid])).tryGet()
|
(await state.events.nodesFound.fire(@[nid])).tryGet()
|
||||||
|
|
||||||
@ -110,11 +103,11 @@ suite "Nodestore":
|
|||||||
await state.events.newNodesDiscovered.unsubscribe(sub)
|
await state.events.newNodesDiscovered.unsubscribe(sub)
|
||||||
|
|
||||||
test "iterateAll yields all known nids":
|
test "iterateAll yields all known nids":
|
||||||
let
|
let
|
||||||
nid1 = genNid()
|
nid1 = genNid()
|
||||||
nid2 = genNid()
|
nid2 = genNid()
|
||||||
nid3 = genNid()
|
nid3 = genNid()
|
||||||
|
|
||||||
(await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet()
|
(await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet()
|
||||||
|
|
||||||
var iterNodes = newSeq[Nid]()
|
var iterNodes = newSeq[Nid]()
|
||||||
|
|||||||
@ -4,11 +4,7 @@ import ../../codexcrawler/utils/asyncdataevent
|
|||||||
import ../../codexcrawler/types
|
import ../../codexcrawler/types
|
||||||
import ../../codexcrawler/config
|
import ../../codexcrawler/config
|
||||||
|
|
||||||
type
|
type MockState* = ref object of State
|
||||||
MockState* = ref object of State
|
|
||||||
# config*: Config
|
|
||||||
# events*: Events
|
|
||||||
|
|
||||||
|
|
||||||
proc createMockState*(): MockState =
|
proc createMockState*(): MockState =
|
||||||
MockState(
|
MockState(
|
||||||
|
|||||||
@ -21,5 +21,3 @@ suite "Types":
|
|||||||
|
|
||||||
check:
|
check:
|
||||||
nid == Nid.fromBytes(bytes).tryGet()
|
nid == Nid.fromBytes(bytes).tryGet()
|
||||||
|
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user