mirror of
https://github.com/logos-storage/logos-storage-network-crawler.git
synced 2026-01-04 06:23:08 +00:00
cleanup
This commit is contained in:
parent
3f7392691e
commit
75a6f38dbe
@ -4,58 +4,18 @@ import pkg/chronos
|
|||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
import ./codexcrawler/main
|
import ./codexcrawler/application
|
||||||
import ./codexcrawler/config
|
|
||||||
import ./codexcrawler/metrics
|
|
||||||
import ./codexcrawler/logging
|
|
||||||
|
|
||||||
when defined(posix):
|
when defined(posix):
|
||||||
import system/ansi_c
|
import system/ansi_c
|
||||||
|
|
||||||
type
|
|
||||||
ApplicationStatus {.pure.} = enum
|
|
||||||
Stopped
|
|
||||||
Stopping
|
|
||||||
Running
|
|
||||||
|
|
||||||
Application = ref object
|
|
||||||
status: ApplicationStatus
|
|
||||||
|
|
||||||
proc run(app: Application) =
|
|
||||||
let config = parseConfig()
|
|
||||||
info "Loaded configuration", config
|
|
||||||
|
|
||||||
# Configure loglevel
|
|
||||||
updateLogLevel(config.logLevel)
|
|
||||||
|
|
||||||
# Ensure datadir path exists:
|
|
||||||
if not existsDir(config.dataDir):
|
|
||||||
createDir(config.dataDir)
|
|
||||||
|
|
||||||
setupMetrics(config.metricsAddress, config.metricsPort)
|
|
||||||
info "Metrics endpoint initialized"
|
|
||||||
|
|
||||||
info "Starting application"
|
|
||||||
app.status = ApplicationStatus.Running
|
|
||||||
if err =? (waitFor startApplication(config)).errorOption:
|
|
||||||
app.status = ApplicationStatus.Stopping
|
|
||||||
error "Failed to start application", err = err.msg
|
|
||||||
|
|
||||||
while app.status == ApplicationStatus.Running:
|
|
||||||
try:
|
|
||||||
chronos.poll()
|
|
||||||
except Exception as exc:
|
|
||||||
error "Unhandled exception", msg = exc.msg
|
|
||||||
quit QuitFailure
|
|
||||||
notice "Application closed"
|
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
let app = Application()
|
let app = Application()
|
||||||
|
|
||||||
# Stopping code must be in scope of app declaration.
|
# Stopping code must be in scope of app declaration.
|
||||||
# Else capture of the instance is not allowed due to {.noconv.}.
|
# Else capture of the instance is not allowed due to {.noconv.}.
|
||||||
proc onStopSignal() =
|
proc onStopSignal() =
|
||||||
app.status = ApplicationStatus.Stopping
|
app.stop()
|
||||||
notice "Stopping Crawler..."
|
notice "Stopping Crawler..."
|
||||||
|
|
||||||
proc controlCHandler() {.noconv.} =
|
proc controlCHandler() {.noconv.} =
|
||||||
|
|||||||
51
codexcrawler/application.nim
Normal file
51
codexcrawler/application.nim
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
import std/os
|
||||||
|
import pkg/chronicles
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import ./config
|
||||||
|
import ./logging
|
||||||
|
import ./metrics
|
||||||
|
|
||||||
|
import ./main
|
||||||
|
|
||||||
|
type
|
||||||
|
ApplicationStatus* {.pure.} = enum
|
||||||
|
Stopped
|
||||||
|
Stopping
|
||||||
|
Running
|
||||||
|
|
||||||
|
Application* = ref object
|
||||||
|
status: ApplicationStatus
|
||||||
|
|
||||||
|
proc run*(app: Application) =
|
||||||
|
let config = parseConfig()
|
||||||
|
info "Loaded configuration", config
|
||||||
|
|
||||||
|
# Configure loglevel
|
||||||
|
updateLogLevel(config.logLevel)
|
||||||
|
|
||||||
|
# Ensure datadir path exists:
|
||||||
|
if not existsDir(config.dataDir):
|
||||||
|
createDir(config.dataDir)
|
||||||
|
|
||||||
|
setupMetrics(config.metricsAddress, config.metricsPort)
|
||||||
|
info "Metrics endpoint initialized"
|
||||||
|
|
||||||
|
info "Starting application"
|
||||||
|
app.status = ApplicationStatus.Running
|
||||||
|
if err =? (waitFor startApplication(config)).errorOption:
|
||||||
|
app.status = ApplicationStatus.Stopping
|
||||||
|
error "Failed to start application", err = err.msg
|
||||||
|
|
||||||
|
while app.status == ApplicationStatus.Running:
|
||||||
|
try:
|
||||||
|
chronos.poll()
|
||||||
|
except Exception as exc:
|
||||||
|
error "Unhandled exception", msg = exc.msg
|
||||||
|
quit QuitFailure
|
||||||
|
notice "Application closed"
|
||||||
|
|
||||||
|
proc stop*(app: Application) =
|
||||||
|
app.status = ApplicationStatus.Stopping
|
||||||
@ -3,62 +3,35 @@ import pkg/chronicles
|
|||||||
import pkg/metrics
|
import pkg/metrics
|
||||||
import pkg/datastore
|
import pkg/datastore
|
||||||
import pkg/datastore/typedds
|
import pkg/datastore/typedds
|
||||||
import pkg/stew/byteutils
|
|
||||||
import pkg/stew/endians2
|
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
import std/os
|
import std/os
|
||||||
import std/times
|
|
||||||
import std/options
|
import ./nodeentry
|
||||||
import std/tables
|
|
||||||
import std/strutils
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "list"
|
topics = "list"
|
||||||
|
|
||||||
type
|
type
|
||||||
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises:[].}
|
OnUpdateMetric = proc(value: int64): void {.gcsafe, raises: [].}
|
||||||
Entry* = object
|
|
||||||
id*: string # will be node ID
|
|
||||||
value*: string
|
|
||||||
|
|
||||||
List* = ref object
|
List* = ref object
|
||||||
name: string
|
name: string
|
||||||
store: TypedDatastore
|
store: TypedDatastore
|
||||||
items: seq[Entry]
|
items: seq[NodeEntry]
|
||||||
onMetric: OnUpdateMetric
|
onMetric: OnUpdateMetric
|
||||||
|
|
||||||
proc `$`*(entry: Entry): string =
|
proc saveItem(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||||
entry.id & ":" & entry.value
|
|
||||||
|
|
||||||
proc encode(s: Entry): seq[byte] =
|
|
||||||
(s.id & ";" & s.value).toBytes()
|
|
||||||
|
|
||||||
proc decode(T: type Entry, bytes: seq[byte]): ?!T =
|
|
||||||
let s = string.fromBytes(bytes)
|
|
||||||
if s.len == 0:
|
|
||||||
return success(Entry(id: "", value: ""))
|
|
||||||
|
|
||||||
let tokens = s.split(";")
|
|
||||||
if tokens.len != 2:
|
|
||||||
return failure("expected 2 tokens")
|
|
||||||
|
|
||||||
success(Entry(
|
|
||||||
id: tokens[0],
|
|
||||||
value: tokens[1]
|
|
||||||
))
|
|
||||||
|
|
||||||
proc saveItem(this: List, item: Entry): Future[?!void] {.async.} =
|
|
||||||
without itemKey =? Key.init(this.name / item.id), err:
|
without itemKey =? Key.init(this.name / item.id), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
? await this.store.put(itemKey, item)
|
?await this.store.put(itemKey, item)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc load*(this: List): Future[?!void] {.async.}=
|
proc load*(this: List): Future[?!void] {.async.} =
|
||||||
without queryKey =? Key.init(this.name), err:
|
without queryKey =? Key.init(this.name), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
without iter =? (await query[Entry](this.store, Query.init(queryKey))), err:
|
without iter =? (await query[NodeEntry](this.store, Query.init(queryKey))), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
while not iter.finished:
|
while not iter.finished:
|
||||||
@ -73,19 +46,11 @@ proc load*(this: List): Future[?!void] {.async.}=
|
|||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
_: type List,
|
_: type List, name: string, store: TypedDatastore, onMetric: OnUpdateMetric
|
||||||
name: string,
|
|
||||||
store: TypedDatastore,
|
|
||||||
onMetric: OnUpdateMetric
|
|
||||||
): List =
|
): List =
|
||||||
List(
|
List(name: name, store: store, items: newSeq[NodeEntry](), onMetric: onMetric)
|
||||||
name: name,
|
|
||||||
store: store,
|
|
||||||
items: newSeq[Entry](),
|
|
||||||
onMetric: onMetric
|
|
||||||
)
|
|
||||||
|
|
||||||
proc add*(this: List, item: Entry): Future[?!void] {.async.} =
|
proc add*(this: List, item: NodeEntry): Future[?!void] {.async.} =
|
||||||
this.items.add(item)
|
this.items.add(item)
|
||||||
this.onMetric(this.items.len.int64)
|
this.onMetric(this.items.len.int64)
|
||||||
|
|
||||||
|
|||||||
@ -9,8 +9,7 @@ proc updateLogLevel*(logLevel: string) =
|
|||||||
notice "Updating logLevel", logLevel
|
notice "Updating logLevel", logLevel
|
||||||
let directives = logLevel.split(";")
|
let directives = logLevel.split(";")
|
||||||
try:
|
try:
|
||||||
setLogLevel(LogLevel.TRACE)
|
setLogLevel(LogLevel.TRACE) #parseEnum[LogLevel](directives[0].toUpperAscii))
|
||||||
#parseEnum[LogLevel](directives[0].toUpperAscii))
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
notice "valueerror logLevel", logLevel
|
notice "valueerror logLevel", logLevel
|
||||||
raise (ref ValueError)(
|
raise (ref ValueError)(
|
||||||
|
|||||||
@ -35,10 +35,7 @@ proc startApplication*(config: CrawlerConfig): Future[?!void] {.async.} =
|
|||||||
while true:
|
while true:
|
||||||
trace "a"
|
trace "a"
|
||||||
await sleepAsync(1000)
|
await sleepAsync(1000)
|
||||||
discard await exampleList.add(Entry(
|
discard await exampleList.add(Entry(id: $i, value: "str!"))
|
||||||
id: $i,
|
|
||||||
value: "str!"
|
|
||||||
))
|
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
asyncSpawn aaa()
|
asyncSpawn aaa()
|
||||||
|
|||||||
25
codexcrawler/nodeentry.nim
Normal file
25
codexcrawler/nodeentry.nim
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import pkg/stew/byteutils
|
||||||
|
import pkg/stew/endians2
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
type NodeEntry* = object
|
||||||
|
id*: string # will be node ID
|
||||||
|
value*: string
|
||||||
|
|
||||||
|
proc `$`*(entry: NodeEntry): string =
|
||||||
|
entry.id & ":" & entry.value
|
||||||
|
|
||||||
|
proc encode(s: NodeEntry): seq[byte] =
|
||||||
|
(s.id & ";" & s.value).toBytes()
|
||||||
|
|
||||||
|
proc decode(T: type NodeEntry, bytes: seq[byte]): ?!T =
|
||||||
|
let s = string.fromBytes(bytes)
|
||||||
|
if s.len == 0:
|
||||||
|
return success(NodeEntry(id: "", value: ""))
|
||||||
|
|
||||||
|
let tokens = s.split(";")
|
||||||
|
if tokens.len != 2:
|
||||||
|
return failure("expected 2 tokens")
|
||||||
|
|
||||||
|
success(NodeEntry(id: tokens[0], value: tokens[1]))
|
||||||
Loading…
x
Reference in New Issue
Block a user