Remove protobuf serialization (#289)

* add format for cid

* cid formatIt change

* track nim-libp2p-unstable

* rework probuf serialization for por

* add missing include

* removing nim protobuf serialization

* rollback to dht to main

* remove protobuf serialization import
This commit is contained in:
Dmitriy Ryajov 2022-10-27 07:41:34 -06:00 committed by GitHub
parent 92eecb0702
commit e50ea88411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 444 additions and 103 deletions

5
.gitmodules vendored
View File

@ -33,11 +33,6 @@
url = https://github.com/status-im/nim-stew.git
ignore = untracked
branch = master
[submodule "vendor/nim-protobuf-serialization"]
path = vendor/nim-protobuf-serialization
url = https://github.com/status-im/nim-protobuf-serialization.git
ignore = untracked
branch = master
[submodule "vendor/nim-nitro"]
path = vendor/nim-nitro
url = https://github.com/status-im/nim-nitro.git

View File

@ -103,19 +103,19 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} =
cid = await b.advertiseQueue.get()
if cid in b.inFlightAdvReqs:
trace "Advertise request already in progress", cid = $cid
trace "Advertise request already in progress", cid
continue
try:
let request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request
codex_inflight_discovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertising block", cid = $cid, inflight = b.inFlightAdvReqs.len
trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len
await request
finally:
b.inFlightAdvReqs.del(cid)
codex_inflight_discovery.set(b.inFlightAdvReqs.len.int64)
trace "Advertised block", cid = $cid, inflight = b.inFlightAdvReqs.len
trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len
except CatchableError as exc:
trace "Exception in advertise task runner", exc = exc.msg
@ -131,15 +131,15 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
cid = await b.discoveryQueue.get()
if cid in b.inFlightDiscReqs:
trace "Discovery request already in progress", cid = $cid
trace "Discovery request already in progress", cid
continue
let
haves = b.peers.peersHave(cid)
trace "Current number of peers for block", cid = $cid, count = haves.len
trace "Current number of peers for block", cid, count = haves.len
if haves.len < b.minPeersPerBlock:
trace "Discovering block", cid = $cid
trace "Discovering block", cid
try:
let
request = b.discovery
@ -173,7 +173,7 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
try:
for cid in cids:
if cid notin b.discoveryQueue:
trace "Queueing find block request", cid = $cid
trace "Queueing find block request", cid
await b.discoveryQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
@ -185,7 +185,7 @@ proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
try:
for cid in cids:
if cid notin b.advertiseQueue:
trace "Queueing provide block request", cid = $cid
trace "Queueing provide block request", cid
await b.advertiseQueue.put(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg

View File

@ -122,7 +122,7 @@ proc requestBlock*(
## Request a block from remotes
##
trace "Requesting block", cid = $cid
trace "Requesting block", cid
if cid in b.pendingBlocks:
return await b.pendingBlocks.getWantHandle(cid, timeout)
@ -136,7 +136,7 @@ proc requestBlock*(
if peers.len <= 0:
peers = toSeq(b.peers) # Get any peer
if peers.len <= 0:
trace "No peers to request blocks from", cid = $cid
trace "No peers to request blocks from", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk
@ -150,7 +150,7 @@ proc requestBlock*(
wantType = WantType.wantBlock) # we want this remote to send us a block
if (peers.len - 1) == 0:
trace "Not enough peers to send want list to", cid = $cid
trace "Not enough peers to send want list to", cid
b.discovery.queueFindBlocksReq(@[cid])
return await blk # no peers to send wants to

View File

@ -1,4 +1,3 @@
import pkg/protobuf_serialization
import pkg/stew/byteutils
import pkg/stint
import pkg/nitro

View File

@ -18,9 +18,13 @@ import pkg/libp2p
import pkg/stew/byteutils
import pkg/questionable
import pkg/questionable/results
import pkg/chronicles
import ./formats
import ./errors
export errors, formats
const
# Size of blocks for storage / network exchange,
# should be divisible by 31 for PoR and by 64 for Leopard ECC

View File

@ -22,6 +22,7 @@ import pkg/libp2pdht/discv5/protocol as discv5
import ./rng
import ./errors
import ./formats
export discv5
@ -82,10 +83,10 @@ method find*(
## Find block providers
##
trace "Finding providers for block", cid = $cid
trace "Finding providers for block", cid
without providers =?
(await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
trace "Error finding providers for block", cid = $cid, error = error.msg
trace "Error finding providers for block", cid, error = error.msg
return providers
@ -93,7 +94,7 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid
##
trace "Providing block", cid = $cid
trace "Providing block", cid
let
nodes = await d.protocol.addProvider(
cid.toNodeId(),

28
codex/formats.nim Normal file
View File

@ -0,0 +1,28 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/strutils
import pkg/chronicles
import pkg/libp2p
func shortLog*(cid: Cid): string =
## Returns compact string representation of ``pid``.
var scid = $cid
if len(scid) > 10:
scid[3] = '*'
when (NimMajor, NimMinor) > (1, 4):
scid.delete(4 .. scid.high - 6)
else:
scid.delete(4, scid.high - 6)
scid
chronicles.formatIt(Cid): shortLog(it)

View File

@ -10,6 +10,7 @@
import std/options
import std/tables
import std/sequtils
import std/strformat
import pkg/questionable
import pkg/questionable/results
@ -188,7 +189,7 @@ proc store*(
blockManifest.add(blk.cid)
if isErr (await node.blockStore.putBlock(blk)):
# trace "Unable to store block", cid = blk.cid
return failure("Unable to store block " & $blk.cid)
return failure(&"Unable to store block {blk.cid}")
except CancelledError as exc:
raise exc

View File

@ -247,7 +247,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
trace "Error uploading file", exc = error.msg
return RestApiResponse.error(Http500, error.msg)
trace "Uploaded file", cid = $cid
trace "Uploaded file", cid
return RestApiResponse.response($cid)
except CancelledError as exc:
return RestApiResponse.error(Http500)

View File

@ -1,4 +1,4 @@
## Nim-POS
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))

View File

@ -0,0 +1,185 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/questionable/results
import pkg/libp2p/protobuf/minprotobuf
type
TauZeroMessage* = object
name*: seq[byte]
n*: int64
u*: seq[seq[byte]]
TauMessage* = object
t*: TauZeroMessage
signature*: seq[byte]
PubKeyMessage* = object
signkey*: seq[byte]
key*: seq[byte]
PorMessage* = object
tau*: TauMessage
spk*: PubKeyMessage
authenticators*: seq[seq[byte]]
ProofMessage* = object
mu*: seq[seq[byte]]
sigma*: seq[byte]
PoREnvelope* = object
por*: PorMessage
proof*: ProofMessage
func write*(pb: var ProtoBuffer, field: int, value: TauZeroMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.name)
ipb.write(2, value.n.uint64)
for u in value.u:
ipb.write(3, u)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: TauMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.t)
ipb.write(2, value.signature)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: PubKeyMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.signkey)
ipb.write(2, value.key)
ipb.finish()
pb.write(field, ipb)
func write*(pb: var ProtoBuffer, field: int, value: PorMessage) =
var ipb = initProtoBuffer()
ipb.write(1, value.tau)
ipb.write(2, value.spk)
for a in value.authenticators:
ipb.write(3, a)
ipb.finish()
pb.write(field, ipb)
func encode*(msg: PorMessage): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, msg.tau)
ipb.write(2, msg.spk)
for a in msg.authenticators:
ipb.write(3, a)
ipb.finish
ipb.buffer
func write*(pb: var ProtoBuffer, field: int, value: ProofMessage) =
var ipb = initProtoBuffer()
for mu in value.mu:
ipb.write(1, mu)
ipb.write(2, value.sigma)
ipb.finish()
pb.write(field, ipb)
func encode*(message: PoREnvelope): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, message.por)
ipb.write(2, message.proof)
ipb.finish
ipb.buffer
proc decode*(_: type TauZeroMessage, pb: ProtoBuffer): ProtoResult[TauZeroMessage] =
var
value = TauZeroMessage()
discard ? pb.getField(1, value.name)
var val: uint64
discard ? pb.getField(2, val)
value.n = val.int64
var bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(3, bytes)
for b in bytes:
value.u.add(b)
ok(value)
proc decode*(_: type TauMessage, pb: ProtoBuffer): ProtoResult[TauMessage] =
var
value = TauMessage()
ipb: ProtoBuffer
discard ? pb.getField(1, ipb)
value.t = ? TauZeroMessage.decode(ipb)
discard ? pb.getField(2, value.signature)
ok(value)
proc decode*(_: type PubKeyMessage, pb: ProtoBuffer): ProtoResult[PubKeyMessage] =
var
value = PubKeyMessage()
discard ? pb.getField(1, value.signkey)
discard ? pb.getField(2, value.key)
ok(value)
proc decode*(_: type PorMessage, pb: ProtoBuffer): ProtoResult[PorMessage] =
var
value = PorMessage()
ipb: ProtoBuffer
discard ? pb.getField(1, ipb)
value.tau = ? TauMessage.decode(ipb)
discard ? pb.getField(2, ipb)
value.spk = ? PubKeyMessage.decode(ipb)
var
bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(3, bytes)
for b in bytes:
value.authenticators.add(b)
ok(value)
proc decode*(_: type PorMessage, msg: seq[byte]): ProtoResult[PorMessage] =
PorMessage.decode(initProtoBuffer(msg))
proc decode*(_: type ProofMessage, pb: ProtoBuffer): ProtoResult[ProofMessage] =
var
value = ProofMessage()
discard ? pb.getField(1, value.mu)
discard ? pb.getField(2, value.sigma)
ok(value)
func decode*(_: type PoREnvelope, msg: openArray[byte]): ?!PoREnvelope =
var
value = PoREnvelope()
pb = initProtoBuffer(msg)
discard ? pb.getField(1, ? value.por.decode)
discard ? pb.getField(2, ? value.proof.decode)
ok(value)

View File

@ -1,33 +0,0 @@
syntax = "proto3";
message PoREnvelope {
message TauZeroMessage {
bytes name = 1;
int64 n = 2;
repeated bytes u = 3;
}
message TauMessage {
TauZeroMessage t = 1;
bytes signature = 2;
}
message PubKeyMessage {
bytes signkey = 1;
bytes key = 2;
}
message PorMessage {
TauMessage tau = 1;
PubKeyMessage spk = 2;
repeated bytes authenticators = 3;
}
message ProofMessage {
repeated bytes mu = 1;
bytes sigma = 2;
}
PorMessage por = 1;
ProofMessage proof = 2;
}

View File

@ -1,4 +1,4 @@
## Nim-POS
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
@ -9,19 +9,14 @@
import std/sequtils
import pkg/protobuf_serialization
import pkg/stew/results
import pkg/stew/objects
import pkg/blscurve
import pkg/blscurve/blst/blst_abi
import_proto3 "por.proto"
import ./messages
export TauZeroMessage
export TauMessage
export ProofMessage
export PorMessage
export PoREnvelope
export messages
import ../por
@ -33,6 +28,7 @@ func toMessage*(self: Proof): ProofMessage =
for mu in self.mu:
var
serialized: array[32, byte]
blst_bendian_from_scalar(serialized, mu)
message.mu.add(toSeq(serialized))

View File

@ -1,15 +0,0 @@
syntax = "proto3";
message StorageProofsMessage {
message Tag {
int64 idx = 1;
bytes tag = 2;
}
message TagsMessage {
bytes cid = 1;
repeated Tag tags = 2;
}
TagsMessage tagsMsg = 1;
}

View File

@ -15,10 +15,10 @@ import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import pkg/contractabi/address as ca
import pkg/protobuf_serialization
import ./stpproto
import ../discovery
import ../formats
const
Codec* = "/dagger/storageproofs/1.0.0"
@ -59,8 +59,7 @@ proc uploadTags*(
conn = await connFut
try:
await conn.writeLp(
Protobuf.encode(StorageProofsMessage(tagsMsg: msg)))
await conn.writeLp(msg.encode)
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -79,10 +78,11 @@ method init*(self: StpNetwork) =
try:
let
msg = await conn.readLp(MaxMessageSize)
message = Protobuf.decode(msg, StorageProofsMessage)
res = TagsMessage.decode(msg)
if message.tagsMsg.tags.len > 0 and not self.tagsHandle.isNil:
await self.tagsHandle(message.tagsMsg)
if not self.tagsHandle.isNil:
if res.isOk and res.get.tags.len > 0:
await self.tagsHandle(res.get)
except CatchableError as exc:
trace "Exception handling Storage Proofs message", exc = exc.msg
finally:

View File

@ -1,7 +1,3 @@
import pkg/protobuf_serialization
import ./stpproto/messages
import_proto3 "stp.proto"
export StorageProofsMessage
export TagsMessage
export Tag
export messages

View File

@ -0,0 +1,68 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/questionable/results
import pkg/libp2p/protobuf/minprotobuf
import ../../errors
type
Tag* = object
idx*: int64
tag*: seq[byte]
TagsMessage* = object
cid*: seq[byte]
tags*: seq[Tag]
func write*(pb: var ProtoBuffer, field: int, value: Tag) =
var ipb = initProtoBuffer()
ipb.write(1, value.idx.uint64)
ipb.write(2, value.tag)
ipb.finish()
pb.write(field, ipb)
func encode*(msg: TagsMessage): seq[byte] =
var ipb = initProtoBuffer()
ipb.write(1, msg.cid)
for tag in msg.tags:
ipb.write(2, tag)
ipb.finish()
ipb.buffer
func decode*(_: type Tag, pb: ProtoBuffer): ProtoResult[Tag] =
var
value = Tag()
idx: uint64
discard ? pb.getField(1, idx)
value.idx = idx.int64
discard ? pb.getField(2, value.tag)
ok(value)
func decode*(_: type TagsMessage, msg: openArray[byte]): ProtoResult[TagsMessage] =
var
value = TagsMessage()
pb = initProtoBuffer(msg)
discard ? pb.getField(1, value.cid)
var
bytes: seq[seq[byte]]
discard ? pb.getRepeatedField(2, bytes)
for b in bytes:
value.tags.add(? Tag.decode(initProtoBuffer(b)))
ok(value)

View File

@ -16,7 +16,9 @@ import pkg/chronicles
import pkg/stew/io2
import pkg/questionable
import pkg/questionable/results
import pkg/protobuf_serialization
import ../errors
import ../formats
import ./stpproto
import ./por
@ -44,7 +46,7 @@ proc retrieve*(
trace "Cannot retrieve storage proof data from fs", path , error
return failure("Cannot retrieve storage proof data from fs")
return Protobuf.decode(data, PorMessage).success
return PorMessage.decode(data).mapFailure
proc store*(
self: StpStore,
@ -62,12 +64,12 @@ proc store*(
let path = dir / "por"
if (
let res = io2.writeFile(path, Protobuf.encode(por));
let res = io2.writeFile(path, por.encode());
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store storage proofs", path, cid = cid, error
trace "Unable to store storage proofs", path, cid, error
return failure(
&"Unable to store storage proofs - path = ${path} cid = ${$cid} error = ${error}")
&"Unable to store storage proofs - path = ${path} cid = ${cid} error = ${error}")
return success()
@ -106,9 +108,9 @@ proc store*(
let res = io2.writeFile(path, t.tag);
res.isErr):
let error = io2.ioErrorMsg(res.error)
trace "Unable to store tags", path, cid = cid, error
trace "Unable to store tags", path, cid, error
return failure(
&"Unable to store tags - path = ${path} cid = ${$cid} error = ${error}")
&"Unable to store tags - path = ${path} cid = ${cid} error = ${error}")
return success()

View File

@ -13,7 +13,6 @@ push: {.upraises: [].}
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ../blocktype

118
codex/stores/localstore.nim Normal file
View File

@ -0,0 +1,118 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/os
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import ./blockstore
import ../blocktype
import ../namespaces
import ../manifest
export blocktype, libp2p
const
CacheBytesKey* = CodexMetaNamespace / "bytes" / "cache"
CachePersistentKey* = CodexMetaNamespace / "bytes" / "persistent"
type
LocalStore* = ref object of BlockStore
ds*: Datastore
blocksRepo*: BlockStore # TODO: Should be a Datastore
manifestRepo*: BlockStore # TODO: Should be a Datastore
cacheBytes*: uint
persistBytes*: uint
method getBlock*(self: LocalStore, cid: Cid): Future[?!Block] =
## Get a block from the blockstore
##
if cid.isManifest:
self.manifestRepo.getBlock(cid)
else:
self.blocksRepo.getBlock(cid)
method putBlock*(self: LocalStore, blk: Block): Future[?!void] =
## Put a block to the blockstore
##
if blk.cid.isManifest:
self.manifestRepo.putBlock(blk)
else:
self.blocksRepo.putBlock(blk)
method delBlock*(self: LocalStore, cid: Cid): Future[?!void] =
## Delete a block from the blockstore
##
if cid.isManifest:
self.manifestRepo.delBlock(cid)
else:
self.blocksRepo.delBlock(cid)
method hasBlock*(self: LocalStore, cid: Cid): Future[?!bool] =
## Check if the block exists in the blockstore
##
if cid.isManifest:
self.manifestRepo.hasBlock(cid)
else:
self.blocksRepo.hasBlock(cid)
method listBlocks*(
self: LocalStore,
blkType: MultiCodec,
batch = 100,
onBlock: OnBlock): Future[?!void] =
## Get the list of blocks in the LocalStore.
## This is an intensive operation
##
if $blkType in ManifestContainers:
self.manifestRepo.listBlocks(blkType, batch, onBlock)
else:
self.blocksRepo.listBlocks(onBlock)
method close*(self: LocalStore) {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
await self.manifestRepo.close()
await self.blocksRepo.close()
proc contains*(self: LocalStore, blk: Cid): Future[bool] {.async.} =
## Check if the block exists in the blockstore.
## Return false if error encountered
##
return (await self.hasBlock(blk)) |? false
func new*(
T: type LocalStore,
datastore: Datastore,
blocksRepo: BlockStore,
manifestRepo: BlockStore,
cacheBytes: uint,
persistBytes: uint): T =
T(
datastore: datastore,
blocksRepo: blocksRepo,
manifestRepo: manifestRepo,
cacheBytes: cacheBytes,
persistBytes: persistBytes)

View File

@ -9,12 +9,11 @@
## Partially taken from nim beacon chain
import std/strutils
import pkg/upraises
push: {.upraises: [].}
import std/strutils
import pkg/chronicles
import stew/io2

View File

@ -5,7 +5,6 @@ import pkg/asynctest
import pkg/chronos
import pkg/libp2p
import pkg/libp2p/errors
import pkg/protobuf_serialization
import pkg/contractabi as ca
import pkg/codex/rng

@ -1 +0,0 @@
Subproject commit f7d671f877e01213494aac7903421ccdbe70616f