diff --git a/LeopardCommon.cpp b/LeopardCommon.cpp index 96dfd8a..956b48b 100644 --- a/LeopardCommon.cpp +++ b/LeopardCommon.cpp @@ -28,6 +28,8 @@ #include "LeopardCommon.h" +#include + namespace leopard { @@ -429,4 +431,195 @@ void VectorXOR( } +#ifdef LEO_ENABLE_MULTITHREADING_OPT + +//------------------------------------------------------------------------------ +// WorkerThread + +void WorkerThread::Start() +{ + Stop(); + +#ifdef _WIN32 + hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr); +#endif + + Terminated = false; + Thread = std::make_unique(&WorkerThread::Loop, this); +} + +void WorkerThread::Stop() +{ + if (Thread) + { + Terminated = true; +#ifdef _WIN32 + ::SetEvent(hEvent); +#endif + + try + { + Thread->join(); + } + catch (std::system_error& /*err*/) + { + } + Thread = nullptr; + } + +#ifdef _WIN32 + if (hEvent != nullptr) + { + ::CloseHandle(hEvent); + hEvent = nullptr; + } +#endif +} + +void WorkerThread::Wake() +{ +#ifdef _WIN32 + ::SetEvent(hEvent); +#else + // FIXME: Port to other platforms +#endif +} + +void WorkerThread::Loop() +{ + for (;;) + { + if (Terminated) + break; + +#ifdef _WIN32 + const DWORD result = ::WaitForSingleObject(hEvent, INFINITE); + + if (result != WAIT_OBJECT_0 || Terminated) + break; +#else + // FIXME: Port to other platforms +#endif + + PoolInstance->DrainWorkQueue(); + } +} + + +//------------------------------------------------------------------------------ +// WorkBundle + +WorkBundle::WorkBundle() +{ +#ifdef _WIN32 + hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr); +#else + // FIXME: Port to other platforms +#endif +} + +WorkBundle::~WorkBundle() +{ +#ifdef _WIN32 + if (hEvent != nullptr) + { + ::CloseHandle(hEvent); + hEvent = nullptr; + } +#else + // FIXME: Port to other platforms +#endif +} + +void WorkBundle::OnAllOperationsCompleted() +{ +#ifdef _WIN32 + ::SetEvent(hEvent); +#else + // FIXME: Port to other platforms +#endif +} + +void WorkBundle::Join() +{ +#ifdef _WIN32 + ::WaitForSingleObject(hEvent, INFINITE); +#else + // FIXME: Port to other platforms +#endif +} + + +//------------------------------------------------------------------------------ +// WorkerPool + +WorkerPool* PoolInstance = nullptr; + +extern "C" void AtExitWrapper() +{ + if (PoolInstance) + PoolInstance->Stop(); +} + +WorkerPool::WorkerPool() +{ + WorkerCount = 0; + + unsigned num_cpus = std::thread::hardware_concurrency(); + if (num_cpus > 1) + { + WorkerCount = num_cpus - 1; + + Workers = new WorkerThread[WorkerCount]; + for (unsigned i = 0; i < WorkerCount; ++i) + Workers[i].Start(); + + std::atexit(AtExitWrapper); + } +} + +void WorkerPool::Stop() +{ + Locker locker(QueueLock); + + delete[] Workers; + Workers = nullptr; + WorkerCount = 0; +} + +void WorkerPool::Dispatch(WorkerCallT call) +{ + Locker locker(QueueLock); + WorkQueue.resize(++Remaining); + WorkQueue[Remaining - 1] = call; +} + +void WorkerPool::Run() +{ + // Wake all the workers + for (unsigned i = 0, count = WorkerCount; i < count; ++i) + Workers[i].Wake(); + + DrainWorkQueue(); +} + +void WorkerPool::DrainWorkQueue() +{ + for (;;) + { + WorkerCallT call; + { + Locker locker(QueueLock); + if (Remaining <= 0) + return; + call = WorkQueue[--Remaining]; + } + call(); + } +} + + +#endif // LEO_ENABLE_MULTITHREADING_OPT + + } // namespace leopard diff --git a/LeopardCommon.h b/LeopardCommon.h index f6c16c5..7b11684 100644 --- a/LeopardCommon.h +++ b/LeopardCommon.h @@ -31,9 +31,6 @@ /* TODO: - Short-term: - + Multithreading - Mid-term: + Add compile-time selectable XOR-only rowops instead of MULADD + Look into 12-bit fields as a performance optimization @@ -157,6 +154,11 @@ #include #include +#include +#include +#include +#include +#include //------------------------------------------------------------------------------ @@ -182,6 +184,11 @@ // Unroll inner loops 4 times #define LEO_USE_VECTOR4_OPT +// Enable multithreading optimization when requested +#ifdef _MSC_VER + #define LEO_ENABLE_MULTITHREADING_OPT +#endif + //------------------------------------------------------------------------------ // Debug @@ -203,6 +210,32 @@ #endif +//------------------------------------------------------------------------------ +// Windows Header + +#ifdef _WIN32 + #define WIN32_LEAN_AND_MEAN + + #ifndef _WINSOCKAPI_ + #define DID_DEFINE_WINSOCKAPI + #define _WINSOCKAPI_ + #endif + #ifndef NOMINMAX + #define NOMINMAX + #endif + #ifndef _WIN32_WINNT + #define _WIN32_WINNT 0x0601 /* Windows 7+ */ + #endif + + #include +#endif + +#ifdef DID_DEFINE_WINSOCKAPI + #undef _WINSOCKAPI_ + #undef DID_DEFINE_WINSOCKAPI +#endif + + //------------------------------------------------------------------------------ // Platform/Architecture @@ -456,4 +489,162 @@ static LEO_FORCE_INLINE void SIMDSafeFree(void* ptr) } +//------------------------------------------------------------------------------ +// Mutex + +#ifdef _WIN32 + +struct Lock +{ + CRITICAL_SECTION cs; + + Lock() { ::InitializeCriticalSectionAndSpinCount(&cs, 1000); } + ~Lock() { ::DeleteCriticalSection(&cs); } + bool TryEnter() { return ::TryEnterCriticalSection(&cs) != FALSE; } + void Enter() { ::EnterCriticalSection(&cs); } + void Leave() { ::LeaveCriticalSection(&cs); } +}; + +#else + +struct Lock +{ + std::recursive_mutex cs; + + bool TryEnter() { return cs.try_lock(); } + void Enter() { cs.lock(); } + void Leave() { cs.unlock(); } +}; + +#endif + +class Locker +{ +public: + Locker(Lock& lock) { + TheLock = &lock; + if (TheLock) + TheLock->Enter(); + } + ~Locker() { Clear(); } + bool TrySet(Lock& lock) { + Clear(); + if (!lock.TryEnter()) + return false; + TheLock = &lock; + return true; + } + void Set(Lock& lock) { + Clear(); + lock.Enter(); + TheLock = &lock; + } + void Clear() { + if (TheLock) + TheLock->Leave(); + TheLock = nullptr; + } +private: + Lock* TheLock; +}; + + +#ifdef LEO_ENABLE_MULTITHREADING_OPT + +//------------------------------------------------------------------------------ +// WorkerThread + +class WorkerThread +{ +public: + WorkerThread() {} + ~WorkerThread() + { + Stop(); + } + + void Start(); + void Stop(); + void Wake(); + +protected: + std::atomic_bool Terminated = false; + std::unique_ptr Thread; + +#ifdef _WIN32 + HANDLE hEvent = nullptr; +#else + // FIXME: Port to other platforms + mutable std::mutex QueueLock; + std::condition_variable QueueCondition; +#endif + + void Loop(); +}; + + +//------------------------------------------------------------------------------ +// WorkBundle + +class WorkBundle +{ +public: + WorkBundle(); + ~WorkBundle(); + + LEO_FORCE_INLINE void Increment() + { + ++WorkCount; + } + LEO_FORCE_INLINE void OperationComplete() + { + if (--WorkCount == 0) + OnAllOperationsCompleted(); + } + void Join(); + +protected: + std::atomic WorkCount; + +#ifdef _WIN32 + HANDLE hEvent = nullptr; +#else + // FIXME: Port to other platforms +#endif + + void OnAllOperationsCompleted(); +}; + + +//------------------------------------------------------------------------------ +// WorkerPool + +typedef std::function WorkerCallT; + +class WorkerPool +{ + friend class WorkerThread; + +public: + WorkerPool(); + void Stop(); + + void Dispatch(WorkerCallT call); + void Run(); + +protected: + void DrainWorkQueue(); + + mutable Lock QueueLock; + WorkerThread* Workers = nullptr; + unsigned WorkerCount = 0; + std::vector WorkQueue; + unsigned Remaining; +}; + +extern WorkerPool* PoolInstance; + +#endif // LEO_ENABLE_MULTITHREADING_OPT + + } // namespace leopard diff --git a/LeopardFF16.cpp b/LeopardFF16.cpp index 1888b4b..f5e21c4 100644 --- a/LeopardFF16.cpp +++ b/LeopardFF16.cpp @@ -897,6 +897,90 @@ static void IFFT_DIT_Decoder( } } +// Multithreaded decoder version +static void IFFT_DIT_Decoder_MT( + const uint64_t bytes, + const unsigned m_truncated, + void** work, + const unsigned m, + const ffe_t* skewLUT) +{ + WorkBundle workBundle; + + // Decimation in time: Unroll 2 layers at a time + unsigned dist = 1, dist4 = 4; + for (; dist4 <= m; dist = dist4, dist4 <<= 2) + { + // For each set of dist*4 elements: + for (unsigned r = 0; r < m_truncated; r += dist4) + { + const unsigned i_end = r + dist; + const ffe_t log_m01 = skewLUT[i_end]; + const ffe_t log_m02 = skewLUT[i_end + dist]; + const ffe_t log_m23 = skewLUT[i_end + dist * 2]; + + // For each set of dist elements: + for (unsigned i = r; i < i_end; ++i) + { + void** work_i = work + i; + PoolInstance->Dispatch([log_m01, log_m02, log_m23, bytes, work_i, dist, &workBundle]() { + IFFT_DIT4( + bytes, + work_i, + dist, + log_m01, + log_m23, + log_m02); + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + } + + PoolInstance->Run(); + workBundle.Join(); + } + + // If there is one layer left: + if (dist < m) + { + // Assuming that dist = m / 2 + LEO_DEBUG_ASSERT(dist * 2 == m); + + const ffe_t log_m = skewLUT[dist]; + + if (log_m == kModulus) + { + for (unsigned i = 0; i < dist; ++i) + { + PoolInstance->Dispatch([work, i, dist, bytes, &workBundle]() { + xor_mem(work[i + dist], work[i], bytes); + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + } + else + { + for (unsigned i = 0; i < dist; ++i) + { + PoolInstance->Dispatch([work, i, dist, log_m, bytes, &workBundle]() { + IFFT_DIT2( + work[i], + work[i + dist], + log_m, + bytes); + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + } + + PoolInstance->Run(); + workBundle.Join(); + } +} + /* Decimation in time FFT: @@ -1208,6 +1292,9 @@ static void FFT_DIT( unsigned dist4 = m, dist = m >> 2; for (; dist != 0; dist4 = dist, dist >>= 2) { + const unsigned thread_u = m_truncated / dist4; + const unsigned thread_v = dist; + // For each set of dist*4 elements: for (unsigned r = 0; r < m_truncated; r += dist4) { @@ -1261,7 +1348,8 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, const void* const * data, - void** work) + void** work, + bool multithreaded) { // work <- IFFT(data, m, m) @@ -1499,6 +1587,82 @@ static void FFT_DIT_ErrorBits( } } +static void FFT_DIT_ErrorBits_MT( + const uint64_t bytes, + void** work, + const unsigned n_truncated, + const unsigned n, + const ffe_t* skewLUT, + const ErrorBitfield& error_bits) +{ + WorkBundle workBundle; + unsigned mip_level = LastNonzeroBit32(n); + + // Decimation in time: Unroll 2 layers at a time + unsigned dist4 = n, dist = n >> 2; + for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2) + { + // For each set of dist*4 elements: + for (unsigned r = 0; r < n_truncated; r += dist4) + { + if (!error_bits.IsNeeded(mip_level, r)) + continue; + + const ffe_t log_m01 = skewLUT[r + dist]; + const ffe_t log_m23 = skewLUT[r + dist * 3]; + const ffe_t log_m02 = skewLUT[r + dist * 2]; + + // For each set of dist elements: + for (unsigned i = r; i < r + dist; ++i) + { + void** work_i = work + i; + + PoolInstance->Dispatch([bytes, &workBundle, work_i, dist, log_m01, log_m02, log_m23]() { + FFT_DIT4( + bytes, + work_i, + dist, + log_m01, + log_m23, + log_m02); + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + } + + PoolInstance->Run(); + workBundle.Join(); + } + + // If there is one layer left: + if (dist4 == 2) + { + for (unsigned r = 0; r < n_truncated; r += 2) + { + PoolInstance->Dispatch([bytes, &workBundle, skewLUT, work, r]() { + const ffe_t log_m = skewLUT[r + 1]; + + if (log_m == kModulus) + xor_mem(work[r + 1], work[r], bytes); + else + { + FFT_DIT2( + work[r], + work[r + 1], + log_m, + bytes); + } + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + + PoolInstance->Run(); + workBundle.Join(); + } +} + #endif // LEO_ERROR_BITFIELD_OPT @@ -1513,8 +1677,11 @@ void ReedSolomonDecode( unsigned n, // NextPow2(m + original_count) = work_count const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work) // n entries + void** work, // n entries + bool multithreaded) { + if (multithreaded && n * buffer_bytes < 512000) + multithreaded = false; // Fill in error locations #ifdef LEO_ERROR_BITFIELD_OPT @@ -1577,24 +1744,68 @@ void ReedSolomonDecode( // work <- IFFT(work, n, 0) - IFFT_DIT_Decoder( - buffer_bytes, - m + original_count, - work, - n, - FFTSkew - 1); + if (multithreaded) + { + IFFT_DIT_Decoder_MT( + buffer_bytes, + m + original_count, + work, + n, + FFTSkew - 1); + } + else + { + IFFT_DIT_Decoder( + buffer_bytes, + m + original_count, + work, + n, + FFTSkew - 1); + } // work <- FormalDerivative(work, n) - for (unsigned i = 1; i < n; ++i) + if (multithreaded) { - const unsigned width = ((i ^ (i - 1)) + 1) >> 1; + for (unsigned i = 1; i < n; ++i) + { + const unsigned width = ((i ^ (i - 1)) + 1) >> 1; - VectorXOR( - buffer_bytes, - width, - work + i - width, - work + i); + if (width <= 4) + { + for (unsigned j = i - width; j < i; ++j) + xor_mem(work[j], work[j + width], buffer_bytes); + } + else + { + WorkBundle workBundle; + + for (unsigned j = i - width; j < i; ++j) + { + PoolInstance->Dispatch([work, j, width, &workBundle, buffer_bytes]() { + xor_mem(work[j], work[j + width], buffer_bytes); + workBundle.OperationComplete(); + }); + workBundle.Increment(); + } + + PoolInstance->Run(); + workBundle.Join(); + } + } + } + else + { + for (unsigned i = 1; i < n; ++i) + { + const unsigned width = ((i ^ (i - 1)) + 1) >> 1; + + VectorXOR( + buffer_bytes, + width, + work + i - width, + work + i); + } } // work <- FFT(work, n, 0) truncated to m + original_count @@ -1602,7 +1813,14 @@ void ReedSolomonDecode( const unsigned output_count = m + original_count; #ifdef LEO_ERROR_BITFIELD_OPT - FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); + if (multithreaded) + { + FFT_DIT_ErrorBits_MT(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); + } + else + { + FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits); + } #else FFT_DIT(buffer_bytes, work, output_count, n, FFTSkew - 1); #endif diff --git a/LeopardFF16.h b/LeopardFF16.h index 689fdb3..ab30977 100644 --- a/LeopardFF16.h +++ b/LeopardFF16.h @@ -75,7 +75,8 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, // = NextPow2(recovery_count) * 2 = work_count const void* const * const data, - void** work); // Size of GetEncodeWorkCount() + void** work, // Size of GetEncodeWorkCount() + bool multithreaded); void ReedSolomonDecode( uint64_t buffer_bytes, @@ -85,7 +86,8 @@ void ReedSolomonDecode( unsigned n, // = NextPow2(m + original_count) = work_count const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work); // n entries + void** work, // n entries + bool multithreaded); }} // namespace leopard::ff16 diff --git a/LeopardFF8.cpp b/LeopardFF8.cpp index 66fc567..8e53043 100644 --- a/LeopardFF8.cpp +++ b/LeopardFF8.cpp @@ -1641,7 +1641,8 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, const void* const* data, - void** work) + void** work, + bool multithreaded) { // work <- IFFT(data, m, m) @@ -1849,7 +1850,8 @@ void ReedSolomonDecode( unsigned n, // NextPow2(m + original_count) = work_count const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work) // n entries + void** work, // n entries + bool multithreaded) { // Fill in error locations diff --git a/LeopardFF8.h b/LeopardFF8.h index 4014d4e..1b5c774 100644 --- a/LeopardFF8.h +++ b/LeopardFF8.h @@ -75,7 +75,8 @@ void ReedSolomonEncode( unsigned recovery_count, unsigned m, // = NextPow2(recovery_count) const void* const * const data, - void** work); // m * 2 elements + void** work, // m * 2 elements + bool multithreaded); void ReedSolomonDecode( uint64_t buffer_bytes, @@ -85,7 +86,8 @@ void ReedSolomonDecode( unsigned n, // = NextPow2(m + original_count) const void* const * const original, // original_count entries const void* const * const recovery, // recovery_count entries - void** work); // n elements + void** work, // n elements + bool multithreaded); }} // namespace leopard::ff8 diff --git a/leopard.cpp b/leopard.cpp index 37bd8e4..cf399b9 100644 --- a/leopard.cpp +++ b/leopard.cpp @@ -63,6 +63,12 @@ LEO_EXPORT int leo_init_(int version) return Leopard_Platform; #endif // LEO_HAS_FF16 +#ifdef LEO_ENABLE_MULTITHREADING_OPT + // Start worker threads spinning + leopard::PoolInstance = new leopard::WorkerPool; +#endif // LEO_ENABLE_MULTITHREADING_OPT + + m_Initialized = true; return Leopard_Success; } @@ -176,7 +182,8 @@ LEO_EXPORT LeopardResult leo_encode( recovery_count, m, original_data, - work_data); + work_data, + mt); } else #endif // LEO_HAS_FF8 @@ -189,7 +196,8 @@ LEO_EXPORT LeopardResult leo_encode( recovery_count, m, original_data, - work_data); + work_data, + mt); } else #endif // LEO_HAS_FF16 @@ -316,7 +324,8 @@ LEO_EXPORT LeopardResult leo_decode( n, original_data, recovery_data, - work_data); + work_data, + mt); } else #endif // LEO_HAS_FF8 @@ -331,7 +340,8 @@ LEO_EXPORT LeopardResult leo_decode( n, original_data, recovery_data, - work_data); + work_data, + mt); } else #endif // LEO_HAS_FF16 diff --git a/tests/benchmark.cpp b/tests/benchmark.cpp index b85d62f..03dc175 100644 --- a/tests/benchmark.cpp +++ b/tests/benchmark.cpp @@ -51,11 +51,11 @@ struct TestParameters unsigned buffer_bytes = 64000; // multiple of 64 bytes unsigned loss_count = 32768; // some fraction of original_count unsigned seed = 2; - bool multithreaded = true; + bool multithreaded = false; }; static const unsigned kLargeTrialCount = 1; -static const unsigned kSmallTrialCount = 1; +static const unsigned kSmallTrialCount = 4; //------------------------------------------------------------------------------