mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
use valueOr
This commit is contained in:
parent
ef36c2f513
commit
5b80f45922
@ -82,81 +82,74 @@ proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.asy
|
||||
let chanKey = toKey(channelId)
|
||||
|
||||
block lamport:
|
||||
let r = await job.get(CatLamport, chanKey)
|
||||
if r.isOk:
|
||||
let opt = r.get
|
||||
if opt.isSome:
|
||||
try:
|
||||
snap.lamportTimestamp = fromBlob(opt.get, int64)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: get lamport failed", channelId, err = $r.error
|
||||
let opt = (await job.get(CatLamport, chanKey)).valueOr:
|
||||
warn "sds-persistency: get lamport failed", channelId, err = $error
|
||||
break lamport
|
||||
if opt.isSome:
|
||||
try:
|
||||
snap.lamportTimestamp = fromBlob(opt.get, int64)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg
|
||||
|
||||
block log:
|
||||
let r = await job.scanPrefix(CatLog, chanKey)
|
||||
if r.isOk:
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in r.get:
|
||||
try:
|
||||
msgs.add(fromBlob(row.payload, SdsMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid log row", channelId, err = e.msg
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
snap.messageHistory = msgs
|
||||
else:
|
||||
warn "sds-persistency: scan log failed", channelId, err = $r.error
|
||||
let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan log failed", channelId, err = $error
|
||||
break log
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in rows:
|
||||
try:
|
||||
msgs.add(fromBlob(row.payload, SdsMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid log row", channelId, err = e.msg
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
snap.messageHistory = msgs
|
||||
|
||||
block outgoing:
|
||||
let r = await job.scanPrefix(CatOutgoing, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid outgoing row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan outgoing failed", channelId, err = $r.error
|
||||
let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan outgoing failed", channelId, err = $error
|
||||
break outgoing
|
||||
for row in rows:
|
||||
try:
|
||||
snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid outgoing row", channelId, err = e.msg
|
||||
|
||||
block incoming:
|
||||
let r = await job.scanPrefix(CatIncoming, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid incoming row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan incoming failed", channelId, err = $r.error
|
||||
let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan incoming failed", channelId, err = $error
|
||||
break incoming
|
||||
for row in rows:
|
||||
try:
|
||||
snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid incoming row", channelId, err = e.msg
|
||||
|
||||
block outRepair:
|
||||
let r = await job.scanPrefix(CatOutRepair, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.outgoingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid out-repair row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan out-repair failed", channelId, err = $r.error
|
||||
let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan out-repair failed", channelId, err = $error
|
||||
break outRepair
|
||||
for row in rows:
|
||||
try:
|
||||
snap.outgoingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid out-repair row", channelId, err = e.msg
|
||||
|
||||
block inRepair:
|
||||
let r = await job.scanPrefix(CatInRepair, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.incomingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid in-repair row", channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan in-repair failed", channelId, err = $r.error
|
||||
let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan in-repair failed", channelId, err = $error
|
||||
break inRepair
|
||||
for row in rows:
|
||||
try:
|
||||
snap.incomingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid in-repair row", channelId, err = e.msg
|
||||
|
||||
return snap
|
||||
|
||||
@ -170,18 +163,17 @@ proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} =
|
||||
|
||||
let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair]
|
||||
for cat in cats:
|
||||
let r = await job.scanPrefix(cat, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
ops.add(TxOp(category: cat, key: row.key, kind: txDelete))
|
||||
if cat == CatLog:
|
||||
try:
|
||||
hintIds.add(fromBlob(row.payload, SdsMessage).messageId)
|
||||
except ValueError:
|
||||
discard
|
||||
else:
|
||||
let rows = (await job.scanPrefix(cat, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan during drop failed",
|
||||
channelId, category = cat, err = $r.error
|
||||
channelId, category = cat, err = $error
|
||||
continue
|
||||
for row in rows:
|
||||
ops.add(TxOp(category: cat, key: row.key, kind: txDelete))
|
||||
if cat == CatLog:
|
||||
try:
|
||||
hintIds.add(fromBlob(row.payload, SdsMessage).messageId)
|
||||
except ValueError:
|
||||
discard
|
||||
|
||||
ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete))
|
||||
for id in hintIds:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user