Setting up for the nodestore

This commit is contained in:
Ben 2025-02-10 16:02:47 +01:00
parent 218443ebe4
commit 14e74d6380
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
8 changed files with 149 additions and 146 deletions

View File

@ -67,18 +67,6 @@ proc initializeApp(app: Application): Future[?!void] {.async.} =
error "Failed to initialize lists", err = err.msg
return failure(err)
# if err =? (await app.initializeDht()).errorOption:
# error "Failed to initialize DHT", err = err.msg
# return failure(err)
# if err =? (await app.initializeCrawler()).errorOption:
# error "Failed to initialize crawler", err = err.msg
# return failure(err)
# if err =? (await app.initializeTimeTracker()).errorOption:
# error "Failed to initialize timetracker", err = err.msg
# return failure(err)
without components =? (await createComponents(app.config)), err:
error "Failed to create componenents", err = err.msg
return failure(err)

View File

@ -5,7 +5,6 @@ import pkg/questionable/results
import ./dht
import ../list
import ../nodeentry
import ../config
import ../component
import ../types
@ -29,48 +28,48 @@ proc isNew(c: Crawler, node: Node): bool =
not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and
not c.nokNodes.contains(node.id)
proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
if err =? (await c.nokNodes.add(target)).errorOption:
error "Failed to add not-OK-node to list", err = err.msg
# proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
# if err =? (await c.nokNodes.add(target)).errorOption:
# error "Failed to add not-OK-node to list", err = err.msg
proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} =
if err =? (await c.okNodes.add(target)).errorOption:
error "Failed to add OK-node to list", err = err.msg
# proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} =
# if err =? (await c.okNodes.add(target)).errorOption:
# error "Failed to add OK-node to list", err = err.msg
proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} =
let entry = NodeEntry(id: nodeId, lastVisit: 0)
return await c.todoNodes.add(entry)
# proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} =
# let entry = NodeEntry(id: nodeId, lastVisit: 0)
# return await c.todoNodes.add(entry)
proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
for node in newNodes:
if err =? (await c.addNewTodoNode(node.id)).errorOption:
error "Failed to add todo-node to list", err = err.msg
# proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
# for node in newNodes:
# if err =? (await c.addNewTodoNode(node.id)).errorOption:
# error "Failed to add todo-node to list", err = err.msg
proc step(c: Crawler) {.async.} =
logScope:
todo = $c.todoNodes.len
ok = $c.okNodes.len
nok = $c.nokNodes.len
# proc step(c: Crawler) {.async.} =
# logScope:
# todo = $c.todoNodes.len
# ok = $c.okNodes.len
# nok = $c.nokNodes.len
without var target =? (await c.todoNodes.pop()), err:
error "Failed to get todo node", err = err.msg
# without var target =? (await c.todoNodes.pop()), err:
# error "Failed to get todo node", err = err.msg
target.lastVisit = Moment.now().epochSeconds.uint64
# target.lastVisit = Moment.now().epochSeconds.uint64
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
await c.handleNodeNotOk(target)
return
# without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
# await c.handleNodeNotOk(target)
# return
let newNodes = receivedNodes.filterIt(isNew(c, it))
if newNodes.len > 0:
trace "Discovered new nodes", newNodes = newNodes.len
# let newNodes = receivedNodes.filterIt(isNew(c, it))
# if newNodes.len > 0:
# trace "Discovered new nodes", newNodes = newNodes.len
await c.handleNodeOk(target)
await c.addNewTodoNodes(newNodes)
# await c.handleNodeOk(target)
# await c.addNewTodoNodes(newNodes)
# Don't log the status every loop:
if (c.todoNodes.len mod 10) == 0:
trace "Status"
# # Don't log the status every loop:
# if (c.todoNodes.len mod 10) == 0:
# trace "Status"
proc worker(c: Crawler) {.async.} =
try:
@ -82,23 +81,8 @@ proc worker(c: Crawler) {.async.} =
quit QuitFailure
method start*(c: Crawler, state: State): Future[?!void] {.async.} =
# if c.todoNodes.len < 1:
# let nodeIds = c.dht.getRoutingTableNodeIds()
# info "Loading routing-table nodes to todo-list...", nodes = nodeIds.len
# for id in nodeIds:
# if err =? (await c.addNewTodoNode(id)).errorOption:
# error "Failed to add routing-table node to todo-list", err = err.msg
# return failure(err)
info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs
asyncSpawn c.worker()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
info "Crawler sees nodes found!", num = nids.len
return success()
let handle = state.events.nodesFound.subscribe(onNodesFound)
return success()
method stop*(c: Crawler): Future[?!void] {.async.} =

View File

@ -11,6 +11,7 @@ from pkg/nimcrypto import keccak256
import ../utils/keyutils
import ../utils/datastoreutils
import ../utils/rng
import ../utils/asyncdataevent
import ../component
import ../config
import ../state
@ -44,9 +45,9 @@ proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
let node = d.protocol.getNode(nodeId)
if node.isSome():
return success(node.get())
return failure("Node not found for id: " & $nodeId)
return failure("Node not found for id: " & $(NodeId(nodeId)))
proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] =
proc getRoutingTableNodeIds(d: Dht): seq[NodeId] =
var ids = newSeq[NodeId]()
for bucket in d.protocol.routingTable.buckets:
for node in bucket.nodes:
@ -82,7 +83,7 @@ method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
trace "Removing provider", peerId
d.protocol.removeProvidersLocal(peerId)
proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) =
proc updateAnnounceRecord(d: Dht, addrs: openArray[MultiAddress]) =
d.announceAddrs = @addrs
trace "Updating announce record", addrs = d.announceAddrs
@ -93,7 +94,7 @@ proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil:
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) =
proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
trace "Updating Dht record", addrs = addrs
d.dhtRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, @addrs))
@ -102,9 +103,19 @@ proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc findRoutingTableNodes(d: Dht, state: State) {.async.} =
await sleepAsync(5.seconds)
let nodes = d.getRoutingTableNodeIds()
if err =? (await state.events.nodesFound.fire(nodes)).errorOption:
error "Failed to raise routing-table nodes as found nodes", err = err.msg
else:
trace "Routing table nodes raise as found nodes", num = nodes.len
method start*(d: Dht, state: State): Future[?!void] {.async.} =
d.protocol.open()
await d.protocol.start()
asyncSpawn d.findRoutingTableNodes(state)
return success()
method stop*(d: Dht): Future[?!void] {.async.} =

View File

@ -0,0 +1,40 @@
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable/results
import pkg/chronos
import pkg/libp2p
import ../types
import
type
NodeEntry* = object
id*: Nid
lastVisit*: uint64
NodeStore* = ref object
store: TypedDatastore
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit
proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastVisit: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit))

View File

@ -5,7 +5,6 @@ import pkg/questionable/results
import ./dht
import ../list
import ../nodeentry
import ../config
import ../component
import ../state
@ -20,28 +19,28 @@ type TimeTracker* = ref object of Component
nokNodes: List
workerDelay: int
proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
var toMove = newSeq[NodeEntry]()
proc onItem(item: NodeEntry) =
if item.lastVisit < expiry:
toMove.add(item)
# # proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
# # var toMove = newSeq[NodeEntry]()
# # proc onItem(item: NodeEntry) =
# # if item.lastVisit < expiry:
# # toMove.add(item)
await list.iterateAll(onItem)
# # await list.iterateAll(onItem)
if toMove.len > 0:
trace "expired node, moving to todo", nodes = $toMove.len
# # if toMove.len > 0:
# # trace "expired node, moving to todo", nodes = $toMove.len
for item in toMove:
if err =? (await t.todoNodes.add(item)).errorOption:
error "Failed to add expired node to todo list", err = err.msg
return
if err =? (await list.remove(item)).errorOption:
error "Failed to remove expired node to source list", err = err.msg
# # for item in toMove:
# # if err =? (await t.todoNodes.add(item)).errorOption:
# # error "Failed to add expired node to todo list", err = err.msg
# # return
# # if err =? (await list.remove(item)).errorOption:
# # error "Failed to remove expired node to source list", err = err.msg
proc step(t: TimeTracker) {.async.} =
let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64
await t.processList(t.okNodes, expiry)
await t.processList(t.nokNodes, expiry)
# proc step(t: TimeTracker) {.async.} =
# let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64
# await t.processList(t.okNodes, expiry)
# await t.processList(t.nokNodes, expiry)
proc worker(t: TimeTracker) {.async.} =
try:

View File

@ -13,7 +13,6 @@ import std/sets
import std/sequtils
import std/os
import ./nodeentry
import ./types
logScope:
@ -21,25 +20,25 @@ logScope:
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].}
OnItem = proc(item: Nid): void {.gcsafe, raises: [].}
List* = ref object
name: string
store: TypedDatastore
items: seq[NodeEntry]
items: HashSet[Nid]
onMetric: OnUpdateMetric
emptySignal: ?Future[void]
proc encode(s: NodeEntry): seq[byte] =
proc encode(s: Nid): seq[byte] =
s.toBytes()
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
proc decode(T: type Nid, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64))
return NodeEntry.fromBytes(bytes)
return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $item.id), err:
proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $item), err:
return failure(err)
?await this.store.put(itemKey, item)
return success()
@ -47,11 +46,11 @@ proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
proc load*(this: List): Future[?!void] {.async.} =
let id = Nid.fromStr("0")
let bytes = newSeq[byte]()
let ne = NodeEntry.fromBytes(bytes)
let ne = Nid.fromBytes(bytes)
without queryKey =? Key.init(this.name), err:
return failure(err)
without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err:
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:
return failure(err)
while not iter.finished:
@ -59,8 +58,8 @@ proc load*(this: List): Future[?!void] {.async.} =
return failure(err)
without value =? item.value, err:
return failure(err)
if value.id > 0 or value.lastVisit > 0:
this.items.add(value)
if value > 0:
this.items.incl(value)
this.onMetric(this.items.len.int64)
info "Loaded list", name = this.name, items = this.items.len
@ -71,40 +70,38 @@ proc new*(
): List =
List(name: name, store: store, onMetric: onMetric)
proc contains*(this: List, nodeId: Nid): bool =
this.items.anyIt(it.id == nodeId)
proc contains*(this: List, nid: Nid): bool =
this.items.anyIt(it == nid)
proc contains*(this: List, item: NodeEntry): bool =
this.contains(item.id)
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
if this.contains(item):
proc add*(this: List, nid: Nid): Future[?!void] {.async.} =
if this.contains(nid):
return success()
this.items.add(item)
this.items.incl(nid)
this.onMetric(this.items.len.int64)
if err =? (await this.saveItem(nid)).errorOption:
return failure(err)
if s =? this.emptySignal:
trace "List no longer empty.", name = this.name
s.complete()
this.emptySignal = Future[void].none
if err =? (await this.saveItem(item)).errorOption:
return failure(err)
return success()
proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} =
proc remove*(this: List, nid: Nid): Future[?!void] {.async.} =
if this.items.len < 1:
return failure(this.name & "List is empty.")
this.items.keepItIf(item.id != it.id)
without itemKey =? Key.init(this.name / $item.id), err:
this.items.excl(nid)
without itemKey =? Key.init(this.name / $nid), err:
return failure(err)
?await this.store.delete(itemKey)
this.onMetric(this.items.len.int64)
return success()
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
proc pop*(this: List): Future[?!Nid] {.async.} =
if this.items.len < 1:
trace "List is empty. Waiting for new items...", name = this.name
let signal = newFuture[void]("list.emptySignal")
@ -113,7 +110,7 @@ proc pop*(this: List): Future[?!NodeEntry] {.async.} =
if this.items.len < 1:
return failure(this.name & "List is empty.")
let item = this.items[0]
let item = this.items.pop()
if err =? (await this.remove(item)).errorOption:
return failure(err)

View File

@ -1,33 +0,0 @@
import pkg/questionable/results
import pkg/chronos
import pkg/libp2p
import ./types
type NodeEntry* = object
id*: Nid
lastVisit*: uint64
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit
proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastVisit: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit))

View File

@ -1,7 +1,8 @@
import pkg/stew/byteutils
import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import pkg/codexdht
import pkg/libp2p
type Nid* = NodeId
@ -10,3 +11,19 @@ proc `$`*(nid: Nid): string =
proc fromStr*(T: type Nid, s: string): Nid =
Nid(UInt256.fromHex(s))
proc toBytes*(nid: Nid): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $nid)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type Nid, data: openArray[byte]): ?!Nid =
var
buffer = initProtoBuffer(data)
idStr: string
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
return success(Nid.fromStr(idStr))