From 14e74d6380ef2781e4d6f9c01146adc8e7e4c3dd Mon Sep 17 00:00:00 2001 From: Ben Date: Mon, 10 Feb 2025 16:02:47 +0100 Subject: [PATCH] Setting up for the nodestore --- codexcrawler/application.nim | 14 +---- codexcrawler/components/crawler.nim | 80 ++++++++++--------------- codexcrawler/components/dht.nim | 19 ++++-- codexcrawler/components/nodestore.nim | 40 +++++++++++++ codexcrawler/components/timetracker.nim | 37 ++++++------ codexcrawler/list.nim | 53 ++++++++-------- codexcrawler/nodeentry.nim | 33 ---------- codexcrawler/types.nim | 19 +++++- 8 files changed, 149 insertions(+), 146 deletions(-) create mode 100644 codexcrawler/components/nodestore.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 20e8e72..c6dab72 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -66,19 +66,7 @@ proc initializeApp(app: Application): Future[?!void] {.async.} = if err =? (await app.initializeLists()).errorOption: error "Failed to initialize lists", err = err.msg return failure(err) - - # if err =? (await app.initializeDht()).errorOption: - # error "Failed to initialize DHT", err = err.msg - # return failure(err) - - # if err =? (await app.initializeCrawler()).errorOption: - # error "Failed to initialize crawler", err = err.msg - # return failure(err) - - # if err =? (await app.initializeTimeTracker()).errorOption: - # error "Failed to initialize timetracker", err = err.msg - # return failure(err) - + without components =? (await createComponents(app.config)), err: error "Failed to create componenents", err = err.msg return failure(err) diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim index 73bdbac..5f4c296 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -5,7 +5,6 @@ import pkg/questionable/results import ./dht import ../list -import ../nodeentry import ../config import ../component import ../types @@ -29,48 +28,48 @@ proc isNew(c: Crawler, node: Node): bool = not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and not c.nokNodes.contains(node.id) -proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} = - if err =? (await c.nokNodes.add(target)).errorOption: - error "Failed to add not-OK-node to list", err = err.msg +# proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} = +# if err =? (await c.nokNodes.add(target)).errorOption: +# error "Failed to add not-OK-node to list", err = err.msg -proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = - if err =? (await c.okNodes.add(target)).errorOption: - error "Failed to add OK-node to list", err = err.msg +# proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = +# if err =? (await c.okNodes.add(target)).errorOption: +# error "Failed to add OK-node to list", err = err.msg -proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = - let entry = NodeEntry(id: nodeId, lastVisit: 0) - return await c.todoNodes.add(entry) +# proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = +# let entry = NodeEntry(id: nodeId, lastVisit: 0) +# return await c.todoNodes.add(entry) -proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = - for node in newNodes: - if err =? (await c.addNewTodoNode(node.id)).errorOption: - error "Failed to add todo-node to list", err = err.msg +# proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = +# for node in newNodes: +# if err =? (await c.addNewTodoNode(node.id)).errorOption: +# error "Failed to add todo-node to list", err = err.msg -proc step(c: Crawler) {.async.} = - logScope: - todo = $c.todoNodes.len - ok = $c.okNodes.len - nok = $c.nokNodes.len +# proc step(c: Crawler) {.async.} = +# logScope: +# todo = $c.todoNodes.len +# ok = $c.okNodes.len +# nok = $c.nokNodes.len - without var target =? (await c.todoNodes.pop()), err: - error "Failed to get todo node", err = err.msg +# without var target =? (await c.todoNodes.pop()), err: +# error "Failed to get todo node", err = err.msg - target.lastVisit = Moment.now().epochSeconds.uint64 +# target.lastVisit = Moment.now().epochSeconds.uint64 - without receivedNodes =? (await c.dht.getNeighbors(target.id)), err: - await c.handleNodeNotOk(target) - return +# without receivedNodes =? (await c.dht.getNeighbors(target.id)), err: +# await c.handleNodeNotOk(target) +# return - let newNodes = receivedNodes.filterIt(isNew(c, it)) - if newNodes.len > 0: - trace "Discovered new nodes", newNodes = newNodes.len +# let newNodes = receivedNodes.filterIt(isNew(c, it)) +# if newNodes.len > 0: +# trace "Discovered new nodes", newNodes = newNodes.len - await c.handleNodeOk(target) - await c.addNewTodoNodes(newNodes) +# await c.handleNodeOk(target) +# await c.addNewTodoNodes(newNodes) - # Don't log the status every loop: - if (c.todoNodes.len mod 10) == 0: - trace "Status" +# # Don't log the status every loop: +# if (c.todoNodes.len mod 10) == 0: +# trace "Status" proc worker(c: Crawler) {.async.} = try: @@ -82,23 +81,8 @@ proc worker(c: Crawler) {.async.} = quit QuitFailure method start*(c: Crawler, state: State): Future[?!void] {.async.} = - # if c.todoNodes.len < 1: - # let nodeIds = c.dht.getRoutingTableNodeIds() - # info "Loading routing-table nodes to todo-list...", nodes = nodeIds.len - # for id in nodeIds: - # if err =? (await c.addNewTodoNode(id)).errorOption: - # error "Failed to add routing-table node to todo-list", err = err.msg - # return failure(err) - info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs asyncSpawn c.worker() - - proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = - info "Crawler sees nodes found!", num = nids.len - return success() - - let handle = state.events.nodesFound.subscribe(onNodesFound) - return success() method stop*(c: Crawler): Future[?!void] {.async.} = diff --git a/codexcrawler/components/dht.nim b/codexcrawler/components/dht.nim index ab1203b..2cef9ec 100644 --- a/codexcrawler/components/dht.nim +++ b/codexcrawler/components/dht.nim @@ -11,6 +11,7 @@ from pkg/nimcrypto import keccak256 import ../utils/keyutils import ../utils/datastoreutils import ../utils/rng +import ../utils/asyncdataevent import ../component import ../config import ../state @@ -44,9 +45,9 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node = let node = d.protocol.getNode(nodeId) if node.isSome(): return success(node.get()) - return failure("Node not found for id: " & $nodeId) + return failure("Node not found for id: " & $(NodeId(nodeId))) -proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] = +proc getRoutingTableNodeIds(d: Dht): seq[NodeId] = var ids = newSeq[NodeId]() for bucket in d.protocol.routingTable.buckets: for node in bucket.nodes: @@ -82,7 +83,7 @@ method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} = trace "Removing provider", peerId d.protocol.removeProvidersLocal(peerId) -proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) = +proc updateAnnounceRecord(d: Dht, addrs: openArray[MultiAddress]) = d.announceAddrs = @addrs trace "Updating announce record", addrs = d.announceAddrs @@ -93,7 +94,7 @@ proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.providerRecord).expect("Should update SPR") -proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) = +proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = trace "Updating Dht record", addrs = addrs d.dhtRecord = SignedPeerRecord .init(d.key, PeerRecord.init(d.peerId, @addrs)) @@ -102,9 +103,19 @@ proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") +proc findRoutingTableNodes(d: Dht, state: State) {.async.} = + await sleepAsync(5.seconds) + let nodes = d.getRoutingTableNodeIds() + + if err =? (await state.events.nodesFound.fire(nodes)).errorOption: + error "Failed to raise routing-table nodes as found nodes", err = err.msg + else: + trace "Routing table nodes raise as found nodes", num = nodes.len + method start*(d: Dht, state: State): Future[?!void] {.async.} = d.protocol.open() await d.protocol.start() + asyncSpawn d.findRoutingTableNodes(state) return success() method stop*(d: Dht): Future[?!void] {.async.} = diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim new file mode 100644 index 0000000..5f96cf0 --- /dev/null +++ b/codexcrawler/components/nodestore.nim @@ -0,0 +1,40 @@ +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable/results +import pkg/chronos +import pkg/libp2p + +import ../types +import + +type + NodeEntry* = object + id*: Nid + lastVisit*: uint64 + + NodeStore* = ref object + store: TypedDatastore + +proc `$`*(entry: NodeEntry): string = + $entry.id & ":" & $entry.lastVisit + +proc toBytes*(entry: NodeEntry): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $entry.id) + buffer.write(2, entry.lastVisit) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = + var + buffer = initProtoBuffer(data) + idStr: string + lastVisit: uint64 + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + if buffer.getField(2, lastVisit).isErr: + return failure("Unable to decode `lastVisit`") + + return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index 755cc5d..092143a 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -5,7 +5,6 @@ import pkg/questionable/results import ./dht import ../list -import ../nodeentry import ../config import ../component import ../state @@ -20,28 +19,28 @@ type TimeTracker* = ref object of Component nokNodes: List workerDelay: int -proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} = - var toMove = newSeq[NodeEntry]() - proc onItem(item: NodeEntry) = - if item.lastVisit < expiry: - toMove.add(item) +# # proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} = +# # var toMove = newSeq[NodeEntry]() +# # proc onItem(item: NodeEntry) = +# # if item.lastVisit < expiry: +# # toMove.add(item) - await list.iterateAll(onItem) +# # await list.iterateAll(onItem) - if toMove.len > 0: - trace "expired node, moving to todo", nodes = $toMove.len +# # if toMove.len > 0: +# # trace "expired node, moving to todo", nodes = $toMove.len - for item in toMove: - if err =? (await t.todoNodes.add(item)).errorOption: - error "Failed to add expired node to todo list", err = err.msg - return - if err =? (await list.remove(item)).errorOption: - error "Failed to remove expired node to source list", err = err.msg +# # for item in toMove: +# # if err =? (await t.todoNodes.add(item)).errorOption: +# # error "Failed to add expired node to todo list", err = err.msg +# # return +# # if err =? (await list.remove(item)).errorOption: +# # error "Failed to remove expired node to source list", err = err.msg -proc step(t: TimeTracker) {.async.} = - let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64 - await t.processList(t.okNodes, expiry) - await t.processList(t.nokNodes, expiry) +# proc step(t: TimeTracker) {.async.} = +# let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64 +# await t.processList(t.okNodes, expiry) +# await t.processList(t.nokNodes, expiry) proc worker(t: TimeTracker) {.async.} = try: diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 935d4ae..aebebe2 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -13,7 +13,6 @@ import std/sets import std/sequtils import std/os -import ./nodeentry import ./types logScope: @@ -21,25 +20,25 @@ logScope: type OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} - OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].} + OnItem = proc(item: Nid): void {.gcsafe, raises: [].} List* = ref object name: string store: TypedDatastore - items: seq[NodeEntry] + items: HashSet[Nid] onMetric: OnUpdateMetric emptySignal: ?Future[void] -proc encode(s: NodeEntry): seq[byte] = +proc encode(s: Nid): seq[byte] = s.toBytes() -proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T = +proc decode(T: type Nid, bytes: seq[byte]): ?!T = if bytes.len < 1: - return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) - return NodeEntry.fromBytes(bytes) + return success(Nid.fromStr("0")) + return Nid.fromBytes(bytes) -proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = - without itemKey =? Key.init(this.name / $item.id), err: +proc saveItem(this: List, item: Nid): Future[?!void] {.async.} = + without itemKey =? Key.init(this.name / $item), err: return failure(err) ?await this.store.put(itemKey, item) return success() @@ -47,11 +46,11 @@ proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = proc load*(this: List): Future[?!void] {.async.} = let id = Nid.fromStr("0") let bytes = newSeq[byte]() - let ne = NodeEntry.fromBytes(bytes) + let ne = Nid.fromBytes(bytes) without queryKey =? Key.init(this.name), err: return failure(err) - without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err: + without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: return failure(err) while not iter.finished: @@ -59,8 +58,8 @@ proc load*(this: List): Future[?!void] {.async.} = return failure(err) without value =? item.value, err: return failure(err) - if value.id > 0 or value.lastVisit > 0: - this.items.add(value) + if value > 0: + this.items.incl(value) this.onMetric(this.items.len.int64) info "Loaded list", name = this.name, items = this.items.len @@ -71,40 +70,38 @@ proc new*( ): List = List(name: name, store: store, onMetric: onMetric) -proc contains*(this: List, nodeId: Nid): bool = - this.items.anyIt(it.id == nodeId) +proc contains*(this: List, nid: Nid): bool = + this.items.anyIt(it == nid) -proc contains*(this: List, item: NodeEntry): bool = - this.contains(item.id) - -proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} = - if this.contains(item): +proc add*(this: List, nid: Nid): Future[?!void] {.async.} = + if this.contains(nid): return success() - this.items.add(item) + this.items.incl(nid) this.onMetric(this.items.len.int64) + if err =? (await this.saveItem(nid)).errorOption: + return failure(err) + if s =? this.emptySignal: trace "List no longer empty.", name = this.name s.complete() this.emptySignal = Future[void].none - if err =? (await this.saveItem(item)).errorOption: - return failure(err) return success() -proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} = +proc remove*(this: List, nid: Nid): Future[?!void] {.async.} = if this.items.len < 1: return failure(this.name & "List is empty.") - this.items.keepItIf(item.id != it.id) - without itemKey =? Key.init(this.name / $item.id), err: + this.items.excl(nid) + without itemKey =? Key.init(this.name / $nid), err: return failure(err) ?await this.store.delete(itemKey) this.onMetric(this.items.len.int64) return success() -proc pop*(this: List): Future[?!NodeEntry] {.async.} = +proc pop*(this: List): Future[?!Nid] {.async.} = if this.items.len < 1: trace "List is empty. Waiting for new items...", name = this.name let signal = newFuture[void]("list.emptySignal") @@ -113,7 +110,7 @@ proc pop*(this: List): Future[?!NodeEntry] {.async.} = if this.items.len < 1: return failure(this.name & "List is empty.") - let item = this.items[0] + let item = this.items.pop() if err =? (await this.remove(item)).errorOption: return failure(err) diff --git a/codexcrawler/nodeentry.nim b/codexcrawler/nodeentry.nim index 6f80411..e69de29 100644 --- a/codexcrawler/nodeentry.nim +++ b/codexcrawler/nodeentry.nim @@ -1,33 +0,0 @@ -import pkg/questionable/results -import pkg/chronos -import pkg/libp2p - -import ./types - -type NodeEntry* = object - id*: Nid - lastVisit*: uint64 - -proc `$`*(entry: NodeEntry): string = - $entry.id & ":" & $entry.lastVisit - -proc toBytes*(entry: NodeEntry): seq[byte] = - var buffer = initProtoBuffer() - buffer.write(1, $entry.id) - buffer.write(2, entry.lastVisit) - buffer.finish() - return buffer.buffer - -proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = - var - buffer = initProtoBuffer(data) - idStr: string - lastVisit: uint64 - - if buffer.getField(1, idStr).isErr: - return failure("Unable to decode `idStr`") - - if buffer.getField(2, lastVisit).isErr: - return failure("Unable to decode `lastVisit`") - - return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) diff --git a/codexcrawler/types.nim b/codexcrawler/types.nim index b4fe39c..1f1d2d9 100644 --- a/codexcrawler/types.nim +++ b/codexcrawler/types.nim @@ -1,7 +1,8 @@ import pkg/stew/byteutils import pkg/stew/endians2 -import pkg/questionable +import pkg/questionable/results import pkg/codexdht +import pkg/libp2p type Nid* = NodeId @@ -10,3 +11,19 @@ proc `$`*(nid: Nid): string = proc fromStr*(T: type Nid, s: string): Nid = Nid(UInt256.fromHex(s)) + +proc toBytes*(nid: Nid): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $nid) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type Nid, data: openArray[byte]): ?!Nid = + var + buffer = initProtoBuffer(data) + idStr: string + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + return success(Nid.fromStr(idStr))