diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index 8909816..bbc0734 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -1,173 +1,80 @@ import std/os -import std/sequtils import pkg/chronicles import pkg/chronos import pkg/questionable import pkg/questionable/results -import pkg/datastore -import pkg/datastore/typedds import pkg/metrics import ./config -import ./logging -import ./metrics -import ./list -import ./dht -import ./keyutils -import ./crawler -import ./timetracker +import ./utils/logging +import ./utils/asyncdataevent +import ./installer +import ./state +import ./component +import ./types -declareGauge(todoNodesGauge, "DHT nodes to be visited") -declareGauge(okNodesGauge, "DHT nodes successfully contacted") -declareGauge(nokNodesGauge, "DHT nodes failed to contact") +type Application* = ref object + state: State + components: seq[Component] -type - ApplicationStatus* {.pure.} = enum - Stopped - Stopping - Running - - Application* = ref object - status: ApplicationStatus - config*: CrawlerConfig - todoNodes*: List - okNodes*: List - nokNodes*: List - dht*: Dht - crawler*: Crawler - timeTracker*: TimeTracker - -proc createDatastore(app: Application, path: string): ?!Datastore = - without store =? LevelDbDatastore.new(path), err: - error "Failed to create datastore" - return failure(err) - return success(Datastore(store)) - -proc createTypedDatastore(app: Application, path: string): ?!TypedDatastore = - without store =? app.createDatastore(path), err: - return failure(err) - return success(TypedDatastore.init(store)) - -proc initializeLists(app: Application): Future[?!void] {.async.} = - without store =? app.createTypedDatastore(app.config.dataDir / "lists"), err: - return failure(err) - - # We can't extract this into a function because gauges cannot be passed as argument. - # The use of global state in nim-metrics is not pleasant. - proc onTodoMetric(value: int64) = - todoNodesGauge.set(value) - - proc onOkMetric(value: int64) = - okNodesGauge.set(value) - - proc onNokMetric(value: int64) = - nokNodesGauge.set(value) - - app.todoNodes = List.new("todo", store, onTodoMetric) - app.okNodes = List.new("ok", store, onOkMetric) - app.nokNodes = List.new("nok", store, onNokMetric) - - if err =? (await app.todoNodes.load()).errorOption: - return failure(err) - if err =? (await app.okNodes.load()).errorOption: - return failure(err) - if err =? (await app.nokNodes.load()).errorOption: - return failure(err) - - return success() - -proc initializeDht(app: Application): Future[?!void] {.async.} = - without dhtStore =? app.createDatastore(app.config.dataDir / "dht"), err: - return failure(err) - let keyPath = app.config.dataDir / "privatekey" - without privateKey =? setupKey(keyPath), err: - return failure(err) - - var listenAddresses = newSeq[MultiAddress]() - # TODO: when p2p connections are supported: - # let aaa = MultiAddress.init("/ip4/" & app.config.publicIp & "/tcp/53678").expect("Should init multiaddress") - # listenAddresses.add(aaa) - - var discAddresses = newSeq[MultiAddress]() - let bbb = MultiAddress - .init("/ip4/" & app.config.publicIp & "/udp/" & $app.config.discPort) - .expect("Should init multiaddress") - discAddresses.add(bbb) - - app.dht = Dht.new( - privateKey, - bindPort = app.config.discPort, - announceAddrs = listenAddresses, - bootstrapNodes = app.config.bootNodes, - store = dhtStore, +proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = + app.state = State( + status: ApplicationStatus.Running, + config: config, + events: Events( + nodesFound: newAsyncDataEvent[seq[Nid]](), + newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), + dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), + nodesExpired: newAsyncDataEvent[seq[Nid]](), + ), ) - app.dht.updateAnnounceRecord(listenAddresses) - app.dht.updateDhtRecord(discAddresses) + without components =? (await createComponents(app.state)), err: + error "Failed to create componenents", err = err.msg + return failure(err) + app.components = components - await app.dht.start() + for c in components: + if err =? (await c.start()).errorOption: + error "Failed to start component", err = err.msg return success() -proc initializeCrawler(app: Application): Future[?!void] {.async.} = - app.crawler = - Crawler.new(app.dht, app.todoNodes, app.okNodes, app.nokNodes, app.config) - return await app.crawler.start() - -proc initializeTimeTracker(app: Application): Future[?!void] {.async.} = - app.timeTracker = - TimeTracker.new(app.todoNodes, app.okNodes, app.nokNodes, app.config) - return await app.timeTracker.start() - -proc initializeApp(app: Application): Future[?!void] {.async.} = - if err =? (await app.initializeLists()).errorOption: - error "Failed to initialize lists", err = err.msg - return failure(err) - - if err =? (await app.initializeDht()).errorOption: - error "Failed to initialize DHT", err = err.msg - return failure(err) - - if err =? (await app.initializeCrawler()).errorOption: - error "Failed to initialize crawler", err = err.msg - return failure(err) - - if err =? (await app.initializeTimeTracker()).errorOption: - error "Failed to initialize timetracker", err = err.msg - return failure(err) - - return success() +proc stopComponents(app: Application) {.async.} = + for c in app.components: + if err =? (await c.stop()).errorOption: + error "Failed to stop component", err = err.msg proc stop*(app: Application) = - app.status = ApplicationStatus.Stopping - waitFor app.dht.stop() + app.state.status = ApplicationStatus.Stopping proc run*(app: Application) = - app.config = parseConfig() - info "Loaded configuration", config = app.config + let config = parseConfig() + info "Loaded configuration", config = $config # Configure loglevel - updateLogLevel(app.config.logLevel) + updateLogLevel(config.logLevel) # Ensure datadir path exists: - if not existsDir(app.config.dataDir): - createDir(app.config.dataDir) + if not existsDir(config.dataDir): + createDir(config.dataDir) - setupMetrics(app.config.metricsAddress, app.config.metricsPort) info "Metrics endpoint initialized" info "Starting application" - app.status = ApplicationStatus.Running - if err =? (waitFor app.initializeApp()).errorOption: - app.status = ApplicationStatus.Stopping + if err =? (waitFor app.initializeApp(config)).errorOption: + app.state.status = ApplicationStatus.Stopping error "Failed to start application", err = err.msg return - while app.status == ApplicationStatus.Running: + while app.state.status == ApplicationStatus.Running: try: chronos.poll() except Exception as exc: error "Unhandled exception", msg = exc.msg quit QuitFailure - notice "Application closed" + + notice "Application stopping..." + waitFor app.stopComponents() + notice "Application stopped" diff --git a/codexcrawler/component.nim b/codexcrawler/component.nim new file mode 100644 index 0000000..f36b0a2 --- /dev/null +++ b/codexcrawler/component.nim @@ -0,0 +1,12 @@ +import pkg/chronos +import pkg/questionable/results + +import ./state + +type Component* = ref object of RootObj + +method start*(c: Component): Future[?!void] {.async, base.} = + raiseAssert("call to abstract method: component.start") + +method stop*(c: Component): Future[?!void] {.async, base.} = + raiseAssert("call to abstract method: component.stop") diff --git a/codexcrawler/components/crawler.nim b/codexcrawler/components/crawler.nim new file mode 100644 index 0000000..b4db6f0 --- /dev/null +++ b/codexcrawler/components/crawler.nim @@ -0,0 +1,59 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ../services/dht +import ./todolist +import ../config +import ../types +import ../component +import ../state +import ../utils/asyncdataevent + +logScope: + topics = "crawler" + +type Crawler* = ref object of Component + state: State + dht: Dht + todo: TodoList + +proc raiseCheckEvent( + c: Crawler, nid: Nid, success: bool +): Future[?!void] {.async: (raises: []).} = + let event = DhtNodeCheckEventData(id: nid, isOk: success) + if err =? (await c.state.events.dhtNodeCheck.fire(event)).errorOption: + return failure(err) + return success() + +proc step(c: Crawler): Future[?!void] {.async: (raises: []).} = + without nid =? (await c.todo.pop()), err: + return failure(err) + + without response =? await c.dht.getNeighbors(nid), err: + return failure(err) + + if err =? (await c.raiseCheckEvent(nid, response.isResponsive)).errorOption: + return failure(err) + + if err =? (await c.state.events.nodesFound.fire(response.nodeIds)).errorOption: + return failure(err) + + return success() + +method start*(c: Crawler): Future[?!void] {.async.} = + info "Starting crawler..." + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + await c.step() + + await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) + + return success() + +method stop*(c: Crawler): Future[?!void] {.async.} = + return success() + +proc new*(T: type Crawler, state: State, dht: Dht, todo: TodoList): Crawler = + Crawler(state: state, dht: dht, todo: todo) diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim new file mode 100644 index 0000000..cbf2a9c --- /dev/null +++ b/codexcrawler/components/dhtmetrics.nim @@ -0,0 +1,69 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import ../list +import ../state +import ../services/metrics +import ../component +import ../utils/asyncdataevent + +logScope: + topics = "dhtmetrics" + +type DhtMetrics* = ref object of Component + state: State + ok: List + nok: List + sub: AsyncDataEventSubscription + metrics: Metrics + +proc handleCheckEvent( + d: DhtMetrics, event: DhtNodeCheckEventData +): Future[?!void] {.async.} = + if event.isOk: + ?await d.ok.add(event.id) + ?await d.nok.remove(event.id) + else: + ?await d.ok.remove(event.id) + ?await d.nok.add(event.id) + + d.metrics.setOkNodes(d.ok.len) + d.metrics.setNokNodes(d.nok.len) + return success() + +method start*(d: DhtMetrics): Future[?!void] {.async.} = + info "Starting..." + ?await d.ok.load() + ?await d.nok.load() + + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + await d.handleCheckEvent(event) + + d.sub = d.state.events.dhtNodeCheck.subscribe(onCheck) + + proc logDhtMetrics(): Future[?!void] {.async: (raises: []), gcsafe.} = + trace "Metrics", ok = d.ok.len, nok = d.nok.len + return success() + + await d.state.whileRunning(logDhtMetrics, 1.minutes) + + return success() + +method stop*(d: DhtMetrics): Future[?!void] {.async.} = + await d.state.events.dhtNodeCheck.unsubscribe(d.sub) + return success() + +proc new*( + T: type DhtMetrics, state: State, okList: List, nokList: List, metrics: Metrics +): DhtMetrics = + DhtMetrics(state: state, ok: okList, nok: nokList, metrics: metrics) + +proc createDhtMetrics*(state: State, metrics: Metrics): ?!DhtMetrics = + without okList =? createList(state.config.dataDir, "dhtok"), err: + return failure(err) + without nokList =? createList(state.config.dataDir, "dhtnok"), err: + return failure(err) + + success(DhtMetrics.new(state, okList, nokList, metrics)) diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim new file mode 100644 index 0000000..1a7509e --- /dev/null +++ b/codexcrawler/components/nodestore.nim @@ -0,0 +1,134 @@ +import std/os +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable/results +import pkg/chronicles +import pkg/chronos +import pkg/libp2p + +import ../types +import ../component +import ../state +import ../utils/datastoreutils +import ../utils/asyncdataevent + +const nodestoreName = "nodestore" + +logScope: + topics = "nodestore" + +type + NodeEntry* = object + id*: Nid + lastVisit*: uint64 + + OnNodeEntry* = proc(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} + + NodeStore* = ref object of Component + state: State + store: TypedDatastore + sub: AsyncDataEventSubscription + +proc `$`*(entry: NodeEntry): string = + $entry.id & ":" & $entry.lastVisit + +proc toBytes*(entry: NodeEntry): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $entry.id) + buffer.write(2, entry.lastVisit) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = + var + buffer = initProtoBuffer(data) + idStr: string + lastVisit: uint64 + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + if buffer.getField(2, lastVisit).isErr: + return failure("Unable to decode `lastVisit`") + + return success(NodeEntry(id: Nid.fromStr(idStr), lastVisit: lastVisit)) + +proc encode*(e: NodeEntry): seq[byte] = + e.toBytes() + +proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = + if bytes.len < 1: + return success(NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64)) + return NodeEntry.fromBytes(bytes) + +proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = + without key =? Key.init(nodestoreName / $nid), err: + return failure(err) + without exists =? (await s.store.has(key)), err: + return failure(err) + + if not exists: + let entry = NodeEntry(id: nid, lastVisit: 0) + ?await s.store.put(key, entry) + + return success(not exists) + +proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = + await s.state.events.newNodesDiscovered.fire(nids) + +proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = + var newNodes = newSeq[Nid]() + for nid in nids: + without isNew =? (await s.storeNodeIsNew(nid)), err: + return failure(err) + if isNew: + newNodes.add(nid) + + if newNodes.len > 0: + trace "Discovered new nodes", newNodes = newNodes.len + ?await s.fireNewNodesDiscovered(newNodes) + return success() + +method iterateAll*( + s: NodeStore, onNode: OnNodeEntry +): Future[?!void] {.async: (raises: []), base.} = + without queryKey =? Key.init(nodestoreName), err: + return failure(err) + try: + without iter =? (await query[NodeEntry](s.store, Query.init(queryKey))), err: + return failure(err) + + while not iter.finished: + without item =? (await iter.next()), err: + return failure(err) + without value =? item.value, err: + return failure(err) + + ?await onNode(value) + except CatchableError as exc: + return failure(exc.msg) + + return success() + +method start*(s: NodeStore): Future[?!void] {.async.} = + info "Starting..." + + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + return await s.processFoundNodes(nids) + + s.sub = s.state.events.nodesFound.subscribe(onNodesFound) + return success() + +method stop*(s: NodeStore): Future[?!void] {.async.} = + await s.state.events.nodesFound.unsubscribe(s.sub) + return success() + +proc new*(T: type NodeStore, state: State, store: TypedDatastore): NodeStore = + NodeStore(state: state, store: store) + +proc createNodeStore*(state: State): ?!NodeStore = + without ds =? createTypedDatastore(state.config.dataDir / "nodestore"), err: + error "Failed to create typed datastore for node store", err = err.msg + return failure(err) + + return success(NodeStore.new(state, ds)) diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim new file mode 100644 index 0000000..67eb48c --- /dev/null +++ b/codexcrawler/components/timetracker.nim @@ -0,0 +1,66 @@ +import pkg/chronicles +import pkg/chronos +import pkg/questionable/results + +import ./nodestore +import ../services/dht +import ../component +import ../state +import ../types +import ../utils/asyncdataevent + +logScope: + topics = "timetracker" + +type TimeTracker* = ref object of Component + state: State + nodestore: NodeStore + dht: Dht + +proc checkForExpiredNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + trace "Checking for expired nodes..." + let expiry = + (Moment.now().epochSeconds - (t.state.config.revisitDelayMins * 60)).uint64 + + var expired = newSeq[Nid]() + proc checkNode(item: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + if item.lastVisit < expiry: + expired.add(item.id) + return success() + + ?await t.nodestore.iterateAll(checkNode) + ?await t.state.events.nodesExpired.fire(expired) + return success() + +proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + trace "Raising routing table nodes..." + let nids = t.dht.getRoutingTableNodeIds() + if err =? (await t.state.events.nodesFound.fire(nids)).errorOption: + return failure(err) + return success() + +proc step(t: TimeTracker): Future[?!void] {.async: (raises: []).} = + ?await t.checkForExpiredNodes() + ?await t.raiseRoutingTableNodes() + return success() + +method start*(t: TimeTracker): Future[?!void] {.async.} = + info "Starting..." + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + await t.step() + + var delay = t.state.config.revisitDelayMins div 100 + if delay < 1: + delay = 1 + + await t.state.whileRunning(onStep, delay.minutes) + return success() + +method stop*(t: TimeTracker): Future[?!void] {.async.} = + return success() + +proc new*( + T: type TimeTracker, state: State, nodestore: NodeStore, dht: Dht +): TimeTracker = + TimeTracker(state: state, nodestore: nodestore, dht: dht) diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim new file mode 100644 index 0000000..f84e2c1 --- /dev/null +++ b/codexcrawler/components/todolist.nim @@ -0,0 +1,71 @@ +import pkg/chronos +import pkg/chronicles +import pkg/datastore +import pkg/datastore/typedds +import pkg/questionable +import pkg/questionable/results + +import std/sets + +import ../state +import ../types +import ../component +import ../utils/asyncdataevent + +logScope: + topics = "todolist" + +type TodoList* = ref object of Component + nids: seq[Nid] + state: State + subNew: AsyncDataEventSubscription + subExp: AsyncDataEventSubscription + emptySignal: ?Future[void] + +proc addNodes(t: TodoList, nids: seq[Nid]) = + for nid in nids: + t.nids.add(nid) + + if s =? t.emptySignal: + trace "Nodes added, resuming...", nodes = nids.len + s.complete() + t.emptySignal = Future[void].none + +method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} = + if t.nids.len < 1: + trace "List is empty. Waiting for new items..." + let signal = newFuture[void]("list.emptySignal") + t.emptySignal = some(signal) + try: + await signal.wait(InfiniteDuration) + except CatchableError as exc: + return failure(exc.msg) + if t.nids.len < 1: + return failure("TodoList is empty.") + + let item = t.nids[0] + t.nids.del(0) + + return success(item) + +method start*(t: TodoList): Future[?!void] {.async.} = + info "Starting TodoList..." + + proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = + t.addNodes(nids) + return success() + + t.subNew = t.state.events.newNodesDiscovered.subscribe(onNewNodes) + t.subExp = t.state.events.nodesExpired.subscribe(onNewNodes) + return success() + +method stop*(t: TodoList): Future[?!void] {.async.} = + await t.state.events.newNodesDiscovered.unsubscribe(t.subNew) + await t.state.events.nodesExpired.unsubscribe(t.subExp) + return success() + +proc new*(_: type TodoList, state: State): TodoList = + TodoList(nids: newSeq[Nid](), state: state, emptySignal: Future[void].none) + +proc createTodoList*(state: State): TodoList = + TodoList.new(state) diff --git a/codexcrawler/config.nim b/codexcrawler/config.nim index 4692670..71e183b 100644 --- a/codexcrawler/config.nim +++ b/codexcrawler/config.nim @@ -3,7 +3,7 @@ import std/sequtils import pkg/chronicles import pkg/libp2p import pkg/codexdht -import ./version +import ./utils/version let doc = """ @@ -13,21 +13,21 @@ Usage: codexcrawler [--logLevel=] [--publicIp=] [--metricsAddress=] [--metricsPort=

] [--dataDir=

] [--discoveryPort=

] [--bootNodes=] [--stepDelay=] [--revisitDelay=] Options: - --publicIp= Public IP address where this instance is reachable. --logLevel= Sets log level [default: INFO] + --publicIp= Public IP address where this instance is reachable. --metricsAddress= Listen address of the metrics server [default: 0.0.0.0] --metricsPort=

Listen HTTP port of the metrics server [default: 8008] --dataDir=

Directory for storing data [default: crawler_data] --discoveryPort=

Port used for DHT [default: 8090] --bootNodes= Semi-colon-separated list of Codex bootstrap SPRs [default: testnet_sprs] --stepDelay= Delay in milliseconds per crawl step [default: 1000] - --revisitDelay= Delay in minutes after which a node can be revisited [default: 1440] (24h) + --revisitDelay= Delay in minutes after which a node can be revisited [default: 10] (24h) """ import strutils import docopt -type CrawlerConfig* = ref object +type Config* = ref object logLevel*: string publicIp*: string metricsAddress*: IpAddress @@ -38,8 +38,8 @@ type CrawlerConfig* = ref object stepDelayMs*: int revisitDelayMins*: int -proc `$`*(config: CrawlerConfig): string = - "CrawlerConfig:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & +proc `$`*(config: Config): string = + "Crawler:" & " logLevel=" & config.logLevel & " publicIp=" & config.publicIp & " metricsAddress=" & $config.metricsAddress & " metricsPort=" & $config.metricsPort & " dataDir=" & config.dataDir & " discPort=" & $config.discPort & " bootNodes=" & config.bootNodes.mapIt($it).join(";") & " stepDelay=" & $config.stepDelayMs & @@ -54,6 +54,9 @@ proc getDefaultTestnetBootNodes(): seq[string] = "spr:CiUIAhIhAzZn3JmJab46BNjadVnLNQKbhnN3eYxwqpteKYY32SbOEgIDARo8CicAJQgCEiEDNmfcmYlpvjoE2Np1Wcs1ApuGc3d5jHCqm14phjfZJs4QrvWesAYaCwoJBKpA-TaRAnViKkcwRQIhANuMmZDD2c25xzTbKSirEpkZYoxbq-FU_lpI0K0e4mIVAiBfQX4yR47h1LCnHznXgDs6xx5DLO5q3lUcicqUeaqGeg", "spr:CiUIAhIhAgybmRwboqDdUJjeZrzh43sn5mp8jt6ENIb08tLn4x01EgIDARo8CicAJQgCEiECDJuZHBuioN1QmN5mvOHjeyfmanyO3oQ0hvTy0ufjHTUQh4ifsAYaCwoJBI_0zSiRAnVsKkcwRQIhAJCb_z0E3RsnQrEePdJzMSQrmn_ooHv6mbw1DOh5IbVNAiBbBJrWR8eBV6ftzMd6ofa5khNA2h88OBhMqHCIzSjCeA", "spr:CiUIAhIhAntGLadpfuBCD9XXfiN_43-V3L5VWgFCXxg4a8uhDdnYEgIDARo8CicAJQgCEiECe0Ytp2l-4EIP1dd-I3_jf5XcvlVaAUJfGDhry6EN2dgQsIufsAYaCwoJBNEmoCiRAnV2KkYwRAIgXO3bzd5VF8jLZG8r7dcLJ_FnQBYp1BcxrOvovEa40acCIDhQ14eJRoPwJ6GKgqOkXdaFAsoszl-HIRzYcXKeb7D9", + "spr:CiUIAhIhA2AEPzVj1Z_pshWAwvTp0xvRZTigIkYphXGZdiYGmYRwEgIDARo8CicAJQgCEiEDYAQ_NWPVn-myFYDC9OnTG9FlOKAiRimFcZl2JgaZhHAQvKCXugYaCwoJBES3CuORAnd-KkYwRAIgNwrc7n8A107pYUoWfJxL8X0f-flfUKeA6bFrjVKzEo0CID_0q-KO5ZAGf65VsK-d9rV3S0PbFg7Hj3Cv4aVX2Lnn", + "spr:CiUIAhIhAuhggJhkjeRoR7MHjZ_L_naZKnjF541X0GXTI7LEwXi_EgIDARo8CicAJQgCEiEC6GCAmGSN5GhHsweNn8v-dpkqeMXnjVfQZdMjssTBeL8Qop2quwYaCwoJBJK-4V-RAncuKkYwRAIgaXWoxvKkzrjUZ5K_ayQHKNlYhUEzBXhGviujxfJiGXkCICbsYFivi6Ny1FT6tbofVBRj7lnaR3K9_3j5pUT4862k", + "spr:CiUIAhIhA-pnA5sLGDVbqEXsRxDUjQEpiSAximHNbyqr2DwLmTq8EgIDARo8CicAJQgCEiED6mcDmwsYNVuoRexHENSNASmJIDGKYc1vKqvYPAuZOrwQyrekvAYaCwoJBIDHOw-RAnc4KkcwRQIhAJtKNeTykcE5bkKwe-vhSmqyBwc2AnexqFX1tAQGLQJ4AiBJOPseqvI3PyEM8l3hY3zvelZU9lT03O7MA_8cUfF4Uw", ] proc getBootNodeStrings(input: string): seq[string] = @@ -78,13 +81,13 @@ proc stringToSpr(uri: string): SignedPeerRecord = proc getBootNodes(input: string): seq[SignedPeerRecord] = getBootNodeStrings(input).mapIt(stringToSpr(it)) -proc parseConfig*(): CrawlerConfig = +proc parseConfig*(): Config = let args = docopt(doc, version = crawlerFullVersion) proc get(name: string): string = $args[name] - return CrawlerConfig( + return Config( logLevel: get("--logLevel"), publicIp: get("--publicIp"), metricsAddress: parseIpAddress(get("--metricsAddress")), diff --git a/codexcrawler/crawler.nim b/codexcrawler/crawler.nim deleted file mode 100644 index 5673e94..0000000 --- a/codexcrawler/crawler.nim +++ /dev/null @@ -1,103 +0,0 @@ -import pkg/chronicles -import pkg/chronos -import pkg/questionable -import pkg/questionable/results - -import ./dht -import ./list -import ./nodeentry -import ./config - -import std/sequtils - -logScope: - topics = "crawler" - -type Crawler* = ref object - dht: Dht - config: CrawlerConfig - todoNodes: List - okNodes: List - nokNodes: List - -# This is not going to stay this way. -proc isNew(c: Crawler, node: Node): bool = - not c.todoNodes.contains(node.id) and not c.okNodes.contains(node.id) and - not c.nokNodes.contains(node.id) - -proc handleNodeNotOk(c: Crawler, target: NodeEntry) {.async.} = - if err =? (await c.nokNodes.add(target)).errorOption: - error "Failed to add not-OK-node to list", err = err.msg - -proc handleNodeOk(c: Crawler, target: NodeEntry) {.async.} = - if err =? (await c.okNodes.add(target)).errorOption: - error "Failed to add OK-node to list", err = err.msg - -proc addNewTodoNode(c: Crawler, nodeId: NodeId): Future[?!void] {.async.} = - let entry = NodeEntry(id: nodeId, lastVisit: 0) - return await c.todoNodes.add(entry) - -proc addNewTodoNodes(c: Crawler, newNodes: seq[Node]) {.async.} = - for node in newNodes: - if err =? (await c.addNewTodoNode(node.id)).errorOption: - error "Failed to add todo-node to list", err = err.msg - -proc step(c: Crawler) {.async.} = - logScope: - todo = $c.todoNodes.len - ok = $c.okNodes.len - nok = $c.nokNodes.len - - without var target =? (await c.todoNodes.pop()), err: - error "Failed to get todo node", err = err.msg - - target.lastVisit = Moment.now().epochSeconds.uint64 - - without receivedNodes =? (await c.dht.getNeighbors(target.id)), err: - await c.handleNodeNotOk(target) - return - - let newNodes = receivedNodes.filterIt(isNew(c, it)) - if newNodes.len > 0: - trace "Discovered new nodes", newNodes = newNodes.len - - await c.handleNodeOk(target) - await c.addNewTodoNodes(newNodes) - - # Don't log the status every loop: - if (c.todoNodes.len mod 10) == 0: - trace "Status" - -proc worker(c: Crawler) {.async.} = - try: - while true: - await c.step() - await sleepAsync(c.config.stepDelayMs.millis) - except Exception as exc: - error "Exception in crawler worker", msg = exc.msg - quit QuitFailure - -proc start*(c: Crawler): Future[?!void] {.async.} = - if c.todoNodes.len < 1: - let nodeIds = c.dht.getRoutingTableNodeIds() - info "Loading routing-table nodes to todo-list...", nodes = nodeIds.len - for id in nodeIds: - if err =? (await c.addNewTodoNode(id)).errorOption: - error "Failed to add routing-table node to todo-list", err = err.msg - return failure(err) - - info "Starting crawler...", stepDelayMs = $c.config.stepDelayMs - asyncSpawn c.worker() - return success() - -proc new*( - T: type Crawler, - dht: Dht, - todoNodes: List, - okNodes: List, - nokNodes: List, - config: CrawlerConfig, -): Crawler = - Crawler( - dht: dht, todoNodes: todoNodes, okNodes: okNodes, nokNodes: nokNodes, config: config - ) diff --git a/codexcrawler/dht.nim b/codexcrawler/dht.nim deleted file mode 100644 index a649ffe..0000000 --- a/codexcrawler/dht.nim +++ /dev/null @@ -1,137 +0,0 @@ -import std/net -import pkg/chronicles -import pkg/chronos -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results -import pkg/codexdht/discv5/[routing_table, protocol as discv5] -from pkg/nimcrypto import keccak256 - -import ./rng - -export discv5 - -logScope: - topics = "dht" - -type Dht* = ref object - protocol*: discv5.Protocol - key: PrivateKey - peerId: PeerId - announceAddrs*: seq[MultiAddress] - providerRecord*: ?SignedPeerRecord - dhtRecord*: ?SignedPeerRecord - -# proc toNodeId*(cid: Cid): NodeId = -# ## Cid to discovery id -# ## - -# readUintBE[256](keccak256.digest(cid.data.buffer).data) - -# proc toNodeId*(host: ca.Address): NodeId = -# ## Eth address to discovery id -# ## - -# readUintBE[256](keccak256.digest(host.toArray).data) - -proc getNode*(d: Dht, nodeId: NodeId): ?!Node = - let node = d.protocol.getNode(nodeId) - if node.isSome(): - return success(node.get()) - return failure("Node not found for id: " & $nodeId) - -proc getRoutingTableNodeIds*(d: Dht): seq[NodeId] = - var ids = newSeq[NodeId]() - for bucket in d.protocol.routingTable.buckets: - for node in bucket.nodes: - ids.add(node.id) - return ids - -proc getNeighbors*(d: Dht, target: NodeId): Future[?!seq[Node]] {.async.} = - without node =? d.getNode(target), err: - return failure(err) - - let distances = @[256.uint16] - let response = await d.protocol.findNode(node, distances) - - if response.isOk(): - let nodes = response.get() - if nodes.len > 0: - return success(nodes) - - # Both returning 0 nodes and a failure result are treated as failure of getNeighbors - return failure("No nodes returned") - -proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = - trace "protocol.resolve..." - let node = await d.protocol.resolve(toNodeId(peerId)) - - return - if node.isSome(): - node.get().record.data.some - else: - PeerRecord.none - -method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} = - trace "Removing provider", peerId - d.protocol.removeProvidersLocal(peerId) - -proc updateAnnounceRecord*(d: Dht, addrs: openArray[MultiAddress]) = - d.announceAddrs = @addrs - - trace "Updating announce record", addrs = d.announceAddrs - d.providerRecord = SignedPeerRecord - .init(d.key, PeerRecord.init(d.peerId, d.announceAddrs)) - .expect("Should construct signed record").some - - if not d.protocol.isNil: - d.protocol.updateRecord(d.providerRecord).expect("Should update SPR") - -proc updateDhtRecord*(d: Dht, addrs: openArray[MultiAddress]) = - trace "Updating Dht record", addrs = addrs - d.dhtRecord = SignedPeerRecord - .init(d.key, PeerRecord.init(d.peerId, @addrs)) - .expect("Should construct signed record").some - - if not d.protocol.isNil: - d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") - -proc start*(d: Dht) {.async.} = - d.protocol.open() - await d.protocol.start() - -proc stop*(d: Dht) {.async.} = - await d.protocol.closeWait() - -proc new*( - T: type Dht, - key: PrivateKey, - bindIp = IPv4_any(), - bindPort = 0.Port, - announceAddrs: openArray[MultiAddress], - bootstrapNodes: openArray[SignedPeerRecord] = [], - store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), -): Dht = - var self = Dht(key: key, peerId: PeerId.init(key).expect("Should construct PeerId")) - - self.updateAnnounceRecord(announceAddrs) - - # This disables IP limits: - let discoveryConfig = DiscoveryConfig( - tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)), - bitsPerHop: DefaultBitsPerHop, - ) - - trace "Creating DHT protocol", ip = $bindIp, port = $bindPort - self.protocol = newProtocol( - key, - bindIp = bindIp, - bindPort = bindPort, - record = self.providerRecord.get, - bootstrapRecords = bootstrapNodes, - rng = Rng.instance(), - providers = ProvidersManager.new(store), - config = discoveryConfig, - ) - - self diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim new file mode 100644 index 0000000..0dbd1fb --- /dev/null +++ b/codexcrawler/installer.nim @@ -0,0 +1,37 @@ +import pkg/chronos +import pkg/questionable/results + +import ./state +import ./services/metrics +import ./services/dht +import ./component +import ./components/crawler +import ./components/timetracker +import ./components/nodestore +import ./components/dhtmetrics +import ./components/todolist + +proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = + var components: seq[Component] = newSeq[Component]() + + without dht =? (await createDht(state)), err: + return failure(err) + + without nodeStore =? createNodeStore(state), err: + return failure(err) + + let + metrics = createMetrics(state.config.metricsAddress, state.config.metricsPort) + todoList = createTodoList(state) + + without dhtMetrics =? createDhtMetrics(state, metrics), err: + return failure(err) + + components.add(dht) + components.add(todoList) + components.add(nodeStore) + components.add(Crawler.new(state, dht, todoList)) + components.add(TimeTracker.new(state, nodeStore, dht)) + components.add(dhtMetrics) + + return success(components) diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 8227304..d1985a4 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -1,6 +1,6 @@ +import std/os import pkg/chronos import pkg/chronicles -import pkg/metrics import pkg/datastore import pkg/datastore/typedds import pkg/stew/byteutils @@ -8,47 +8,40 @@ import pkg/stew/endians2 import pkg/questionable import pkg/questionable/results import pkg/stint -import pkg/codexdht import std/sets -import std/strutils import std/sequtils import std/os -import ./nodeentry +import ./types +import ./utils/datastoreutils logScope: topics = "list" -type - OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} - OnItem = proc(item: NodeEntry): void {.gcsafe, raises: [].} +type List* = ref object of RootObj + name: string + store: TypedDatastore + items: HashSet[Nid] - List* = ref object - name: string - store: TypedDatastore - items: seq[NodeEntry] - onMetric: OnUpdateMetric - emptySignal: ?Future[void] - -proc encode(s: NodeEntry): seq[byte] = +proc encode(s: Nid): seq[byte] = s.toBytes() -proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T = +proc decode(T: type Nid, bytes: seq[byte]): ?!T = if bytes.len < 1: - return success(NodeEntry(id: UInt256.fromHex("0"), lastVisit: 0.uint64)) - return NodeEntry.fromBytes(bytes) + return success(Nid.fromStr("0")) + return Nid.fromBytes(bytes) -proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} = - without itemKey =? Key.init(this.name / $item.id), err: +proc saveItem(this: List, item: Nid): Future[?!void] {.async.} = + without itemKey =? Key.init(this.name / $item), err: return failure(err) ?await this.store.put(itemKey, item) return success() -proc load*(this: List): Future[?!void] {.async.} = +method load*(this: List): Future[?!void] {.async, base.} = without queryKey =? Key.init(this.name), err: return failure(err) - without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err: + without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: return failure(err) while not iter.finished: @@ -56,69 +49,43 @@ proc load*(this: List): Future[?!void] {.async.} = return failure(err) without value =? item.value, err: return failure(err) - if value.id > 0 or value.lastVisit > 0: - this.items.add(value) + if value > 0: + this.items.incl(value) - this.onMetric(this.items.len.int64) info "Loaded list", name = this.name, items = this.items.len return success() -proc new*( - _: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric -): List = - List(name: name, store: store, onMetric: onMetric) +proc contains*(this: List, nid: Nid): bool = + this.items.anyIt(it == nid) -proc contains*(this: List, nodeId: NodeId): bool = - this.items.anyIt(it.id == nodeId) - -proc contains*(this: List, item: NodeEntry): bool = - this.contains(item.id) - -proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} = - if this.contains(item): +method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = + if this.contains(nid): return success() - this.items.add(item) - this.onMetric(this.items.len.int64) + this.items.incl(nid) - if isSome(this.emptySignal): - trace "List no longer empty.", name = this.name - this.emptySignal.get().complete() - this.emptySignal = Future[void].none - - if err =? (await this.saveItem(item)).errorOption: + if err =? (await this.saveItem(nid)).errorOption: return failure(err) + return success() -proc remove*(this: List, item: NodeEntry): Future[?!void] {.async.} = - if this.items.len < 1: - return failure(this.name & "List is empty.") +method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} = + if not this.contains(nid): + return success() - this.items.keepItIf(item.id != it.id) - without itemKey =? Key.init(this.name / $item.id), err: + this.items.excl(nid) + without itemKey =? Key.init(this.name / $nid), err: return failure(err) ?await this.store.delete(itemKey) - this.onMetric(this.items.len.int64) return success() -proc pop*(this: List): Future[?!NodeEntry] {.async.} = - if this.items.len < 1: - trace "List is empty. Waiting for new items...", name = this.name - this.emptySignal = some(newFuture[void]("list.emptySignal")) - await this.emptySignal.get().wait(1.hours) - if this.items.len < 1: - return failure(this.name & "List is empty.") - - let item = this.items[0] - - if err =? (await this.remove(item)).errorOption: - return failure(err) - return success(item) - -proc len*(this: List): int = +method len*(this: List): int {.base, gcsafe, raises: [].} = this.items.len -proc iterateAll*(this: List, onItem: OnItem) {.async.} = - for item in this.items: - onItem(item) - await sleepAsync(1.millis) +proc new*(_: type List, name: string, store: TypedDatastore): List = + List(name: name, store: store) + +proc createList*(dataDir: string, name: string): ?!List = + without store =? createTypedDatastore(dataDir / name), err: + return failure(err) + success(List.new(name, store)) diff --git a/codexcrawler/metrics.nim b/codexcrawler/metrics.nim deleted file mode 100644 index 7f6b54e..0000000 --- a/codexcrawler/metrics.nim +++ /dev/null @@ -1,14 +0,0 @@ -import pkg/chronicles -import pkg/metrics -import pkg/metrics/chronos_httpserver - -proc setupMetrics*(metricsAddress: IpAddress, metricsPort: Port) = - let metricsAddress = metricsAddress - notice "Starting metrics HTTP server", - url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics" - try: - startMetricsHttpServer($metricsAddress, metricsPort) - except CatchableError as exc: - raiseAssert exc.msg - except Exception as exc: - raiseAssert exc.msg # TODO fix metrics diff --git a/codexcrawler/nodeentry.nim b/codexcrawler/nodeentry.nim deleted file mode 100644 index e5b6721..0000000 --- a/codexcrawler/nodeentry.nim +++ /dev/null @@ -1,35 +0,0 @@ -import pkg/stew/byteutils -import pkg/stew/endians2 -import pkg/questionable -import pkg/questionable/results -import pkg/codexdht -import pkg/chronos -import pkg/libp2p - -type NodeEntry* = object - id*: NodeId - lastVisit*: uint64 - -proc `$`*(entry: NodeEntry): string = - $entry.id & ":" & $entry.lastVisit - -proc toBytes*(entry: NodeEntry): seq[byte] = - var buffer = initProtoBuffer() - buffer.write(1, $entry.id) - buffer.write(2, entry.lastVisit) - buffer.finish() - return buffer.buffer - -proc fromBytes*(_: type NodeEntry, data: openArray[byte]): ?!NodeEntry = - var - buffer = initProtoBuffer(data) - idStr: string - lastVisit: uint64 - - if buffer.getField(1, idStr).isErr: - return failure("Unable to decode `idStr`") - - if buffer.getField(2, lastVisit).isErr: - return failure("Unable to decode `lastVisit`") - - return success(NodeEntry(id: UInt256.fromHex(idStr), lastVisit: lastVisit)) diff --git a/codexcrawler/services/dht.nim b/codexcrawler/services/dht.nim new file mode 100644 index 0000000..e4726fc --- /dev/null +++ b/codexcrawler/services/dht.nim @@ -0,0 +1,187 @@ +import std/os +import std/net +import std/sequtils +import pkg/chronicles +import pkg/chronos +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/codexdht/discv5/[routing_table, protocol as discv5] +from pkg/nimcrypto import keccak256 + +import ../utils/keyutils +import ../utils/datastoreutils +import ../utils/rng +import ../component +import ../state +import ../types + +export discv5 + +logScope: + topics = "dht" + +type + GetNeighborsResponse* = ref object + isResponsive*: bool + nodeIds*: seq[Nid] + + Dht* = ref object of Component + state: State + protocol*: discv5.Protocol + key: PrivateKey + peerId: PeerId + announceAddrs*: seq[MultiAddress] + providerRecord*: ?SignedPeerRecord + dhtRecord*: ?SignedPeerRecord + +proc responsive(nodeIds: seq[Nid]): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: true, nodeIds: nodeIds) + +proc unresponsive(): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: false, nodeIds: newSeq[Nid]()) + +proc getNode*(d: Dht, nodeId: NodeId): ?!Node = + let node = d.protocol.getNode(nodeId) + if node.isSome(): + return success(node.get()) + return failure("Node not found for id: " & nodeId.toHex()) + +method getRoutingTableNodeIds*(d: Dht): seq[Nid] {.base, gcsafe, raises: [].} = + var ids = newSeq[Nid]() + for bucket in d.protocol.routingTable.buckets: + for node in bucket.nodes: + ids.add(node.id) + return ids + +method getNeighbors*( + d: Dht, target: Nid +): Future[?!GetNeighborsResponse] {.async: (raises: []), base.} = + without node =? d.getNode(target), err: + return success(unresponsive()) + + let distances = @[256.uint16] + try: + let response = await d.protocol.findNode(node, distances) + + if response.isOk(): + let nodes = response.get() + return success(responsive(nodes.mapIt(it.id))) + else: + let errmsg = $(response.error()) + if errmsg == "Nodes message not received in time": + return success(unresponsive()) + return failure(errmsg) + except CatchableError as exc: + return failure(exc.msg) + +proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = + trace "protocol.resolve..." + let node = await d.protocol.resolve(toNodeId(peerId)) + + return + if node.isSome(): + node.get().record.data.some + else: + PeerRecord.none + +method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} = + trace "Removing provider", peerId + d.protocol.removeProvidersLocal(peerId) + +proc updateAnnounceRecord(d: Dht, addrs: openArray[MultiAddress]) = + d.announceAddrs = @addrs + + trace "Updating announce record", addrs = d.announceAddrs + d.providerRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, d.announceAddrs)) + .expect("Should construct signed record").some + + if not d.protocol.isNil: + d.protocol.updateRecord(d.providerRecord).expect("Should update SPR") + +proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = + trace "Updating Dht record", addrs = addrs + d.dhtRecord = SignedPeerRecord + .init(d.key, PeerRecord.init(d.peerId, @addrs)) + .expect("Should construct signed record").some + + if not d.protocol.isNil: + d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") + +method start*(d: Dht): Future[?!void] {.async.} = + d.protocol.open() + await d.protocol.start() + return success() + +method stop*(d: Dht): Future[?!void] {.async.} = + await d.protocol.closeWait() + return success() + +proc new( + T: type Dht, + state: State, + key: PrivateKey, + bindIp = IPv4_any(), + bindPort = 0.Port, + announceAddrs: openArray[MultiAddress], + bootstrapNodes: openArray[SignedPeerRecord] = [], + store: Datastore = SQLiteDatastore.new(Memory).expect("Should not fail!"), +): Dht = + var self = Dht( + state: state, key: key, peerId: PeerId.init(key).expect("Should construct PeerId") + ) + + self.updateAnnounceRecord(announceAddrs) + + # This disables IP limits: + let discoveryConfig = DiscoveryConfig( + tableIpLimits: TableIpLimits(tableIpLimit: high(uint), bucketIpLimit: high(uint)), + bitsPerHop: DefaultBitsPerHop, + ) + + trace "Creating DHT protocol", ip = $bindIp, port = $bindPort + self.protocol = newProtocol( + key, + bindIp = bindIp, + bindPort = bindPort, + record = self.providerRecord.get, + bootstrapRecords = bootstrapNodes, + rng = Rng.instance(), + providers = ProvidersManager.new(store), + config = discoveryConfig, + ) + + self + +proc createDht*(state: State): Future[?!Dht] {.async.} = + without dhtStore =? createDatastore(state.config.dataDir / "dht"), err: + return failure(err) + let keyPath = state.config.dataDir / "privatekey" + without privateKey =? setupKey(keyPath), err: + return failure(err) + + var listenAddresses = newSeq[MultiAddress]() + # TODO: when p2p connections are supported: + # let aaa = MultiAddress.init("/ip4/" & state.config.publicIp & "/tcp/53678").expect("Should init multiaddress") + # listenAddresses.add(aaa) + + var discAddresses = newSeq[MultiAddress]() + let bbb = MultiAddress + .init("/ip4/" & state.config.publicIp & "/udp/" & $state.config.discPort) + .expect("Should init multiaddress") + discAddresses.add(bbb) + + let dht = Dht.new( + state, + privateKey, + bindPort = state.config.discPort, + announceAddrs = listenAddresses, + bootstrapNodes = state.config.bootNodes, + store = dhtStore, + ) + + dht.updateAnnounceRecord(listenAddresses) + dht.updateDhtRecord(discAddresses) + + return success(dht) diff --git a/codexcrawler/services/metrics.nim b/codexcrawler/services/metrics.nim new file mode 100644 index 0000000..39d0b63 --- /dev/null +++ b/codexcrawler/services/metrics.nim @@ -0,0 +1,51 @@ +import pkg/chronicles +import pkg/metrics +import pkg/metrics/chronos_httpserver + +declareGauge(todoNodesGauge, "DHT nodes to be visited") +declareGauge(okNodesGauge, "DHT nodes successfully contacted") +declareGauge(nokNodesGauge, "DHT nodes failed to contact") + +type + OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].} + + Metrics* = ref object of RootObj + todoNodes: OnUpdateMetric + okNodes: OnUpdateMetric + nokNodes: OnUpdateMetric + +proc startServer(metricsAddress: IpAddress, metricsPort: Port) = + let metricsAddress = metricsAddress + notice "Starting metrics HTTP server", + url = "http://" & $metricsAddress & ":" & $metricsPort & "/metrics" + try: + startMetricsHttpServer($metricsAddress, metricsPort) + except CatchableError as exc: + raiseAssert exc.msg + except Exception as exc: + raiseAssert exc.msg # TODO fix metrics + +method setTodoNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.todoNodes(value.int64) + +method setOkNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.okNodes(value.int64) + +method setNokNodes*(m: Metrics, value: int) {.base, gcsafe, raises: [].} = + m.nokNodes(value.int64) + +proc createMetrics*(metricsAddress: IpAddress, metricsPort: Port): Metrics = + startServer(metricsAddress, metricsPort) + + # We can't extract this into a function because gauges cannot be passed as argument. + # The use of global state in nim-metrics is not pleasant. + proc onTodo(value: int64) = + todoNodesGauge.set(value) + + proc onOk(value: int64) = + okNodesGauge.set(value) + + proc onNok(value: int64) = + nokNodesGauge.set(value) + + return Metrics(todoNodes: onTodo, okNodes: onOk, nokNodes: onNok) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim new file mode 100644 index 0000000..d70dfb5 --- /dev/null +++ b/codexcrawler/state.nim @@ -0,0 +1,51 @@ +import pkg/chronos +import pkg/chronicles +import pkg/questionable/results + +import ./config +import ./utils/asyncdataevent +import ./types + +logScope: + topics = "state" + +type + OnStep* = proc(): Future[?!void] {.async: (raises: []), gcsafe.} + + DhtNodeCheckEventData* = object + id*: Nid + isOk*: bool + + Events* = ref object + nodesFound*: AsyncDataEvent[seq[Nid]] + newNodesDiscovered*: AsyncDataEvent[seq[Nid]] + dhtNodeCheck*: AsyncDataEvent[DhtNodeCheckEventData] + nodesExpired*: AsyncDataEvent[seq[Nid]] + + ApplicationStatus* {.pure.} = enum + Stopped + Stopping + Running + + State* = ref object of RootObj + status*: ApplicationStatus + config*: Config + events*: Events + +proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} = + await sleepAsync(1.seconds) + + proc worker(): Future[void] {.async.} = + while s.status == ApplicationStatus.Running: + if err =? (await step()).errorOption: + error "Failure-result caught in main loop. Stopping...", err = err.msg + s.status = ApplicationStatus.Stopping + await sleepAsync(delay) + + asyncSpawn worker() + +method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = + # We use a small delay before starting the workers because 'whileRunning' is likely called from + # component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling). + # Worker steps might start raising events that other components haven't had time to subscribe to yet. + asyncSpawn s.delayedWorkerStart(step, delay) diff --git a/codexcrawler/timetracker.nim b/codexcrawler/timetracker.nim deleted file mode 100644 index 4c91893..0000000 --- a/codexcrawler/timetracker.nim +++ /dev/null @@ -1,75 +0,0 @@ -import pkg/chronicles -import pkg/chronos -import pkg/questionable -import pkg/questionable/results - -import ./dht -import ./list -import ./nodeentry -import ./config - -logScope: - topics = "timetracker" - -type TimeTracker* = ref object - config: CrawlerConfig - todoNodes: List - okNodes: List - nokNodes: List - workerDelay: int - -proc processList(t: TimeTracker, list: List, expiry: uint64) {.async.} = - var toMove = newSeq[NodeEntry]() - proc onItem(item: NodeEntry) = - if item.lastVisit < expiry: - toMove.add(item) - - await list.iterateAll(onItem) - - if toMove.len > 0: - trace "expired node, moving to todo", nodes = $toMove.len - - for item in toMove: - if err =? (await t.todoNodes.add(item)).errorOption: - error "Failed to add expired node to todo list", err = err.msg - return - if err =? (await list.remove(item)).errorOption: - error "Failed to remove expired node to source list", err = err.msg - -proc step(t: TimeTracker) {.async.} = - let expiry = (Moment.now().epochSeconds - (t.config.revisitDelayMins * 60)).uint64 - await t.processList(t.okNodes, expiry) - await t.processList(t.nokNodes, expiry) - -proc worker(t: TimeTracker) {.async.} = - try: - while true: - await t.step() - await sleepAsync(t.workerDelay.minutes) - except Exception as exc: - error "Exception in timetracker worker", msg = exc.msg - quit QuitFailure - -proc start*(t: TimeTracker): Future[?!void] {.async.} = - info "Starting timetracker...", revisitDelayMins = $t.workerDelay - asyncSpawn t.worker() - return success() - -proc new*( - T: type TimeTracker, - todoNodes: List, - okNodes: List, - nokNodes: List, - config: CrawlerConfig, -): TimeTracker = - var delay = config.revisitDelayMins div 10 - if delay < 1: - delay = 1 - - TimeTracker( - todoNodes: todoNodes, - okNodes: okNodes, - nokNodes: nokNodes, - config: config, - workerDelay: delay, - ) diff --git a/codexcrawler/types.nim b/codexcrawler/types.nim new file mode 100644 index 0000000..9895ed1 --- /dev/null +++ b/codexcrawler/types.nim @@ -0,0 +1,29 @@ +import pkg/stew/byteutils +import pkg/stint/io +import pkg/questionable/results +import pkg/codexdht +import pkg/libp2p + +type Nid* = NodeId + +proc `$`*(nid: Nid): string = + nid.toHex() + +proc fromStr*(T: type Nid, s: string): Nid = + Nid(UInt256.fromHex(s)) + +proc toBytes*(nid: Nid): seq[byte] = + var buffer = initProtoBuffer() + buffer.write(1, $nid) + buffer.finish() + return buffer.buffer + +proc fromBytes*(_: type Nid, data: openArray[byte]): ?!Nid = + var + buffer = initProtoBuffer(data) + idStr: string + + if buffer.getField(1, idStr).isErr: + return failure("Unable to decode `idStr`") + + return success(Nid.fromStr(idStr)) diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim new file mode 100644 index 0000000..81946d7 --- /dev/null +++ b/codexcrawler/utils/asyncdataevent.nim @@ -0,0 +1,95 @@ +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +type + AsyncDataEventSubscription* = ref object + key: EventQueueKey + listenFuture: Future[void] + fireEvent: AsyncEvent + lastResult: ?!void + inHandler: bool + delayedUnsubscribe: bool + + AsyncDataEvent*[T] = ref object + queue: AsyncEventQueue[?T] + subscriptions: seq[AsyncDataEventSubscription] + + AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] + +proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = + AsyncDataEvent[T]( + queue: newAsyncEventQueue[?T](), subscriptions: newSeq[AsyncDataEventSubscription]() + ) + +proc performUnsubscribe[T]( + event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription +) {.async.} = + if subscription in event.subscriptions: + await subscription.listenFuture.cancelAndWait() + event.subscriptions.delete(event.subscriptions.find(subscription)) + +proc subscribe*[T]( + event: AsyncDataEvent[T], handler: AsyncDataEventHandler[T] +): AsyncDataEventSubscription = + var subscription = AsyncDataEventSubscription( + key: event.queue.register(), + listenFuture: newFuture[void](), + fireEvent: newAsyncEvent(), + inHandler: false, + delayedUnsubscribe: false, + ) + + proc listener() {.async.} = + while true: + let items = await event.queue.waitEvents(subscription.key) + for item in items: + if data =? item: + subscription.inHandler = true + subscription.lastResult = (await handler(data)) + subscription.inHandler = false + subscription.fireEvent.fire() + + subscription.listenFuture = listener() + + event.subscriptions.add(subscription) + subscription + +proc fire*[T]( + event: AsyncDataEvent[T], data: T +): Future[?!void] {.async: (raises: []).} = + event.queue.emit(data.some) + var toUnsubscribe = newSeq[AsyncDataEventSubscription]() + for sub in event.subscriptions: + try: + await sub.fireEvent.wait() + except CancelledError: + discard + if err =? sub.lastResult.errorOption: + return failure(err) + if sub.delayedUnsubscribe: + toUnsubscribe.add(sub) + + for sub in toUnsubscribe: + try: + await event.unsubscribe(sub) + except CatchableError as exc: + return failure(exc.msg) + + success() + +proc unsubscribe*[T]( + event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription +) {.async.} = + if subscription.inHandler: + subscription.delayedUnsubscribe = true + else: + await event.performUnsubscribe(subscription) + +proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = + let all = event.subscriptions + for subscription in all: + await event.unsubscribe(subscription) + +proc listeners*[T](event: AsyncDataEvent[T]): int = + event.subscriptions.len diff --git a/codexcrawler/utils/datastoreutils.nim b/codexcrawler/utils/datastoreutils.nim new file mode 100644 index 0000000..8754d63 --- /dev/null +++ b/codexcrawler/utils/datastoreutils.nim @@ -0,0 +1,15 @@ +import pkg/chronicles +import pkg/questionable/results +import pkg/datastore +import pkg/datastore/typedds + +proc createDatastore*(path: string): ?!Datastore = + without store =? LevelDbDatastore.new(path), err: + error "Failed to create datastore" + return failure(err) + return success(Datastore(store)) + +proc createTypedDatastore*(path: string): ?!TypedDatastore = + without store =? createDatastore(path), err: + return failure(err) + return success(TypedDatastore.init(store)) diff --git a/codexcrawler/keyutils.nim b/codexcrawler/utils/keyutils.nim similarity index 100% rename from codexcrawler/keyutils.nim rename to codexcrawler/utils/keyutils.nim diff --git a/codexcrawler/logging.nim b/codexcrawler/utils/logging.nim similarity index 100% rename from codexcrawler/logging.nim rename to codexcrawler/utils/logging.nim diff --git a/codexcrawler/rng.nim b/codexcrawler/utils/rng.nim similarity index 100% rename from codexcrawler/rng.nim rename to codexcrawler/utils/rng.nim diff --git a/codexcrawler/utils.nim b/codexcrawler/utils/timeutils.nim similarity index 100% rename from codexcrawler/utils.nim rename to codexcrawler/utils/timeutils.nim diff --git a/codexcrawler/version.nim b/codexcrawler/utils/version.nim similarity index 100% rename from codexcrawler/version.nim rename to codexcrawler/utils/version.nim diff --git a/tests/codexcrawler/components/testcrawler.nim b/tests/codexcrawler/components/testcrawler.nim new file mode 100644 index 0000000..25dfd53 --- /dev/null +++ b/tests/codexcrawler/components/testcrawler.nim @@ -0,0 +1,112 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/crawler +import ../../../codexcrawler/services/dht +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mockdht +import ../mocks/mocktodolist +import ../helpers + +suite "Crawler": + var + nid1: Nid + nid2: Nid + state: MockState + todo: MockTodoList + dht: MockDht + crawler: Crawler + + setup: + nid1 = genNid() + nid2 = genNid() + state = createMockState() + todo = createMockTodoList() + dht = createMockDht() + + crawler = Crawler.new(state, dht, todo) + + (await crawler.start()).tryGet() + + teardown: + (await crawler.stop()).tryGet() + state.checkAllUnsubscribed() + + proc onStep() {.async.} = + (await state.stepper()).tryGet() + + proc responsive(nid: Nid): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: true, nodeIds: @[nid]) + + proc unresponsive(nid: Nid): GetNeighborsResponse = + GetNeighborsResponse(isResponsive: false, nodeIds: @[nid]) + + test "onStep should pop a node from the todoList and getNeighbors for it": + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid1)) + + await onStep() + + check: + !(dht.getNeighborsArg) == nid1 + + test "nodes returned by getNeighbors are raised as nodesFound": + var nodesFound = newSeq[Nid]() + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + nodesFound = nids + return success() + + let sub = state.events.nodesFound.subscribe(onNodesFound) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid2)) + + await onStep() + + check: + nid2 in nodesFound + + await state.events.nodesFound.unsubscribe(sub) + + test "responsive result from getNeighbors raises the node as successful dhtNodeCheck": + var checkEvent = DhtNodeCheckEventData() + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + checkEvent = event + return success() + + let sub = state.events.dhtNodeCheck.subscribe(onCheck) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(responsive(nid2)) + + await onStep() + + check: + checkEvent.id == nid1 + checkEvent.isOk == true + + await state.events.dhtNodeCheck.unsubscribe(sub) + + test "unresponsive result from getNeighbors raises the node as unsuccessful dhtNodeCheck": + var checkEvent = DhtNodeCheckEventData() + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + checkEvent = event + return success() + + let sub = state.events.dhtNodeCheck.subscribe(onCheck) + + todo.popReturn = success(nid1) + dht.getNeighborsReturn = success(unresponsive(nid2)) + + await onStep() + + check: + checkEvent.id == nid1 + checkEvent.isOk == false + + await state.events.dhtNodeCheck.unsubscribe(sub) diff --git a/tests/codexcrawler/components/testdhtmetrics.nim b/tests/codexcrawler/components/testdhtmetrics.nim new file mode 100644 index 0000000..1620c87 --- /dev/null +++ b/tests/codexcrawler/components/testdhtmetrics.nim @@ -0,0 +1,90 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/dhtmetrics +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mocklist +import ../mocks/mockmetrics +import ../helpers + +suite "DhtMetrics": + var + nid: Nid + state: MockState + okList: MockList + nokList: MockList + metrics: MockMetrics + dhtmetrics: DhtMetrics + + setup: + nid = genNid() + state = createMockState() + okList = createMockList() + nokList = createMockList() + metrics = createMockMetrics() + + dhtmetrics = DhtMetrics.new(state, okList, nokList, metrics) + + (await dhtmetrics.start()).tryGet() + + teardown: + (await dhtmetrics.stop()).tryGet() + state.checkAllUnsubscribed() + + proc fireDhtNodeCheckEvent(isOk: bool) {.async.} = + let event = DhtNodeCheckEventData(id: nid, isOk: isOk) + + (await state.events.dhtNodeCheck.fire(event)).tryGet() + + test "dhtmetrics start should load both lists": + check: + okList.loadCalled + nokList.loadCalled + + test "dhtNodeCheck event should add node to okList if check is successful": + await fireDhtNodeCheckEvent(true) + + check: + nid in okList.added + + test "dhtNodeCheck event should add node to nokList if check has failed": + await fireDhtNodeCheckEvent(false) + + check: + nid in nokList.added + + test "dhtNodeCheck event should remove node from nokList if check is successful": + await fireDhtNodeCheckEvent(true) + + check: + nid in nokList.removed + + test "dhtNodeCheck event should remove node from okList if check has failed": + await fireDhtNodeCheckEvent(false) + + check: + nid in okList.removed + + test "dhtNodeCheck event should set okList length as dht-ok metric": + let length = 123 + + okList.length = length + + await fireDhtNodeCheckEvent(true) + + check: + metrics.ok == length + + test "dhtNodeCheck event should set nokList length as dht-nok metric": + let length = 234 + + nokList.length = length + + await fireDhtNodeCheckEvent(true) + + check: + metrics.nok == length diff --git a/tests/codexcrawler/components/testnodestore.nim b/tests/codexcrawler/components/testnodestore.nim new file mode 100644 index 0000000..79f9a38 --- /dev/null +++ b/tests/codexcrawler/components/testnodestore.nim @@ -0,0 +1,123 @@ +import std/os +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest +import pkg/datastore/typedds + +import ../../../codexcrawler/components/nodestore +import ../../../codexcrawler/utils/datastoreutils +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../mocks/mockstate +import ../helpers + +suite "Nodestore": + let + dsPath = getTempDir() / "testds" + nodestoreName = "nodestore" + + var + ds: TypedDatastore + state: MockState + store: NodeStore + + setup: + ds = createTypedDatastore(dsPath).tryGet() + state = createMockState() + + store = NodeStore.new(state, ds) + + (await store.start()).tryGet() + + teardown: + (await store.stop()).tryGet() + (await ds.close()).tryGet() + state.checkAllUnsubscribed() + removeDir(dsPath) + + test "nodeEntry encoding": + let entry = NodeEntry(id: genNid(), lastVisit: 123.uint64) + + let + bytes = entry.encode() + decoded = NodeEntry.decode(bytes).tryGet() + + check: + entry.id == decoded.id + entry.lastVisit == decoded.lastVisit + + test "nodesFound event should store nodes": + let + nid = genNid() + expectedKey = Key.init(nodestoreName / $nid).tryGet() + + (await state.events.nodesFound.fire(@[nid])).tryGet() + + check: + (await ds.has(expectedKey)).tryGet() + + let entry = (await get[NodeEntry](ds, expectedKey)).tryGet() + check: + entry.id == nid + + test "nodesFound event should fire newNodesDiscovered": + var newNodes = newSeq[Nid]() + proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = + newNodes = nids + return success() + + let + sub = state.events.newNodesDiscovered.subscribe(onNewNodes) + nid = genNid() + + (await state.events.nodesFound.fire(@[nid])).tryGet() + + check: + newNodes == @[nid] + + await state.events.newNodesDiscovered.unsubscribe(sub) + + test "nodesFound event should not fire newNodesDiscovered for previously seen nodes": + let nid = genNid() + + # Make nid known first. Then subscribe. + (await state.events.nodesFound.fire(@[nid])).tryGet() + + var + newNodes = newSeq[Nid]() + count = 0 + proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = + newNodes = nids + inc count + return success() + + let sub = state.events.newNodesDiscovered.subscribe(onNewNodes) + + # Firing the event again should not trigger newNodesDiscovered for nid + (await state.events.nodesFound.fire(@[nid])).tryGet() + + check: + newNodes.len == 0 + count == 0 + + await state.events.newNodesDiscovered.unsubscribe(sub) + + test "iterateAll yields all known nids": + let + nid1 = genNid() + nid2 = genNid() + nid3 = genNid() + + (await state.events.nodesFound.fire(@[nid1, nid2, nid3])).tryGet() + + var iterNodes = newSeq[Nid]() + proc onNode(entry: NodeEntry): Future[?!void] {.async: (raises: []), gcsafe.} = + iterNodes.add(entry.id) + return success() + + (await store.iterateAll(onNode)).tryGet() + + check: + nid1 in iterNodes + nid2 in iterNodes + nid3 in iterNodes diff --git a/tests/codexcrawler/components/testtimetracker.nim b/tests/codexcrawler/components/testtimetracker.nim new file mode 100644 index 0000000..34873c5 --- /dev/null +++ b/tests/codexcrawler/components/testtimetracker.nim @@ -0,0 +1,95 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/timetracker +import ../../../codexcrawler/components/nodestore +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../mocks/mocknodestore +import ../mocks/mockdht +import ../helpers + +suite "TimeTracker": + var + nid: Nid + state: MockState + store: MockNodeStore + dht: MockDht + time: TimeTracker + expiredNodesReceived: seq[Nid] + sub: AsyncDataEventSubscription + + setup: + nid = genNid() + state = createMockState() + store = createMockNodeStore() + dht = createMockDht() + + # Subscribe to nodesExpired event + expiredNodesReceived = newSeq[Nid]() + proc onExpired(nids: seq[Nid]): Future[?!void] {.async.} = + expiredNodesReceived = nids + return success() + + sub = state.events.nodesExpired.subscribe(onExpired) + + state.config.revisitDelayMins = 22 + + time = TimeTracker.new(state, store, dht) + + (await time.start()).tryGet() + + teardown: + (await time.stop()).tryGet() + await state.events.nodesExpired.unsubscribe(sub) + state.checkAllUnsubscribed() + + proc onStep() {.async.} = + (await state.stepper()).tryGet() + + proc createNodeInStore(lastVisit: uint64): Nid = + let entry = NodeEntry(id: genNid(), lastVisit: lastVisit) + store.nodesToIterate.add(entry) + return entry.id + + test "onStep fires nodesExpired event for expired nodes": + let + expiredTimestamp = + (Moment.now().epochSeconds - ((1 + state.config.revisitDelayMins) * 60)).uint64 + expiredNodeId = createNodeInStore(expiredTimestamp) + + await onStep() + + check: + expiredNodeId in expiredNodesReceived + + test "onStep does not fire nodesExpired event for nodes that are recent": + let + recentTimestamp = + (Moment.now().epochSeconds - ((state.config.revisitDelayMins - 1) * 60)).uint64 + recentNodeId = createNodeInStore(recentTimestamp) + + await onStep() + + check: + recentNodeId notin expiredNodesReceived + + test "onStep raises routingTable nodes as nodesFound": + var nodesFound = newSeq[Nid]() + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + nodesFound = nids + return success() + + let sub = state.events.nodesFound.subscribe(onNodesFound) + + dht.routingTable.add(nid) + + await onStep() + + check: + nid in nodesFound + + await state.events.nodesFound.unsubscribe(sub) diff --git a/tests/codexcrawler/components/testtodolist.nim b/tests/codexcrawler/components/testtodolist.nim new file mode 100644 index 0000000..7aa30df --- /dev/null +++ b/tests/codexcrawler/components/testtodolist.nim @@ -0,0 +1,59 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/components/todolist +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/state +import ../mocks/mockstate +import ../helpers + +suite "TodoList": + var + nid: Nid + state: MockState + todo: TodoList + + setup: + nid = genNid() + state = createMockState() + + todo = TodoList.new(state) + + (await todo.start()).tryGet() + + teardown: + (await todo.stop()).tryGet() + state.checkAllUnsubscribed() + + proc fireNewNodesDiscoveredEvent(nids: seq[Nid]) {.async.} = + (await state.events.newNodesDiscovered.fire(nids)).tryGet() + + proc fireNodesExpiredEvent(nids: seq[Nid]) {.async.} = + (await state.events.nodesExpired.fire(nids)).tryGet() + + test "discovered nodes are added to todo list": + await fireNewNodesDiscoveredEvent(@[nid]) + let item = (await todo.pop).tryGet() + + check: + item == nid + + test "expired nodes are added to todo list": + await fireNodesExpiredEvent(@[nid]) + let item = (await todo.pop).tryGet() + + check: + item == nid + + test "pop on empty todo list waits until item is added": + let popFuture = todo.pop() + check: + not popFuture.finished + + await fireNewNodesDiscoveredEvent(@[nid]) + + check: + popFuture.finished + popFuture.value.tryGet() == nid diff --git a/tests/codexcrawler/exampletest.nim b/tests/codexcrawler/exampletest.nim deleted file mode 100644 index ca0d9b4..0000000 --- a/tests/codexcrawler/exampletest.nim +++ /dev/null @@ -1,7 +0,0 @@ -import pkg/asynctest/chronos/unittest - -suite "Example tests": - test "Example": - echo "Woo!" - check: - 1 == 1 diff --git a/tests/codexcrawler/helpers.nim b/tests/codexcrawler/helpers.nim new file mode 100644 index 0000000..5feb345 --- /dev/null +++ b/tests/codexcrawler/helpers.nim @@ -0,0 +1,6 @@ +import std/random +import pkg/stint +import ../../codexcrawler/types + +proc genNid*(): Nid = + Nid(rand(uint64).u256) diff --git a/tests/codexcrawler/mocks/mockdht.nim b/tests/codexcrawler/mocks/mockdht.nim new file mode 100644 index 0000000..4c12f33 --- /dev/null +++ b/tests/codexcrawler/mocks/mockdht.nim @@ -0,0 +1,28 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import ../../../codexcrawler/services/dht +import ../../../codexcrawler/types + +type MockDht* = ref object of Dht + routingTable*: seq[Nid] + getNeighborsArg*: ?Nid + getNeighborsReturn*: ?!GetNeighborsResponse + +method getRoutingTableNodeIds*(d: MockDht): seq[Nid] = + return d.routingTable + +method getNeighbors*( + d: MockDht, target: Nid +): Future[?!GetNeighborsResponse] {.async: (raises: []).} = + d.getNeighborsArg = some(target) + return d.getNeighborsReturn + +method start*(d: MockDht): Future[?!void] {.async.} = + return success() + +method stop*(d: MockDht): Future[?!void] {.async.} = + return success() + +proc createMockDht*(): MockDht = + MockDht() diff --git a/tests/codexcrawler/mocks/mocklist.nim b/tests/codexcrawler/mocks/mocklist.nim new file mode 100644 index 0000000..dfbd414 --- /dev/null +++ b/tests/codexcrawler/mocks/mocklist.nim @@ -0,0 +1,42 @@ +import pkg/chronos +import pkg/questionable/results + +import ../../../codexcrawler/types +import ../../../codexcrawler/list + +type MockList* = ref object of List + loadCalled*: bool + added*: seq[Nid] + addSuccess*: bool + removed*: seq[Nid] + removeSuccess*: bool + length*: int + +method load*(this: MockList): Future[?!void] {.async.} = + this.loadCalled = true + return success() + +method add*(this: MockList, nid: Nid): Future[?!void] {.async.} = + this.added.add(nid) + if this.addSuccess: + return success() + return failure("test failure") + +method remove*(this: MockList, nid: Nid): Future[?!void] {.async.} = + this.removed.add(nid) + if this.removeSuccess: + return success() + return failure("test failure") + +method len*(this: MockList): int = + return this.length + +proc createMockList*(): MockList = + MockList( + loadCalled: false, + added: newSeq[Nid](), + addSuccess: true, + removed: newSeq[Nid](), + removeSuccess: true, + length: 0, + ) diff --git a/tests/codexcrawler/mocks/mockmetrics.nim b/tests/codexcrawler/mocks/mockmetrics.nim new file mode 100644 index 0000000..8152e76 --- /dev/null +++ b/tests/codexcrawler/mocks/mockmetrics.nim @@ -0,0 +1,18 @@ +import ../../../codexcrawler/services/metrics + +type MockMetrics* = ref object of Metrics + todo*: int + ok*: int + nok*: int + +method setTodoNodes*(m: MockMetrics, value: int) = + m.todo = value + +method setOkNodes*(m: MockMetrics, value: int) = + m.ok = value + +method setNokNodes*(m: MockMetrics, value: int) = + m.nok = value + +proc createMockMetrics*(): MockMetrics = + MockMetrics() diff --git a/tests/codexcrawler/mocks/mocknodestore.nim b/tests/codexcrawler/mocks/mocknodestore.nim new file mode 100644 index 0000000..d640f38 --- /dev/null +++ b/tests/codexcrawler/mocks/mocknodestore.nim @@ -0,0 +1,24 @@ +import std/sequtils +import pkg/questionable/results +import pkg/chronos + +import ../../../codexcrawler/components/nodestore + +type MockNodeStore* = ref object of NodeStore + nodesToIterate*: seq[NodeEntry] + +method iterateAll*( + s: MockNodeStore, onNode: OnNodeEntry +): Future[?!void] {.async: (raises: []).} = + for node in s.nodesToIterate: + ?await onNode(node) + return success() + +method start*(s: MockNodeStore): Future[?!void] {.async.} = + return success() + +method stop*(s: MockNodeStore): Future[?!void] {.async.} = + return success() + +proc createMockNodeStore*(): MockNodeStore = + MockNodeStore(nodesToIterate: newSeq[NodeEntry]()) diff --git a/tests/codexcrawler/mocks/mockstate.nim b/tests/codexcrawler/mocks/mockstate.nim new file mode 100644 index 0000000..f7d212b --- /dev/null +++ b/tests/codexcrawler/mocks/mockstate.nim @@ -0,0 +1,30 @@ +import pkg/asynctest/chronos/unittest +import ../../../codexcrawler/state +import ../../../codexcrawler/utils/asyncdataevent +import ../../../codexcrawler/types +import ../../../codexcrawler/config + +type MockState* = ref object of State + stepper*: OnStep + +proc checkAllUnsubscribed*(s: MockState) = + check: + s.events.nodesFound.listeners == 0 + s.events.newNodesDiscovered.listeners == 0 + s.events.dhtNodeCheck.listeners == 0 + s.events.nodesExpired.listeners == 0 + +method whileRunning*(s: MockState, step: OnStep, delay: Duration) {.async.} = + s.stepper = step + +proc createMockState*(): MockState = + MockState( + status: ApplicationStatus.Running, + config: Config(), + events: Events( + nodesFound: newAsyncDataEvent[seq[Nid]](), + newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), + dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), + nodesExpired: newAsyncDataEvent[seq[Nid]](), + ), + ) diff --git a/tests/codexcrawler/mocks/mocktodolist.nim b/tests/codexcrawler/mocks/mocktodolist.nim new file mode 100644 index 0000000..1417ae4 --- /dev/null +++ b/tests/codexcrawler/mocks/mocktodolist.nim @@ -0,0 +1,20 @@ +import pkg/chronos +import pkg/questionable/results + +import ../../../codexcrawler/components/todolist +import ../../../codexcrawler/types + +type MockTodoList* = ref object of TodoList + popReturn*: ?!Nid + +method pop*(t: MockTodoList): Future[?!Nid] {.async: (raises: []).} = + return t.popReturn + +method start*(t: MockTodoList): Future[?!void] {.async.} = + return success() + +method stop*(t: MockTodoList): Future[?!void] {.async.} = + return success() + +proc createMockTodoList*(): MockTodoList = + MockTodoList() diff --git a/tests/codexcrawler/testcomponents.nim b/tests/codexcrawler/testcomponents.nim new file mode 100644 index 0000000..0773ec0 --- /dev/null +++ b/tests/codexcrawler/testcomponents.nim @@ -0,0 +1,7 @@ +import ./components/testnodestore +import ./components/testdhtmetrics +import ./components/testtodolist +import ./components/testtimetracker +import ./components/testcrawler + +{.warning[UnusedImport]: off.} diff --git a/tests/codexcrawler/teststate.nim b/tests/codexcrawler/teststate.nim new file mode 100644 index 0000000..4ea6bb5 --- /dev/null +++ b/tests/codexcrawler/teststate.nim @@ -0,0 +1,42 @@ +import pkg/chronos +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../codexcrawler/state +import ../../codexcrawler/config +import ../../codexcrawler/types +import ../../codexcrawler/utils/asyncdataevent + +suite "State": + var state: State + + setup: + state = State( + status: ApplicationStatus.Running, + config: Config(), + events: Events( + nodesFound: newAsyncDataEvent[seq[Nid]](), + newNodesDiscovered: newAsyncDataEvent[seq[Nid]](), + dhtNodeCheck: newAsyncDataEvent[DhtNodeCheckEventData](), + nodesExpired: newAsyncDataEvent[seq[Nid]](), + ), + ) + + test "whileRunning": + var counter = 0 + + proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = + inc counter + return success() + + await state.whileRunning(onStep, 1.milliseconds) + + while counter < 5: + await sleepAsync(1.milliseconds) + + state.status = ApplicationStatus.Stopped + + await sleepAsync(10.milliseconds) + + check: + counter == 5 diff --git a/tests/codexcrawler/testtypes.nim b/tests/codexcrawler/testtypes.nim new file mode 100644 index 0000000..5c75ec9 --- /dev/null +++ b/tests/codexcrawler/testtypes.nim @@ -0,0 +1,23 @@ +import pkg/chronos +import pkg/asynctest/chronos/unittest +import pkg/questionable/results + +import ../../codexcrawler/types +import ./helpers + +suite "Types": + test "nid string encoding": + let + nid = genNid() + str = $nid + + check: + nid == Nid.fromStr(str) + + test "nid byte encoding": + let + nid = genNid() + bytes = nid.toBytes() + + check: + nid == Nid.fromBytes(bytes).tryGet() diff --git a/tests/codexcrawler/testutils.nim b/tests/codexcrawler/testutils.nim new file mode 100644 index 0000000..eaec989 --- /dev/null +++ b/tests/codexcrawler/testutils.nim @@ -0,0 +1,3 @@ +import ./utils/testasyncdataevent + +{.warning[UnusedImport]: off.} diff --git a/tests/codexcrawler/utils/testasyncdataevent.nim b/tests/codexcrawler/utils/testasyncdataevent.nim new file mode 100644 index 0000000..c5a47c5 --- /dev/null +++ b/tests/codexcrawler/utils/testasyncdataevent.nim @@ -0,0 +1,107 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results +import pkg/asynctest/chronos/unittest + +import ../../../codexcrawler/utils/asyncdataevent + +type ExampleData = object + s: string + +suite "AsyncDataEvent": + var event: AsyncDataEvent[ExampleData] + let msg = "Yeah!" + + setup: + event = newAsyncDataEvent[ExampleData]() + + teardown: + await event.unsubscribeAll() + + test "Successful event": + var data = "" + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + data = e.s + success() + + let s = event.subscribe(eventHandler) + + check: + isOK(await event.fire(ExampleData(s: msg))) + data == msg + + await event.unsubscribe(s) + + test "Failed event preserves error message": + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + failure(msg) + + let s = event.subscribe(eventHandler) + let fireResult = await event.fire(ExampleData(s: "a")) + + check: + fireResult.isErr + fireResult.error.msg == msg + + await event.unsubscribe(s) + + test "Emits data to multiple subscribers": + var + data1 = "" + data2 = "" + data3 = "" + + proc handler1(e: ExampleData): Future[?!void] {.async.} = + data1 = e.s + success() + + proc handler2(e: ExampleData): Future[?!void] {.async.} = + data2 = e.s + success() + + proc handler3(e: ExampleData): Future[?!void] {.async.} = + data3 = e.s + success() + + let + s1 = event.subscribe(handler1) + s2 = event.subscribe(handler2) + s3 = event.subscribe(handler3) + + let fireResult = await event.fire(ExampleData(s: msg)) + + check: + fireResult.isOK + data1 == msg + data2 == msg + data3 == msg + + await event.unsubscribe(s1) + await event.unsubscribe(s2) + await event.unsubscribe(s3) + + test "Can fire and event without subscribers": + check: + isOK(await event.fire(ExampleData(s: msg))) + + test "Can unsubscribe in handler": + proc doNothing() {.async, closure.} = + await sleepAsync(1.millis) + + var callback = doNothing + + proc eventHandler(e: ExampleData): Future[?!void] {.async.} = + await callback() + success() + + let s = event.subscribe(eventHandler) + + proc doUnsubscribe() {.async.} = + await event.unsubscribe(s) + + callback = doUnsubscribe + + check: + isOK(await event.fire(ExampleData(s: msg))) + + await event.unsubscribe(s) diff --git a/tests/config.nims b/tests/config.nims new file mode 100644 index 0000000..da6438a --- /dev/null +++ b/tests/config.nims @@ -0,0 +1 @@ +switch("define", "chronicles_log_level=ERROR") diff --git a/tests/test.nim b/tests/test.nim index d7c2006..1df02d1 100644 --- a/tests/test.nim +++ b/tests/test.nim @@ -1,3 +1,6 @@ -import ./codexcrawler/exampletest +import ./codexcrawler/testutils +import ./codexcrawler/testcomponents +import ./codexcrawler/testtypes +import ./codexcrawler/teststate {.warning[UnusedImport]: off.}