From 5fcd7a7a653a5b8a8c0a337ebde20b9ac145d3c6 Mon Sep 17 00:00:00 2001 From: Ben Date: Mon, 10 Feb 2025 16:24:54 +0100 Subject: [PATCH] sets up empty nodestore --- codexcrawler/application.nim | 63 +++++++++++-------------- codexcrawler/component.nim | 2 +- codexcrawler/components/crawler.nim | 2 +- codexcrawler/components/dht.nim | 30 +++++++----- codexcrawler/components/nodestore.nim | 43 ++++++++++++++--- codexcrawler/components/timetracker.nim | 2 +- codexcrawler/installer.nim | 15 ++++-- codexcrawler/nodeentry.nim | 0 codexcrawler/state.nim | 2 +- 9 files changed, 95 insertions(+), 64 deletions(-) delete mode 100644 codexcrawler/nodeentry.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index c6dab72..d894aa6 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -34,43 +34,35 @@ type okNodes*: List nokNodes*: List -proc initializeLists(app: Application): Future[?!void] {.async.} = - without store =? createTypedDatastore(app.config.dataDir / "lists"), err: - return failure(err) +# proc initializeLists(app: Application): Future[?!void] {.async.} = +# without store =? createTypedDatastore(app.config.dataDir / "lists"), err: +# return failure(err) - # We can't extract this into a function because gauges cannot be passed as argument. - # The use of global state in nim-metrics is not pleasant. - proc onTodoMetric(value: int64) = - todoNodesGauge.set(value) +# # We can't extract this into a function because gauges cannot be passed as argument. +# # The use of global state in nim-metrics is not pleasant. +# proc onTodoMetric(value: int64) = +# todoNodesGauge.set(value) - proc onOkMetric(value: int64) = - okNodesGauge.set(value) +# proc onOkMetric(value: int64) = +# okNodesGauge.set(value) - proc onNokMetric(value: int64) = - nokNodesGauge.set(value) +# proc onNokMetric(value: int64) = +# nokNodesGauge.set(value) - app.todoNodes = List.new("todo", store, onTodoMetric) - app.okNodes = List.new("ok", store, onOkMetric) - app.nokNodes = List.new("nok", store, onNokMetric) +# app.todoNodes = List.new("todo", store, onTodoMetric) +# app.okNodes = List.new("ok", store, onOkMetric) +# app.nokNodes = List.new("nok", store, onNokMetric) - if err =? (await app.todoNodes.load()).errorOption: - return failure(err) - if err =? (await app.okNodes.load()).errorOption: - return failure(err) - if err =? (await app.nokNodes.load()).errorOption: - return failure(err) +# if err =? (await app.todoNodes.load()).errorOption: +# return failure(err) +# if err =? (await app.okNodes.load()).errorOption: +# return failure(err) +# if err =? (await app.nokNodes.load()).errorOption: +# return failure(err) - return success() +# return success() proc initializeApp(app: Application): Future[?!void] {.async.} = - if err =? (await app.initializeLists()).errorOption: - error "Failed to initialize lists", err = err.msg - return failure(err) - - without components =? (await createComponents(app.config)), err: - error "Failed to create componenents", err = err.msg - return failure(err) - # todo move this let state = State( config: app.config, @@ -82,15 +74,14 @@ proc initializeApp(app: Application): Future[?!void] {.async.} = ), ) - for c in components: - if err =? (await c.start(state)).errorOption: - error "Failed to start component", err = err.msg - - # test raise newnodes - let nodes: seq[Nid] = newSeq[Nid]() - if err =? (await state.events.nodesFound.fire(nodes)).errorOption: + without components =? (await createComponents(state)), err: + error "Failed to create componenents", err = err.msg return failure(err) + for c in components: + if err =? (await c.start()).errorOption: + error "Failed to start component", err = err.msg + return success() proc stop*(app: Application) = diff --git a/codexcrawler/component.nim b/codexcrawler/component.nim index 479cdfc..f36b0a2 100644 --- a/codexcrawler/component.nim +++ b/codexcrawler/component.nim @@ -5,7 +5,7 @@ import ./state type Component* = ref object of RootObj -method start*(c: Component, state: State): Future[?!void] {.async, base.} = +method start*(c: Component): Future[?!void] {.async, base.} = raiseAssert("call to abstract method: component.start") method stop*(c: Component): Future[?!void] {.async, base.} = diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim index 5f4c296..e86de85 100644 --- a/codexcrawler/components/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -80,7 +80,7 @@ proc worker(c: Crawler) {.async.} = error "Exception in crawler worker", msg = exc.msg quit QuitFailure -method start*(c: Crawler, state: State): Future[?!void] {.async.} = +method start*(c: Crawler): Future[?!void] {.async.} = info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs asyncSpawn c.worker() return success() diff --git a/codexcrawler/components/dht.nim b/codexcrawler/components/dht.nim index 2cef9ec..565af1e 100644 --- a/codexcrawler/components/dht.nim +++ b/codexcrawler/components/dht.nim @@ -13,7 +13,6 @@ import ../utils/datastoreutils import ../utils/rng import ../utils/asyncdataevent import ../component -import ../config import ../state export discv5 @@ -22,6 +21,7 @@ logScope: topics = "dht" type Dht* = ref object of Component + state: State protocol*: discv5.Protocol key: PrivateKey peerId: PeerId @@ -103,19 +103,19 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -proc findRoutingTableNodes(d: Dht, state: State) {.async.} = +proc findRoutingTableNodes(d: Dht) {.async.} = await sleepAsync(5.seconds) let nodes = d.getRoutingTableNodeIds() - if err =? (await state.events.nodesFound.fire(nodes)).errorOption: + if err =? (await d.state.events.nodesFound.fire(nodes)).errorOption: error "Failed to raise routing-table nodes as found nodes", err = err.msg else: trace "Routing table nodes raise as found nodes", num = nodes.len -method start*(d: Dht, state: State): Future[?!void] {.async.} = +method start*(d: Dht): Future[?!void] {.async.} = d.protocol.open() await d.protocol.start() - asyncSpawn d.findRoutingTableNodes(state) + asyncSpawn d.findRoutingTableNodes() return success() method stop*(d: Dht): Future[?!void] {.async.} = @@ -124,6 +124,7 @@ method stop*(d: Dht): Future[?!void] {.async.} = proc new( T: type Dht, + state: State, key: PrivateKey, bindIp = IPv4_any(), bindPort = 0.Port, @@ -131,7 +132,9 @@ proc new( bootstrapNodes: openArray[SignedPeerRecord] = [], store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), ): Dht = - var self = Dht(key: key, peerId: PeerId.init(key).expect("Should construct PeerId")) + var self = Dht( + state: state, key: key, peerId: PeerId.init(key).expect("Should construct PeerId") + ) self.updateAnnounceRecord(announceAddrs) @@ -155,29 +158,30 @@ proc new( self -proc createDht*(config: Config): Future[?!Dht] {.async.} = - without dhtStore =? createDatastore(config.dataDir / "dht"), err: +proc createDht*(state: State): Future[?!Dht] {.async.} = + without dhtStore =? createDatastore(state.config.dataDir / "dht"), err: return failure(err) - let keyPath = config.dataDir / "privatekey" + let keyPath = state.config.dataDir / "privatekey" without privateKey =? setupKey(keyPath), err: return failure(err) var listenAddresses = newSeq[MultiAddress]() # TODO: when p2p connections are supported: - # let aaa = MultiAddress.init("/ip4/" & config.publicIp & "/tcp/53678").expect("Should init multiaddress") + # let aaa = MultiAddress.init("/ip4/" & state.config.publicIp & "/tcp/53678").expect("Should init multiaddress") # listenAddresses.add(aaa) var discAddresses = newSeq[MultiAddress]() let bbb = MultiAddress - .init("/ip4/" & config.publicIp & "/udp/" & $config.discPort) + .init("/ip4/" & state.config.publicIp & "/udp/" & $state.config.discPort) .expect("Should init multiaddress") discAddresses.add(bbb) let dht = Dht.new( + state, privateKey, - bindPort = config.discPort, + bindPort = state.config.discPort, announceAddrs = listenAddresses, - bootstrapNodes = config.bootNodes, + bootstrapNodes = state.config.bootNodes, store = dhtStore, ) diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index 5f96cf0..8d0b042 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -1,19 +1,25 @@ -import pkg/datastore +import std/os import pkg/datastore/typedds import pkg/questionable/results +import pkg/chronicles import pkg/chronos import pkg/libp2p import ../types -import +import ../component +import ../state +import ../utils/datastoreutils +import ../utils/asyncdataevent type - NodeEntry* = object - id*: Nid - lastVisit*: uint64 + NodeEntry = object + id: Nid + lastVisit: uint64 - NodeStore* = ref object + NodeStore* = ref object of Component + state: State store: TypedDatastore + sub: AsyncDataEventSubscription proc `$`*(entry: NodeEntry): string = $entry.id & ":" & $entry.lastVisit @@ -38,3 +44,28 @@ proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = return failure("Unable to decode `lastVisit`") return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) + +proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = + # put the nodes in the store. + # track all new ones, if any, raise newNodes event. + return success() + +method start*(s: NodeStore): Future[?!void] {.async.} = + info "Starting nodestore..." + + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + return await s.processFoundNodes(nids) + + s.sub = s.state.events.nodesFound.subscribe(onNodesFound) + return success() + +method stop*(s: NodeStore): Future[?!void] {.async.} = + await s.state.events.nodesFound.unsubscribe(s.sub) + return success() + +proc createNodeStore*(state: State): ?!NodeStore = + without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err: + error "Failed to create typed datastore for node store", err = err.msg + return failure(err) + + return success(NodeStore(state: state, store: ds)) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index 092143a..e811481 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -51,7 +51,7 @@ proc worker(t: TimeTracker) {.async.} = error "Exception in timetracker worker", msg = exc.msg quit QuitFailure -method start*(t: TimeTracker, state: State): Future[?!void] {.async.} = +method start*(t: TimeTracker): Future[?!void] {.async.} = info "Starting timetracker...", revisitDelayMins = $t.workerDelay asyncSpawn t.worker() return success() diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 247f7ee..3642b8f 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -1,19 +1,24 @@ import pkg/chronos import pkg/questionable/results -import ./config +import ./state import ./component import ./components/dht import ./components/crawler import ./components/timetracker +import ./components/nodestore -proc createComponents*(config: Config): Future[?!seq[Component]] {.async.} = +proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = var components: seq[Component] = newSeq[Component]() - without dht =? (await createDht(config)), err: + without dht =? (await createDht(state)), err: return failure(err) + without nodeStore =? createNodeStore(state), err: + return failure(err) + + components.add(nodeStore) components.add(dht) - components.add(Crawler.new(dht, config)) - components.add(TimeTracker.new(config)) + components.add(Crawler.new(dht, state.config)) + components.add(TimeTracker.new(state.config)) return success(components) diff --git a/codexcrawler/nodeentry.nim b/codexcrawler/nodeentry.nim deleted file mode 100644 index e69de29..0000000 diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index f7cd928..ead6430 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -20,7 +20,7 @@ type State* = ref object config*: Config - events*: Events # appstate + events*: Events proc whileRunning*(this: State, step: OnStep, delay: Duration) = discard