mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-04 06:23:08 +00:00
Cleanup and sets up lists
This commit is contained in:
parent
75a6f38dbe
commit
bdb6f94ce2
@ -1,8 +1,4 @@
|
|||||||
import std/os
|
|
||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/chronos
|
|
||||||
import pkg/questionable
|
|
||||||
import pkg/questionable/results
|
|
||||||
|
|
||||||
import ./codexcrawler/application
|
import ./codexcrawler/application
|
||||||
|
|
||||||
|
|||||||
@ -4,11 +4,18 @@ import pkg/chronos
|
|||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import pkg/datastore
|
||||||
|
import pkg/datastore/typedds
|
||||||
|
import pkg/metrics
|
||||||
|
|
||||||
import ./config
|
import ./config
|
||||||
import ./logging
|
import ./logging
|
||||||
import ./metrics
|
import ./metrics
|
||||||
|
import ./list
|
||||||
|
|
||||||
import ./main
|
declareGauge(todoNodesGauge, "DHT nodes to be visited")
|
||||||
|
declareGauge(okNodesGauge, "DHT nodes successfully contacted")
|
||||||
|
declareGauge(nokNodesGauge, "DHT nodes failed to contact")
|
||||||
|
|
||||||
type
|
type
|
||||||
ApplicationStatus* {.pure.} = enum
|
ApplicationStatus* {.pure.} = enum
|
||||||
@ -18,26 +25,72 @@ type
|
|||||||
|
|
||||||
Application* = ref object
|
Application* = ref object
|
||||||
status: ApplicationStatus
|
status: ApplicationStatus
|
||||||
|
config*: CrawlerConfig
|
||||||
|
todoList*: List
|
||||||
|
okNodes*: List
|
||||||
|
nokNodes*: List
|
||||||
|
|
||||||
|
proc createDatastore(app: Application): ?!TypedDatastore =
|
||||||
|
without store =? LevelDbDatastore.new(app.config.dataDir), err:
|
||||||
|
error "Failed to create datastore"
|
||||||
|
return failure(err)
|
||||||
|
return success(TypedDatastore.init(store))
|
||||||
|
|
||||||
|
proc initializeLists(app: Application): Future[?!void] {.async.} =
|
||||||
|
without store =? app.createDatastore(), err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
# We can't extract this into a function because gauges cannot be passed as argument.
|
||||||
|
# The use of global state in nim-metrics is not pleasant.
|
||||||
|
proc onTodoMetric(value: int64) =
|
||||||
|
todoNodesGauge.set(value)
|
||||||
|
proc onOkMetric(value: int64) =
|
||||||
|
okNodesGauge.set(value)
|
||||||
|
proc onNokMetric(value: int64) =
|
||||||
|
nokNodesGauge.set(value)
|
||||||
|
|
||||||
|
app.todoList = List.new("todo", store, onTodoMetric)
|
||||||
|
app.okNodes = List.new("ok", store, onOkMetric)
|
||||||
|
app.nokNodes = List.new("nok", store, onNokMetric)
|
||||||
|
|
||||||
|
if err =? (await app.todoList.load()).errorOption:
|
||||||
|
return failure(err)
|
||||||
|
if err =? (await app.okNodes.load()).errorOption:
|
||||||
|
return failure(err)
|
||||||
|
if err =? (await app.nokNodes.load()).errorOption:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
return success()
|
||||||
|
|
||||||
|
proc initializeApp(app: Application): Future[?!void] {.async.} =
|
||||||
|
if err =? (await app.initializeLists()).errorOption:
|
||||||
|
error "Failed to initialize lists", err = err.msg
|
||||||
|
return failure(err)
|
||||||
|
return success()
|
||||||
|
|
||||||
|
proc stop*(app: Application) =
|
||||||
|
app.status = ApplicationStatus.Stopping
|
||||||
|
|
||||||
proc run*(app: Application) =
|
proc run*(app: Application) =
|
||||||
let config = parseConfig()
|
app.config = parseConfig()
|
||||||
info "Loaded configuration", config
|
info "Loaded configuration", config = app.config
|
||||||
|
|
||||||
# Configure loglevel
|
# Configure loglevel
|
||||||
updateLogLevel(config.logLevel)
|
updateLogLevel(app.config.logLevel)
|
||||||
|
|
||||||
# Ensure datadir path exists:
|
# Ensure datadir path exists:
|
||||||
if not existsDir(config.dataDir):
|
if not existsDir(app.config.dataDir):
|
||||||
createDir(config.dataDir)
|
createDir(app.config.dataDir)
|
||||||
|
|
||||||
setupMetrics(config.metricsAddress, config.metricsPort)
|
setupMetrics(app.config.metricsAddress, app.config.metricsPort)
|
||||||
info "Metrics endpoint initialized"
|
info "Metrics endpoint initialized"
|
||||||
|
|
||||||
info "Starting application"
|
info "Starting application"
|
||||||
app.status = ApplicationStatus.Running
|
app.status = ApplicationStatus.Running
|
||||||
if err =? (waitFor startApplication(config)).errorOption:
|
if err =? (waitFor app.initializeApp()).errorOption:
|
||||||
app.status = ApplicationStatus.Stopping
|
app.status = ApplicationStatus.Stopping
|
||||||
error "Failed to start application", err = err.msg
|
error "Failed to start application", err = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
while app.status == ApplicationStatus.Running:
|
while app.status == ApplicationStatus.Running:
|
||||||
try:
|
try:
|
||||||
@ -46,6 +99,3 @@ proc run*(app: Application) =
|
|||||||
error "Unhandled exception", msg = exc.msg
|
error "Unhandled exception", msg = exc.msg
|
||||||
quit QuitFailure
|
quit QuitFailure
|
||||||
notice "Application closed"
|
notice "Application closed"
|
||||||
|
|
||||||
proc stop*(app: Application) =
|
|
||||||
app.status = ApplicationStatus.Stopping
|
|
||||||
|
|||||||
@ -3,9 +3,12 @@ import pkg/chronicles
|
|||||||
import pkg/metrics
|
import pkg/metrics
|
||||||
import pkg/datastore
|
import pkg/datastore
|
||||||
import pkg/datastore/typedds
|
import pkg/datastore/typedds
|
||||||
|
import pkg/stew/byteutils
|
||||||
|
import pkg/stew/endians2
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import std/strutils
|
||||||
import std/os
|
import std/os
|
||||||
|
|
||||||
import ./nodeentry
|
import ./nodeentry
|
||||||
@ -22,6 +25,20 @@ type
|
|||||||
items: seq[NodeEntry]
|
items: seq[NodeEntry]
|
||||||
onMetric: OnUpdateMetric
|
onMetric: OnUpdateMetric
|
||||||
|
|
||||||
|
proc encode(s: NodeEntry): seq[byte] =
|
||||||
|
(s.id & ";" & s.value).toBytes()
|
||||||
|
|
||||||
|
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
|
||||||
|
let s = string.fromBytes(bytes)
|
||||||
|
if s.len == 0:
|
||||||
|
return success(NodeEntry(id: "", value: ""))
|
||||||
|
|
||||||
|
let tokens = s.split(";")
|
||||||
|
if tokens.len != 2:
|
||||||
|
return failure("expected 2 tokens")
|
||||||
|
|
||||||
|
success(NodeEntry(id: tokens[0], value: tokens[1]))
|
||||||
|
|
||||||
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||||
without itemKey =? Key.init(this.name / item.id), err:
|
without itemKey =? Key.init(this.name / item.id), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|||||||
@ -1,46 +0,0 @@
|
|||||||
import std/os
|
|
||||||
import pkg/chronicles
|
|
||||||
import pkg/chronos
|
|
||||||
import pkg/questionable
|
|
||||||
import pkg/questionable/results
|
|
||||||
|
|
||||||
import pkg/datastore
|
|
||||||
import pkg/datastore/typedds
|
|
||||||
import pkg/metrics
|
|
||||||
|
|
||||||
import ./config
|
|
||||||
import ./list
|
|
||||||
|
|
||||||
logScope:
|
|
||||||
topics = "main"
|
|
||||||
|
|
||||||
declareGauge(example, "testing")
|
|
||||||
|
|
||||||
proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} =
|
|
||||||
without exampleStore =? LevelDbDatastore.new(config.dataDir / "example"):
|
|
||||||
error "Failed to create datastore"
|
|
||||||
return failure("Failed to create datastore")
|
|
||||||
|
|
||||||
let typedDs = TypedDatastore.init(exampleStore)
|
|
||||||
|
|
||||||
proc onExampleMetric(value: int64) =
|
|
||||||
example.set(value)
|
|
||||||
|
|
||||||
var exampleList = List.new("example", typedDs, onExampleMetric)
|
|
||||||
if err =? (await exampleList.load()).errorOption:
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
proc aaa() {.async.} =
|
|
||||||
var i = 0
|
|
||||||
while true:
|
|
||||||
trace "a"
|
|
||||||
await sleepAsync(1000)
|
|
||||||
discard await exampleList.add(Entry(id: $i, value: "str!"))
|
|
||||||
inc i
|
|
||||||
|
|
||||||
asyncSpawn aaa()
|
|
||||||
|
|
||||||
await sleepAsync(1000)
|
|
||||||
|
|
||||||
notice "b"
|
|
||||||
return success()
|
|
||||||
@ -9,17 +9,3 @@ type NodeEntry* = object
|
|||||||
|
|
||||||
proc `$`*(entry: NodeEntry): string =
|
proc `$`*(entry: NodeEntry): string =
|
||||||
entry.id & ":" & entry.value
|
entry.id & ":" & entry.value
|
||||||
|
|
||||||
proc encode(s: NodeEntry): seq[byte] =
|
|
||||||
(s.id & ";" & s.value).toBytes()
|
|
||||||
|
|
||||||
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
|
|
||||||
let s = string.fromBytes(bytes)
|
|
||||||
if s.len == 0:
|
|
||||||
return success(NodeEntry(id: "", value: ""))
|
|
||||||
|
|
||||||
let tokens = s.split(";")
|
|
||||||
if tokens.len != 2:
|
|
||||||
return failure("expected 2 tokens")
|
|
||||||
|
|
||||||
success(NodeEntry(id: tokens[0], value: tokens[1]))
|
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
--define:metrics
|
--define:
|
||||||
|
metrics
|
||||||
# switch("define", "chronicles_runtime_filtering=true")
|
# switch("define", "chronicles_runtime_filtering=true")
|
||||||
switch("define", "chronicles_log_level=TRACE")
|
switch("define", "chronicles_log_level=TRACE")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user