From 50962d9a915385562d348895a526e9e88236366c Mon Sep 17 00:00:00 2001 From: Ben Date: Mon, 10 Feb 2025 14:49:30 +0100 Subject: [PATCH] setup component abstraction --- codexcrawler/application.nim | 102 +++++------------- codexcrawler/component.nim | 14 +++ codexcrawler/{ => components}/crawler.nim | 29 ++--- codexcrawler/{ => components}/dht.nim | 11 +- codexcrawler/{ => components}/timetracker.nim | 43 ++++---- codexcrawler/config.nim | 10 +- codexcrawler/installer.nim | 54 ++++++++++ codexcrawler/state.nim | 15 +++ codexcrawler/utils/datastoreutils.nim | 15 +++ 9 files changed, 179 insertions(+), 114 deletions(-) create mode 100644 codexcrawler/component.nim rename codexcrawler/{ => components}/crawler.nim (86%) rename codexcrawler/{ => components}/dht.nim (94%) rename codexcrawler/{ => components}/timetracker.nim (73%) create mode 100644 codexcrawler/installer.nim create mode 100644 codexcrawler/state.nim create mode 100644 codexcrawler/utils/datastoreutils.nim diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 975ab65..5eef739 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -4,18 +4,16 @@ import pkg/chronos import pkg/questionable import pkg/questionable/results -import pkg/datastore -import pkg/datastore/typedds import pkg/metrics import ./config import ./utils/logging import ./metrics import ./list -import ./dht -import ./utils/keyutils -import ./crawler -import ./timetracker +import ./utils/datastoreutils +import ./installer +import ./state +import ./component declareGauge(todoNodesGauge, "DHT nodes to be visited") declareGauge(okNodesGauge, "DHT nodes successfully contacted") @@ -29,27 +27,13 @@ type Application* = ref object status: ApplicationStatus - config*: CrawlerConfig + config*: Config todoNodes*: List okNodes*: List nokNodes*: List - dht*: Dht - crawler*: Crawler - timeTracker*: TimeTracker - -proc createDatastore(app: Application, path: string): ?!Datastore = - without store =? LevelDbDatastore.new(path), err: - error "Failed to create datastore" - return failure(err) - return success(Datastore(store)) - -proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore = - without store =? app.createDatastore(path), err: - return failure(err) - return success(TypedDatastore.init(store)) proc initializeLists(app: Application): Future[?!void] {.async.} = - without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err: + without store =? createTypedDatastore(app.config.dataDir / "lists"), err: return failure(err) # We can't extract this into a function because gauges cannot be passed as argument. @@ -76,71 +60,41 @@ proc initializeLists(app: Application): Future[?!void] {.async.} = return success() -proc initializeDht(app: Application): Future[?!void] {.async.} = - without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err: - return failure(err) - let keyPath = app.config.dataDir / "privatekey" - without privateKey =? setupKey(keyPath), err: - return failure(err) - - var listenAddresses = newSeq[MultiAddress]() - # TODO: when p2p connections are supported: - # let aaa = MultiAddress.init("/ip4/" & app.config.publicIp & "/tcp/53678").expect("Should init multiaddress") - # listenAddresses.add(aaa) - - var discAddresses = newSeq[MultiAddress]() - let bbb = MultiAddress - .init("/ip4/" & app.config.publicIp & "/udp/" & $app.config.discPort) - .expect("Should init multiaddress") - discAddresses.add(bbb) - - app.dht = Dht.new( - privateKey, - bindPort = app.config.discPort, - announceAddrs = listenAddresses, - bootstrapNodes = app.config.bootNodes, - store = dhtStore, - ) - - app.dht.updateAnnounceRecord(listenAddresses) - app.dht.updateDhtRecord(discAddresses) - - await app.dht.start() - - return success() - -proc initializeCrawler(app: Application): Future[?!void] {.async.} = - app.crawler = - Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config) - return await app.crawler.start() - -proc initializeTimeTracker(app: Application): Future[?!void] {.async.} = - app.timeTracker = - TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config) - return await app.timeTracker.start() - proc initializeApp(app: Application): Future[?!void] {.async.} = if err =? (await app.initializeLists()).errorOption: error "Failed to initialize lists", err = err.msg return failure(err) - if err =? (await app.initializeDht()).errorOption: - error "Failed to initialize DHT", err = err.msg + # if err =? (await app.initializeDht()).errorOption: + # error "Failed to initialize DHT", err = err.msg + # return failure(err) + + # if err =? (await app.initializeCrawler()).errorOption: + # error "Failed to initialize crawler", err = err.msg + # return failure(err) + + # if err =? (await app.initializeTimeTracker()).errorOption: + # error "Failed to initialize timetracker", err = err.msg + # return failure(err) + + without components =? (await createComponents(app.config)), err: + error "Failed to create componenents", err = err.msg return failure(err) - if err =? (await app.initializeCrawler()).errorOption: - error "Failed to initialize crawler", err = err.msg - return failure(err) + # todo move this + let state = State( + config: app.config + ) - if err =? (await app.initializeTimeTracker()).errorOption: - error "Failed to initialize timetracker", err = err.msg - return failure(err) + for c in components: + if err =? (await c.start(state)).errorOption: + error "Failed to start component", err = err.msg return success() proc stop*(app: Application) = app.status = ApplicationStatus.Stopping - waitFor app.dht.stop() + # waitFor app.dht.stop() proc run*(app: Application) = app.config = parseConfig() diff --git a/codexcrawler/component.nim b/codexcrawler/component.nim new file mode 100644 index 0000000..29acc8c --- /dev/null +++ b/codexcrawler/component.nim @@ -0,0 +1,14 @@ +import pkg/chronos +import pkg/questionable/results + +import ./state + +type + Component* = ref object of RootObj + +method start*(c: Component, state: State): Future[?!void] {.async, base.} = + raiseAssert("call to abstract method") + +method stop*(c: Component): Future[?!void] {.async, base.} = + raiseAssert("call to abstract method") + diff --git a/codexcrawler/crawler.nim b/codexcrawler/components/crawler.nim similarity index 86% rename from codexcrawler/crawler.nim rename to codexcrawler/components/crawler.nim index 5673e94..1a03b56 100644 --- a/codexcrawler/crawler.nim +++ b/codexcrawler/components/crawler.nim @@ -4,18 +4,19 @@ import pkg/questionable import pkg/questionable/results import ./dht -import ./list -import ./nodeentry -import ./config +import ../list +import ../nodeentry +import ../config +import ../component import std/sequtils logScope: topics = "crawler" -type Crawler* = ref object +type Crawler* = ref object of Component dht: Dht - config: CrawlerConfig + config: Config todoNodes: List okNodes: List nokNodes: List @@ -90,14 +91,18 @@ proc start*(c: Crawler): Future[?!void] {.async.} = asyncSpawn c.worker() return success() +proc stop*(c: Crawler): Future[?!void] {.async.} = + return success() + proc new*( T: type Crawler, dht: Dht, - todoNodes: List, - okNodes: List, - nokNodes: List, - config: CrawlerConfig, + # todoNodes: List, + # okNodes: List, + # nokNodes: List, + config: Config, ): Crawler = - Crawler( - dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config - ) + raiseAssert("todo") + # Crawler( + # dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config + # ) diff --git a/codexcrawler/dht.nim b/codexcrawler/components/dht.nim similarity index 94% rename from codexcrawler/dht.nim rename to codexcrawler/components/dht.nim index 3e8c2fb..d46f13e 100644 --- a/codexcrawler/dht.nim +++ b/codexcrawler/components/dht.nim @@ -7,14 +7,15 @@ import pkg/questionable/results import pkg/codexdht/discv5/[routing_table, protocol as discv5] from pkg/nimcrypto import keccak256 -import ./utils/rng +import ../utils/rng +import ../component export discv5 logScope: topics = "dht" -type Dht* = ref object +type Dht* = ref object of Component protocol*: discv5.Protocol key: PrivateKey peerId: PeerId @@ -96,12 +97,14 @@ proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -proc start*(d: Dht) {.async.} = +proc start*(d: Dht): Future[?!void] {.async.} = d.protocol.open() await d.protocol.start() + return success() -proc stop*(d: Dht) {.async.} = +proc stop*(d: Dht): Future[?!void] {.async.} = await d.protocol.closeWait() + return success() proc new*( T: type Dht, diff --git a/codexcrawler/timetracker.nim b/codexcrawler/components/timetracker.nim similarity index 73% rename from codexcrawler/timetracker.nim rename to codexcrawler/components/timetracker.nim index 4c91893..da348ff 100644 --- a/codexcrawler/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -4,15 +4,16 @@ import pkg/questionable import pkg/questionable/results import ./dht -import ./list -import ./nodeentry -import ./config +import ../list +import ../nodeentry +import ../config +import ../component logScope: topics = "timetracker" -type TimeTracker* = ref object - config: CrawlerConfig +type TimeTracker* = ref object of Component + config: Config todoNodes: List okNodes: List nokNodes: List @@ -55,21 +56,25 @@ proc start*(t: TimeTracker): Future[?!void] {.async.} = asyncSpawn t.worker() return success() +proc stop*(t: TimeTracker): Future[?!void] {.async.} = + return success() + proc new*( T: type TimeTracker, - todoNodes: List, - okNodes: List, - nokNodes: List, - config: CrawlerConfig, + # todoNodes: List, + # okNodes: List, + # nokNodes: List, + config: Config, ): TimeTracker = - var delay = config.revisitDelayMins div 10 - if delay < 1: - delay = 1 + raiseAssert("todo") + # var delay = config.revisitDelayMins div 10 + # if delay < 1: + # delay = 1 - TimeTracker( - todoNodes: todoNodes, - okNodes: okNodes, - nokNodes: nokNodes, - config: config, - workerDelay: delay, - ) + # TimeTracker( + # todoNodes: todoNodes, + # okNodes: okNodes, + # nokNodes: nokNodes, + # config: config, + # workerDelay: delay, + # ) diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 2f0e137..01d9c5e 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -27,7 +27,7 @@ Options: import strutils import docopt -type CrawlerConfig* = ref object +type Config* = ref object logLevel*: string publicIp*: string metricsAddress*: IpAddress @@ -38,8 +38,8 @@ type CrawlerConfig* = ref object stepDelayMs*: int revisitDelayMins*: int -proc `$`*(config: CrawlerConfig): string = - "CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & +proc `$`*(config: Config): string = + "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & @@ -81,13 +81,13 @@ proc stringToSpr(uri: string): SignedPeerRecord = proc getBootNodes(input: string): seq[SignedPeerRecord] = getBootNodeStrings(input).mapIt(stringToSpr(it)) -proc parseConfig*(): CrawlerConfig = +proc parseConfig*(): Config = let args = docopt(doc, version = crawlerFullVersion) proc get(name: string): string = $args[name] - return CrawlerConfig( + return Config( logLevel: get("--logLevel"), publicIp: get("--publicIp"), metricsAddress: parseIpAddress(get("--metricsAddress")), diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim new file mode 100644 index 0000000..02f6a53 --- /dev/null +++ b/codexcrawler/installer.nim @@ -0,0 +1,54 @@ +import std/os +import pkg/chronos +import pkg/chronicles +import pkg/questionable/results + +import ./config +import ./component +import ./components/dht +import ./components/crawler +import ./components/timetracker +import ./utils/keyutils +import ./utils/datastoreutils + +proc initializeDht(config: Config): Future[?!Dht] {.async.} = + without dhtStore =? createDatastore(config.dataDir / "dht"), err: + return failure(err) + let keyPath = config.dataDir / "privatekey" + without privateKey =? setupKey(keyPath), err: + return failure(err) + + var listenAddresses = newSeq[MultiAddress]() + # TODO: when p2p connections are supported: + # let aaa = MultiAddress.init("/ip4/" & config.publicIp & "/tcp/53678").expect("Should init multiaddress") + # listenAddresses.add(aaa) + + var discAddresses = newSeq[MultiAddress]() + let bbb = MultiAddress + .init("/ip4/" & config.publicIp & "/udp/" & $config.discPort) + .expect("Should init multiaddress") + discAddresses.add(bbb) + + let dht = Dht.new( + privateKey, + bindPort = config.discPort, + announceAddrs = listenAddresses, + bootstrapNodes = config.bootNodes, + store = dhtStore, + ) + + dht.updateAnnounceRecord(listenAddresses) + dht.updateDhtRecord(discAddresses) + + return success(dht) + +proc createComponents*(config: Config): Future[?!seq[Component]] {.async.} = + var components: seq[Component] = newSeq[Component]() + + without dht =? (await initializeDht(config)), err: + return failure(err) + + components.add(dht) + components.add(Crawler.new(dht, config)) + components.add(TimeTracker.new(config)) + return success(components) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim new file mode 100644 index 0000000..5a37cf0 --- /dev/null +++ b/codexcrawler/state.nim @@ -0,0 +1,15 @@ +import pkg/chronos +import pkg/questionable/results + +import ./config + +type + OnStep = proc(): Future[?!void] {.async: (raises: []), gcsafe.} + State* = ref object + config*: Config + # events + # appstate + +proc whileRunning*(this: State, step: OnStep, delay: Duration) = + discard + #todo: while status == running, step(), asyncsleep duration diff --git a/codexcrawler/utils/datastoreutils.nim b/codexcrawler/utils/datastoreutils.nim new file mode 100644 index 0000000..8754d63 --- /dev/null +++ b/codexcrawler/utils/datastoreutils.nim @@ -0,0 +1,15 @@ +import pkg/chronicles +import pkg/questionable/results +import pkg/datastore +import pkg/datastore/typedds + +proc createDatastore*(path: string): ?!Datastore = + without store =? LevelDbDatastore.new(path), err: + error "Failed to create datastore" + return failure(err) + return success(Datastore(store)) + +proc createTypedDatastore*(path: string): ?!TypedDatastore = + without store =? createDatastore(path), err: + return failure(err) + return success(TypedDatastore.init(store))