Drop my thread pool for OpenMP

This commit is contained in:
Christopher Taylor 2017-06-06 03:13:41 -07:00
parent eef50925d9
commit 393dcac6ef
10 changed files with 173 additions and 1146 deletions

View File

@ -406,6 +406,36 @@ void xor_mem4(
#endif // LEO_USE_VECTOR4_OPT
void VectorXOR_Threads(
const uint64_t bytes,
unsigned count,
void** x,
void** y)
{
#ifdef LEO_USE_VECTOR4_OPT
if (count >= 4)
{
int i_end = count - 4;
#pragma omp parallel for
for (int i = 0; i <= i_end; i += 4)
{
xor_mem4(
x[i + 0], y[i + 0],
x[i + 1], y[i + 1],
x[i + 2], y[i + 2],
x[i + 3], y[i + 3],
bytes);
}
count %= 4;
i_end -= count;
x += i_end;
y += i_end;
}
#endif // LEO_USE_VECTOR4_OPT
for (unsigned i = 0; i < count; ++i)
xor_mem(x[i], y[i], bytes);
}
void VectorXOR(
const uint64_t bytes,
unsigned count,
@ -413,16 +443,22 @@ void VectorXOR(
void** y)
{
#ifdef LEO_USE_VECTOR4_OPT
while (count >= 4)
if (count >= 4)
{
xor_mem4(
x[0], y[0],
x[1], y[1],
x[2], y[2],
x[3], y[3],
bytes);
x += 4, y += 4;
count -= 4;
int i_end = count - 4;
for (int i = 0; i <= i_end; i += 4)
{
xor_mem4(
x[i + 0], y[i + 0],
x[i + 1], y[i + 1],
x[i + 2], y[i + 2],
x[i + 3], y[i + 3],
bytes);
}
count %= 4;
i_end -= count;
x += i_end;
y += i_end;
}
#endif // LEO_USE_VECTOR4_OPT
@ -431,226 +467,4 @@ void VectorXOR(
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// WorkerThread
void WorkerThread::Start(unsigned cpu_affinity)
{
Stop();
CPUAffinity = cpu_affinity;
#ifdef _WIN32
hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr);
#endif
Terminated = false;
Thread = std::make_unique<std::thread>(&WorkerThread::Loop, this);
}
void WorkerThread::Stop()
{
if (Thread)
{
Terminated = true;
#ifdef _WIN32
::SetEvent(hEvent);
#endif
try
{
Thread->join();
}
catch (std::system_error& /*err*/)
{
}
Thread = nullptr;
}
#ifdef _WIN32
if (hEvent != nullptr)
{
::CloseHandle(hEvent);
hEvent = nullptr;
}
#endif
}
void WorkerThread::Wake()
{
#ifdef _WIN32
::SetEvent(hEvent);
#else
// FIXME: Port to other platforms
#endif
}
static bool SetCurrentThreadAffinity(unsigned processorIndex)
{
#ifdef _WIN32
return 0 != ::SetThreadAffinityMask(
::GetCurrentThread(), (DWORD_PTR)1 << (processorIndex & 63));
#elif !defined(ANDROID)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(processorIndex, &cpuset);
return 0 == pthread_setaffinity_np(pthread_self(),
sizeof(cpu_set_t), &cpuset);
#else
return true; // FIXME: Unused on Android anyway
#endif
}
void WorkerThread::Loop()
{
SetCurrentThreadAffinity(CPUAffinity);
for (;;)
{
if (Terminated)
break;
#ifdef _WIN32
const DWORD result = ::WaitForSingleObject(hEvent, INFINITE);
if (result != WAIT_OBJECT_0 || Terminated)
break;
#else
// FIXME: Port to other platforms
#endif
PoolInstance->DrainWorkQueue();
}
}
//------------------------------------------------------------------------------
// WorkBundle
WorkBundle::WorkBundle()
{
#ifdef _WIN32
hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr);
#else
// FIXME: Port to other platforms
#endif
}
WorkBundle::~WorkBundle()
{
#ifdef _WIN32
if (hEvent != nullptr)
{
::CloseHandle(hEvent);
hEvent = nullptr;
}
#else
// FIXME: Port to other platforms
#endif
}
void WorkBundle::OnAllOperationsCompleted()
{
#ifdef _WIN32
::SetEvent(hEvent);
#else
// FIXME: Port to other platforms
#endif
}
void WorkBundle::Join()
{
#ifdef _WIN32
::WaitForSingleObject(hEvent, INFINITE);
#else
// FIXME: Port to other platforms
#endif
}
//------------------------------------------------------------------------------
// WorkerPool
WorkerPool* PoolInstance = nullptr;
extern "C" void AtExitWrapper()
{
if (PoolInstance)
PoolInstance->Stop();
}
WorkerPool::WorkerPool()
{
WorkerCount = 0;
unsigned num_cpus = std::thread::hardware_concurrency();
if (num_cpus > 1)
{
WorkerCount = num_cpus - 1;
Workers = new WorkerThread[WorkerCount];
for (unsigned i = 0; i < WorkerCount; ++i)
Workers[i].Start(i);
std::atexit(AtExitWrapper);
}
}
void WorkerPool::Stop()
{
Locker locker(QueueLock);
delete[] Workers;
Workers = nullptr;
WorkerCount = 0;
}
void WorkerPool::Dispatch(WorkBundle* bundle, const WorkerCallT& call)
{
Locker locker(QueueLock);
WorkQueue.resize(++Remaining);
WorkQueue[Remaining - 1].Call = call;
WorkQueue[Remaining - 1].Bundle = bundle;
}
void WorkerPool::Run()
{
unsigned remaining;
{
Locker locker(QueueLock);
remaining = Remaining;
}
unsigned count = WorkerCount;
if (count > remaining)
count = remaining;
// Wake all the workers
for (unsigned i = 0; i < count; ++i)
Workers[i].Wake();
DrainWorkQueue();
}
void WorkerPool::DrainWorkQueue()
{
for (;;)
{
QueueItem item;
{
Locker locker(QueueLock);
if (Remaining <= 0)
return;
item = WorkQueue[--Remaining];
}
item.Call();
item.Bundle->OperationComplete();
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
} // namespace leopard

View File

@ -184,11 +184,6 @@
// Unroll inner loops 4 times
#define LEO_USE_VECTOR4_OPT
// Enable multithreading optimization when requested
#ifdef _MSC_VER
#define LEO_ENABLE_MULTITHREADING_OPT
#endif
//------------------------------------------------------------------------------
// Debug
@ -407,6 +402,13 @@ void VectorXOR(
void** x,
void** y);
// x[] ^= y[] (Multithreaded)
void VectorXOR_Threads(
const uint64_t bytes,
unsigned count,
void** x,
void** y);
//------------------------------------------------------------------------------
// XORSummer
@ -457,11 +459,6 @@ protected:
static const unsigned kAlignmentBytes = LEO_ALIGN_BYTES;
LEO_FORCE_INLINE unsigned NextAlignedOffset(unsigned offset)
{
return (offset + kAlignmentBytes - 1) & ~(kAlignmentBytes - 1);
}
static LEO_FORCE_INLINE uint8_t* SIMDSafeAllocate(size_t size)
{
uint8_t* data = (uint8_t*)calloc(1, kAlignmentBytes + size);
@ -489,234 +486,4 @@ static LEO_FORCE_INLINE void SIMDSafeFree(void* ptr)
}
//------------------------------------------------------------------------------
// Mutex
#ifdef _WIN32
struct Lock
{
CRITICAL_SECTION cs;
Lock() { ::InitializeCriticalSectionAndSpinCount(&cs, 1000); }
~Lock() { ::DeleteCriticalSection(&cs); }
bool TryEnter() { return ::TryEnterCriticalSection(&cs) != FALSE; }
void Enter() { ::EnterCriticalSection(&cs); }
void Leave() { ::LeaveCriticalSection(&cs); }
};
#else
struct Lock
{
std::recursive_mutex cs;
bool TryEnter() { return cs.try_lock(); }
void Enter() { cs.lock(); }
void Leave() { cs.unlock(); }
};
#endif
class Locker
{
public:
Locker(Lock& lock) {
TheLock = &lock;
if (TheLock)
TheLock->Enter();
}
~Locker() { Clear(); }
bool TrySet(Lock& lock) {
Clear();
if (!lock.TryEnter())
return false;
TheLock = &lock;
return true;
}
void Set(Lock& lock) {
Clear();
lock.Enter();
TheLock = &lock;
}
void Clear() {
if (TheLock)
TheLock->Leave();
TheLock = nullptr;
}
private:
Lock* TheLock;
};
#ifdef LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// WorkerThread
class WorkerThread
{
public:
WorkerThread()
{
}
~WorkerThread()
{
Stop();
}
void Start(unsigned cpu_affinity);
void Stop();
void Wake();
protected:
unsigned CPUAffinity = 0;
std::atomic_bool Terminated = false;
std::unique_ptr<std::thread> Thread;
#ifdef _WIN32
HANDLE hEvent = nullptr;
#else
// FIXME: Port to other platforms
mutable std::mutex QueueLock;
std::condition_variable QueueCondition;
#endif
void Loop();
};
//------------------------------------------------------------------------------
// WorkBundle
typedef std::function<void()> WorkerCallT;
class WorkerPool;
extern WorkerPool* PoolInstance;
class WorkBundle
{
friend class WorkerPool;
public:
WorkBundle();
~WorkBundle();
void Dispatch(const WorkerCallT& call);
void Complete();
protected:
std::atomic<unsigned> WorkCount;
#ifdef _WIN32
HANDLE hEvent = nullptr;
#else
// FIXME: Port to other platforms
#endif
LEO_FORCE_INLINE void Increment()
{
++WorkCount;
}
LEO_FORCE_INLINE void OperationComplete()
{
if (--WorkCount == 0)
OnAllOperationsCompleted();
}
void Join();
void OnAllOperationsCompleted();
};
//------------------------------------------------------------------------------
// WorkerPool
class WorkerPool
{
friend class WorkerThread;
friend class WorkBundle;
public:
WorkerPool();
void Stop();
unsigned GetParallelism() const
{
return WorkerCount + 1;
}
WorkBundle* GetBundle()
{
WorkBundle* back;
{
Locker locker(BundleLock);
if (FreeBundles.empty())
{
locker.Clear();
back = new WorkBundle;
}
else
{
back = FreeBundles.back();
FreeBundles.pop_back();
}
}
back->Increment();
return back;
}
void FreeBundle(WorkBundle* bundle)
{
Locker locker(BundleLock);
FreeBundles.push_back(bundle);
}
protected:
void Dispatch(WorkBundle* bundle, const WorkerCallT& call);
void Run();
void DrainWorkQueue();
mutable Lock QueueLock;
WorkerThread* Workers = nullptr;
unsigned WorkerCount = 0;
struct QueueItem
{
WorkerCallT Call;
WorkBundle* Bundle;
};
std::vector<QueueItem> WorkQueue;
unsigned Remaining;
mutable Lock BundleLock;
// TBD: Free this memory on shutdown?
std::vector<WorkBundle*> FreeBundles;
};
inline void WorkBundle::Dispatch(const WorkerCallT& call)
{
Increment();
PoolInstance->Dispatch(this, call);
}
inline void WorkBundle::Complete()
{
if (WorkCount > 0)
{
PoolInstance->Run();
OperationComplete();
Join();
}
PoolInstance->FreeBundle(this);
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
} // namespace leopard

View File

@ -118,17 +118,20 @@ static void FWHT(ffe_t* data, const unsigned m, const unsigned m_truncated)
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
#pragma omp parallel for
for (int r = 0; r < m_truncated; r += dist4)
{
// For each set of dist elements:
for (unsigned i = r; i < r + dist; ++i)
#pragma omp parallel for
for (int i = r; i < r + dist; ++i)
FWHT_4(data + i, dist);
}
}
// If there is one layer left:
if (dist < m)
for (unsigned i = 0; i < dist; ++i)
#pragma omp parallel for
for (int i = 0; i < dist; ++i)
FWHT_2(data[i], data[i + dist]);
}
@ -305,7 +308,8 @@ static void InitializeMultiplyTables()
Multiply128LUT = reinterpret_cast<const Multiply128LUT_t*>(SIMDSafeAllocate(sizeof(Multiply128LUT_t) * kOrder));
// For each value we could multiply by:
for (unsigned log_m = 0; log_m < kOrder; ++log_m)
#pragma omp parallel for
for (int log_m = 0; log_m < kOrder; ++log_m)
{
// For each 4 bits of the finite field width in bits:
for (unsigned i = 0, shift = 0; i < 4; ++i, shift += 4)
@ -770,9 +774,11 @@ static void IFFT_DIT_Encoder(
// I tried rolling the memcpy/memset into the first layer of the FFT and
// found that it only yields a 4% performance improvement, which is not
// worth the extra complexity.
for (unsigned i = 0; i < m_truncated; ++i)
#pragma omp parallel for
for (int i = 0; i < m_truncated; ++i)
memcpy(work[i], data[i], bytes);
for (unsigned i = m_truncated; i < m; ++i)
#pragma omp parallel for
for (int i = m_truncated; i < m; ++i)
memset(work[i], 0, bytes);
// I tried splitting up the first few layers into L3-cache sized blocks but
@ -784,7 +790,8 @@ static void IFFT_DIT_Encoder(
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
#pragma omp parallel for
for (int r = 0; r < m_truncated; r += dist4)
{
const unsigned i_end = r + dist;
const ffe_t log_m01 = skewLUT[i_end];
@ -792,7 +799,7 @@ static void IFFT_DIT_Encoder(
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < i_end; ++i)
for (int i = r; i < i_end; ++i)
{
IFFT_DIT4(
bytes,
@ -818,10 +825,11 @@ static void IFFT_DIT_Encoder(
const ffe_t log_m = skewLUT[dist];
if (log_m == kModulus)
VectorXOR(bytes, dist, work + dist, work);
VectorXOR_Threads(bytes, dist, work + dist, work);
else
{
for (unsigned i = 0; i < dist; ++i)
#pragma omp parallel for
for (int i = 0; i < dist; ++i)
{
IFFT_DIT2(
work[i],
@ -835,7 +843,7 @@ static void IFFT_DIT_Encoder(
// I tried unrolling this but it does not provide more than 5% performance
// improvement for 16-bit finite fields, so it's not worth the complexity.
if (xor_result)
VectorXOR(bytes, m, xor_result, work);
VectorXOR_Threads(bytes, m, xor_result, work);
}
@ -852,7 +860,8 @@ static void IFFT_DIT_Decoder(
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
#pragma omp parallel for
for (int r = 0; r < m_truncated; r += dist4)
{
const unsigned i_end = r + dist;
const ffe_t log_m01 = skewLUT[i_end];
@ -860,7 +869,7 @@ static void IFFT_DIT_Decoder(
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < i_end; ++i)
for (int i = r; i < i_end; ++i)
{
IFFT_DIT4(
bytes,
@ -882,10 +891,11 @@ static void IFFT_DIT_Decoder(
const ffe_t log_m = skewLUT[dist];
if (log_m == kModulus)
VectorXOR(bytes, dist, work + dist, work);
VectorXOR_Threads(bytes, dist, work + dist, work);
else
{
for (unsigned i = 0; i < dist; ++i)
#pragma omp parallel for
for (int i = 0; i < dist; ++i)
{
IFFT_DIT2(
work[i],
@ -897,155 +907,6 @@ static void IFFT_DIT_Decoder(
}
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
// Multithreaded decoder version
static void IFFT_DIT_Decoder_MT(
const uint64_t bytes,
const unsigned m_truncated,
void** work,
const unsigned m,
const ffe_t* skewLUT)
{
unsigned bundle_min = (unsigned)(8000 / bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
// Decimation in time: Unroll 2 layers at a time
unsigned dist = 1, dist4 = 4;
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
WorkBundle* workBundle = PoolInstance->GetBundle();
if (m_truncated <= dist4)
{
unsigned bundle_count = dist / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
// For each set of dist*4 elements:
const unsigned i_end = dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = 0; i < i_end; i += bundle_count)
{
workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_count, i, i_end, dist]() {
unsigned j_end = i + bundle_count;
if (j_end > i_end)
j_end = i_end;
for (unsigned j = i; j < j_end; ++j)
{
IFFT_DIT4(
bytes,
work + j,
dist,
log_m01,
log_m23,
log_m02);
}
});
}
}
else
{
unsigned bundle_count = (m_truncated / dist) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
bundle_count /= 4;
if (bundle_count < 1)
bundle_count = 1;
const unsigned dist4_bundle_count = dist4 * bundle_count;
for (unsigned r = 0; r < m_truncated; r += dist4_bundle_count)
{
workBundle->Dispatch([bytes, work, skewLUT, dist4, m_truncated, r, dist4_bundle_count, dist]() {
unsigned s_end = r + dist4_bundle_count;
if (s_end > m_truncated)
s_end = m_truncated;
// For each set of dist*4 elements:
for (unsigned s = r; s < s_end; s += dist4)
{
const unsigned i_end = s + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = s; i < i_end; ++i)
{
IFFT_DIT4(
bytes,
work + i,
dist,
log_m01,
log_m23,
log_m02);
}
}
});
}
}
workBundle->Complete();
}
// If there is one layer left:
if (dist < m)
{
WorkBundle* workBundle = PoolInstance->GetBundle();
// Assuming that dist = m / 2
LEO_DEBUG_ASSERT(dist * 2 == m);
const ffe_t log_m = skewLUT[dist];
unsigned bundle_count = dist / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
if (log_m == kModulus)
{
for (unsigned i = 0; i < dist; i += bundle_count)
{
workBundle->Dispatch([work, i, dist, bundle_count, bytes]() {
unsigned j_end = i + bundle_count;
if (j_end > dist)
j_end = dist;
for (unsigned j = i; j < j_end; ++j)
xor_mem(work[j + dist], work[j], bytes);
});
}
}
else
{
for (unsigned i = 0; i < dist; i += bundle_count)
{
workBundle->Dispatch([work, i, dist, log_m, bytes, bundle_count]() {
unsigned j_end = i + bundle_count;
if (j_end > dist)
j_end = dist;
for (unsigned j = i; j < j_end; ++j)
{
IFFT_DIT2(
work[j],
work[j + dist],
log_m,
bytes);
}
});
}
}
workBundle->Complete();
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
/*
Decimation in time FFT:
@ -1361,7 +1222,8 @@ static void FFT_DIT(
const unsigned thread_v = dist;
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
#pragma omp parallel for
for (int r = 0; r < m_truncated; r += dist4)
{
const unsigned i_end = r + dist;
const ffe_t log_m01 = skewLUT[i_end];
@ -1369,7 +1231,7 @@ static void FFT_DIT(
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < i_end; ++i)
for (int i = r; i < i_end; ++i)
{
FFT_DIT4(
bytes,
@ -1385,7 +1247,8 @@ static void FFT_DIT(
// If there is one layer left:
if (dist4 == 2)
{
for (unsigned r = 0; r < m_truncated; r += 2)
#pragma omp parallel for
for (int r = 0; r < m_truncated; r += 2)
{
const ffe_t log_m = skewLUT[r + 1];
@ -1413,8 +1276,7 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m,
const void* const * data,
void** work,
bool multithreaded)
void** work)
{
// work <- IFFT(data, m, m)
@ -1608,17 +1470,20 @@ static void FFT_DIT_ErrorBits(
for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < n_truncated; r += dist4)
#pragma omp parallel for
for (int r = 0; r < n_truncated; r += dist4)
{
if (!error_bits.IsNeeded(mip_level, r))
continue;
const ffe_t log_m01 = skewLUT[r + dist];
const ffe_t log_m23 = skewLUT[r + dist * 3];
const ffe_t log_m02 = skewLUT[r + dist * 2];
const unsigned i_end = r + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < r + dist; ++i)
#pragma omp parallel for
for (int i = r; i < i_end; ++i)
{
FFT_DIT4(
bytes,
@ -1634,8 +1499,12 @@ static void FFT_DIT_ErrorBits(
// If there is one layer left:
if (dist4 == 2)
{
for (unsigned r = 0; r < n_truncated; r += 2)
#pragma omp parallel for
for (int r = 0; r < n_truncated; r += 2)
{
if (!error_bits.IsNeeded(mip_level, r))
continue;
const ffe_t log_m = skewLUT[r + 1];
if (log_m == kModulus)
@ -1652,148 +1521,6 @@ static void FFT_DIT_ErrorBits(
}
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
static void FFT_DIT_ErrorBits_MT(
const uint64_t bytes,
void** work,
const unsigned n_truncated,
const unsigned n,
const ffe_t* skewLUT,
const ErrorBitfield& error_bits)
{
unsigned bundle_min = (unsigned)(8000 / bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
unsigned mip_level = LastNonzeroBit32(n);
// Decimation in time: Unroll 2 layers at a time
unsigned dist4 = n, dist = n >> 2;
for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2)
{
WorkBundle* workBundle = PoolInstance->GetBundle();
if (n_truncated <= dist4)
{
if (error_bits.IsNeeded(mip_level, 0))
{
unsigned bundle_size = dist / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
const unsigned i_end = dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = 0; i < i_end; i += bundle_size)
{
workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_size, i, i_end, dist]() {
unsigned j_end = i + bundle_size;
if (j_end > i_end)
j_end = i_end;
for (unsigned j = i; j < j_end; ++j)
{
FFT_DIT4(
bytes,
work + j,
dist,
log_m01,
log_m23,
log_m02);
}
});
}
}
}
else
{
unsigned bundle_count = (n_truncated / dist) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
bundle_count /= 4;
if (bundle_count < 1)
bundle_count = 1;
const unsigned dist4_bundle_count = dist4 * bundle_count;
// For each set of dist*4 elements:
for (unsigned r = 0; r < n_truncated; r += dist4_bundle_count)
{
workBundle->Dispatch([bytes, work, skewLUT, &error_bits, mip_level, dist4, n_truncated, r, dist4_bundle_count, dist]() {
unsigned s_end = r + dist4_bundle_count;
if (s_end > n_truncated)
s_end = n_truncated;
for (unsigned s = r; s < s_end; s += dist4)
{
if (!error_bits.IsNeeded(mip_level, s))
continue;
const unsigned i_end = s + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = s; i < i_end; ++i)
{
FFT_DIT4(
bytes,
work + i,
dist,
log_m01,
log_m23,
log_m02);
}
}
});
}
}
workBundle->Complete();
}
// If there is one layer left:
if (dist4 == 2)
{
WorkBundle* workBundle = PoolInstance->GetBundle();
unsigned bundle_count = (n_truncated / 2) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
for (unsigned s = 0; s < n_truncated; s += 2 * bundle_count)
{
workBundle->Dispatch([bytes, skewLUT, work, s, n_truncated, bundle_count]() {
unsigned r_end = s + 2 * bundle_count;
if (r_end > n_truncated)
r_end = n_truncated;
for (unsigned r = s; r < r_end; r += 2)
{
const ffe_t log_m = skewLUT[r + 1];
if (log_m == kModulus)
xor_mem(work[r + 1], work[r], bytes);
else
{
FFT_DIT2(
work[r],
work[r + 1],
log_m,
bytes);
}
}
});
}
workBundle->Complete();
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
#endif // LEO_ERROR_BITFIELD_OPT
@ -1808,11 +1535,8 @@ void ReedSolomonDecode(
unsigned n, // NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work, // n entries
bool multithreaded)
void** work) // n entries
{
if (multithreaded && n * buffer_bytes < 512000)
multithreaded = false;
// Fill in error locations
#ifdef LEO_ERROR_BITFIELD_OPT
@ -1820,10 +1544,10 @@ void ReedSolomonDecode(
#endif // LEO_ERROR_BITFIELD_OPT
ffe_t error_locations[kOrder] = {};
for (unsigned i = 0; i < recovery_count; ++i)
for (int i = 0; i < recovery_count; ++i)
if (!recovery[i])
error_locations[i] = 1;
for (unsigned i = recovery_count; i < m; ++i)
for (int i = recovery_count; i < m; ++i)
error_locations[i] = 1;
for (unsigned i = 0; i < original_count; ++i)
{
@ -1844,180 +1568,60 @@ void ReedSolomonDecode(
FWHT(error_locations, kOrder, m + original_count);
for (unsigned i = 0; i < kOrder; ++i)
#pragma omp parallel for
for (int i = 0; i < kOrder; ++i)
error_locations[i] = ((unsigned)error_locations[i] * (unsigned)LogWalsh[i]) % kModulus;
FWHT(error_locations, kOrder, kOrder);
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
// work <- recovery data
#pragma omp parallel for
for (int i = 0; i < recovery_count; ++i)
{
unsigned bundle_min = (unsigned)(64000 / buffer_bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
unsigned bundle_size = recovery_count / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
WorkBundle* workBundle = PoolInstance->GetBundle();
// work <- recovery data
for (unsigned i = 0; i < recovery_count; i += bundle_size)
{
workBundle->Dispatch([i, recovery_count, recovery, &error_locations, bundle_size, work, buffer_bytes]() {
unsigned j_end = i + bundle_size;
if (j_end > recovery_count)
j_end = recovery_count;
for (unsigned j = i; j < j_end; ++j)
{
if (recovery[j])
mul_mem(work[j], recovery[j], error_locations[j], buffer_bytes);
else
memset(work[j], 0, buffer_bytes);
}
});
}
workBundle->Dispatch([recovery_count, m, buffer_bytes, work]() {
for (unsigned i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
});
// work <- original data
bundle_size = original_count / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
for (unsigned i = 0; i < original_count; i += bundle_size)
{
workBundle->Dispatch([i, original_count, original, &error_locations, m, bundle_size, work, buffer_bytes]() {
unsigned j_end = i + bundle_size;
if (j_end > original_count)
j_end = original_count;
for (unsigned j = i; j < j_end; ++j)
{
if (original[j])
mul_mem(work[m + j], original[j], error_locations[m + j], buffer_bytes);
else
memset(work[m + j], 0, buffer_bytes);
}
});
}
workBundle->Dispatch([original_count, m, n, buffer_bytes, work]() {
for (unsigned i = m + original_count; i < n; ++i)
memset(work[i], 0, buffer_bytes);
});
workBundle->Complete();
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
// work <- recovery data
for (unsigned i = 0; i < recovery_count; ++i)
{
if (recovery[i])
mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes);
else
memset(work[i], 0, buffer_bytes);
}
for (unsigned i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
// work <- original data
for (unsigned i = 0; i < original_count; ++i)
{
if (original[i])
mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes);
else
memset(work[m + i], 0, buffer_bytes);
}
for (unsigned i = m + original_count; i < n; ++i)
if (recovery[i])
mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes);
else
memset(work[i], 0, buffer_bytes);
}
#pragma omp parallel for
for (int i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
// work <- original data
#pragma omp parallel for
for (int i = 0; i < original_count; ++i)
{
if (original[i])
mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes);
else
memset(work[m + i], 0, buffer_bytes);
}
#pragma omp parallel for
for (int i = m + original_count; i < n; ++i)
memset(work[i], 0, buffer_bytes);
// work <- IFFT(work, n, 0)
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
IFFT_DIT_Decoder_MT(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
IFFT_DIT_Decoder(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
}
IFFT_DIT_Decoder(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
// work <- FormalDerivative(work, n)
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
for (unsigned i = 1; i < n; ++i)
{
unsigned bundle_min = (unsigned)(64000 / buffer_bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
for (unsigned i = 1; i < n; ++i)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
if (width <= 2 + bundle_min)
{
for (unsigned j = i - width; j < i; ++j)
xor_mem(work[j], work[j + width], buffer_bytes);
}
else
{
unsigned bundle_size = width / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
WorkBundle* workBundle = PoolInstance->GetBundle();
for (unsigned j = i - width; j < i; j += bundle_size)
{
workBundle->Dispatch([work, i, j, width, bundle_size, buffer_bytes]() {
unsigned k_end = j + bundle_size;
if (k_end > i)
k_end = i;
for (unsigned k = j; k < k_end; ++k)
xor_mem(work[k], work[k + width], buffer_bytes);
});
}
workBundle->Complete();
}
}
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
for (unsigned i = 1; i < n; ++i)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
VectorXOR(
buffer_bytes,
width,
work + i - width,
work + i);
}
VectorXOR(
buffer_bytes,
width,
work + i - width,
work + i);
}
// work <- FFT(work, n, 0) truncated to m + original_count
@ -2025,23 +1629,14 @@ void ReedSolomonDecode(
const unsigned output_count = m + original_count;
#ifdef LEO_ERROR_BITFIELD_OPT
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
FFT_DIT_ErrorBits_MT(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}
FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
#else
FFT_DIT(buffer_bytes, work, output_count, n, FFTSkew - 1);
#endif
// Reveal erasures
for (unsigned i = 0; i < original_count; ++i)
for (int i = 0; i < original_count; ++i)
if (!original[i])
mul_mem(work[i], work[i + m], kModulus - error_locations[i + m], buffer_bytes);
}

View File

@ -73,21 +73,19 @@ void ReedSolomonEncode(
uint64_t buffer_bytes,
unsigned original_count,
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count) * 2 = work_count
unsigned m, // = NextPow2(recovery_count)
const void* const * const data,
void** work, // Size of GetEncodeWorkCount()
bool multithreaded);
void** work); // m * 2 elements
void ReedSolomonDecode(
uint64_t buffer_bytes,
unsigned original_count,
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count)
unsigned n, // = NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work, // n entries
bool multithreaded);
unsigned n, // = NextPow2(m + original_count)
const void* const * const original, // original_count elements
const void* const * const recovery, // recovery_count elements
void** work); // n elements
}} // namespace leopard::ff16

View File

@ -1641,8 +1641,7 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m,
const void* const* data,
void** work,
bool multithreaded)
void** work)
{
// work <- IFFT(data, m, m)
@ -1820,6 +1819,9 @@ static void FFT_DIT_ErrorBits(
{
for (unsigned r = 0; r < n_truncated; r += 2)
{
if (!error_bits.IsNeeded(mip_level, r))
continue;
const ffe_t log_m = skewLUT[r + 1];
if (log_m == kModulus)
@ -1850,8 +1852,7 @@ void ReedSolomonDecode(
unsigned n, // NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work, // n entries
bool multithreaded)
void** work) // n entries
{
// Fill in error locations

View File

@ -75,8 +75,7 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count)
const void* const * const data,
void** work, // m * 2 elements
bool multithreaded);
void** work); // m * 2 elements
void ReedSolomonDecode(
uint64_t buffer_bytes,
@ -84,10 +83,9 @@ void ReedSolomonDecode(
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count)
unsigned n, // = NextPow2(m + original_count)
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work, // n elements
bool multithreaded);
const void* const * const original, // original_count elements
const void* const * const recovery, // recovery_count elements
void** work); // n elements
}} // namespace leopard::ff8

View File

@ -63,11 +63,6 @@ LEO_EXPORT int leo_init_(int version)
return Leopard_Platform;
#endif // LEO_HAS_FF16
#ifdef LEO_ENABLE_MULTITHREADING_OPT
// Start worker threads spinning
leopard::PoolInstance = new leopard::WorkerPool;
#endif // LEO_ENABLE_MULTITHREADING_OPT
m_Initialized = true;
return Leopard_Success;
@ -131,8 +126,7 @@ LEO_EXPORT LeopardResult leo_encode(
unsigned recovery_count, // Number of recovery_data[] buffer pointers
unsigned work_count, // Number of work_data[] buffer pointers, from leo_encode_work_count()
const void* const * const original_data, // Array of pointers to original data buffers
void** work_data, // Array of work buffers
unsigned flags) // Operation flags
void** work_data) // Array of work buffers
{
if (buffer_bytes <= 0 || buffer_bytes % 64 != 0)
return Leopard_InvalidSize;
@ -171,8 +165,6 @@ LEO_EXPORT LeopardResult leo_encode(
if (work_count != m * 2)
return Leopard_InvalidCounts;
const bool mt = (flags & LeopardFlags_Multithreaded) != 0;
#ifdef LEO_HAS_FF8
if (n <= leopard::ff8::kOrder)
{
@ -182,8 +174,7 @@ LEO_EXPORT LeopardResult leo_encode(
recovery_count,
m,
original_data,
work_data,
mt);
work_data);
}
else
#endif // LEO_HAS_FF8
@ -196,8 +187,7 @@ LEO_EXPORT LeopardResult leo_encode(
recovery_count,
m,
original_data,
work_data,
mt);
work_data);
}
else
#endif // LEO_HAS_FF16
@ -247,8 +237,7 @@ LEO_EXPORT LeopardResult leo_decode(
unsigned work_count, // Number of buffer pointers in work_data[]
const void* const * const original_data, // Array of original data buffers
const void* const * const recovery_data, // Array of recovery data buffers
void** work_data, // Array of work data buffers
unsigned flags) // Operation flags
void** work_data) // Array of work data buffers
{
if (buffer_bytes <= 0 || buffer_bytes % 64 != 0)
return Leopard_InvalidSize;
@ -311,8 +300,6 @@ LEO_EXPORT LeopardResult leo_decode(
if (work_count != n)
return Leopard_InvalidCounts;
const bool mt = (flags & LeopardFlags_Multithreaded) != 0;
#ifdef LEO_HAS_FF8
if (n <= leopard::ff8::kOrder)
{
@ -324,8 +311,7 @@ LEO_EXPORT LeopardResult leo_decode(
n,
original_data,
recovery_data,
work_data,
mt);
work_data);
}
else
#endif // LEO_HAS_FF8
@ -340,8 +326,7 @@ LEO_EXPORT LeopardResult leo_decode(
n,
original_data,
recovery_data,
work_data,
mt);
work_data);
}
else
#endif // LEO_HAS_FF16

View File

@ -63,7 +63,7 @@
*/
// Library version
#define LEO_VERSION 1
#define LEO_VERSION 2
// Tweak if the functions are exported or statically linked
//#define LEO_DLL /* Defined when building/linking as DLL */
@ -126,14 +126,6 @@ typedef enum LeopardResultT
// Convert Leopard result to string
LEO_EXPORT const char* leo_result_string(LeopardResult result);
// Flags
typedef enum LeopardFlagsT
{
LeopardFlags_Defaults = 0, // Default settings
LeopardFlags_Multithreaded = 1, // Enable multiple threads
} LeopardFlags;
//------------------------------------------------------------------------------
// Encoder API
@ -163,7 +155,6 @@ LEO_EXPORT unsigned leo_encode_work_count(
original_data: Array of pointers to original data buffers.
work_count: Number of work_data[] buffers, from leo_encode_work_count().
work_data: Array of pointers to work data buffers.
flags: Flags for encoding e.g. LeopardFlag_Multithreaded
The sum of original_count + recovery_count must not exceed 65536.
The recovery_count <= original_count.
@ -192,8 +183,7 @@ LEO_EXPORT LeopardResult leo_encode(
unsigned recovery_count, // Number of recovery_data[] buffer pointers
unsigned work_count, // Number of work_data[] buffer pointers, from leo_encode_work_count()
const void* const * const original_data, // Array of pointers to original data buffers
void** work_data, // Array of work buffers
unsigned flags); // Operation flags
void** work_data); // Array of work buffers
//------------------------------------------------------------------------------
@ -225,7 +215,6 @@ LEO_EXPORT unsigned leo_decode_work_count(
recovery_data: Array of pointers to recovery data buffers.
work_count: Number of work_data[] buffers, from leo_decode_work_count().
work_data: Array of pointers to recovery data buffers.
flags: Flags for encoding e.g. LeopardFlag_Multithreaded
Lost original/recovery data should be set to NULL.
@ -242,8 +231,7 @@ LEO_EXPORT LeopardResult leo_decode(
unsigned work_count, // Number of buffer pointers in work_data[]
const void* const * const original_data, // Array of original data buffers
const void* const * const recovery_data, // Array of recovery data buffers
void** work_data, // Array of work data buffers
unsigned flags); // Operation flags
void** work_data); // Array of work data buffers
#ifdef __cplusplus

View File

@ -169,6 +169,7 @@
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<BufferSecurityCheck>true</BufferSecurityCheck>
<PreprocessorDefinitions>_MBCS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<OpenMPSupport>true</OpenMPSupport>
</ClCompile>
<Link>
<GenerateDebugInformation>true</GenerateDebugInformation>

View File

@ -43,19 +43,18 @@ struct TestParameters
{
#ifdef LEO_HAS_FF16
unsigned original_count = 1000; // under 65536
unsigned recovery_count = 200; // under 65536 - original_count
unsigned recovery_count = 100; // under 65536 - original_count
#else
unsigned original_count = 100; // under 65536
unsigned recovery_count = 20; // under 65536 - original_count
unsigned recovery_count = 10; // under 65536 - original_count
#endif
unsigned buffer_bytes = 6400; // multiple of 64 bytes
unsigned buffer_bytes = 1344; // multiple of 64 bytes
unsigned loss_count = 32768; // some fraction of original_count
unsigned seed = 2;
bool multithreaded = true;
};
static const unsigned kLargeTrialCount = 1;
static const unsigned kSmallTrialCount = 10;
static const unsigned kSmallTrialCount = 300;
//------------------------------------------------------------------------------
@ -420,8 +419,7 @@ static bool Benchmark(const TestParameters& params)
params.recovery_count,
encode_work_count,
(void**)&original_data[0],
(void**)&encode_work_data[0], // recovery data written here
params.multithreaded ? LeopardFlags_Multithreaded : LeopardFlags_Defaults
(void**)&encode_work_data[0] // recovery data written here
);
t_leo_encode.EndCall();
@ -473,8 +471,7 @@ static bool Benchmark(const TestParameters& params)
decode_work_count,
(void**)&original_data[0],
(void**)&encode_work_data[0],
(void**)&decode_work_data[0],
params.multithreaded ? LeopardFlags_Multithreaded : LeopardFlags_Defaults);
(void**)&decode_work_data[0]);
t_leo_decode.EndCall();
if (decodeResult != Leopard_Success)
@ -528,108 +525,6 @@ static bool Benchmark(const TestParameters& params)
}
//------------------------------------------------------------------------------
// Multithreading Tests
#ifdef LEO_ENABLE_MULTITHREADING_OPT
void MultithreadingTests()
{
static const unsigned kBytes = 100;
static const unsigned N = 10000;
std::vector<uint8_t*> Buffers(N * 2);
for (unsigned i = 0; i < N * 2; ++i)
Buffers[i] = leopard::SIMDSafeAllocate(kBytes);
{
for (unsigned j = 1; j <= 16; ++j)
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle();
const unsigned split_j = N / j;
for (unsigned k = 0; k < N; k += split_j)
{
bundle->Dispatch([split_j, k, &Buffers]() {
for (unsigned m = k; m < N && m < (k + split_j); ++m)
{
leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes);
}
});
}
bundle->Complete();
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl;
}
for (unsigned j = 32; j <= N / 2; j *= 2)
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle();
const unsigned split_j = N / j;
for (unsigned k = 0; k < N; k += split_j)
{
bundle->Dispatch([split_j, k, &Buffers]() {
for (unsigned m = k; m < N && m < (k + split_j); ++m)
{
leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes);
}
});
}
bundle->Complete();
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl;
}
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
for (unsigned k = 0; k < N; ++k)
{
leopard::xor_mem(Buffers[k + N], Buffers[k], kBytes);
}
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) single-threaded: " << mbps << " MB/s" << endl;
}
}
for (unsigned i = 0; i < N * 2; ++i)
leopard::SIMDSafeFree(Buffers[i]);
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// Entrypoint
@ -659,37 +554,22 @@ int main(int argc, char **argv)
params.buffer_bytes = atoi(argv[3]);
if (argc >= 5)
params.loss_count = atoi(argv[4]);
if (argc >= 6)
params.multithreaded = (atoi(argv[5]) != 0);
if (params.loss_count > params.recovery_count)
params.loss_count = params.recovery_count;
params.multithreaded = false;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading OFF)" << endl;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "]" << endl;
if (!Benchmark(params))
goto Failed;
params.multithreaded = true;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading ON)" << endl;
if (!Benchmark(params))
goto Failed;
#ifdef LEO_ENABLE_MULTITHREADING_OPT
MultithreadingTests();
#endif
#if 1
static const unsigned kMaxRandomData = 32768;
prng.Seed(params.seed, 8);
for (;; ++params.seed)
{
params.original_count = prng.Next() % kMaxRandomData;
params.original_count = prng.Next() % kMaxRandomData + 1;
params.recovery_count = prng.Next() % params.original_count + 1;
params.loss_count = prng.Next() % params.recovery_count + 1;