Merge branch 'redesign'

This commit is contained in:
Ben 2025-02-12 14:49:53 +01:00
commit 4b46bb34ba
No known key found for this signature in database
GPG Key ID: 0F16E812E736C24B
46 changed files with 1801 additions and 586 deletions

View File

@ -1,173 +1,80 @@
import std/os
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/datastore/typedds
import pkg/metrics
import ./config
import ./logging
import ./metrics
import ./list
import ./dht
import ./keyutils
import ./crawler
import ./timetracker
import ./utils/logging
import ./utils/asyncdataevent
import ./installer
import ./state
import ./component
import ./types
declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
type Application* = ref object
state: State
components: seq[Component]
type
ApplicationStatus* {.pure.} = enum
Stopped
Stopping
Running
Application* = ref object
status: ApplicationStatus
config*: CrawlerConfig
todoNodes*: List
okNodes*: List
nokNodes*: List
dht*: Dht
crawler*: Crawler
timeTracker*: TimeTracker
proc createDatastore(app: Application, path: string): ?!Datastore =
without store =? LevelDbDatastore.new(path), err:
error "Failed to create datastore"
return failure(err)
return success(Datastore(store))
proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore =
without store =? app.createDatastore(path), err:
return failure(err)
return success(TypedDatastore.init(store))
proc initializeLists(app: Application): Future[?!void] {.async.} =
without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err:
return failure(err)
# We can't extract this into a function because gauges cannot be passed as argument.
# The use of global state in nim-metrics is not pleasant.
proc onTodoMetric(value: int64) =
todoNodesGauge.set(value)
proc onOkMetric(value: int64) =
okNodesGauge.set(value)
proc onNokMetric(value: int64) =
nokNodesGauge.set(value)
app.todoNodes = List.new("todo", store, onTodoMetric)
app.okNodes = List.new("ok", store, onOkMetric)
app.nokNodes = List.new("nok", store, onNokMetric)
if err =? (await app.todoNodes.load()).errorOption:
return failure(err)
if err =? (await app.okNodes.load()).errorOption:
return failure(err)
if err =? (await app.nokNodes.load()).errorOption:
return failure(err)
return success()
proc initializeDht(app: Application): Future[?!void] {.async.} =
without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err:
return failure(err)
let keyPath = app.config.dataDir / "privatekey"
without privateKey =? setupKey(keyPath), err:
return failure(err)
var listenAddresses = newSeq[MultiAddress]()
# TODO: when p2p connections are supported:
# let aaa = MultiAddress.init("/ip4/" & app.config.publicIp & "/tcp/53678").expect("Should init multiaddress")
# listenAddresses.add(aaa)
var discAddresses = newSeq[MultiAddress]()
let bbb = MultiAddress
.init("/ip4/" & app.config.publicIp & "/udp/" & $app.config.discPort)
.expect("Should init multiaddress")
discAddresses.add(bbb)
app.dht = Dht.new(
privateKey,
bindPort = app.config.discPort,
announceAddrs = listenAddresses,
bootstrapNodes = app.config.bootNodes,
store = dhtStore,
proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
app.state = State(
status: ApplicationStatus.Running,
config: config,
events: Events(
nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](),
),
)
app.dht.updateAnnounceRecord(listenAddresses)
app.dht.updateDhtRecord(discAddresses)
without components =? (await createComponents(app.state)), err:
error "Failed to create componenents", err = err.msg
return failure(err)
app.components = components
await app.dht.start()
for c in components:
if err =? (await c.start()).errorOption:
error "Failed to start component", err = err.msg
return success()
proc initializeCrawler(app: Application): Future[?!void] {.async.} =
app.crawler =
Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config)
return await app.crawler.start()
proc initializeTimeTracker(app: Application): Future[?!void] {.async.} =
app.timeTracker =
TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config)
return await app.timeTracker.start()
proc initializeApp(app: Application): Future[?!void] {.async.} =
if err =? (await app.initializeLists()).errorOption:
error "Failed to initialize lists", err = err.msg
return failure(err)
if err =? (await app.initializeDht()).errorOption:
error "Failed to initialize DHT", err = err.msg
return failure(err)
if err =? (await app.initializeCrawler()).errorOption:
error "Failed to initialize crawler", err = err.msg
return failure(err)
if err =? (await app.initializeTimeTracker()).errorOption:
error "Failed to initialize timetracker", err = err.msg
return failure(err)
return success()
proc stopComponents(app: Application) {.async.} =
for c in app.components:
if err =? (await c.stop()).errorOption:
error "Failed to stop component", err = err.msg
proc stop*(app: Application) =
app.status = ApplicationStatus.Stopping
waitFor app.dht.stop()
app.state.status = ApplicationStatus.Stopping
proc run*(app: Application) =
app.config = parseConfig()
info "Loaded configuration", config = app.config
let config = parseConfig()
info "Loaded configuration", config = $config
# Configure loglevel
updateLogLevel(app.config.logLevel)
updateLogLevel(config.logLevel)
# Ensure datadir path exists:
if not existsDir(app.config.dataDir):
createDir(app.config.dataDir)
if not existsDir(config.dataDir):
createDir(config.dataDir)
setupMetrics(app.config.metricsAddress, app.config.metricsPort)
info "Metrics endpoint initialized"
info "Starting application"
app.status = ApplicationStatus.Running
if err =? (waitFor app.initializeApp()).errorOption:
app.status = ApplicationStatus.Stopping
if err =? (waitFor app.initializeApp(config)).errorOption:
app.state.status = ApplicationStatus.Stopping
error "Failed to start application", err = err.msg
return
while app.status == ApplicationStatus.Running:
while app.state.status == ApplicationStatus.Running:
try:
chronos.poll()
except Exception as exc:
error "Unhandled exception", msg = exc.msg
quit QuitFailure
notice "Application closed"
notice "Application stopping..."
waitFor app.stopComponents()
notice "Application stopped"

View File

@ -0,0 +1,12 @@
import pkg/chronos
import pkg/questionable/results
import ./state
type Component* = ref object of RootObj
method start*(c: Component): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method: component.start")
method stop*(c: Component): Future[?!void] {.async, base.} =
raiseAssert("call to abstract method: component.stop")

View File

@ -0,0 +1,59 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ../services/dht
import ./todolist
import ../config
import ../types
import ../component
import ../state
import ../utils/asyncdataevent
logScope:
topics = "crawler"
type Crawler* = ref object of Component
state: State
dht: Dht
todo: TodoList
proc raiseCheckEvent(
c: Crawler, nid: Nid, success: bool
): Future[?!void] {.async: (raises: []).} =
let event = DhtNodeCheckEventData(id: nid, isOk: success)
if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption:
return failure(err)
return success()
proc step(c: Crawler): Future[?!void] {.async: (raises: []).} =
without nid =? (await c.todo.pop()), err:
return failure(err)
without response =? await c.dht.getNeighbors(nid), err:
return failure(err)
if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption:
return failure(err)
if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption:
return failure(err)
return success()
method start*(c: Crawler): Future[?!void] {.async.} =
info "Starting crawler..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await c.step()
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
return success()
method stop*(c: Crawler): Future[?!void] {.async.} =
return success()
proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler =
Crawler(state: state, dht: dht, todo: todo)

View File

@ -0,0 +1,69 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ../list
import ../state
import ../services/metrics
import ../component
import ../utils/asyncdataevent
logScope:
topics = "dhtmetrics"
type DhtMetrics* = ref object of Component
state: State
ok: List
nok: List
sub: AsyncDataEventSubscription
metrics: Metrics
proc handleCheckEvent(
d: DhtMetrics, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
if event.isOk:
?await d.ok.add(event.id)
?await d.nok.remove(event.id)
else:
?await d.ok.remove(event.id)
?await d.nok.add(event.id)
d.metrics.setOkNodes(d.ok.len)
d.metrics.setNokNodes(d.nok.len)
return success()
method start*(d: DhtMetrics): Future[?!void] {.async.} =
info "Starting..."
?await d.ok.load()
?await d.nok.load()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
await d.handleCheckEvent(event)
d.sub = d.state.events.dhtNodeCheck.subscribe(onCheck)
proc logDhtMetrics(): Future[?!void] {.async: (raises: []), gcsafe.} =
trace "Metrics", ok = d.ok.len, nok = d.nok.len
return success()
await d.state.whileRunning(logDhtMetrics, 1.minutes)
return success()
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
await d.state.events.dhtNodeCheck.unsubscribe(d.sub)
return success()
proc new*(
T: type DhtMetrics, state: State, okList: List, nokList: List, metrics: Metrics
): DhtMetrics =
DhtMetrics(state: state, ok: okList, nok: nokList, metrics: metrics)
proc createDhtMetrics*(state: State, metrics: Metrics): ?!DhtMetrics =
without okList =? createList(state.config.dataDir, "dhtok"), err:
return failure(err)
without nokList =? createList(state.config.dataDir, "dhtnok"), err:
return failure(err)
success(DhtMetrics.new(state, okList, nokList, metrics))

View File

@ -0,0 +1,134 @@
import std/os
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable/results
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import ../types
import ../component
import ../state
import ../utils/datastoreutils
import ../utils/asyncdataevent
const nodestoreName = "nodestore"
logScope:
topics = "nodestore"
type
NodeEntry* = object
id*: Nid
lastVisit*: uint64
OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.}
NodeStore* = ref object of Component
state: State
store: TypedDatastore
sub: AsyncDataEventSubscription
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit
proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastVisit: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit))
proc encode*(e: NodeEntry): seq[byte] =
e.toBytes()
proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64))
return NodeEntry.fromBytes(bytes)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
without key =? Key.init(nodestoreName / $nid), err:
return failure(err)
without exists =? (await s.store.has(key)), err:
return failure(err)
if not exists:
let entry = NodeEntry(id: nid, lastVisit: 0)
?await s.store.put(key, entry)
return success(not exists)
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
await s.state.events.newNodesDiscovered.fire(nids)
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
var newNodes = newSeq[Nid]()
for nid in nids:
without isNew =? (await s.storeNodeIsNew(nid)), err:
return failure(err)
if isNew:
newNodes.add(nid)
if newNodes.len > 0:
trace "Discovered new nodes", newNodes = newNodes.len
?await s.fireNewNodesDiscovered(newNodes)
return success()
method iterateAll*(
s: NodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []), base.} =
without queryKey =? Key.init(nodestoreName), err:
return failure(err)
try:
without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err:
return failure(err)
while not iter.finished:
without item =? (await iter.next()), err:
return failure(err)
without value =? item.value, err:
return failure(err)
?await onNode(value)
except CatchableError as exc:
return failure(exc.msg)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} =
info "Starting..."
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
return await s.processFoundNodes(nids)
s.sub = s.state.events.nodesFound.subscribe(onNodesFound)
return success()
method stop*(s: NodeStore): Future[?!void] {.async.} =
await s.state.events.nodesFound.unsubscribe(s.sub)
return success()
proc new*(T: type NodeStore, state: State, store: TypedDatastore): NodeStore =
NodeStore(state: state, store: store)
proc createNodeStore*(state: State): ?!NodeStore =
without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err:
error "Failed to create typed datastore for node store", err = err.msg
return failure(err)
return success(NodeStore.new(state, ds))

View File

@ -0,0 +1,66 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable/results
import ./nodestore
import ../services/dht
import ../component
import ../state
import ../types
import ../utils/asyncdataevent
logScope:
topics = "timetracker"
type TimeTracker* = ref object of Component
state: State
nodestore: NodeStore
dht: Dht
proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
trace "Checking for expired nodes..."
let expiry =
(Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64
var expired = newSeq[Nid]()
proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
if item.lastVisit < expiry:
expired.add(item.id)
return success()
?await t.nodestore.iterateAll(checkNode)
?await t.state.events.nodesExpired.fire(expired)
return success()
proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
trace "Raising routing table nodes..."
let nids = t.dht.getRoutingTableNodeIds()
if err =? (await t.state.events.nodesFound.fire(nids)).errorOption:
return failure(err)
return success()
proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} =
?await t.checkForExpiredNodes()
?await t.raiseRoutingTableNodes()
return success()
method start*(t: TimeTracker): Future[?!void] {.async.} =
info "Starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
await t.step()
var delay = t.state.config.revisitDelayMins div 100
if delay < 1:
delay = 1
await t.state.whileRunning(onStep, delay.minutes)
return success()
method stop*(t: TimeTracker): Future[?!void] {.async.} =
return success()
proc new*(
T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht
): TimeTracker =
TimeTracker(state: state, nodestore: nodestore, dht: dht)

View File

@ -0,0 +1,71 @@
import pkg/chronos
import pkg/chronicles
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import std/sets
import ../state
import ../types
import ../component
import ../utils/asyncdataevent
logScope:
topics = "todolist"
type TodoList* = ref object of Component
nids: seq[Nid]
state: State
subNew: AsyncDataEventSubscription
subExp: AsyncDataEventSubscription
emptySignal: ?Future[void]
proc addNodes(t: TodoList, nids: seq[Nid]) =
for nid in nids:
t.nids.add(nid)
if s =? t.emptySignal:
trace "Nodes added, resuming...", nodes = nids.len
s.complete()
t.emptySignal = Future[void].none
method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
if t.nids.len < 1:
trace "List is empty. Waiting for new items..."
let signal = newFuture[void]("list.emptySignal")
t.emptySignal = some(signal)
try:
await signal.wait(InfiniteDuration)
except CatchableError as exc:
return failure(exc.msg)
if t.nids.len < 1:
return failure("TodoList is empty.")
let item = t.nids[0]
t.nids.del(0)
return success(item)
method start*(t: TodoList): Future[?!void] {.async.} =
info "Starting TodoList..."
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
t.addNodes(nids)
return success()
t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes)
t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes)
return success()
method stop*(t: TodoList): Future[?!void] {.async.} =
await t.state.events.newNodesDiscovered.unsubscribe(t.subNew)
await t.state.events.nodesExpired.unsubscribe(t.subExp)
return success()
proc new*(_: type TodoList, state: State): TodoList =
TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none)
proc createTodoList*(state: State): TodoList =
TodoList.new(state)

View File

@ -3,7 +3,7 @@ import std/sequtils
import pkg/chronicles
import pkg/libp2p
import pkg/codexdht
import ./version
import ./utils/version
let doc =
"""
@ -13,21 +13,21 @@ Usage:
codexcrawler [--logLevel=<l>] [--publicIp=<a>] [--metricsAddress=<ip>] [--metricsPort=<p>] [--dataDir=<dir>] [--discoveryPort=<p>] [--bootNodes=<n>] [--stepDelay=<ms>] [--revisitDelay=<m>]
Options:
--publicIp=<a> Public IP address where this instance is reachable.
--logLevel=<l> Sets log level [default: INFO]
--publicIp=<a> Public IP address where this instance is reachable.
--metricsAddress=<ip> Listen address of the metrics server [default: 0.0.0.0]
--metricsPort=<p> Listen HTTP port of the metrics server [default: 8008]
--dataDir=<dir> Directory for storing data [default: crawler_data]
--discoveryPort=<p> Port used for DHT [default: 8090]
--bootNodes=<n> Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs]
--stepDelay=<ms> Delay in milliseconds per crawl step [default: 1000]
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 1440] (24h)
--revisitDelay=<m> Delay in minutes after which a node can be revisited [default: 10] (24h)
"""
import strutils
import docopt
type CrawlerConfig* = ref object
type Config* = ref object
logLevel*: string
publicIp*: string
metricsAddress*: IpAddress
@ -38,8 +38,8 @@ type CrawlerConfig* = ref object
stepDelayMs*: int
revisitDelayMins*: int
proc `$`*(config: CrawlerConfig): string =
"CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
proc `$`*(config: Config): string =
"Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp &
" metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort &
" dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" &
config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs &
@ -54,6 +54,9 @@ proc getDefaultTestnetBootNodes(): seq[string] =
"spr:CiUIAhIhAzZn3JmJab46BNjadVnLNQKbhnN3eYxwqpteKYY32SbOEgIDARo8CicAJQgCEiEDNmfcmYlpvjoE2Np1Wcs1ApuGc3d5jHCqm14phjfZJs4QrvWesAYaCwoJBKpA-TaRAnViKkcwRQIhANuMmZDD2c25xzTbKSirEpkZYoxbq-FU_lpI0K0e4mIVAiBfQX4yR47h1LCnHznXgDs6xx5DLO5q3lUcicqUeaqGeg",
"spr:CiUIAhIhAgybmRwboqDdUJjeZrzh43sn5mp8jt6ENIb08tLn4x01EgIDARo8CicAJQgCEiECDJuZHBuioN1QmN5mvOHjeyfmanyO3oQ0hvTy0ufjHTUQh4ifsAYaCwoJBI_0zSiRAnVsKkcwRQIhAJCb_z0E3RsnQrEePdJzMSQrmn_ooHv6mbw1DOh5IbVNAiBbBJrWR8eBV6ftzMd6ofa5khNA2h88OBhMqHCIzSjCeA",
"spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9",
"spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn",
"spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k",
"spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw",
]
proc getBootNodeStrings(input: string): seq[string] =
@ -78,13 +81,13 @@ proc stringToSpr(uri: string): SignedPeerRecord =
proc getBootNodes(input: string): seq[SignedPeerRecord] =
getBootNodeStrings(input).mapIt(stringToSpr(it))
proc parseConfig*(): CrawlerConfig =
proc parseConfig*(): Config =
let args = docopt(doc, version = crawlerFullVersion)
proc get(name: string): string =
$args[name]
return CrawlerConfig(
return Config(
logLevel: get("--logLevel"),
publicIp: get("--publicIp"),
metricsAddress: parseIpAddress(get("--metricsAddress")),

View File

@ -1,103 +0,0 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./dht
import ./list
import ./nodeentry
import ./config
import std/sequtils
logScope:
topics = "crawler"
type Crawler* = ref object
dht: Dht
config: CrawlerConfig
todoNodes: List
okNodes: List
nokNodes: List
# This is not going to stay this way.
proc isNew(c: Crawler, node: Node): bool =
not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and
not c.nokNodes.contains(node.id)
proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} =
if err =? (await c.nokNodes.add(target)).errorOption:
error "Failed to add not-OK-node to list", err = err.msg
proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} =
if err =? (await c.okNodes.add(target)).errorOption:
error "Failed to add OK-node to list", err = err.msg
proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} =
let entry = NodeEntry(id: nodeId, lastVisit: 0)
return await c.todoNodes.add(entry)
proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} =
for node in newNodes:
if err =? (await c.addNewTodoNode(node.id)).errorOption:
error "Failed to add todo-node to list", err = err.msg
proc step(c: Crawler) {.async.} =
logScope:
todo = $c.todoNodes.len
ok = $c.okNodes.len
nok = $c.nokNodes.len
without var target =? (await c.todoNodes.pop()), err:
error "Failed to get todo node", err = err.msg
target.lastVisit = Moment.now().epochSeconds.uint64
without receivedNodes =? (await c.dht.getNeighbors(target.id)), err:
await c.handleNodeNotOk(target)
return
let newNodes = receivedNodes.filterIt(isNew(c, it))
if newNodes.len > 0:
trace "Discovered new nodes", newNodes = newNodes.len
await c.handleNodeOk(target)
await c.addNewTodoNodes(newNodes)
# Don't log the status every loop:
if (c.todoNodes.len mod 10) == 0:
trace "Status"
proc worker(c: Crawler) {.async.} =
try:
while true:
await c.step()
await sleepAsync(c.config.stepDelayMs.millis)
except Exception as exc:
error "Exception in crawler worker", msg = exc.msg
quit QuitFailure
proc start*(c: Crawler): Future[?!void] {.async.} =
if c.todoNodes.len < 1:
let nodeIds = c.dht.getRoutingTableNodeIds()
info "Loading routing-table nodes to todo-list...", nodes = nodeIds.len
for id in nodeIds:
if err =? (await c.addNewTodoNode(id)).errorOption:
error "Failed to add routing-table node to todo-list", err = err.msg
return failure(err)
info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs
asyncSpawn c.worker()
return success()
proc new*(
T: type Crawler,
dht: Dht,
todoNodes: List,
okNodes: List,
nokNodes: List,
config: CrawlerConfig,
): Crawler =
Crawler(
dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config
)

View File

@ -1,137 +0,0 @@
import std/net
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/codexdht/discv5/[routing_table, protocol as discv5]
from pkg/nimcrypto import keccak256
import ./rng
export discv5
logScope:
topics = "dht"
type Dht* = ref object
protocol*: discv5.Protocol
key: PrivateKey
peerId: PeerId
announceAddrs*: seq[MultiAddress]
providerRecord*: ?SignedPeerRecord
dhtRecord*: ?SignedPeerRecord
# proc toNodeId*(cid: Cid): NodeId =
# ## Cid to discovery id
# ##
# readUintBE[256](keccak256.digest(cid.data.buffer).data)
# proc toNodeId*(host: ca.Address): NodeId =
# ## Eth address to discovery id
# ##
# readUintBE[256](keccak256.digest(host.toArray).data)
proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
let node = d.protocol.getNode(nodeId)
if node.isSome():
return success(node.get())
return failure("Node not found for id: " & $nodeId)
proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] =
var ids = newSeq[NodeId]()
for bucket in d.protocol.routingTable.buckets:
for node in bucket.nodes:
ids.add(node.id)
return ids
proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} =
without node =? d.getNode(target), err:
return failure(err)
let distances = @[256.uint16]
let response = await d.protocol.findNode(node, distances)
if response.isOk():
let nodes = response.get()
if nodes.len > 0:
return success(nodes)
# Both returning 0 nodes and a failure result are treated as failure of getNeighbors
return failure("No nodes returned")
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
trace "protocol.resolve..."
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
trace "Removing provider", peerId
d.protocol.removeProvidersLocal(peerId)
proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) =
d.announceAddrs = @addrs
trace "Updating announce record", addrs = d.announceAddrs
d.providerRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, d.announceAddrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) =
trace "Updating Dht record", addrs = addrs
d.dhtRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, @addrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
proc start*(d: Dht) {.async.} =
d.protocol.open()
await d.protocol.start()
proc stop*(d: Dht) {.async.} =
await d.protocol.closeWait()
proc new*(
T: type Dht,
key: PrivateKey,
bindIp = IPv4_any(),
bindPort = 0.Port,
announceAddrs: openArray[MultiAddress],
bootstrapNodes: openArray[SignedPeerRecord] = [],
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"),
): Dht =
var self = Dht(key: key, peerId: PeerId.init(key).expect("Should construct PeerId"))
self.updateAnnounceRecord(announceAddrs)
# This disables IP limits:
let discoveryConfig = DiscoveryConfig(
tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)),
bitsPerHop: DefaultBitsPerHop,
)
trace "Creating DHT protocol", ip = $bindIp, port = $bindPort
self.protocol = newProtocol(
key,
bindIp = bindIp,
bindPort = bindPort,
record = self.providerRecord.get,
bootstrapRecords = bootstrapNodes,
rng = Rng.instance(),
providers = ProvidersManager.new(store),
config = discoveryConfig,
)
self

View File

@ -0,0 +1,37 @@
import pkg/chronos
import pkg/questionable/results
import ./state
import ./services/metrics
import ./services/dht
import ./component
import ./components/crawler
import ./components/timetracker
import ./components/nodestore
import ./components/dhtmetrics
import ./components/todolist
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
var components: seq[Component] = newSeq[Component]()
without dht =? (await createDht(state)), err:
return failure(err)
without nodeStore =? createNodeStore(state), err:
return failure(err)
let
metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort)
todoList = createTodoList(state)
without dhtMetrics =? createDhtMetrics(state, metrics), err:
return failure(err)
components.add(dht)
components.add(todoList)
components.add(nodeStore)
components.add(Crawler.new(state, dht, todoList))
components.add(TimeTracker.new(state, nodeStore, dht))
components.add(dhtMetrics)
return success(components)

View File

@ -1,6 +1,6 @@
import std/os
import pkg/chronos
import pkg/chronicles
import pkg/metrics
import pkg/datastore
import pkg/datastore/typedds
import pkg/stew/byteutils
@ -8,47 +8,40 @@ import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/codexdht
import std/sets
import std/strutils
import std/sequtils
import std/os
import ./nodeentry
import ./types
import ./utils/datastoreutils
logScope:
topics = "list"
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].}
type List* = ref object of RootObj
name: string
store: TypedDatastore
items: HashSet[Nid]
List* = ref object
name: string
store: TypedDatastore
items: seq[NodeEntry]
onMetric: OnUpdateMetric
emptySignal: ?Future[void]
proc encode(s: NodeEntry): seq[byte] =
proc encode(s: Nid): seq[byte] =
s.toBytes()
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
proc decode(T: type Nid, bytes: seq[byte]): ?!T =
if bytes.len < 1:
return success(NodeEntry(id: UInt256.fromHex("0"), lastVisit: 0.uint64))
return NodeEntry.fromBytes(bytes)
return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $item.id), err:
proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
without itemKey =? Key.init(this.name / $item), err:
return failure(err)
?await this.store.put(itemKey, item)
return success()
proc load*(this: List): Future[?!void] {.async.} =
method load*(this: List): Future[?!void] {.async, base.} =
without queryKey =? Key.init(this.name), err:
return failure(err)
without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err:
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:
return failure(err)
while not iter.finished:
@ -56,69 +49,43 @@ proc load*(this: List): Future[?!void] {.async.} =
return failure(err)
without value =? item.value, err:
return failure(err)
if value.id > 0 or value.lastVisit > 0:
this.items.add(value)
if value > 0:
this.items.incl(value)
this.onMetric(this.items.len.int64)
info "Loaded list", name = this.name, items = this.items.len
return success()
proc new*(
_: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric
): List =
List(name: name, store: store, onMetric: onMetric)
proc contains*(this: List, nid: Nid): bool =
this.items.anyIt(it == nid)
proc contains*(this: List, nodeId: NodeId): bool =
this.items.anyIt(it.id == nodeId)
proc contains*(this: List, item: NodeEntry): bool =
this.contains(item.id)
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
if this.contains(item):
method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
if this.contains(nid):
return success()
this.items.add(item)
this.onMetric(this.items.len.int64)
this.items.incl(nid)
if isSome(this.emptySignal):
trace "List no longer empty.", name = this.name
this.emptySignal.get().complete()
this.emptySignal = Future[void].none
if err =? (await this.saveItem(item)).errorOption:
if err =? (await this.saveItem(nid)).errorOption:
return failure(err)
return success()
proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} =
if this.items.len < 1:
return failure(this.name & "List is empty.")
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} =
if not this.contains(nid):
return success()
this.items.keepItIf(item.id != it.id)
without itemKey =? Key.init(this.name / $item.id), err:
this.items.excl(nid)
without itemKey =? Key.init(this.name / $nid), err:
return failure(err)
?await this.store.delete(itemKey)
this.onMetric(this.items.len.int64)
return success()
proc pop*(this: List): Future[?!NodeEntry] {.async.} =
if this.items.len < 1:
trace "List is empty. Waiting for new items...", name = this.name
this.emptySignal = some(newFuture[void]("list.emptySignal"))
await this.emptySignal.get().wait(1.hours)
if this.items.len < 1:
return failure(this.name & "List is empty.")
let item = this.items[0]
if err =? (await this.remove(item)).errorOption:
return failure(err)
return success(item)
proc len*(this: List): int =
method len*(this: List): int {.base, gcsafe, raises: [].} =
this.items.len
proc iterateAll*(this: List, onItem: OnItem) {.async.} =
for item in this.items:
onItem(item)
await sleepAsync(1.millis)
proc new*(_: type List, name: string, store: TypedDatastore): List =
List(name: name, store: store)
proc createList*(dataDir: string, name: string): ?!List =
without store =? createTypedDatastore(dataDir / name), err:
return failure(err)
success(List.new(name, store))

View File

@ -1,14 +0,0 @@
import pkg/chronicles
import pkg/metrics
import pkg/metrics/chronos_httpserver
proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) =
let metricsAddress = metricsAddress
notice "Starting metrics HTTP server",
url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics"
try:
startMetricsHttpServer($metricsAddress, metricsPort)
except CatchableError as exc:
raiseAssert exc.msg
except Exception as exc:
raiseAssert exc.msg # TODO fix metrics

View File

@ -1,35 +0,0 @@
import pkg/stew/byteutils
import pkg/stew/endians2
import pkg/questionable
import pkg/questionable/results
import pkg/codexdht
import pkg/chronos
import pkg/libp2p
type NodeEntry* = object
id*: NodeId
lastVisit*: uint64
proc `$`*(entry: NodeEntry): string =
$entry.id & ":" & $entry.lastVisit
proc toBytes*(entry: NodeEntry): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $entry.id)
buffer.write(2, entry.lastVisit)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry =
var
buffer = initProtoBuffer(data)
idStr: string
lastVisit: uint64
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
if buffer.getField(2, lastVisit).isErr:
return failure("Unable to decode `lastVisit`")
return success(NodeEntry(id: UInt256.fromHex(idStr), lastVisit: lastVisit))

View File

@ -0,0 +1,187 @@
import std/os
import std/net
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/codexdht/discv5/[routing_table, protocol as discv5]
from pkg/nimcrypto import keccak256
import ../utils/keyutils
import ../utils/datastoreutils
import ../utils/rng
import ../component
import ../state
import ../types
export discv5
logScope:
topics = "dht"
type
GetNeighborsResponse* = ref object
isResponsive*: bool
nodeIds*: seq[Nid]
Dht* = ref object of Component
state: State
protocol*: discv5.Protocol
key: PrivateKey
peerId: PeerId
announceAddrs*: seq[MultiAddress]
providerRecord*: ?SignedPeerRecord
dhtRecord*: ?SignedPeerRecord
proc responsive(nodeIds: seq[Nid]): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: true, nodeIds: nodeIds)
proc unresponsive(): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: false, nodeIds: newSeq[Nid]())
proc getNode*(d: Dht, nodeId: NodeId): ?!Node =
let node = d.protocol.getNode(nodeId)
if node.isSome():
return success(node.get())
return failure("Node not found for id: " & nodeId.toHex())
method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base, gcsafe, raises: [].} =
var ids = newSeq[Nid]()
for bucket in d.protocol.routingTable.buckets:
for node in bucket.nodes:
ids.add(node.id)
return ids
method getNeighbors*(
d: Dht, target: Nid
): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} =
without node =? d.getNode(target), err:
return success(unresponsive())
let distances = @[256.uint16]
try:
let response = await d.protocol.findNode(node, distances)
if response.isOk():
let nodes = response.get()
return success(responsive(nodes.mapIt(it.id)))
else:
let errmsg = $(response.error())
if errmsg == "Nodes message not received in time":
return success(unresponsive())
return failure(errmsg)
except CatchableError as exc:
return failure(exc.msg)
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
trace "protocol.resolve..."
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
trace "Removing provider", peerId
d.protocol.removeProvidersLocal(peerId)
proc updateAnnounceRecord(d: Dht, addrs: openArray[MultiAddress]) =
d.announceAddrs = @addrs
trace "Updating announce record", addrs = d.announceAddrs
d.providerRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, d.announceAddrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.providerRecord).expect("Should update SPR")
proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
trace "Updating Dht record", addrs = addrs
d.dhtRecord = SignedPeerRecord
.init(d.key, PeerRecord.init(d.peerId, @addrs))
.expect("Should construct signed record").some
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
method start*(d: Dht): Future[?!void] {.async.} =
d.protocol.open()
await d.protocol.start()
return success()
method stop*(d: Dht): Future[?!void] {.async.} =
await d.protocol.closeWait()
return success()
proc new(
T: type Dht,
state: State,
key: PrivateKey,
bindIp = IPv4_any(),
bindPort = 0.Port,
announceAddrs: openArray[MultiAddress],
bootstrapNodes: openArray[SignedPeerRecord] = [],
store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"),
): Dht =
var self = Dht(
state: state, key: key, peerId: PeerId.init(key).expect("Should construct PeerId")
)
self.updateAnnounceRecord(announceAddrs)
# This disables IP limits:
let discoveryConfig = DiscoveryConfig(
tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)),
bitsPerHop: DefaultBitsPerHop,
)
trace "Creating DHT protocol", ip = $bindIp, port = $bindPort
self.protocol = newProtocol(
key,
bindIp = bindIp,
bindPort = bindPort,
record = self.providerRecord.get,
bootstrapRecords = bootstrapNodes,
rng = Rng.instance(),
providers = ProvidersManager.new(store),
config = discoveryConfig,
)
self
proc createDht*(state: State): Future[?!Dht] {.async.} =
without dhtStore =? createDatastore(state.config.dataDir / "dht"), err:
return failure(err)
let keyPath = state.config.dataDir / "privatekey"
without privateKey =? setupKey(keyPath), err:
return failure(err)
var listenAddresses = newSeq[MultiAddress]()
# TODO: when p2p connections are supported:
# let aaa = MultiAddress.init("/ip4/" & state.config.publicIp & "/tcp/53678").expect("Should init multiaddress")
# listenAddresses.add(aaa)
var discAddresses = newSeq[MultiAddress]()
let bbb = MultiAddress
.init("/ip4/" & state.config.publicIp & "/udp/" & $state.config.discPort)
.expect("Should init multiaddress")
discAddresses.add(bbb)
let dht = Dht.new(
state,
privateKey,
bindPort = state.config.discPort,
announceAddrs = listenAddresses,
bootstrapNodes = state.config.bootNodes,
store = dhtStore,
)
dht.updateAnnounceRecord(listenAddresses)
dht.updateDhtRecord(discAddresses)
return success(dht)

View File

@ -0,0 +1,51 @@
import pkg/chronicles
import pkg/metrics
import pkg/metrics/chronos_httpserver
declareGauge(todoNodesGauge, "DHT nodes to be visited")
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
type
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
Metrics* = ref object of RootObj
todoNodes: OnUpdateMetric
okNodes: OnUpdateMetric
nokNodes: OnUpdateMetric
proc startServer(metricsAddress: IpAddress, metricsPort: Port) =
let metricsAddress = metricsAddress
notice "Starting metrics HTTP server",
url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics"
try:
startMetricsHttpServer($metricsAddress, metricsPort)
except CatchableError as exc:
raiseAssert exc.msg
except Exception as exc:
raiseAssert exc.msg # TODO fix metrics
method setTodoNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.todoNodes(value.int64)
method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.okNodes(value.int64)
method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} =
m.nokNodes(value.int64)
proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics =
startServer(metricsAddress, metricsPort)
# We can't extract this into a function because gauges cannot be passed as argument.
# The use of global state in nim-metrics is not pleasant.
proc onTodo(value: int64) =
todoNodesGauge.set(value)
proc onOk(value: int64) =
okNodesGauge.set(value)
proc onNok(value: int64) =
nokNodesGauge.set(value)
return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok)

51
codexcrawler/state.nim Normal file
View File

@ -0,0 +1,51 @@
import pkg/chronos
import pkg/chronicles
import pkg/questionable/results
import ./config
import ./utils/asyncdataevent
import ./types
logScope:
topics = "state"
type
OnStep* = proc(): Future[?!void] {.async: (raises: []), gcsafe.}
DhtNodeCheckEventData* = object
id*: Nid
isOk*: bool
Events* = ref object
nodesFound*: AsyncDataEvent[seq[Nid]]
newNodesDiscovered*: AsyncDataEvent[seq[Nid]]
dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData]
nodesExpired*: AsyncDataEvent[seq[Nid]]
ApplicationStatus* {.pure.} = enum
Stopped
Stopping
Running
State* = ref object of RootObj
status*: ApplicationStatus
config*: Config
events*: Events
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
await sleepAsync(1.seconds)
proc worker(): Future[void] {.async.} =
while s.status == ApplicationStatus.Running:
if err =? (await step()).errorOption:
error "Failure-result caught in main loop. Stopping...", err = err.msg
s.status = ApplicationStatus.Stopping
await sleepAsync(delay)
asyncSpawn worker()
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} =
# We use a small delay before starting the workers because 'whileRunning' is likely called from
# component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling).
# Worker steps might start raising events that other components haven't had time to subscribe to yet.
asyncSpawn s.delayedWorkerStart(step, delay)

View File

@ -1,75 +0,0 @@
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ./dht
import ./list
import ./nodeentry
import ./config
logScope:
topics = "timetracker"
type TimeTracker* = ref object
config: CrawlerConfig
todoNodes: List
okNodes: List
nokNodes: List
workerDelay: int
proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} =
var toMove = newSeq[NodeEntry]()
proc onItem(item: NodeEntry) =
if item.lastVisit < expiry:
toMove.add(item)
await list.iterateAll(onItem)
if toMove.len > 0:
trace "expired node, moving to todo", nodes = $toMove.len
for item in toMove:
if err =? (await t.todoNodes.add(item)).errorOption:
error "Failed to add expired node to todo list", err = err.msg
return
if err =? (await list.remove(item)).errorOption:
error "Failed to remove expired node to source list", err = err.msg
proc step(t: TimeTracker) {.async.} =
let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64
await t.processList(t.okNodes, expiry)
await t.processList(t.nokNodes, expiry)
proc worker(t: TimeTracker) {.async.} =
try:
while true:
await t.step()
await sleepAsync(t.workerDelay.minutes)
except Exception as exc:
error "Exception in timetracker worker", msg = exc.msg
quit QuitFailure
proc start*(t: TimeTracker): Future[?!void] {.async.} =
info "Starting timetracker...", revisitDelayMins = $t.workerDelay
asyncSpawn t.worker()
return success()
proc new*(
T: type TimeTracker,
todoNodes: List,
okNodes: List,
nokNodes: List,
config: CrawlerConfig,
): TimeTracker =
var delay = config.revisitDelayMins div 10
if delay < 1:
delay = 1
TimeTracker(
todoNodes: todoNodes,
okNodes: okNodes,
nokNodes: nokNodes,
config: config,
workerDelay: delay,
)

29
codexcrawler/types.nim Normal file
View File

@ -0,0 +1,29 @@
import pkg/stew/byteutils
import pkg/stint/io
import pkg/questionable/results
import pkg/codexdht
import pkg/libp2p
type Nid* = NodeId
proc `$`*(nid: Nid): string =
nid.toHex()
proc fromStr*(T: type Nid, s: string): Nid =
Nid(UInt256.fromHex(s))
proc toBytes*(nid: Nid): seq[byte] =
var buffer = initProtoBuffer()
buffer.write(1, $nid)
buffer.finish()
return buffer.buffer
proc fromBytes*(_: type Nid, data: openArray[byte]): ?!Nid =
var
buffer = initProtoBuffer(data)
idStr: string
if buffer.getField(1, idStr).isErr:
return failure("Unable to decode `idStr`")
return success(Nid.fromStr(idStr))

View File

@ -0,0 +1,95 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
type
AsyncDataEventSubscription* = ref object
key: EventQueueKey
listenFuture: Future[void]
fireEvent: AsyncEvent
lastResult: ?!void
inHandler: bool
delayedUnsubscribe: bool
AsyncDataEvent*[T] = ref object
queue: AsyncEventQueue[?T]
subscriptions: seq[AsyncDataEventSubscription]
AsyncDataEventHandler*[T] = proc(data: T): Future[?!void]
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
AsyncDataEvent[T](
queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]()
)
proc performUnsubscribe[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
if subscription in event.subscriptions:
await subscription.listenFuture.cancelAndWait()
event.subscriptions.delete(event.subscriptions.find(subscription))
proc subscribe*[T](
event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T]
): AsyncDataEventSubscription =
var subscription = AsyncDataEventSubscription(
key: event.queue.register(),
listenFuture: newFuture[void](),
fireEvent: newAsyncEvent(),
inHandler: false,
delayedUnsubscribe: false,
)
proc listener() {.async.} =
while true:
let items = await event.queue.waitEvents(subscription.key)
for item in items:
if data =? item:
subscription.inHandler = true
subscription.lastResult = (await handler(data))
subscription.inHandler = false
subscription.fireEvent.fire()
subscription.listenFuture = listener()
event.subscriptions.add(subscription)
subscription
proc fire*[T](
event: AsyncDataEvent[T], data: T
): Future[?!void] {.async: (raises: []).} =
event.queue.emit(data.some)
var toUnsubscribe = newSeq[AsyncDataEventSubscription]()
for sub in event.subscriptions:
try:
await sub.fireEvent.wait()
except CancelledError:
discard
if err =? sub.lastResult.errorOption:
return failure(err)
if sub.delayedUnsubscribe:
toUnsubscribe.add(sub)
for sub in toUnsubscribe:
try:
await event.unsubscribe(sub)
except CatchableError as exc:
return failure(exc.msg)
success()
proc unsubscribe*[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
if subscription.inHandler:
subscription.delayedUnsubscribe = true
else:
await event.performUnsubscribe(subscription)
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} =
let all = event.subscriptions
for subscription in all:
await event.unsubscribe(subscription)
proc listeners*[T](event: AsyncDataEvent[T]): int =
event.subscriptions.len

View File

@ -0,0 +1,15 @@
import pkg/chronicles
import pkg/questionable/results
import pkg/datastore
import pkg/datastore/typedds
proc createDatastore*(path: string): ?!Datastore =
without store =? LevelDbDatastore.new(path), err:
error "Failed to create datastore"
return failure(err)
return success(Datastore(store))
proc createTypedDatastore*(path: string): ?!TypedDatastore =
without store =? createDatastore(path), err:
return failure(err)
return success(TypedDatastore.init(store))

View File

@ -0,0 +1,112 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/crawler
import ../../../codexcrawler/services/dht
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mockdht
import ../mocks/mocktodolist
import ../helpers
suite "Crawler":
var
nid1: Nid
nid2: Nid
state: MockState
todo: MockTodoList
dht: MockDht
crawler: Crawler
setup:
nid1 = genNid()
nid2 = genNid()
state = createMockState()
todo = createMockTodoList()
dht = createMockDht()
crawler = Crawler.new(state, dht, todo)
(await crawler.start()).tryGet()
teardown:
(await crawler.stop()).tryGet()
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
proc responsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: true, nodeIds: @[nid])
proc unresponsive(nid: Nid): GetNeighborsResponse =
GetNeighborsResponse(isResponsive: false, nodeIds: @[nid])
test "onStep should pop a node from the todoList and getNeighbors for it":
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid1))
await onStep()
check:
!(dht.getNeighborsArg) == nid1
test "nodes returned by getNeighbors are raised as nodesFound":
var nodesFound = newSeq[Nid]()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
nodesFound = nids
return success()
let sub = state.events.nodesFound.subscribe(onNodesFound)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid2))
await onStep()
check:
nid2 in nodesFound
await state.events.nodesFound.unsubscribe(sub)
test "responsive result from getNeighbors raises the node as successful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
checkEvent = event
return success()
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(responsive(nid2))
await onStep()
check:
checkEvent.id == nid1
checkEvent.isOk == true
await state.events.dhtNodeCheck.unsubscribe(sub)
test "unresponsive result from getNeighbors raises the node as unsuccessful dhtNodeCheck":
var checkEvent = DhtNodeCheckEventData()
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
checkEvent = event
return success()
let sub = state.events.dhtNodeCheck.subscribe(onCheck)
todo.popReturn = success(nid1)
dht.getNeighborsReturn = success(unresponsive(nid2))
await onStep()
check:
checkEvent.id == nid1
checkEvent.isOk == false
await state.events.dhtNodeCheck.unsubscribe(sub)

View File

@ -0,0 +1,90 @@
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/dhtmetrics
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mocklist
import ../mocks/mockmetrics
import ../helpers
suite "DhtMetrics":
var
nid: Nid
state: MockState
okList: MockList
nokList: MockList
metrics: MockMetrics
dhtmetrics: DhtMetrics
setup:
nid = genNid()
state = createMockState()
okList = createMockList()
nokList = createMockList()
metrics = createMockMetrics()
dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics)
(await dhtmetrics.start()).tryGet()
teardown:
(await dhtmetrics.stop()).tryGet()
state.checkAllUnsubscribed()
proc fireDhtNodeCheckEvent(isOk: bool) {.async.} =
let event = DhtNodeCheckEventData(id: nid, isOk: isOk)
(await state.events.dhtNodeCheck.fire(event)).tryGet()
test "dhtmetrics start should load both lists":
check:
okList.loadCalled
nokList.loadCalled
test "dhtNodeCheck event should add node to okList if check is successful":
await fireDhtNodeCheckEvent(true)
check:
nid in okList.added
test "dhtNodeCheck event should add node to nokList if check has failed":
await fireDhtNodeCheckEvent(false)
check:
nid in nokList.added
test "dhtNodeCheck event should remove node from nokList if check is successful":
await fireDhtNodeCheckEvent(true)
check:
nid in nokList.removed
test "dhtNodeCheck event should remove node from okList if check has failed":
await fireDhtNodeCheckEvent(false)
check:
nid in okList.removed
test "dhtNodeCheck event should set okList length as dht-ok metric":
let length = 123
okList.length = length
await fireDhtNodeCheckEvent(true)
check:
metrics.ok == length
test "dhtNodeCheck event should set nokList length as dht-nok metric":
let length = 234
nokList.length = length
await fireDhtNodeCheckEvent(true)
check:
metrics.nok == length

View File

@ -0,0 +1,123 @@
import std/os
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import pkg/datastore/typedds
import ../../../codexcrawler/components/nodestore
import ../../../codexcrawler/utils/datastoreutils
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../mocks/mockstate
import ../helpers
suite "Nodestore":
let
dsPath = getTempDir() / "testds"
nodestoreName = "nodestore"
var
ds: TypedDatastore
state: MockState
store: NodeStore
setup:
ds = createTypedDatastore(dsPath).tryGet()
state = createMockState()
store = NodeStore.new(state, ds)
(await store.start()).tryGet()
teardown:
(await store.stop()).tryGet()
(await ds.close()).tryGet()
state.checkAllUnsubscribed()
removeDir(dsPath)
test "nodeEntry encoding":
let entry = NodeEntry(id: genNid(), lastVisit: 123.uint64)
let
bytes = entry.encode()
decoded = NodeEntry.decode(bytes).tryGet()
check:
entry.id == decoded.id
entry.lastVisit == decoded.lastVisit
test "nodesFound event should store nodes":
let
nid = genNid()
expectedKey = Key.init(nodestoreName / $nid).tryGet()
(await state.events.nodesFound.fire(@[nid])).tryGet()
check:
(await ds.has(expectedKey)).tryGet()
let entry = (await get[NodeEntry](ds, expectedKey)).tryGet()
check:
entry.id == nid
test "nodesFound event should fire newNodesDiscovered":
var newNodes = newSeq[Nid]()
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
newNodes = nids
return success()
let
sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
nid = genNid()
(await state.events.nodesFound.fire(@[nid])).tryGet()
check:
newNodes == @[nid]
await state.events.newNodesDiscovered.unsubscribe(sub)
test "nodesFound event should not fire newNodesDiscovered for previously seen nodes":
let nid = genNid()
# Make nid known first. Then subscribe.
(await state.events.nodesFound.fire(@[nid])).tryGet()
var
newNodes = newSeq[Nid]()
count = 0
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
newNodes = nids
inc count
return success()
let sub = state.events.newNodesDiscovered.subscribe(onNewNodes)
# Firing the event again should not trigger newNodesDiscovered for nid
(await state.events.nodesFound.fire(@[nid])).tryGet()
check:
newNodes.len == 0
count == 0
await state.events.newNodesDiscovered.unsubscribe(sub)
test "iterateAll yields all known nids":
let
nid1 = genNid()
nid2 = genNid()
nid3 = genNid()
(await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet()
var iterNodes = newSeq[Nid]()
proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} =
iterNodes.add(entry.id)
return success()
(await store.iterateAll(onNode)).tryGet()
check:
nid1 in iterNodes
nid2 in iterNodes
nid3 in iterNodes

View File

@ -0,0 +1,95 @@
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/timetracker
import ../../../codexcrawler/components/nodestore
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../mocks/mocknodestore
import ../mocks/mockdht
import ../helpers
suite "TimeTracker":
var
nid: Nid
state: MockState
store: MockNodeStore
dht: MockDht
time: TimeTracker
expiredNodesReceived: seq[Nid]
sub: AsyncDataEventSubscription
setup:
nid = genNid()
state = createMockState()
store = createMockNodeStore()
dht = createMockDht()
# Subscribe to nodesExpired event
expiredNodesReceived = newSeq[Nid]()
proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} =
expiredNodesReceived = nids
return success()
sub = state.events.nodesExpired.subscribe(onExpired)
state.config.revisitDelayMins = 22
time = TimeTracker.new(state, store, dht)
(await time.start()).tryGet()
teardown:
(await time.stop()).tryGet()
await state.events.nodesExpired.unsubscribe(sub)
state.checkAllUnsubscribed()
proc onStep() {.async.} =
(await state.stepper()).tryGet()
proc createNodeInStore(lastVisit: uint64): Nid =
let entry = NodeEntry(id: genNid(), lastVisit: lastVisit)
store.nodesToIterate.add(entry)
return entry.id
test "onStep fires nodesExpired event for expired nodes":
let
expiredTimestamp =
(Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64
expiredNodeId = createNodeInStore(expiredTimestamp)
await onStep()
check:
expiredNodeId in expiredNodesReceived
test "onStep does not fire nodesExpired event for nodes that are recent":
let
recentTimestamp =
(Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64
recentNodeId = createNodeInStore(recentTimestamp)
await onStep()
check:
recentNodeId notin expiredNodesReceived
test "onStep raises routingTable nodes as nodesFound":
var nodesFound = newSeq[Nid]()
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
nodesFound = nids
return success()
let sub = state.events.nodesFound.subscribe(onNodesFound)
dht.routingTable.add(nid)
await onStep()
check:
nid in nodesFound
await state.events.nodesFound.unsubscribe(sub)

View File

@ -0,0 +1,59 @@
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/components/todolist
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/state
import ../mocks/mockstate
import ../helpers
suite "TodoList":
var
nid: Nid
state: MockState
todo: TodoList
setup:
nid = genNid()
state = createMockState()
todo = TodoList.new(state)
(await todo.start()).tryGet()
teardown:
(await todo.stop()).tryGet()
state.checkAllUnsubscribed()
proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} =
(await state.events.newNodesDiscovered.fire(nids)).tryGet()
proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} =
(await state.events.nodesExpired.fire(nids)).tryGet()
test "discovered nodes are added to todo list":
await fireNewNodesDiscoveredEvent(@[nid])
let item = (await todo.pop).tryGet()
check:
item == nid
test "expired nodes are added to todo list":
await fireNodesExpiredEvent(@[nid])
let item = (await todo.pop).tryGet()
check:
item == nid
test "pop on empty todo list waits until item is added":
let popFuture = todo.pop()
check:
not popFuture.finished
await fireNewNodesDiscoveredEvent(@[nid])
check:
popFuture.finished
popFuture.value.tryGet() == nid

View File

@ -1,7 +0,0 @@
import pkg/asynctest/chronos/unittest
suite "Example tests":
test "Example":
echo "Woo!"
check:
1 == 1

View File

@ -0,0 +1,6 @@
import std/random
import pkg/stint
import ../../codexcrawler/types
proc genNid*(): Nid =
Nid(rand(uint64).u256)

View File

@ -0,0 +1,28 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import ../../../codexcrawler/services/dht
import ../../../codexcrawler/types
type MockDht* = ref object of Dht
routingTable*: seq[Nid]
getNeighborsArg*: ?Nid
getNeighborsReturn*: ?!GetNeighborsResponse
method getRoutingTableNodeIds*(d: MockDht): seq[Nid] =
return d.routingTable
method getNeighbors*(
d: MockDht, target: Nid
): Future[?!GetNeighborsResponse] {.async: (raises: []).} =
d.getNeighborsArg = some(target)
return d.getNeighborsReturn
method start*(d: MockDht): Future[?!void] {.async.} =
return success()
method stop*(d: MockDht): Future[?!void] {.async.} =
return success()
proc createMockDht*(): MockDht =
MockDht()

View File

@ -0,0 +1,42 @@
import pkg/chronos
import pkg/questionable/results
import ../../../codexcrawler/types
import ../../../codexcrawler/list
type MockList* = ref object of List
loadCalled*: bool
added*: seq[Nid]
addSuccess*: bool
removed*: seq[Nid]
removeSuccess*: bool
length*: int
method load*(this: MockList): Future[?!void] {.async.} =
this.loadCalled = true
return success()
method add*(this: MockList, nid: Nid): Future[?!void] {.async.} =
this.added.add(nid)
if this.addSuccess:
return success()
return failure("test failure")
method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} =
this.removed.add(nid)
if this.removeSuccess:
return success()
return failure("test failure")
method len*(this: MockList): int =
return this.length
proc createMockList*(): MockList =
MockList(
loadCalled: false,
added: newSeq[Nid](),
addSuccess: true,
removed: newSeq[Nid](),
removeSuccess: true,
length: 0,
)

View File

@ -0,0 +1,18 @@
import ../../../codexcrawler/services/metrics
type MockMetrics* = ref object of Metrics
todo*: int
ok*: int
nok*: int
method setTodoNodes*(m: MockMetrics, value: int) =
m.todo = value
method setOkNodes*(m: MockMetrics, value: int) =
m.ok = value
method setNokNodes*(m: MockMetrics, value: int) =
m.nok = value
proc createMockMetrics*(): MockMetrics =
MockMetrics()

View File

@ -0,0 +1,24 @@
import std/sequtils
import pkg/questionable/results
import pkg/chronos
import ../../../codexcrawler/components/nodestore
type MockNodeStore* = ref object of NodeStore
nodesToIterate*: seq[NodeEntry]
method iterateAll*(
s: MockNodeStore, onNode: OnNodeEntry
): Future[?!void] {.async: (raises: []).} =
for node in s.nodesToIterate:
?await onNode(node)
return success()
method start*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
method stop*(s: MockNodeStore): Future[?!void] {.async.} =
return success()
proc createMockNodeStore*(): MockNodeStore =
MockNodeStore(nodesToIterate: newSeq[NodeEntry]())

View File

@ -0,0 +1,30 @@
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/state
import ../../../codexcrawler/utils/asyncdataevent
import ../../../codexcrawler/types
import ../../../codexcrawler/config
type MockState* = ref object of State
stepper*: OnStep
proc checkAllUnsubscribed*(s: MockState) =
check:
s.events.nodesFound.listeners == 0
s.events.newNodesDiscovered.listeners == 0
s.events.dhtNodeCheck.listeners == 0
s.events.nodesExpired.listeners == 0
method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} =
s.stepper = step
proc createMockState*(): MockState =
MockState(
status: ApplicationStatus.Running,
config: Config(),
events: Events(
nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](),
),
)

View File

@ -0,0 +1,20 @@
import pkg/chronos
import pkg/questionable/results
import ../../../codexcrawler/components/todolist
import ../../../codexcrawler/types
type MockTodoList* = ref object of TodoList
popReturn*: ?!Nid
method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} =
return t.popReturn
method start*(t: MockTodoList): Future[?!void] {.async.} =
return success()
method stop*(t: MockTodoList): Future[?!void] {.async.} =
return success()
proc createMockTodoList*(): MockTodoList =
MockTodoList()

View File

@ -0,0 +1,7 @@
import ./components/testnodestore
import ./components/testdhtmetrics
import ./components/testtodolist
import ./components/testtimetracker
import ./components/testcrawler
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,42 @@
import pkg/chronos
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../codexcrawler/state
import ../../codexcrawler/config
import ../../codexcrawler/types
import ../../codexcrawler/utils/asyncdataevent
suite "State":
var state: State
setup:
state = State(
status: ApplicationStatus.Running,
config: Config(),
events: Events(
nodesFound: newAsyncDataEvent[seq[Nid]](),
newNodesDiscovered: newAsyncDataEvent[seq[Nid]](),
dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](),
nodesExpired: newAsyncDataEvent[seq[Nid]](),
),
)
test "whileRunning":
var counter = 0
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =
inc counter
return success()
await state.whileRunning(onStep, 1.milliseconds)
while counter < 5:
await sleepAsync(1.milliseconds)
state.status = ApplicationStatus.Stopped
await sleepAsync(10.milliseconds)
check:
counter == 5

View File

@ -0,0 +1,23 @@
import pkg/chronos
import pkg/asynctest/chronos/unittest
import pkg/questionable/results
import ../../codexcrawler/types
import ./helpers
suite "Types":
test "nid string encoding":
let
nid = genNid()
str = $nid
check:
nid == Nid.fromStr(str)
test "nid byte encoding":
let
nid = genNid()
bytes = nid.toBytes()
check:
nid == Nid.fromBytes(bytes).tryGet()

View File

@ -0,0 +1,3 @@
import ./utils/testasyncdataevent
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,107 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/asynctest/chronos/unittest
import ../../../codexcrawler/utils/asyncdataevent
type ExampleData = object
s: string
suite "AsyncDataEvent":
var event: AsyncDataEvent[ExampleData]
let msg = "Yeah!"
setup:
event = newAsyncDataEvent[ExampleData]()
teardown:
await event.unsubscribeAll()
test "Successful event":
var data = ""
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
data = e.s
success()
let s = event.subscribe(eventHandler)
check:
isOK(await event.fire(ExampleData(s: msg)))
data == msg
await event.unsubscribe(s)
test "Failed event preserves error message":
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
failure(msg)
let s = event.subscribe(eventHandler)
let fireResult = await event.fire(ExampleData(s: "a"))
check:
fireResult.isErr
fireResult.error.msg == msg
await event.unsubscribe(s)
test "Emits data to multiple subscribers":
var
data1 = ""
data2 = ""
data3 = ""
proc handler1(e: ExampleData): Future[?!void] {.async.} =
data1 = e.s
success()
proc handler2(e: ExampleData): Future[?!void] {.async.} =
data2 = e.s
success()
proc handler3(e: ExampleData): Future[?!void] {.async.} =
data3 = e.s
success()
let
s1 = event.subscribe(handler1)
s2 = event.subscribe(handler2)
s3 = event.subscribe(handler3)
let fireResult = await event.fire(ExampleData(s: msg))
check:
fireResult.isOK
data1 == msg
data2 == msg
data3 == msg
await event.unsubscribe(s1)
await event.unsubscribe(s2)
await event.unsubscribe(s3)
test "Can fire and event without subscribers":
check:
isOK(await event.fire(ExampleData(s: msg)))
test "Can unsubscribe in handler":
proc doNothing() {.async, closure.} =
await sleepAsync(1.millis)
var callback = doNothing
proc eventHandler(e: ExampleData): Future[?!void] {.async.} =
await callback()
success()
let s = event.subscribe(eventHandler)
proc doUnsubscribe() {.async.} =
await event.unsubscribe(s)
callback = doUnsubscribe
check:
isOK(await event.fire(ExampleData(s: msg)))
await event.unsubscribe(s)

1
tests/config.nims Normal file
View File

@ -0,0 +1 @@
switch("define", "chronicles_log_level=ERROR")

View File

@ -1,3 +1,6 @@
import ./codexcrawler/exampletest
import ./codexcrawler/testutils
import ./codexcrawler/testcomponents
import ./codexcrawler/testtypes
import ./codexcrawler/teststate
{.warning[UnusedImport]: off.}