mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
nph formatting
This commit is contained in:
parent
4a8e9fe553
commit
2939ad941e
@ -56,12 +56,14 @@ suite "Persistency blob codec":
|
||||
check fromBlob(toBlob(u), UnacknowledgedMessage) == u
|
||||
|
||||
test "IncomingMessage round-trips (HashSet)":
|
||||
let inc = IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"]))
|
||||
let inc =
|
||||
IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"]))
|
||||
check fromBlob(toBlob(inc), IncomingMessage) == inc
|
||||
|
||||
test "repair tuples round-trip":
|
||||
let outPair =
|
||||
("msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0)))
|
||||
let outPair = (
|
||||
"msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0))
|
||||
)
|
||||
check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair
|
||||
|
||||
let inPair = (
|
||||
|
||||
@ -136,7 +136,9 @@ proc writePart*(buf: var seq[byte], b: seq[byte]) =
|
||||
for x in b:
|
||||
buf.add(x)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[seq[byte]]): seq[byte] {.raises: [ValueError].} =
|
||||
proc readPart*(
|
||||
r: var ReadCtx, _: typedesc[seq[byte]]
|
||||
): seq[byte] {.raises: [ValueError].} =
|
||||
let n = int(readPart(r, uint32))
|
||||
r.need(n)
|
||||
result = newSeq[byte](n)
|
||||
@ -206,9 +208,7 @@ proc writePart*[T: tuple](buf: var seq[byte], v: T) =
|
||||
for f in fields(v):
|
||||
writePart(buf, f)
|
||||
|
||||
proc readPart*[T: tuple](
|
||||
r: var ReadCtx, _: typedesc[T]
|
||||
): T {.raises: [ValueError].} =
|
||||
proc readPart*[T: tuple](r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
for f in fields(result):
|
||||
f = readPart(r, typeof(f))
|
||||
@ -264,7 +264,9 @@ macro BlobCodec*(T: typedesc): untyped =
|
||||
name = ident "writePart",
|
||||
params = [
|
||||
newEmptyNode(),
|
||||
newIdentDefs(bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))),
|
||||
newIdentDefs(
|
||||
bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))
|
||||
),
|
||||
newIdentDefs(vId, tSym),
|
||||
],
|
||||
body = writeBody,
|
||||
@ -287,9 +289,9 @@ macro BlobCodec*(T: typedesc): untyped =
|
||||
],
|
||||
body = readBody,
|
||||
)
|
||||
readProc.addPragma(nnkExprColonExpr.newTree(
|
||||
ident "raises", nnkBracket.newTree(ident "ValueError")
|
||||
))
|
||||
readProc.addPragma(
|
||||
nnkExprColonExpr.newTree(ident "raises", nnkBracket.newTree(ident "ValueError"))
|
||||
)
|
||||
|
||||
result = newStmtList(writeProc, readProc)
|
||||
|
||||
|
||||
@ -76,9 +76,7 @@ BlobCodec(IncomingRepairEntry)
|
||||
|
||||
# ── Async backing procs ─────────────────────────────────────────────────
|
||||
|
||||
proc doLoadAll(
|
||||
job: Job, channelId: SdsChannelID
|
||||
): Future[ChannelSnapshot] {.async.} =
|
||||
proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} =
|
||||
var snap = ChannelSnapshot()
|
||||
let chanKey = toKey(channelId)
|
||||
|
||||
@ -90,11 +88,9 @@ proc doLoadAll(
|
||||
try:
|
||||
snap.lamportTimestamp = fromBlob(opt.get, int64)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid lamport bytes",
|
||||
channelId, err = e.msg
|
||||
warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: get lamport failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: get lamport failed", channelId, err = $r.error
|
||||
|
||||
block log:
|
||||
let r = await job.scanPrefix(CatLog, chanKey)
|
||||
@ -111,8 +107,7 @@ proc doLoadAll(
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
snap.messageHistory = msgs
|
||||
else:
|
||||
warn "sds-persistency: scan log failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: scan log failed", channelId, err = $r.error
|
||||
|
||||
block outgoing:
|
||||
let r = await job.scanPrefix(CatOutgoing, chanKey)
|
||||
@ -121,11 +116,9 @@ proc doLoadAll(
|
||||
try:
|
||||
snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid outgoing row",
|
||||
channelId, err = e.msg
|
||||
warn "sds-persistency: invalid outgoing row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan outgoing failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: scan outgoing failed", channelId, err = $r.error
|
||||
|
||||
block incoming:
|
||||
let r = await job.scanPrefix(CatIncoming, chanKey)
|
||||
@ -134,11 +127,9 @@ proc doLoadAll(
|
||||
try:
|
||||
snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid incoming row",
|
||||
channelId, err = e.msg
|
||||
warn "sds-persistency: invalid incoming row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan incoming failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: scan incoming failed", channelId, err = $r.error
|
||||
|
||||
block outRepair:
|
||||
let r = await job.scanPrefix(CatOutRepair, chanKey)
|
||||
@ -149,11 +140,9 @@ proc doLoadAll(
|
||||
fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid out-repair row",
|
||||
channelId, err = e.msg
|
||||
warn "sds-persistency: invalid out-repair row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan out-repair failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: scan out-repair failed", channelId, err = $r.error
|
||||
|
||||
block inRepair:
|
||||
let r = await job.scanPrefix(CatInRepair, chanKey)
|
||||
@ -164,11 +153,9 @@ proc doLoadAll(
|
||||
fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid in-repair row",
|
||||
channelId, err = e.msg
|
||||
warn "sds-persistency: invalid in-repair row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan in-repair failed",
|
||||
channelId, err = $r.error
|
||||
warn "sds-persistency: scan in-repair failed", channelId, err = $r.error
|
||||
|
||||
return snap
|
||||
|
||||
@ -180,8 +167,7 @@ proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} =
|
||||
var ops: seq[TxOp] = @[]
|
||||
var hintIds: seq[SdsMessageID] = @[]
|
||||
|
||||
let cats =
|
||||
[CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair]
|
||||
let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair]
|
||||
for cat in cats:
|
||||
let r = await job.scanPrefix(cat, chanKey)
|
||||
if r.isOk:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user