feat: enhance encode/decode procs for multithreading support (#20)

* update encode proc signature

* add new decode proc

* remove old encode/decode procs

* fix tests
This commit is contained in:
munna0908 2025-02-12 00:44:18 +05:30 committed by GitHub
parent aa5f8d7748
commit 03f71498fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 150 additions and 94 deletions

View File

@ -48,8 +48,8 @@ type
func encode*( func encode*(
self: var LeoEncoder, self: var LeoEncoder,
data, data,parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
parity: var openArray[seq[byte]]): Result[void, cstring] = dataLen,parityLen: int ): Result[void, cstring] =
## Encode a list of buffers in `data` into a number of `bufSize` sized ## Encode a list of buffers in `data` into a number of `bufSize` sized
## `parity` buffers ## `parity` buffers
## ##
@ -57,10 +57,10 @@ func encode*(
## `parity` - list of parity `buffers` of size `bufSize` ## `parity` - list of parity `buffers` of size `bufSize`
## ##
if data.len != self.buffers: if dataLen != self.buffers:
return err("Number of data buffers should match!") return err("Number of data buffers should match!")
if parity.len != self.parity: if parityLen != self.parity:
return err("Number of parity buffers should match!") return err("Number of parity buffers should match!")
# zero encode work buffer to avoid corrupting with previous run # zero encode work buffer to avoid corrupting with previous run
@ -68,7 +68,7 @@ func encode*(
zeroMem(self.workBufferPtr[i], self.bufSize) zeroMem(self.workBufferPtr[i], self.bufSize)
# copy data into aligned buffer # copy data into aligned buffer
for i in 0..<data.len: for i in 0..<dataLen:
copyMem(self.dataBufferPtr[i], addr data[i][0], self.bufSize) copyMem(self.dataBufferPtr[i], addr data[i][0], self.bufSize)
let let
@ -83,8 +83,8 @@ func encode*(
if ord(res) != ord(LeopardSuccess): if ord(res) != ord(LeopardSuccess):
return err(leoResultString(res.LeopardResult)) return err(leoResultString(res.LeopardResult))
for i in 0..<parity.len: for i in 0..<parityLen:
copyMem(addr parity[i][0], self.workBufferPtr[i], self.bufSize) copyMem(parity[i], self.workBufferPtr[i], self.bufSize)
return ok() return ok()
@ -92,7 +92,8 @@ func decode*(
self: var LeoDecoder, self: var LeoDecoder,
data, data,
parity, parity,
recovered: var openArray[seq[byte]]): Result[void, cstring] = recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen,parityLen,recoveredLen: int): Result[void, cstring] =
## Decode a list of buffers in `data` and `parity` into a list ## Decode a list of buffers in `data` and `parity` into a list
## of `recovered` buffers of `bufSize`. The list of `recovered` ## of `recovered` buffers of `bufSize`. The list of `recovered`
## buffers should be match the `Leo.buffers` ## buffers should be match the `Leo.buffers`
@ -102,13 +103,13 @@ func decode*(
## `recovered` - list of recovered `buffers` of size `bufSize` ## `recovered` - list of recovered `buffers` of size `bufSize`
## ##
if data.len != self.buffers: if dataLen != self.buffers:
return err("Number of data buffers should match!") return err("Number of data buffers should match!")
if parity.len != self.parity: if parityLen != self.parity:
return err("Number of parity buffers should match!") return err("Number of parity buffers should match!")
if recovered.len != self.buffers: if recoveredLen != self.buffers:
return err("Number of recovered buffers should match buffers!") return err("Number of recovered buffers should match buffers!")
# clean out work and data buffers # clean out work and data buffers
@ -118,25 +119,25 @@ func decode*(
for i in 0..<self.decodeBufferCount: for i in 0..<self.decodeBufferCount:
zeroMem(self.decodeBufferPtr[i], self.bufSize) zeroMem(self.decodeBufferPtr[i], self.bufSize)
for i in 0..<data.len: for i in 0..<dataLen:
zeroMem(self.dataBufferPtr[i], self.bufSize) zeroMem(self.dataBufferPtr[i], self.bufSize)
# this is needed because erasures are nil pointers # this is needed because erasures are nil pointers
var var
dataPtr = newSeq[LeoBufferPtr](data.len) dataPtr = newSeq[LeoBufferPtr](dataLen)
parityPtr = newSeq[LeoBufferPtr](self.workBufferCount) parityPtr = newSeq[LeoBufferPtr](self.workBufferCount)
# copy data into aligned buffer # copy data into aligned buffer
for i in 0..<data.len: for i in 0..<dataLen:
if data[i].len > 0: if not data[i].isNil:
copyMem(self.dataBufferPtr[i], addr data[i][0], self.bufSize) copyMem(self.dataBufferPtr[i],addr data[i][0], self.bufSize)
dataPtr[i] = self.dataBufferPtr[i] dataPtr[i] = self.dataBufferPtr[i]
else: else:
dataPtr[i] = nil dataPtr[i] = nil
# copy parity into aligned buffer # copy parity into aligned buffer
for i in 0..<self.workBufferCount: for i in 0..<self.workBufferCount:
if i < parity.len and parity[i].len > 0: if i < parityLen and not parity[i].isNil:
copyMem(self.workBufferPtr[i], addr parity[i][0], self.bufSize) copyMem(self.workBufferPtr[i], addr parity[i][0], self.bufSize)
parityPtr[i] = self.workBufferPtr[i] parityPtr[i] = self.workBufferPtr[i]
else: else:

View File

@ -24,22 +24,22 @@ proc randomCRCPacket*(data: var openArray[byte]) =
copyMem(addr data[4], unsafeAddr crc, sizeof(crc)) copyMem(addr data[4], unsafeAddr crc, sizeof(crc))
proc checkCRCPacket*(data: openArray[byte]): bool = proc checkCRCPacket*(data: ptr UncheckedArray[byte], len: int): bool =
if data.len < 16: if len < 16:
for d in data[1..data.high]: for i in 1..<len:
if d != data[0]: if data[i] != data[0]:
raise (ref Defect)(msg: "Packet don't match") raise (ref Defect)(msg: "Packet don't match")
else: else:
var var
crc = data.len.uint32 crc = len.uint32
packCrc: uint32 packCrc: uint32
packSize: uint32 packSize: uint32
copyMem(addr packSize, unsafeAddr data[0], sizeof(packSize)) copyMem(addr packSize, unsafeAddr data[0], sizeof(packSize))
if packSize != data.len.uint: if packSize != len.uint:
raise (ref Defect)(msg: "Packet size don't match!") raise (ref Defect)(msg: "Packet size don't match!")
for i in 4..<data.len: for i in 4..<len:
let v = data[i] let v = data[i]
crc = (crc shl 3) and (crc shr (32 - 3)) crc = (crc shl 3) and (crc shr (32 - 3))
crc += v crc += v
@ -49,19 +49,43 @@ proc checkCRCPacket*(data: openArray[byte]): bool =
if packCrc == crc: if packCrc == crc:
return true return true
proc dropRandomIdx*(bufs: var openArray[seq[byte]], dropCount: int) = proc dropRandomIdx*(bufs: ptr UncheckedArray[ptr UncheckedArray[byte]], bufsLen,dropCount: int) =
var var
count = 0 count = 0
dups: seq[int] dups: seq[int]
size = bufs.len size = bufsLen
while count < dropCount: while count < dropCount:
let i = rand(0..<size) let i = rand(0..<size)
if dups.find(i) == -1: if dups.find(i) == -1:
dups.add(i) dups.add(i)
bufs[i].setLen(0) bufs[i]=nil
count.inc count.inc
proc createDoubleArray*(
outerLen, innerLen: int
): ptr UncheckedArray[ptr UncheckedArray[byte]] =
# Allocate outer array
result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](alloc0(
sizeof(ptr UncheckedArray[byte]) * outerLen
))
# Allocate each inner array
for i in 0 ..< outerLen:
result[i] = cast[ptr UncheckedArray[byte]](alloc0(sizeof(byte) * innerLen))
proc freeDoubleArray*(
arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int
) =
# Free each inner array
for i in 0 ..< outerLen:
if not arr[i].isNil:
dealloc(arr[i])
# Free outer array
if not arr.isNil:
dealloc(arr)
proc testPackets*( proc testPackets*(
buffers, buffers,
parity, parity,
@ -72,35 +96,37 @@ proc testPackets*(
decoder: var LeoDecoder): Result[void, cstring] = decoder: var LeoDecoder): Result[void, cstring] =
var var
dataBuf = newSeqOfCap[seq[byte]](buffers) dataBuf = createDoubleArray(buffers, bufSize)
parityBuf = newSeqOfCap[seq[byte]](parity) parityBuf = createDoubleArray(parity, bufSize)
recoveredBuf = newSeqOfCap[seq[byte]](buffers) recoveredBuf = createDoubleArray(buffers, bufSize)
for _ in 0..<buffers: defer:
freeDoubleArray(dataBuf, buffers)
freeDoubleArray(parityBuf, parity)
freeDoubleArray(recoveredBuf, buffers)
for i in 0..<buffers:
var var
dataSeq = newSeq[byte](bufSize) dataSeq = newSeq[byte](bufSize)
randomCRCPacket(dataSeq) randomCRCPacket(dataSeq)
dataBuf.add(dataSeq) copyMem(dataBuf[i],addr dataSeq[0],bufSize)
recoveredBuf.add(newSeq[byte](bufSize)) encoder.encode(dataBuf, parityBuf,buffers,parity).tryGet()
for _ in 0..<parity:
parityBuf.add(newSeq[byte](bufSize))
encoder.encode(dataBuf, parityBuf).tryGet()
if dataLosses > 0: if dataLosses > 0:
dropRandomIdx(dataBuf, dataLosses) dropRandomIdx(dataBuf,buffers, dataLosses)
if parityLosses > 0: if parityLosses > 0:
dropRandomIdx(parityBuf, parityLosses) dropRandomIdx(parityBuf,parity,parityLosses)
decoder.decode(dataBuf, parityBuf, recoveredBuf).tryGet() decoder.decode(dataBuf, parityBuf, recoveredBuf,buffers,parity,buffers).tryGet()
for i, d in dataBuf: for i in 0..<buffers:
if d.len <= 0: if dataBuf[i].isNil:
if not checkCRCPacket(recoveredBuf[i]): if not checkCRCPacket(recoveredBuf[i],bufSize):
return err(("Check failed for packet " & $i).cstring) return err(("Check failed for packet " & $i).cstring)
ok() ok()

View File

@ -30,51 +30,74 @@ suite "Leopard Parametrization":
test "Should not allow encoding with invalid data buffer counts": test "Should not allow encoding with invalid data buffer counts":
var var
dataLen =3
parityLen = 2
leo = LeoEncoder.init(64, 4, 2).tryGet() leo = LeoEncoder.init(64, 4, 2).tryGet()
data = newSeq[seq[byte]](3) data = createDoubleArray(dataLen, 64)
parity = newSeq[seq[byte]](2) parity = createDoubleArray(parityLen, 64)
defer:
freeDoubleArray(data, dataLen)
freeDoubleArray(parity, parityLen)
check: check:
leo.encode(data, parity).error == "Number of data buffers should match!" leo.encode(data, parity,dataLen,parityLen).error == "Number of data buffers should match!"
test "Should not allow encoding with invalid parity buffer counts": test "Should not allow encoding with invalid parity buffer counts":
var var
dataLen =4
parityLen = 3
leo = LeoEncoder.init(64, 4, 2).tryGet() leo = LeoEncoder.init(64, 4, 2).tryGet()
data = newSeq[seq[byte]](4) data = createDoubleArray(dataLen, 64)
parity = newSeq[seq[byte]](3) parity = createDoubleArray(parityLen, 64)
defer:
freeDoubleArray(data, dataLen)
freeDoubleArray(parity, parityLen)
check: check:
leo.encode(data, parity).error == "Number of parity buffers should match!" leo.encode(data, parity,dataLen,parityLen).error == "Number of parity buffers should match!"
test "Should not allow decoding with invalid data buffer counts": test "Should not allow decoding with invalid data buffer counts":
var var
dataLen =3
parityLen = 2
leo = LeoDecoder.init(64, 4, 2).tryGet() leo = LeoDecoder.init(64, 4, 2).tryGet()
data = newSeq[seq[byte]](3) data = createDoubleArray(dataLen, 64)
parity = newSeq[seq[byte]](2) parity = createDoubleArray(parityLen, 64)
recovered = newSeq[seq[byte]](3) recovered = createDoubleArray(dataLen, 64)
defer:
freeDoubleArray(data, dataLen)
freeDoubleArray(parity, parityLen)
freeDoubleArray(recovered, dataLen)
check: check:
leo.decode(data, parity, recovered).error == "Number of data buffers should match!" leo.decode(data, parity, recovered,dataLen,parityLen,dataLen).error == "Number of data buffers should match!"
test "Should not allow decoding with invalid data buffer counts": test "Should not allow decoding with invalid data buffer counts":
var var
dataLen =4
parityLen = 1
recoveredLen = 3
leo = LeoDecoder.init(64, 4, 2).tryGet() leo = LeoDecoder.init(64, 4, 2).tryGet()
data = newSeq[seq[byte]](4) data = createDoubleArray(dataLen, 64)
parity = newSeq[seq[byte]](1) parity = createDoubleArray(parityLen, 64)
recovered = newSeq[seq[byte]](3) recovered = createDoubleArray(recoveredLen, 64)
check: check:
leo.decode(data, parity, recovered).error == "Number of parity buffers should match!" leo.decode(data, parity, recovered,dataLen,parityLen,recoveredLen).error == "Number of parity buffers should match!"
test "Should not allow decoding with invalid data buffer counts": test "Should not allow decoding with invalid data buffer counts":
var var
dataLen =4
parityLen = 2
recoveredLen = 3
leo = LeoDecoder.init(64, 4, 2).tryGet() leo = LeoDecoder.init(64, 4, 2).tryGet()
data = newSeq[seq[byte]](4) data = createDoubleArray(dataLen, 64)
parity = newSeq[seq[byte]](2) parity = createDoubleArray(parityLen, 64)
recovered = newSeq[seq[byte]](3) recovered = createDoubleArray(recoveredLen, 64)
check: check:
leo.decode(data, parity, recovered).error == "Number of recovered buffers should match buffers!" leo.decode(data, parity, recovered,dataLen,parityLen,recoveredLen).error == "Number of recovered buffers should match buffers!"
suite "Leopard simple Encode/Decode": suite "Leopard simple Encode/Decode":
const const
@ -86,70 +109,76 @@ suite "Leopard simple Encode/Decode":
var var
encoder: LeoEncoder encoder: LeoEncoder
decoder: LeoDecoder decoder: LeoDecoder
data: seq[seq[byte]] data: ptr UncheckedArray[ptr UncheckedArray[byte]]
parity: seq[seq[byte]] parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
recovered: seq[seq[byte]] recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]
setup: setup:
encoder = LeoEncoder.init(BufferSize, DataCount, ParityCount).tryGet() encoder = LeoEncoder.init(BufferSize, DataCount, ParityCount).tryGet()
decoder = LeoDecoder.init(BufferSize, DataCount, ParityCount).tryGet() decoder = LeoDecoder.init(BufferSize, DataCount, ParityCount).tryGet()
data = newSeq[seq[byte]](DataCount) data = createDoubleArray(DataCount, BufferSize)
parity = newSeq[seq[byte]](ParityCount) parity = createDoubleArray(ParityCount, BufferSize)
recovered = newSeq[seq[byte]](DataCount) recovered = createDoubleArray(DataCount, BufferSize)
teardown: teardown:
freeDoubleArray(data, DataCount)
freeDoubleArray(parity, ParityCount)
freeDoubleArray(recovered, DataCount)
encoder.free() encoder.free()
decoder.free() decoder.free()
test "Test 2 data loses out of 4 possible": test "Test 2 data loses out of 4 possible":
for i in 0..<DataCount: for i in 0..<DataCount:
data[i] = newSeq[byte](BufferSize)
recovered[i] = newSeq[byte](BufferSize)
var var
str = TestString & " " & $i str = TestString & " " & $i
copyMem(addr data[i][0], addr str[0], str.len) copyMem(data[i], addr str[0], str.len)
for i in 0..<ParityCount:
parity[i] = newSeq[byte](BufferSize)
encoder.encode(data, parity).tryGet() encoder.encode(data, parity,DataCount,ParityCount).tryGet()
var var
data1 = data[0] data1 =cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * BufferSize))
data2 = data[1] data2 = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * BufferSize))
data[0].setLen(0) defer:
data[1].setLen(0) deallocShared(data1)
deallocShared(data2)
decoder.decode(data, parity, recovered).tryGet() copyMem(data1,data[0], BufferSize)
copyMem(data2,data[1], BufferSize)
check recovered[0] == data1 data[0]=nil
check recovered[1] == data2 data[1]=nil
decoder.decode(data, parity, recovered,DataCount,ParityCount,DataCount).tryGet()
check equalMem(recovered[0], data1, BufferSize)
check equalMem(recovered[1], data2, BufferSize)
test "Test 1 data and 1 parity loss out of 4 possible": test "Test 1 data and 1 parity loss out of 4 possible":
for i in 0..<DataCount: for i in 0..<DataCount:
data[i] = newSeq[byte](BufferSize)
recovered[i] = newSeq[byte](BufferSize)
var var
str = TestString & " " & $i str = TestString & " " & $i
copyMem(addr data[i][0], addr str[0], str.len) copyMem(addr data[i][0], addr str[0], str.len)
for i in 0..<ParityCount: encoder.encode(data, parity,DataCount,ParityCount).tryGet()
parity[i] = newSeq[byte](BufferSize)
encoder.encode(data, parity).tryGet()
var var data1 = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * BufferSize))
data1 = data[0]
data[0].setLen(0) defer: deallocShared(data1)
parity[0].setLen(0)
copyMem(data1,data[0], BufferSize)
data[0]=nil
parity[0]=nil
decoder.decode(data, parity, recovered,DataCount,ParityCount,DataCount).tryGet()
check equalMem(recovered[0], data1, BufferSize)
decoder.decode(data, parity, recovered).tryGet()
check recovered[0] == data1
suite "Leopard Encode/Decode": suite "Leopard Encode/Decode":
test "bufSize = 4096, K = 800, M = 200 - drop data = 200 data": test "bufSize = 4096, K = 800, M = 200 - drop data = 200 data":