diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index 41fbf1cf0..a52b7b4e9 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -82,81 +82,74 @@ proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.asy let chanKey = toKey(channelId) block lamport: - let r = await job.get(CatLamport, chanKey) - if r.isOk: - let opt = r.get - if opt.isSome: - try: - snap.lamportTimestamp = fromBlob(opt.get, int64) - except ValueError as e: - warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg - else: - warn "sds-persistency: get lamport failed", channelId, err = $r.error + let opt = (await job.get(CatLamport, chanKey)).valueOr: + warn "sds-persistency: get lamport failed", channelId, err = $error + break lamport + if opt.isSome: + try: + snap.lamportTimestamp = fromBlob(opt.get, int64) + except ValueError as e: + warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg block log: - let r = await job.scanPrefix(CatLog, chanKey) - if r.isOk: - var msgs = newSeq[SdsMessage]() - for row in r.get: - try: - msgs.add(fromBlob(row.payload, SdsMessage)) - except ValueError as e: - warn "sds-persistency: invalid log row", channelId, err = e.msg - msgs.sort do(a, b: SdsMessage) -> int: - result = cmp(a.lamportTimestamp, b.lamportTimestamp) - if result == 0: - result = cmp(a.messageId, b.messageId) - snap.messageHistory = msgs - else: - warn "sds-persistency: scan log failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: + warn "sds-persistency: scan log failed", channelId, err = $error + break log + var msgs = newSeq[SdsMessage]() + for row in rows: + try: + msgs.add(fromBlob(row.payload, SdsMessage)) + except ValueError as e: + warn "sds-persistency: invalid log row", channelId, err = e.msg + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + snap.messageHistory = msgs block outgoing: - let r = await job.scanPrefix(CatOutgoing, chanKey) - if r.isOk: - for row in r.get: - try: - snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) - except ValueError as e: - warn "sds-persistency: invalid outgoing row", channelId, err = e.msg - else: - warn "sds-persistency: scan outgoing failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr: + warn "sds-persistency: scan outgoing failed", channelId, err = $error + break outgoing + for row in rows: + try: + snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) + except ValueError as e: + warn "sds-persistency: invalid outgoing row", channelId, err = e.msg block incoming: - let r = await job.scanPrefix(CatIncoming, chanKey) - if r.isOk: - for row in r.get: - try: - snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) - except ValueError as e: - warn "sds-persistency: invalid incoming row", channelId, err = e.msg - else: - warn "sds-persistency: scan incoming failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr: + warn "sds-persistency: scan incoming failed", channelId, err = $error + break incoming + for row in rows: + try: + snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) + except ValueError as e: + warn "sds-persistency: invalid incoming row", channelId, err = e.msg block outRepair: - let r = await job.scanPrefix(CatOutRepair, chanKey) - if r.isOk: - for row in r.get: - try: - snap.outgoingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid out-repair row", channelId, err = e.msg - else: - warn "sds-persistency: scan out-repair failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr: + warn "sds-persistency: scan out-repair failed", channelId, err = $error + break outRepair + for row in rows: + try: + snap.outgoingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid out-repair row", channelId, err = e.msg block inRepair: - let r = await job.scanPrefix(CatInRepair, chanKey) - if r.isOk: - for row in r.get: - try: - snap.incomingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid in-repair row", channelId, err = e.msg - else: - warn "sds-persistency: scan in-repair failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr: + warn "sds-persistency: scan in-repair failed", channelId, err = $error + break inRepair + for row in rows: + try: + snap.incomingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid in-repair row", channelId, err = e.msg return snap @@ -170,18 +163,17 @@ proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] for cat in cats: - let r = await job.scanPrefix(cat, chanKey) - if r.isOk: - for row in r.get: - ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) - if cat == CatLog: - try: - hintIds.add(fromBlob(row.payload, SdsMessage).messageId) - except ValueError: - discard - else: + let rows = (await job.scanPrefix(cat, chanKey)).valueOr: warn "sds-persistency: scan during drop failed", - channelId, category = cat, err = $r.error + channelId, category = cat, err = $error + continue + for row in rows: + ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) + if cat == CatLog: + try: + hintIds.add(fromBlob(row.payload, SdsMessage).messageId) + except ValueError: + discard ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) for id in hintIds: