From 393dcac6efe39253dc39a9673d953187361d3399 Mon Sep 17 00:00:00 2001 From: Christopher Taylor Date: Tue, 6 Jun 2017 03:13:41 -0700 Subject: [PATCH] Drop my thread pool for OpenMP --- LeopardCommon.cpp | 276 ++++---------------- LeopardCommon.h | 247 +----------------- LeopardFF16.cpp | 581 +++++++------------------------------------ LeopardFF16.h | 14 +- LeopardFF8.cpp | 9 +- LeopardFF8.h | 10 +- leopard.cpp | 27 +- leopard.h | 18 +- proj/Leopard.vcxproj | 1 + tests/benchmark.cpp | 136 +--------- 10 files changed, 173 insertions(+), 1146 deletions(-) diff --git a/LeopardCommon.cpp b/LeopardCommon.cpp index 580d302..76d87a3 100644 --- a/LeopardCommon.cpp +++ b/LeopardCommon.cpp @@ -406,6 +406,36 @@ void xor_mem4( #endif // LEO_USE_VECTOR4_OPT +void VectorXOR_Threads( + const uint64_t bytes, + unsigned count, + void** x, + void** y) +{ +#ifdef LEO_USE_VECTOR4_OPT + if (count >= 4) + { + int i_end = count - 4; +#pragma omp parallel for + for (int i = 0; i <= i_end; i += 4) + { + xor_mem4( + x[i + 0], y[i + 0], + x[i + 1], y[i + 1], + x[i + 2], y[i + 2], + x[i + 3], y[i + 3], + bytes); + } + count %= 4; + i_end -= count; + x += i_end; + y += i_end; + } +#endif // LEO_USE_VECTOR4_OPT + + for (unsigned i = 0; i < count; ++i) + xor_mem(x[i], y[i], bytes); +} void VectorXOR( const uint64_t bytes, unsigned count, @@ -413,16 +443,22 @@ void VectorXOR( void** y) { #ifdef LEO_USE_VECTOR4_OPT - while (count >= 4) + if (count >= 4) { - xor_mem4( - x[0], y[0], - x[1], y[1], - x[2], y[2], - x[3], y[3], - bytes); - x += 4, y += 4; - count -= 4; + int i_end = count - 4; + for (int i = 0; i <= i_end; i += 4) + { + xor_mem4( + x[i + 0], y[i + 0], + x[i + 1], y[i + 1], + x[i + 2], y[i + 2], + x[i + 3], y[i + 3], + bytes); + } + count %= 4; + i_end -= count; + x += i_end; + y += i_end; } #endif // LEO_USE_VECTOR4_OPT @@ -431,226 +467,4 @@ void VectorXOR( } -#ifdef LEO_ENABLE_MULTITHREADING_OPT - -//------------------------------------------------------------------------------ -// WorkerThread - -void WorkerThread::Start(unsigned cpu_affinity) -{ - Stop(); - - CPUAffinity = cpu_affinity; - -#ifdef _WIN32 - hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr); -#endif - - Terminated = false; - Thread = std::make_unique(&WorkerThread::Loop, this); -} - -void WorkerThread::Stop() -{ - if (Thread) - { - Terminated = true; -#ifdef _WIN32 - ::SetEvent(hEvent); -#endif - - try - { - Thread->join(); - } - catch (std::system_error& /*err*/) - { - } - Thread = nullptr; - } - -#ifdef _WIN32 - if (hEvent != nullptr) - { - ::CloseHandle(hEvent); - hEvent = nullptr; - } -#endif -} - -void WorkerThread::Wake() -{ -#ifdef _WIN32 - ::SetEvent(hEvent); -#else - // FIXME: Port to other platforms -#endif -} - -static bool SetCurrentThreadAffinity(unsigned processorIndex) -{ -#ifdef _WIN32 - return 0 != ::SetThreadAffinityMask( - ::GetCurrentThread(), (DWORD_PTR)1 << (processorIndex & 63)); -#elif !defined(ANDROID) - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(processorIndex, &cpuset); - return 0 == pthread_setaffinity_np(pthread_self(), - sizeof(cpu_set_t), &cpuset); -#else - return true; // FIXME: Unused on Android anyway -#endif -} - -void WorkerThread::Loop() -{ - SetCurrentThreadAffinity(CPUAffinity); - - for (;;) - { - if (Terminated) - break; - -#ifdef _WIN32 - const DWORD result = ::WaitForSingleObject(hEvent, INFINITE); - - if (result != WAIT_OBJECT_0 || Terminated) - break; -#else - // FIXME: Port to other platforms -#endif - - PoolInstance->DrainWorkQueue(); - } -} - - -//------------------------------------------------------------------------------ -// WorkBundle - -WorkBundle::WorkBundle() -{ -#ifdef _WIN32 - hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr); -#else - // FIXME: Port to other platforms -#endif -} - -WorkBundle::~WorkBundle() -{ -#ifdef _WIN32 - if (hEvent != nullptr) - { - ::CloseHandle(hEvent); - hEvent = nullptr; - } -#else - // FIXME: Port to other platforms -#endif -} - -void WorkBundle::OnAllOperationsCompleted() -{ -#ifdef _WIN32 - ::SetEvent(hEvent); -#else - // FIXME: Port to other platforms -#endif -} - -void WorkBundle::Join() -{ -#ifdef _WIN32 - ::WaitForSingleObject(hEvent, INFINITE); -#else - // FIXME: Port to other platforms -#endif -} - - -//------------------------------------------------------------------------------ -// WorkerPool - -WorkerPool* PoolInstance = nullptr; - -extern "C" void AtExitWrapper() -{ - if (PoolInstance) - PoolInstance->Stop(); -} - -WorkerPool::WorkerPool() -{ - WorkerCount = 0; - - unsigned num_cpus = std::thread::hardware_concurrency(); - if (num_cpus > 1) - { - WorkerCount = num_cpus - 1; - - Workers = new WorkerThread[WorkerCount]; - for (unsigned i = 0; i < WorkerCount; ++i) - Workers[i].Start(i); - - std::atexit(AtExitWrapper); - } -} - -void WorkerPool::Stop() -{ - Locker locker(QueueLock); - - delete[] Workers; - Workers = nullptr; - WorkerCount = 0; -} - -void WorkerPool::Dispatch(WorkBundle* bundle, const WorkerCallT& call) -{ - Locker locker(QueueLock); - WorkQueue.resize(++Remaining); - WorkQueue[Remaining - 1].Call = call; - WorkQueue[Remaining - 1].Bundle = bundle; -} - -void WorkerPool::Run() -{ - unsigned remaining; - { - Locker locker(QueueLock); - remaining = Remaining; - } - unsigned count = WorkerCount; - if (count > remaining) - count = remaining; - - // Wake all the workers - for (unsigned i = 0; i < count; ++i) - Workers[i].Wake(); - - DrainWorkQueue(); -} - -void WorkerPool::DrainWorkQueue() -{ - for (;;) - { - QueueItem item; - { - Locker locker(QueueLock); - if (Remaining <= 0) - return; - item = WorkQueue[--Remaining]; - } - item.Call(); - item.Bundle->OperationComplete(); - } -} - - -#endif // LEO_ENABLE_MULTITHREADING_OPT - - } // namespace leopard diff --git a/LeopardCommon.h b/LeopardCommon.h index 16a3da7..a399933 100644 --- a/LeopardCommon.h +++ b/LeopardCommon.h @@ -184,11 +184,6 @@ // Unroll inner loops 4 times #define LEO_USE_VECTOR4_OPT -// Enable multithreading optimization when requested -#ifdef _MSC_VER - #define LEO_ENABLE_MULTITHREADING_OPT -#endif - //------------------------------------------------------------------------------ // Debug @@ -407,6 +402,13 @@ void VectorXOR( void** x, void** y); +// x[] ^= y[] (Multithreaded) +void VectorXOR_Threads( + const uint64_t bytes, + unsigned count, + void** x, + void** y); + //------------------------------------------------------------------------------ // XORSummer @@ -457,11 +459,6 @@ protected: static const unsigned kAlignmentBytes = LEO_ALIGN_BYTES; -LEO_FORCE_INLINE unsigned NextAlignedOffset(unsigned offset) -{ - return (offset + kAlignmentBytes - 1) & ~(kAlignmentBytes - 1); -} - static LEO_FORCE_INLINE uint8_t* SIMDSafeAllocate(size_t size) { uint8_t* data = (uint8_t*)calloc(1, kAlignmentBytes + size); @@ -489,234 +486,4 @@ static LEO_FORCE_INLINE void SIMDSafeFree(void* ptr) } -//------------------------------------------------------------------------------ -// Mutex - -#ifdef _WIN32 - -struct Lock -{ - CRITICAL_SECTION cs; - - Lock() { ::InitializeCriticalSectionAndSpinCount(&cs, 1000); } - ~Lock() { ::DeleteCriticalSection(&cs); } - bool TryEnter() { return ::TryEnterCriticalSection(&cs) != FALSE; } - void Enter() { ::EnterCriticalSection(&cs); } - void Leave() { ::LeaveCriticalSection(&cs); } -}; - -#else - -struct Lock -{ - std::recursive_mutex cs; - - bool TryEnter() { return cs.try_lock(); } - void Enter() { cs.lock(); } - void Leave() { cs.unlock(); } -}; - -#endif - -class Locker -{ -public: - Locker(Lock& lock) { - TheLock = &lock; - if (TheLock) - TheLock->Enter(); - } - ~Locker() { Clear(); } - bool TrySet(Lock& lock) { - Clear(); - if (!lock.TryEnter()) - return false; - TheLock = &lock; - return true; - } - void Set(Lock& lock) { - Clear(); - lock.Enter(); - TheLock = &lock; - } - void Clear() { - if (TheLock) - TheLock->Leave(); - TheLock = nullptr; - } -private: - Lock* TheLock; -}; - - -#ifdef LEO_ENABLE_MULTITHREADING_OPT - -//------------------------------------------------------------------------------ -// WorkerThread - -class WorkerThread -{ -public: - WorkerThread() - { - } - ~WorkerThread() - { - Stop(); - } - - void Start(unsigned cpu_affinity); - void Stop(); - void Wake(); - -protected: - unsigned CPUAffinity = 0; - std::atomic_bool Terminated = false; - std::unique_ptr Thread; - -#ifdef _WIN32 - HANDLE hEvent = nullptr; -#else - // FIXME: Port to other platforms - mutable std::mutex QueueLock; - std::condition_variable QueueCondition; -#endif - - void Loop(); -}; - - -//------------------------------------------------------------------------------ -// WorkBundle - -typedef std::function WorkerCallT; -class WorkerPool; -extern WorkerPool* PoolInstance; - -class WorkBundle -{ - friend class WorkerPool; - -public: - WorkBundle(); - ~WorkBundle(); - - void Dispatch(const WorkerCallT& call); - void Complete(); - -protected: - std::atomic WorkCount; - -#ifdef _WIN32 - HANDLE hEvent = nullptr; -#else - // FIXME: Port to other platforms -#endif - - - LEO_FORCE_INLINE void Increment() - { - ++WorkCount; - } - LEO_FORCE_INLINE void OperationComplete() - { - if (--WorkCount == 0) - OnAllOperationsCompleted(); - } - void Join(); - - void OnAllOperationsCompleted(); -}; - - -//------------------------------------------------------------------------------ -// WorkerPool - -class WorkerPool -{ - friend class WorkerThread; - friend class WorkBundle; - -public: - WorkerPool(); - void Stop(); - - unsigned GetParallelism() const - { - return WorkerCount + 1; - } - - WorkBundle* GetBundle() - { - WorkBundle* back; - { - Locker locker(BundleLock); - - if (FreeBundles.empty()) - { - locker.Clear(); - back = new WorkBundle; - } - else - { - back = FreeBundles.back(); - FreeBundles.pop_back(); - } - } - back->Increment(); - return back; - } - - void FreeBundle(WorkBundle* bundle) - { - Locker locker(BundleLock); - FreeBundles.push_back(bundle); - } - -protected: - void Dispatch(WorkBundle* bundle, const WorkerCallT& call); - void Run(); - - void DrainWorkQueue(); - - mutable Lock QueueLock; - - WorkerThread* Workers = nullptr; - unsigned WorkerCount = 0; - - struct QueueItem - { - WorkerCallT Call; - WorkBundle* Bundle; - }; - - std::vector WorkQueue; - unsigned Remaining; - - mutable Lock BundleLock; - // TBD: Free this memory on shutdown? - std::vector FreeBundles; -}; - - -inline void WorkBundle::Dispatch(const WorkerCallT& call) -{ - Increment(); - PoolInstance->Dispatch(this, call); -} - -inline void WorkBundle::Complete() -{ - if (WorkCount > 0) - { - PoolInstance->Run(); - OperationComplete(); - Join(); - } - PoolInstance->FreeBundle(this); -} - -#endif // LEO_ENABLE_MULTITHREADING_OPT - - } // namespace leopard diff --git a/LeopardFF16.cpp b/LeopardFF16.cpp index b6845ee..108f85c 100644 --- a/LeopardFF16.cpp +++ b/LeopardFF16.cpp @@ -118,17 +118,20 @@ static void FWHT(ffe_t* data, const unsigned m, const unsigned m_truncated) for (; dist4 <= m; dist = dist4, dist4 <<= 2) { // For each set of dist*4 elements: - for (unsigned r = 0; r < m_truncated; r += dist4) +#pragma omp parallel for + for (int r = 0; r < m_truncated; r += dist4) { // For each set of dist elements: - for (unsigned i = r; i < r + dist; ++i) +#pragma omp parallel for + for (int i = r; i < r + dist; ++i) FWHT_4(data + i, dist); } } // If there is one layer left: if (dist < m) - for (unsigned i = 0; i < dist; ++i) +#pragma omp parallel for + for (int i = 0; i < dist; ++i) FWHT_2(data[i], data[i + dist]); } @@ -305,7 +308,8 @@ static void InitializeMultiplyTables() Multiply128LUT = reinterpret_cast(SIMDSafeAllocate(sizeof(Multiply128LUT_t) * kOrder)); // For each value we could multiply by: - for (unsigned log_m = 0; log_m < kOrder; ++log_m) +#pragma omp parallel for + for (int log_m = 0; log_m < kOrder; ++log_m) { // For each 4 bits of the finite field width in bits: for (unsigned i = 0, shift = 0; i < 4; ++i, shift += 4) @@ -770,9 +774,11 @@ static void IFFT_DIT_Encoder( // I tried rolling the memcpy/memset into the first layer of the FFT and // found that it only yields a 4% performance improvement, which is not // worth the extra complexity. - for (unsigned i = 0; i < m_truncated; ++i) +#pragma omp parallel for + for (int i = 0; i < m_truncated; ++i) memcpy(work[i], data[i], bytes); - for (unsigned i = m_truncated; i < m; ++i) +#pragma omp parallel for + for (int i = m_truncated; i < m; ++i) memset(work[i], 0, bytes); // I tried splitting up the first few layers into L3-cache sized blocks but @@ -784,7 +790,8 @@ static void IFFT_DIT_Encoder( for (; dist4 <= m; dist = dist4, dist4 <<= 2) { // For each set of dist*4 elements: - for (unsigned r = 0; r < m_truncated; r += dist4) +#pragma omp parallel for + for (int r = 0; r < m_truncated; r += dist4) { const unsigned i_end = r + dist; const ffe_t log_m01 = skewLUT[i_end]; @@ -792,7 +799,7 @@ static void IFFT_DIT_Encoder( const ffe_t log_m23 = skewLUT[i_end + dist * 2]; // For each set of dist elements: - for (unsigned i = r; i < i_end; ++i) + for (int i = r; i < i_end; ++i) { IFFT_DIT4( bytes, @@ -818,10 +825,11 @@ static void IFFT_DIT_Encoder( const ffe_t log_m = skewLUT[dist]; if (log_m == kModulus) - VectorXOR(bytes, dist, work + dist, work); + VectorXOR_Threads(bytes, dist, work + dist, work); else { - for (unsigned i = 0; i < dist; ++i) +#pragma omp parallel for + for (int i = 0; i < dist; ++i) { IFFT_DIT2( work[i], @@ -835,7 +843,7 @@ static void IFFT_DIT_Encoder( // I tried unrolling this but it does not provide more than 5% performance // improvement for 16-bit finite fields, so it's not worth the complexity. if (xor_result) - VectorXOR(bytes, m, xor_result, work); + VectorXOR_Threads(bytes, m, xor_result, work); } @@ -852,7 +860,8 @@ static void IFFT_DIT_Decoder( for (; dist4 <= m; dist = dist4, dist4 <<= 2) { // For each set of dist*4 elements: - for (unsigned r = 0; r < m_truncated; r += dist4) +#pragma omp parallel for + for (int r = 0; r < m_truncated; r += dist4) { const unsigned i_end = r + dist; const ffe_t log_m01 = skewLUT[i_end]; @@ -860,7 +869,7 @@ static void IFFT_DIT_Decoder( const ffe_t log_m23 = skewLUT[i_end + dist * 2]; // For each set of dist elements: - for (unsigned i = r; i < i_end; ++i) + for (int i = r; i < i_end; ++i) { IFFT_DIT4( bytes, @@ -882,10 +891,11 @@ static void IFFT_DIT_Decoder( const ffe_t log_m = skewLUT[dist]; if (log_m == kModulus) - VectorXOR(bytes, dist, work + dist, work); + VectorXOR_Threads(bytes, dist, work + dist, work); else { - for (unsigned i = 0; i < dist; ++i) +#pragma omp parallel for + for (int i = 0; i < dist; ++i) { IFFT_DIT2( work[i], @@ -897,155 +907,6 @@ static void IFFT_DIT_Decoder( } } -#ifdef LEO_ENABLE_MULTITHREADING_OPT - -// Multithreaded decoder version -static void IFFT_DIT_Decoder_MT( - const uint64_t bytes, - const unsigned m_truncated, - void** work, - const unsigned m, - const ffe_t* skewLUT) -{ - unsigned bundle_min = (unsigned)(8000 / bytes); - if (bundle_min < 1) - bundle_min = 1; - const unsigned parallel_max = PoolInstance->GetParallelism(); - - // Decimation in time: Unroll 2 layers at a time - unsigned dist = 1, dist4 = 4; - for (; dist4 <= m; dist = dist4, dist4 <<= 2) - { - WorkBundle* workBundle = PoolInstance->GetBundle(); - - if (m_truncated <= dist4) - { - unsigned bundle_count = dist / parallel_max; - if (bundle_count < bundle_min) - bundle_count = bundle_min; - - // For each set of dist*4 elements: - const unsigned i_end = dist; - const ffe_t log_m01 = skewLUT[i_end]; - const ffe_t log_m02 = skewLUT[i_end + dist]; - const ffe_t log_m23 = skewLUT[i_end + dist * 2]; - - // For each set of dist elements: - for (unsigned i = 0; i < i_end; i += bundle_count) - { - workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_count, i, i_end, dist]() { - unsigned j_end = i + bundle_count; - if (j_end > i_end) - j_end = i_end; - for (unsigned j = i; j < j_end; ++j) - { - IFFT_DIT4( - bytes, - work + j, - dist, - log_m01, - log_m23, - log_m02); - } - }); - } - } - else - { - unsigned bundle_count = (m_truncated / dist) / parallel_max; - if (bundle_count < bundle_min) - bundle_count = bundle_min; - bundle_count /= 4; - if (bundle_count < 1) - bundle_count = 1; - const unsigned dist4_bundle_count = dist4 * bundle_count; - - for (unsigned r = 0; r < m_truncated; r += dist4_bundle_count) - { - workBundle->Dispatch([bytes, work, skewLUT, dist4, m_truncated, r, dist4_bundle_count, dist]() { - unsigned s_end = r + dist4_bundle_count; - if (s_end > m_truncated) - s_end = m_truncated; - // For each set of dist*4 elements: - for (unsigned s = r; s < s_end; s += dist4) - { - const unsigned i_end = s + dist; - const ffe_t log_m01 = skewLUT[i_end]; - const ffe_t log_m02 = skewLUT[i_end + dist]; - const ffe_t log_m23 = skewLUT[i_end + dist * 2]; - - // For each set of dist elements: - for (unsigned i = s; i < i_end; ++i) - { - IFFT_DIT4( - bytes, - work + i, - dist, - log_m01, - log_m23, - log_m02); - } - } - }); - } - } - - workBundle->Complete(); - } - - // If there is one layer left: - if (dist < m) - { - WorkBundle* workBundle = PoolInstance->GetBundle(); - - // Assuming that dist = m / 2 - LEO_DEBUG_ASSERT(dist * 2 == m); - - const ffe_t log_m = skewLUT[dist]; - - unsigned bundle_count = dist / parallel_max; - if (bundle_count < bundle_min) - bundle_count = bundle_min; - - if (log_m == kModulus) - { - for (unsigned i = 0; i < dist; i += bundle_count) - { - workBundle->Dispatch([work, i, dist, bundle_count, bytes]() { - unsigned j_end = i + bundle_count; - if (j_end > dist) - j_end = dist; - for (unsigned j = i; j < j_end; ++j) - xor_mem(work[j + dist], work[j], bytes); - }); - } - } - else - { - for (unsigned i = 0; i < dist; i += bundle_count) - { - workBundle->Dispatch([work, i, dist, log_m, bytes, bundle_count]() { - unsigned j_end = i + bundle_count; - if (j_end > dist) - j_end = dist; - for (unsigned j = i; j < j_end; ++j) - { - IFFT_DIT2( - work[j], - work[j + dist], - log_m, - bytes); - } - }); - } - } - - workBundle->Complete(); - } -} - -#endif // LEO_ENABLE_MULTITHREADING_OPT - /* Decimation in time FFT: @@ -1361,7 +1222,8 @@ static void FFT_DIT( const unsigned thread_v = dist; // For each set of dist*4 elements: - for (unsigned r = 0; r < m_truncated; r += dist4) +#pragma omp parallel for + for (int r = 0; r < m_truncated; r += dist4) { const unsigned i_end = r + dist; const ffe_t log_m01 = skewLUT[i_end]; @@ -1369,7 +1231,7 @@ static void FFT_DIT( const ffe_t log_m23 = skewLUT[i_end + dist * 2]; // For each set of dist elements: - for (unsigned i = r; i < i_end; ++i) + for (int i = r; i < i_end; ++i) { FFT_DIT4( bytes, @@ -1385,7 +1247,8 @@ static void FFT_DIT( // If there is one layer left: if (dist4 == 2) { - for (unsigned r = 0; r < m_truncated; r += 2) +#pragma omp parallel for + for (int r = 0; r < m_truncated; r += 2) { const ffe_t log_m = skewLUT[r + 1]; @@ -1413,8 +1276,7 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, const void* const * data, - void** work, - bool multithreaded) + void** work) { // work <- IFFT(data, m, m) @@ -1608,17 +1470,20 @@ static void FFT_DIT_ErrorBits( for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2) { // For each set of dist*4 elements: - for (unsigned r = 0; r < n_truncated; r += dist4) +#pragma omp parallel for + for (int r = 0; r < n_truncated; r += dist4) { if (!error_bits.IsNeeded(mip_level, r)) continue; - const ffe_t log_m01 = skewLUT[r + dist]; - const ffe_t log_m23 = skewLUT[r + dist * 3]; - const ffe_t log_m02 = skewLUT[r + dist * 2]; + const unsigned i_end = r + dist; + const ffe_t log_m01 = skewLUT[i_end]; + const ffe_t log_m02 = skewLUT[i_end + dist]; + const ffe_t log_m23 = skewLUT[i_end + dist * 2]; // For each set of dist elements: - for (unsigned i = r; i < r + dist; ++i) +#pragma omp parallel for + for (int i = r; i < i_end; ++i) { FFT_DIT4( bytes, @@ -1634,8 +1499,12 @@ static void FFT_DIT_ErrorBits( // If there is one layer left: if (dist4 == 2) { - for (unsigned r = 0; r < n_truncated; r += 2) +#pragma omp parallel for + for (int r = 0; r < n_truncated; r += 2) { + if (!error_bits.IsNeeded(mip_level, r)) + continue; + const ffe_t log_m = skewLUT[r + 1]; if (log_m == kModulus) @@ -1652,148 +1521,6 @@ static void FFT_DIT_ErrorBits( } } -#ifdef LEO_ENABLE_MULTITHREADING_OPT - -static void FFT_DIT_ErrorBits_MT( - const uint64_t bytes, - void** work, - const unsigned n_truncated, - const unsigned n, - const ffe_t* skewLUT, - const ErrorBitfield& error_bits) -{ - unsigned bundle_min = (unsigned)(8000 / bytes); - if (bundle_min < 1) - bundle_min = 1; - const unsigned parallel_max = PoolInstance->GetParallelism(); - - unsigned mip_level = LastNonzeroBit32(n); - - // Decimation in time: Unroll 2 layers at a time - unsigned dist4 = n, dist = n >> 2; - for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2) - { - WorkBundle* workBundle = PoolInstance->GetBundle(); - - if (n_truncated <= dist4) - { - if (error_bits.IsNeeded(mip_level, 0)) - { - unsigned bundle_size = dist / parallel_max; - if (bundle_size < bundle_min) - bundle_size = bundle_min; - - const unsigned i_end = dist; - const ffe_t log_m01 = skewLUT[i_end]; - const ffe_t log_m02 = skewLUT[i_end + dist]; - const ffe_t log_m23 = skewLUT[i_end + dist * 2]; - - // For each set of dist elements: - for (unsigned i = 0; i < i_end; i += bundle_size) - { - workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_size, i, i_end, dist]() { - unsigned j_end = i + bundle_size; - if (j_end > i_end) - j_end = i_end; - for (unsigned j = i; j < j_end; ++j) - { - FFT_DIT4( - bytes, - work + j, - dist, - log_m01, - log_m23, - log_m02); - } - }); - } - } - } - else - { - unsigned bundle_count = (n_truncated / dist) / parallel_max; - if (bundle_count < bundle_min) - bundle_count = bundle_min; - bundle_count /= 4; - if (bundle_count < 1) - bundle_count = 1; - const unsigned dist4_bundle_count = dist4 * bundle_count; - - // For each set of dist*4 elements: - for (unsigned r = 0; r < n_truncated; r += dist4_bundle_count) - { - workBundle->Dispatch([bytes, work, skewLUT, &error_bits, mip_level, dist4, n_truncated, r, dist4_bundle_count, dist]() { - unsigned s_end = r + dist4_bundle_count; - if (s_end > n_truncated) - s_end = n_truncated; - for (unsigned s = r; s < s_end; s += dist4) - { - if (!error_bits.IsNeeded(mip_level, s)) - continue; - - const unsigned i_end = s + dist; - const ffe_t log_m01 = skewLUT[i_end]; - const ffe_t log_m02 = skewLUT[i_end + dist]; - const ffe_t log_m23 = skewLUT[i_end + dist * 2]; - - // For each set of dist elements: - for (unsigned i = s; i < i_end; ++i) - { - FFT_DIT4( - bytes, - work + i, - dist, - log_m01, - log_m23, - log_m02); - } - } - }); - } - } - - workBundle->Complete(); - } - - // If there is one layer left: - if (dist4 == 2) - { - WorkBundle* workBundle = PoolInstance->GetBundle(); - - unsigned bundle_count = (n_truncated / 2) / parallel_max; - if (bundle_count < bundle_min) - bundle_count = bundle_min; - - for (unsigned s = 0; s < n_truncated; s += 2 * bundle_count) - { - workBundle->Dispatch([bytes, skewLUT, work, s, n_truncated, bundle_count]() { - unsigned r_end = s + 2 * bundle_count; - if (r_end > n_truncated) - r_end = n_truncated; - for (unsigned r = s; r < r_end; r += 2) - { - const ffe_t log_m = skewLUT[r + 1]; - - if (log_m == kModulus) - xor_mem(work[r + 1], work[r], bytes); - else - { - FFT_DIT2( - work[r], - work[r + 1], - log_m, - bytes); - } - } - }); - } - - workBundle->Complete(); - } -} - -#endif // LEO_ENABLE_MULTITHREADING_OPT - #endif // LEO_ERROR_BITFIELD_OPT @@ -1808,11 +1535,8 @@ void ReedSolomonDecode( unsigned n, // NextPow2(m + original_count) = work_count const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work, // n entries - bool multithreaded) + void** work) // n entries { - if (multithreaded && n * buffer_bytes < 512000) - multithreaded = false; // Fill in error locations #ifdef LEO_ERROR_BITFIELD_OPT @@ -1820,10 +1544,10 @@ void ReedSolomonDecode( #endif // LEO_ERROR_BITFIELD_OPT ffe_t error_locations[kOrder] = {}; - for (unsigned i = 0; i < recovery_count; ++i) + for (int i = 0; i < recovery_count; ++i) if (!recovery[i]) error_locations[i] = 1; - for (unsigned i = recovery_count; i < m; ++i) + for (int i = recovery_count; i < m; ++i) error_locations[i] = 1; for (unsigned i = 0; i < original_count; ++i) { @@ -1844,180 +1568,60 @@ void ReedSolomonDecode( FWHT(error_locations, kOrder, m + original_count); - for (unsigned i = 0; i < kOrder; ++i) +#pragma omp parallel for + for (int i = 0; i < kOrder; ++i) error_locations[i] = ((unsigned)error_locations[i] * (unsigned)LogWalsh[i]) % kModulus; FWHT(error_locations, kOrder, kOrder); -#ifdef LEO_ENABLE_MULTITHREADING_OPT - if (multithreaded) + // work <- recovery data + +#pragma omp parallel for + for (int i = 0; i < recovery_count; ++i) { - unsigned bundle_min = (unsigned)(64000 / buffer_bytes); - if (bundle_min < 1) - bundle_min = 1; - const unsigned parallel_max = PoolInstance->GetParallelism(); - unsigned bundle_size = recovery_count / parallel_max; - if (bundle_size < bundle_min) - bundle_size = bundle_min; - - WorkBundle* workBundle = PoolInstance->GetBundle(); - - // work <- recovery data - - for (unsigned i = 0; i < recovery_count; i += bundle_size) - { - workBundle->Dispatch([i, recovery_count, recovery, &error_locations, bundle_size, work, buffer_bytes]() { - unsigned j_end = i + bundle_size; - if (j_end > recovery_count) - j_end = recovery_count; - for (unsigned j = i; j < j_end; ++j) - { - if (recovery[j]) - mul_mem(work[j], recovery[j], error_locations[j], buffer_bytes); - else - memset(work[j], 0, buffer_bytes); - } - }); - } - workBundle->Dispatch([recovery_count, m, buffer_bytes, work]() { - for (unsigned i = recovery_count; i < m; ++i) - memset(work[i], 0, buffer_bytes); - }); - - // work <- original data - - bundle_size = original_count / parallel_max; - if (bundle_size < bundle_min) - bundle_size = bundle_min; - - for (unsigned i = 0; i < original_count; i += bundle_size) - { - workBundle->Dispatch([i, original_count, original, &error_locations, m, bundle_size, work, buffer_bytes]() { - unsigned j_end = i + bundle_size; - if (j_end > original_count) - j_end = original_count; - for (unsigned j = i; j < j_end; ++j) - { - if (original[j]) - mul_mem(work[m + j], original[j], error_locations[m + j], buffer_bytes); - else - memset(work[m + j], 0, buffer_bytes); - } - }); - } - workBundle->Dispatch([original_count, m, n, buffer_bytes, work]() { - for (unsigned i = m + original_count; i < n; ++i) - memset(work[i], 0, buffer_bytes); - }); - - workBundle->Complete(); - } - else -#endif // LEO_ENABLE_MULTITHREADING_OPT - { - // work <- recovery data - - for (unsigned i = 0; i < recovery_count; ++i) - { - if (recovery[i]) - mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes); - else - memset(work[i], 0, buffer_bytes); - } - for (unsigned i = recovery_count; i < m; ++i) - memset(work[i], 0, buffer_bytes); - - // work <- original data - - for (unsigned i = 0; i < original_count; ++i) - { - if (original[i]) - mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes); - else - memset(work[m + i], 0, buffer_bytes); - } - for (unsigned i = m + original_count; i < n; ++i) + if (recovery[i]) + mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes); + else memset(work[i], 0, buffer_bytes); } +#pragma omp parallel for + for (int i = recovery_count; i < m; ++i) + memset(work[i], 0, buffer_bytes); + + // work <- original data + +#pragma omp parallel for + for (int i = 0; i < original_count; ++i) + { + if (original[i]) + mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes); + else + memset(work[m + i], 0, buffer_bytes); + } +#pragma omp parallel for + for (int i = m + original_count; i < n; ++i) + memset(work[i], 0, buffer_bytes); // work <- IFFT(work, n, 0) -#ifdef LEO_ENABLE_MULTITHREADING_OPT - if (multithreaded) - { - IFFT_DIT_Decoder_MT( - buffer_bytes, - m + original_count, - work, - n, - FFTSkew - 1); - } - else -#endif // LEO_ENABLE_MULTITHREADING_OPT - { - IFFT_DIT_Decoder( - buffer_bytes, - m + original_count, - work, - n, - FFTSkew - 1); - } + IFFT_DIT_Decoder( + buffer_bytes, + m + original_count, + work, + n, + FFTSkew - 1); // work <- FormalDerivative(work, n) -#ifdef LEO_ENABLE_MULTITHREADING_OPT - if (multithreaded) + for (unsigned i = 1; i < n; ++i) { - unsigned bundle_min = (unsigned)(64000 / buffer_bytes); - if (bundle_min < 1) - bundle_min = 1; - const unsigned parallel_max = PoolInstance->GetParallelism(); + const unsigned width = ((i ^ (i - 1)) + 1) >> 1; - for (unsigned i = 1; i < n; ++i) - { - const unsigned width = ((i ^ (i - 1)) + 1) >> 1; - - if (width <= 2 + bundle_min) - { - for (unsigned j = i - width; j < i; ++j) - xor_mem(work[j], work[j + width], buffer_bytes); - } - else - { - unsigned bundle_size = width / parallel_max; - if (bundle_size < bundle_min) - bundle_size = bundle_min; - - WorkBundle* workBundle = PoolInstance->GetBundle(); - - for (unsigned j = i - width; j < i; j += bundle_size) - { - workBundle->Dispatch([work, i, j, width, bundle_size, buffer_bytes]() { - unsigned k_end = j + bundle_size; - if (k_end > i) - k_end = i; - for (unsigned k = j; k < k_end; ++k) - xor_mem(work[k], work[k + width], buffer_bytes); - }); - } - - workBundle->Complete(); - } - } - } - else -#endif // LEO_ENABLE_MULTITHREADING_OPT - { - for (unsigned i = 1; i < n; ++i) - { - const unsigned width = ((i ^ (i - 1)) + 1) >> 1; - - VectorXOR( - buffer_bytes, - width, - work + i - width, - work + i); - } + VectorXOR( + buffer_bytes, + width, + work + i - width, + work + i); } // work <- FFT(work, n, 0) truncated to m + original_count @@ -2025,23 +1629,14 @@ void ReedSolomonDecode( const unsigned output_count = m + original_count; #ifdef LEO_ERROR_BITFIELD_OPT -#ifdef LEO_ENABLE_MULTITHREADING_OPT - if (multithreaded) - { - FFT_DIT_ErrorBits_MT(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); - } - else -#endif // LEO_ENABLE_MULTITHREADING_OPT - { - FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); - } + FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); #else FFT_DIT(buffer_bytes, work, output_count, n, FFTSkew - 1); #endif // Reveal erasures - for (unsigned i = 0; i < original_count; ++i) + for (int i = 0; i < original_count; ++i) if (!original[i]) mul_mem(work[i], work[i + m], kModulus - error_locations[i + m], buffer_bytes); } diff --git a/LeopardFF16.h b/LeopardFF16.h index ab30977..6f8ba6c 100644 --- a/LeopardFF16.h +++ b/LeopardFF16.h @@ -73,21 +73,19 @@ void ReedSolomonEncode( uint64_t buffer_bytes, unsigned original_count, unsigned recovery_count, - unsigned m, // = NextPow2(recovery_count) * 2 = work_count + unsigned m, // = NextPow2(recovery_count) const void* const * const data, - void** work, // Size of GetEncodeWorkCount() - bool multithreaded); + void** work); // m * 2 elements void ReedSolomonDecode( uint64_t buffer_bytes, unsigned original_count, unsigned recovery_count, unsigned m, // = NextPow2(recovery_count) - unsigned n, // = NextPow2(m + original_count) = work_count - const void* const * const original, // original_count entries - const void* const * const recovery, // recovery_count entries - void** work, // n entries - bool multithreaded); + unsigned n, // = NextPow2(m + original_count) + const void* const * const original, // original_count elements + const void* const * const recovery, // recovery_count elements + void** work); // n elements }} // namespace leopard::ff16 diff --git a/LeopardFF8.cpp b/LeopardFF8.cpp index 8e53043..07061ab 100644 --- a/LeopardFF8.cpp +++ b/LeopardFF8.cpp @@ -1641,8 +1641,7 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, const void* const* data, - void** work, - bool multithreaded) + void** work) { // work <- IFFT(data, m, m) @@ -1820,6 +1819,9 @@ static void FFT_DIT_ErrorBits( { for (unsigned r = 0; r < n_truncated; r += 2) { + if (!error_bits.IsNeeded(mip_level, r)) + continue; + const ffe_t log_m = skewLUT[r + 1]; if (log_m == kModulus) @@ -1850,8 +1852,7 @@ void ReedSolomonDecode( unsigned n, // NextPow2(m + original_count) = work_count const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work, // n entries - bool multithreaded) + void** work) // n entries { // Fill in error locations diff --git a/LeopardFF8.h b/LeopardFF8.h index 1b5c774..fe19ed5 100644 --- a/LeopardFF8.h +++ b/LeopardFF8.h @@ -75,8 +75,7 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, // = NextPow2(recovery_count) const void* const * const data, - void** work, // m * 2 elements - bool multithreaded); + void** work); // m * 2 elements void ReedSolomonDecode( uint64_t buffer_bytes, @@ -84,10 +83,9 @@ void ReedSolomonDecode( unsigned recovery_count, unsigned m, // = NextPow2(recovery_count) unsigned n, // = NextPow2(m + original_count) - const void* const * const original, // original_count entries - const void* const * const recovery, // recovery_count entries - void** work, // n elements - bool multithreaded); + const void* const * const original, // original_count elements + const void* const * const recovery, // recovery_count elements + void** work); // n elements }} // namespace leopard::ff8 diff --git a/leopard.cpp b/leopard.cpp index cf399b9..9f78d19 100644 --- a/leopard.cpp +++ b/leopard.cpp @@ -63,11 +63,6 @@ LEO_EXPORT int leo_init_(int version) return Leopard_Platform; #endif // LEO_HAS_FF16 -#ifdef LEO_ENABLE_MULTITHREADING_OPT - // Start worker threads spinning - leopard::PoolInstance = new leopard::WorkerPool; -#endif // LEO_ENABLE_MULTITHREADING_OPT - m_Initialized = true; return Leopard_Success; @@ -131,8 +126,7 @@ LEO_EXPORT LeopardResult leo_encode( unsigned recovery_count, // Number of recovery_data[] buffer pointers unsigned work_count, // Number of work_data[] buffer pointers, from leo_encode_work_count() const void* const * const original_data, // Array of pointers to original data buffers - void** work_data, // Array of work buffers - unsigned flags) // Operation flags + void** work_data) // Array of work buffers { if (buffer_bytes <= 0 || buffer_bytes % 64 != 0) return Leopard_InvalidSize; @@ -171,8 +165,6 @@ LEO_EXPORT LeopardResult leo_encode( if (work_count != m * 2) return Leopard_InvalidCounts; - const bool mt = (flags & LeopardFlags_Multithreaded) != 0; - #ifdef LEO_HAS_FF8 if (n <= leopard::ff8::kOrder) { @@ -182,8 +174,7 @@ LEO_EXPORT LeopardResult leo_encode( recovery_count, m, original_data, - work_data, - mt); + work_data); } else #endif // LEO_HAS_FF8 @@ -196,8 +187,7 @@ LEO_EXPORT LeopardResult leo_encode( recovery_count, m, original_data, - work_data, - mt); + work_data); } else #endif // LEO_HAS_FF16 @@ -247,8 +237,7 @@ LEO_EXPORT LeopardResult leo_decode( unsigned work_count, // Number of buffer pointers in work_data[] const void* const * const original_data, // Array of original data buffers const void* const * const recovery_data, // Array of recovery data buffers - void** work_data, // Array of work data buffers - unsigned flags) // Operation flags + void** work_data) // Array of work data buffers { if (buffer_bytes <= 0 || buffer_bytes % 64 != 0) return Leopard_InvalidSize; @@ -311,8 +300,6 @@ LEO_EXPORT LeopardResult leo_decode( if (work_count != n) return Leopard_InvalidCounts; - const bool mt = (flags & LeopardFlags_Multithreaded) != 0; - #ifdef LEO_HAS_FF8 if (n <= leopard::ff8::kOrder) { @@ -324,8 +311,7 @@ LEO_EXPORT LeopardResult leo_decode( n, original_data, recovery_data, - work_data, - mt); + work_data); } else #endif // LEO_HAS_FF8 @@ -340,8 +326,7 @@ LEO_EXPORT LeopardResult leo_decode( n, original_data, recovery_data, - work_data, - mt); + work_data); } else #endif // LEO_HAS_FF16 diff --git a/leopard.h b/leopard.h index e6ee529..2a8fa79 100644 --- a/leopard.h +++ b/leopard.h @@ -63,7 +63,7 @@ */ // Library version -#define LEO_VERSION 1 +#define LEO_VERSION 2 // Tweak if the functions are exported or statically linked //#define LEO_DLL /* Defined when building/linking as DLL */ @@ -126,14 +126,6 @@ typedef enum LeopardResultT // Convert Leopard result to string LEO_EXPORT const char* leo_result_string(LeopardResult result); -// Flags -typedef enum LeopardFlagsT -{ - LeopardFlags_Defaults = 0, // Default settings - - LeopardFlags_Multithreaded = 1, // Enable multiple threads -} LeopardFlags; - //------------------------------------------------------------------------------ // Encoder API @@ -163,7 +155,6 @@ LEO_EXPORT unsigned leo_encode_work_count( original_data: Array of pointers to original data buffers. work_count: Number of work_data[] buffers, from leo_encode_work_count(). work_data: Array of pointers to work data buffers. - flags: Flags for encoding e.g. LeopardFlag_Multithreaded The sum of original_count + recovery_count must not exceed 65536. The recovery_count <= original_count. @@ -192,8 +183,7 @@ LEO_EXPORT LeopardResult leo_encode( unsigned recovery_count, // Number of recovery_data[] buffer pointers unsigned work_count, // Number of work_data[] buffer pointers, from leo_encode_work_count() const void* const * const original_data, // Array of pointers to original data buffers - void** work_data, // Array of work buffers - unsigned flags); // Operation flags + void** work_data); // Array of work buffers //------------------------------------------------------------------------------ @@ -225,7 +215,6 @@ LEO_EXPORT unsigned leo_decode_work_count( recovery_data: Array of pointers to recovery data buffers. work_count: Number of work_data[] buffers, from leo_decode_work_count(). work_data: Array of pointers to recovery data buffers. - flags: Flags for encoding e.g. LeopardFlag_Multithreaded Lost original/recovery data should be set to NULL. @@ -242,8 +231,7 @@ LEO_EXPORT LeopardResult leo_decode( unsigned work_count, // Number of buffer pointers in work_data[] const void* const * const original_data, // Array of original data buffers const void* const * const recovery_data, // Array of recovery data buffers - void** work_data, // Array of work data buffers - unsigned flags); // Operation flags + void** work_data); // Array of work data buffers #ifdef __cplusplus diff --git a/proj/Leopard.vcxproj b/proj/Leopard.vcxproj index e065916..8cfb7f2 100644 --- a/proj/Leopard.vcxproj +++ b/proj/Leopard.vcxproj @@ -169,6 +169,7 @@ MultiThreaded true _MBCS;%(PreprocessorDefinitions) + true true diff --git a/tests/benchmark.cpp b/tests/benchmark.cpp index 1a57d0c..5ad1110 100644 --- a/tests/benchmark.cpp +++ b/tests/benchmark.cpp @@ -43,19 +43,18 @@ struct TestParameters { #ifdef LEO_HAS_FF16 unsigned original_count = 1000; // under 65536 - unsigned recovery_count = 200; // under 65536 - original_count + unsigned recovery_count = 100; // under 65536 - original_count #else unsigned original_count = 100; // under 65536 - unsigned recovery_count = 20; // under 65536 - original_count + unsigned recovery_count = 10; // under 65536 - original_count #endif - unsigned buffer_bytes = 6400; // multiple of 64 bytes + unsigned buffer_bytes = 1344; // multiple of 64 bytes unsigned loss_count = 32768; // some fraction of original_count unsigned seed = 2; - bool multithreaded = true; }; static const unsigned kLargeTrialCount = 1; -static const unsigned kSmallTrialCount = 10; +static const unsigned kSmallTrialCount = 300; //------------------------------------------------------------------------------ @@ -420,8 +419,7 @@ static bool Benchmark(const TestParameters& params) params.recovery_count, encode_work_count, (void**)&original_data[0], - (void**)&encode_work_data[0], // recovery data written here - params.multithreaded ? LeopardFlags_Multithreaded : LeopardFlags_Defaults + (void**)&encode_work_data[0] // recovery data written here ); t_leo_encode.EndCall(); @@ -473,8 +471,7 @@ static bool Benchmark(const TestParameters& params) decode_work_count, (void**)&original_data[0], (void**)&encode_work_data[0], - (void**)&decode_work_data[0], - params.multithreaded ? LeopardFlags_Multithreaded : LeopardFlags_Defaults); + (void**)&decode_work_data[0]); t_leo_decode.EndCall(); if (decodeResult != Leopard_Success) @@ -528,108 +525,6 @@ static bool Benchmark(const TestParameters& params) } -//------------------------------------------------------------------------------ -// Multithreading Tests - -#ifdef LEO_ENABLE_MULTITHREADING_OPT - -void MultithreadingTests() -{ - static const unsigned kBytes = 100; - static const unsigned N = 10000; - std::vector Buffers(N * 2); - - for (unsigned i = 0; i < N * 2; ++i) - Buffers[i] = leopard::SIMDSafeAllocate(kBytes); - - { - for (unsigned j = 1; j <= 16; ++j) - { - FunctionTimer trialTimer("trial"); - - for (unsigned t = 0; t < 40; ++t) - { - trialTimer.BeginCall(); - - leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle(); - - const unsigned split_j = N / j; - - for (unsigned k = 0; k < N; k += split_j) - { - bundle->Dispatch([split_j, k, &Buffers]() { - for (unsigned m = k; m < N && m < (k + split_j); ++m) - { - leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes); - } - }); - } - - bundle->Complete(); - - trialTimer.EndCall(); - } - - float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec); - cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl; - } - for (unsigned j = 32; j <= N / 2; j *= 2) - { - FunctionTimer trialTimer("trial"); - - for (unsigned t = 0; t < 40; ++t) - { - trialTimer.BeginCall(); - - leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle(); - - const unsigned split_j = N / j; - - for (unsigned k = 0; k < N; k += split_j) - { - bundle->Dispatch([split_j, k, &Buffers]() { - for (unsigned m = k; m < N && m < (k + split_j); ++m) - { - leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes); - } - }); - } - - bundle->Complete(); - - trialTimer.EndCall(); - } - - float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec); - cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl; - } - { - FunctionTimer trialTimer("trial"); - - for (unsigned t = 0; t < 40; ++t) - { - trialTimer.BeginCall(); - - for (unsigned k = 0; k < N; ++k) - { - leopard::xor_mem(Buffers[k + N], Buffers[k], kBytes); - } - - trialTimer.EndCall(); - } - - float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec); - cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) single-threaded: " << mbps << " MB/s" << endl; - } - } - - for (unsigned i = 0; i < N * 2; ++i) - leopard::SIMDSafeFree(Buffers[i]); -} - -#endif // LEO_ENABLE_MULTITHREADING_OPT - - //------------------------------------------------------------------------------ // Entrypoint @@ -659,37 +554,22 @@ int main(int argc, char **argv) params.buffer_bytes = atoi(argv[3]); if (argc >= 5) params.loss_count = atoi(argv[4]); - if (argc >= 6) - params.multithreaded = (atoi(argv[5]) != 0); if (params.loss_count > params.recovery_count) params.loss_count = params.recovery_count; - params.multithreaded = false; - - cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading OFF)" << endl; + cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "]" << endl; if (!Benchmark(params)) goto Failed; - params.multithreaded = true; - - cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading ON)" << endl; - - if (!Benchmark(params)) - goto Failed; - -#ifdef LEO_ENABLE_MULTITHREADING_OPT - MultithreadingTests(); -#endif - #if 1 static const unsigned kMaxRandomData = 32768; prng.Seed(params.seed, 8); for (;; ++params.seed) { - params.original_count = prng.Next() % kMaxRandomData; + params.original_count = prng.Next() % kMaxRandomData + 1; params.recovery_count = prng.Next() % params.original_count + 1; params.loss_count = prng.Next() % params.recovery_count + 1;