setup component abstraction

This commit is contained in:
Ben 2025-02-10 14:49:30 +01:00
parent ed26070d24
commit 50962d9a91
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
9 changed files with 179 additions and 114 deletions

View File

@ -4,18 +4,16 @@ import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/datastore
import pkg/datastore/typedds
import pkg/metrics import pkg/metrics
import ./config import ./config
import ./utils/logging import ./utils/logging
import ./metrics import ./metrics
import ./list import ./list
import ./dht import ./utils/datastoreutils
import ./utils/keyutils import ./installer
import ./crawler import ./state
import ./timetracker import ./component
declareGauge(todoNodesGauge, "DHT nodes to be visited") declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted") declareGauge(okNodesGauge, "DHT nodes successfully contacted")
@ -29,27 +27,13 @@ type
Application* = ref object Application* = ref object
status: ApplicationStatus status: ApplicationStatus
config*: CrawlerConfig config*: Config
todoNodes*: List todoNodes*: List
okNodes*: List okNodes*: List
nokNodes*: List nokNodes*: List
dht*: Dht
crawler*: Crawler
timeTracker*: TimeTracker
proc createDatastore(app: Application, path: string): ?!Datastore =
without store =? LevelDbDatastore.new(path), err:
error "Failed to create datastore"
return failure(err)
return success(Datastore(store))
proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore =
without store =? app.createDatastore(path), err:
return failure(err)
return success(TypedDatastore.init(store))
proc initializeLists(app: Application): Future[?!void] {.async.} = proc initializeLists(app: Application): Future[?!void] {.async.} =
without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err: without store =? createTypedDatastore(app.config.dataDir / "lists"), err:
return failure(err) return failure(err)
# We can't extract this into a function because gauges cannot be passed as argument. # We can't extract this into a function because gauges cannot be passed as argument.
@ -76,71 +60,41 @@ proc initializeLists(app: Application): Future[?!void] {.async.} =
return success() return success()
proc initializeDht(app: Application): Future[?!void] {.async.} =
without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err:
return failure(err)
let keyPath = app.config.dataDir / "privatekey"
without privateKey =? setupKey(keyPath), err:
return failure(err)
var listenAddresses = newSeq[MultiAddress]()
# TODO: when p2p connections are supported:
# let aaa = MultiAddress.init("/ip4/" & app.config.publicIp & "/tcp/53678").expect("Should init multiaddress")
# listenAddresses.add(aaa)
var discAddresses = newSeq[MultiAddress]()
let bbb = MultiAddress
.init("/ip4/" & app.config.publicIp & "/udp/" & $app.config.discPort)
.expect("Should init multiaddress")
discAddresses.add(bbb)
app.dht = Dht.new(
privateKey,
bindPort = app.config.discPort,
announceAddrs = listenAddresses,
bootstrapNodes = app.config.bootNodes,
store = dhtStore,
)
app.dht.updateAnnounceRecord(listenAddresses)
app.dht.updateDhtRecord(discAddresses)
await app.dht.start()
return success()
proc initializeCrawler(app: Application): Future[?!void] {.async.} =
app.crawler =
Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config)
return await app.crawler.start()
proc initializeTimeTracker(app: Application): Future[?!void] {.async.} =
app.timeTracker =
TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config)
return await app.timeTracker.start()
proc initializeApp(app: Application): Future[?!void] {.async.} = proc initializeApp(app: Application): Future[?!void] {.async.} =
if err =? (await app.initializeLists()).errorOption: if err =? (await app.initializeLists()).errorOption:
error "Failed to initialize lists", err = err.msg error "Failed to initialize lists", err = err.msg
return failure(err) return failure(err)
if err =? (await app.initializeDht()).errorOption: # if err =? (await app.initializeDht()).errorOption:
error "Failed to initialize DHT", err = err.msg # error "Failed to initialize DHT", err = err.msg
# return failure(err)
# if err =? (await app.initializeCrawler()).errorOption:
# error "Failed to initialize crawler", err = err.msg
# return failure(err)
# if err =? (await app.initializeTimeTracker()).errorOption:
# error "Failed to initialize timetracker", err = err.msg
# return failure(err)
without components =? (await createComponents(app.config)), err:
error "Failed to create componenents", err = err.msg
return failure(err) return failure(err)
if err =? (await app.initializeCrawler()).errorOption: # todo move this
error "Failed to initialize crawler", err = err.msg let state = State(
return failure(err) config: app.config
)
if err =? (await app.initializeTimeTracker()).errorOption: for c in components:
error "Failed to initialize timetracker", err = err.msg if err =? (await c.start(state)).errorOption:
return failure(err) error "Failed to start component", err = err.msg
return success() return success()
proc stop*(app: Application) = proc stop*(app: Application) =
app.status = ApplicationStatus.Stopping app.status = ApplicationStatus.Stopping
waitFor app.dht.stop() # waitFor app.dht.stop()
proc run*(app: Application) = proc run*(app: Application) =
app.config = parseConfig() app.config = parseConfig()

View File

@ -0,0 +1,14 @@
import pkg/chronos
import pkg/questionable/results
import ./state
type
Component* = ref object of RootObj
method start*(c: Component, state: State): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method")
method stop*(c: Component): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method")

View File

@ -4,18 +4,19 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ./dht import ./dht
import ./list import ../list
import ./nodeentry import ../nodeentry
import ./config import ../config
import ../component
import std/sequtils import std/sequtils
logScope: logScope:
topics = "crawler" topics = "crawler"
type Crawler* = ref object type Crawler* = ref object of Component
dht: Dht dht: Dht
config: CrawlerConfig config: Config
todoNodes: List todoNodes: List
okNodes: List okNodes: List
nokNodes: List nokNodes: List
@ -90,14 +91,18 @@ proc start*(c: Crawler): Future[?!void] {.async.} =
asyncSpawn c.worker() asyncSpawn c.worker()
return success() return success()
proc stop*(c: Crawler): Future[?!void] {.async.} =
return success()
proc new*( proc new*(
T: type Crawler, T: type Crawler,
dht: Dht, dht: Dht,
todoNodes: List, # todoNodes: List,
okNodes: List, # okNodes: List,
nokNodes: List, # nokNodes: List,
config: CrawlerConfig, config: Config,
): Crawler = ): Crawler =
Crawler( raiseAssert("todo")
dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config # Crawler(
) # dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config
# )

View File

@ -7,14 +7,15 @@ import pkg/questionable/results
import pkg/codexdht/discv5/[routing_table, protocol as discv5] import pkg/codexdht/discv5/[routing_table, protocol as discv5]
from pkg/nimcrypto import keccak256 from pkg/nimcrypto import keccak256
import ./utils/rng import ../utils/rng
import ../component
export discv5 export discv5
logScope: logScope:
topics = "dht" topics = "dht"
type Dht* = ref object type Dht* = ref object of Component
protocol*: discv5.Protocol protocol*: discv5.Protocol
key: PrivateKey key: PrivateKey
peerId: PeerId peerId: PeerId
@ -96,12 +97,14 @@ proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil: if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc start*(d: Dht) {.async.} = proc start*(d: Dht): Future[?!void] {.async.} =
d.protocol.open() d.protocol.open()
await d.protocol.start() await d.protocol.start()
return success()
proc stop*(d: Dht) {.async.} = proc stop*(d: Dht): Future[?!void] {.async.} =
await d.protocol.closeWait() await d.protocol.closeWait()
return success()
proc new*( proc new*(
T: type Dht, T: type Dht,

View File

@ -4,15 +4,16 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ./dht import ./dht
import ./list import ../list
import ./nodeentry import ../nodeentry
import ./config import ../config
import ../component
logScope: logScope:
topics = "timetracker" topics = "timetracker"
type TimeTracker* = ref object type TimeTracker* = ref object of Component
config: CrawlerConfig config: Config
todoNodes: List todoNodes: List
okNodes: List okNodes: List
nokNodes: List nokNodes: List
@ -55,21 +56,25 @@ proc start*(t: TimeTracker): Future[?!void] {.async.} =
asyncSpawn t.worker() asyncSpawn t.worker()
return success() return success()
proc stop*(t: TimeTracker): Future[?!void] {.async.} =
return success()
proc new*( proc new*(
T: type TimeTracker, T: type TimeTracker,
todoNodes: List, # todoNodes: List,
okNodes: List, # okNodes: List,
nokNodes: List, # nokNodes: List,
config: CrawlerConfig, config: Config,
): TimeTracker = ): TimeTracker =
var delay = config.revisitDelayMins div 10 raiseAssert("todo")
if delay < 1: # var delay = config.revisitDelayMins div 10
delay = 1 # if delay < 1:
# delay = 1
TimeTracker( # TimeTracker(
todoNodes: todoNodes, # todoNodes: todoNodes,
okNodes: okNodes, # okNodes: okNodes,
nokNodes: nokNodes, # nokNodes: nokNodes,
config: config, # config: config,
workerDelay: delay, # workerDelay: delay,
) # )

View File

@ -27,7 +27,7 @@ Options:
import strutils import strutils
import docopt import docopt
type CrawlerConfig* = ref object type Config* = ref object
logLevel*: string logLevel*: string
publicIp*: string publicIp*: string
metricsAddress*: IpAddress metricsAddress*: IpAddress
@ -38,8 +38,8 @@ type CrawlerConfig* = ref object
stepDelayMs*: int stepDelayMs*: int
revisitDelayMins*: int revisitDelayMins*: int
proc `$`*(config: CrawlerConfig): string = proc `$`*(config: Config): string =
"CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
@ -81,13 +81,13 @@ proc stringToSpr(uri: string): SignedPeerRecord =
proc getBootNodes(input: string): seq[SignedPeerRecord] = proc getBootNodes(input: string): seq[SignedPeerRecord] =
getBootNodeStrings(input).mapIt(stringToSpr(it)) getBootNodeStrings(input).mapIt(stringToSpr(it))
proc parseConfig*(): CrawlerConfig = proc parseConfig*(): Config =
let args = docopt(doc, version = crawlerFullVersion) let args = docopt(doc, version = crawlerFullVersion)
proc get(name: string): string = proc get(name: string): string =
$args[name] $args[name]
return CrawlerConfig( return Config(
logLevel: get("--logLevel"), logLevel: get("--logLevel"),
publicIp: get("--publicIp"), publicIp: get("--publicIp"),
metricsAddress: parseIpAddress(get("--metricsAddress")), metricsAddress: parseIpAddress(get("--metricsAddress")),

View File

@ -0,0 +1,54 @@
import std/os
import pkg/chronos
import pkg/chronicles
import pkg/questionable/results
import ./config
import ./component
import ./components/dht
import ./components/crawler
import ./components/timetracker
import ./utils/keyutils
import ./utils/datastoreutils
proc initializeDht(config: Config): Future[?!Dht] {.async.} =
without dhtStore =? createDatastore(config.dataDir / "dht"), err:
return failure(err)
let keyPath = config.dataDir / "privatekey"
without privateKey =? setupKey(keyPath), err:
return failure(err)
var listenAddresses = newSeq[MultiAddress]()
# TODO: when p2p connections are supported:
# let aaa = MultiAddress.init("/ip4/" & config.publicIp & "/tcp/53678").expect("Should init multiaddress")
# listenAddresses.add(aaa)
var discAddresses = newSeq[MultiAddress]()
let bbb = MultiAddress
.init("/ip4/" & config.publicIp & "/udp/" & $config.discPort)
.expect("Should init multiaddress")
discAddresses.add(bbb)
let dht = Dht.new(
privateKey,
bindPort = config.discPort,
announceAddrs = listenAddresses,
bootstrapNodes = config.bootNodes,
store = dhtStore,
)
dht.updateAnnounceRecord(listenAddresses)
dht.updateDhtRecord(discAddresses)
return success(dht)
proc createComponents*(config: Config): Future[?!seq[Component]] {.async.} =
var components: seq[Component] = newSeq[Component]()
without dht =? (await initializeDht(config)), err:
return failure(err)
components.add(dht)
components.add(Crawler.new(dht, config))
components.add(TimeTracker.new(config))
return success(components)

15
codexcrawler/state.nim Normal file
View File

@ -0,0 +1,15 @@
import pkg/chronos
import pkg/questionable/results
import ./config
type
OnStep = proc(): Future[?!void] {.async: (raises: []), gcsafe.}
State* = ref object
config*: Config
# events
# appstate
proc whileRunning*(this: State, step: OnStep, delay: Duration) =
discard
#todo: while status == running, step(), asyncsleep duration

View File

@ -0,0 +1,15 @@
import pkg/chronicles
import pkg/questionable/results
import pkg/datastore
import pkg/datastore/typedds
proc createDatastore*(path: string): ?!Datastore =
without store =? LevelDbDatastore.new(path), err:
error "Failed to create datastore"
return failure(err)
return success(Datastore(store))
proc createTypedDatastore*(path: string): ?!TypedDatastore =
without store =? createDatastore(path), err:
return failure(err)
return success(TypedDatastore.init(store))