Multithreading support (buggy)

This commit is contained in:
Christopher Taylor 2017-06-05 01:05:51 -07:00
parent d7567e5dd0
commit 9498f45011
8 changed files with 649 additions and 31 deletions

View File

@ -28,6 +28,8 @@
#include "LeopardCommon.h"
#include <thread>
namespace leopard {
@ -429,4 +431,195 @@ void VectorXOR(
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// WorkerThread
void WorkerThread::Start()
{
Stop();
#ifdef _WIN32
hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr);
#endif
Terminated = false;
Thread = std::make_unique<std::thread>(&WorkerThread::Loop, this);
}
void WorkerThread::Stop()
{
if (Thread)
{
Terminated = true;
#ifdef _WIN32
::SetEvent(hEvent);
#endif
try
{
Thread->join();
}
catch (std::system_error& /*err*/)
{
}
Thread = nullptr;
}
#ifdef _WIN32
if (hEvent != nullptr)
{
::CloseHandle(hEvent);
hEvent = nullptr;
}
#endif
}
void WorkerThread::Wake()
{
#ifdef _WIN32
::SetEvent(hEvent);
#else
// FIXME: Port to other platforms
#endif
}
void WorkerThread::Loop()
{
for (;;)
{
if (Terminated)
break;
#ifdef _WIN32
const DWORD result = ::WaitForSingleObject(hEvent, INFINITE);
if (result != WAIT_OBJECT_0 || Terminated)
break;
#else
// FIXME: Port to other platforms
#endif
PoolInstance->DrainWorkQueue();
}
}
//------------------------------------------------------------------------------
// WorkBundle
WorkBundle::WorkBundle()
{
#ifdef _WIN32
hEvent = ::CreateEventW(nullptr, FALSE, FALSE, nullptr);
#else
// FIXME: Port to other platforms
#endif
}
WorkBundle::~WorkBundle()
{
#ifdef _WIN32
if (hEvent != nullptr)
{
::CloseHandle(hEvent);
hEvent = nullptr;
}
#else
// FIXME: Port to other platforms
#endif
}
void WorkBundle::OnAllOperationsCompleted()
{
#ifdef _WIN32
::SetEvent(hEvent);
#else
// FIXME: Port to other platforms
#endif
}
void WorkBundle::Join()
{
#ifdef _WIN32
::WaitForSingleObject(hEvent, INFINITE);
#else
// FIXME: Port to other platforms
#endif
}
//------------------------------------------------------------------------------
// WorkerPool
WorkerPool* PoolInstance = nullptr;
extern "C" void AtExitWrapper()
{
if (PoolInstance)
PoolInstance->Stop();
}
WorkerPool::WorkerPool()
{
WorkerCount = 0;
unsigned num_cpus = std::thread::hardware_concurrency();
if (num_cpus > 1)
{
WorkerCount = num_cpus - 1;
Workers = new WorkerThread[WorkerCount];
for (unsigned i = 0; i < WorkerCount; ++i)
Workers[i].Start();
std::atexit(AtExitWrapper);
}
}
void WorkerPool::Stop()
{
Locker locker(QueueLock);
delete[] Workers;
Workers = nullptr;
WorkerCount = 0;
}
void WorkerPool::Dispatch(WorkerCallT call)
{
Locker locker(QueueLock);
WorkQueue.resize(++Remaining);
WorkQueue[Remaining - 1] = call;
}
void WorkerPool::Run()
{
// Wake all the workers
for (unsigned i = 0, count = WorkerCount; i < count; ++i)
Workers[i].Wake();
DrainWorkQueue();
}
void WorkerPool::DrainWorkQueue()
{
for (;;)
{
WorkerCallT call;
{
Locker locker(QueueLock);
if (Remaining <= 0)
return;
call = WorkQueue[--Remaining];
}
call();
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
} // namespace leopard

View File

@ -31,9 +31,6 @@
/*
TODO:
Short-term:
+ Multithreading
Mid-term:
+ Add compile-time selectable XOR-only rowops instead of MULADD
+ Look into 12-bit fields as a performance optimization
@ -157,6 +154,11 @@
#include <stdint.h>
#include <malloc.h>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
#include <condition_variable>
//------------------------------------------------------------------------------
@ -182,6 +184,11 @@
// Unroll inner loops 4 times
#define LEO_USE_VECTOR4_OPT
// Enable multithreading optimization when requested
#ifdef _MSC_VER
#define LEO_ENABLE_MULTITHREADING_OPT
#endif
//------------------------------------------------------------------------------
// Debug
@ -203,6 +210,32 @@
#endif
//------------------------------------------------------------------------------
// Windows Header
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WINSOCKAPI_
#define DID_DEFINE_WINSOCKAPI
#define _WINSOCKAPI_
#endif
#ifndef NOMINMAX
#define NOMINMAX
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601 /* Windows 7+ */
#endif
#include <windows.h>
#endif
#ifdef DID_DEFINE_WINSOCKAPI
#undef _WINSOCKAPI_
#undef DID_DEFINE_WINSOCKAPI
#endif
//------------------------------------------------------------------------------
// Platform/Architecture
@ -456,4 +489,162 @@ static LEO_FORCE_INLINE void SIMDSafeFree(void* ptr)
}
//------------------------------------------------------------------------------
// Mutex
#ifdef _WIN32
struct Lock
{
CRITICAL_SECTION cs;
Lock() { ::InitializeCriticalSectionAndSpinCount(&cs, 1000); }
~Lock() { ::DeleteCriticalSection(&cs); }
bool TryEnter() { return ::TryEnterCriticalSection(&cs) != FALSE; }
void Enter() { ::EnterCriticalSection(&cs); }
void Leave() { ::LeaveCriticalSection(&cs); }
};
#else
struct Lock
{
std::recursive_mutex cs;
bool TryEnter() { return cs.try_lock(); }
void Enter() { cs.lock(); }
void Leave() { cs.unlock(); }
};
#endif
class Locker
{
public:
Locker(Lock& lock) {
TheLock = &lock;
if (TheLock)
TheLock->Enter();
}
~Locker() { Clear(); }
bool TrySet(Lock& lock) {
Clear();
if (!lock.TryEnter())
return false;
TheLock = &lock;
return true;
}
void Set(Lock& lock) {
Clear();
lock.Enter();
TheLock = &lock;
}
void Clear() {
if (TheLock)
TheLock->Leave();
TheLock = nullptr;
}
private:
Lock* TheLock;
};
#ifdef LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// WorkerThread
class WorkerThread
{
public:
WorkerThread() {}
~WorkerThread()
{
Stop();
}
void Start();
void Stop();
void Wake();
protected:
std::atomic_bool Terminated = false;
std::unique_ptr<std::thread> Thread;
#ifdef _WIN32
HANDLE hEvent = nullptr;
#else
// FIXME: Port to other platforms
mutable std::mutex QueueLock;
std::condition_variable QueueCondition;
#endif
void Loop();
};
//------------------------------------------------------------------------------
// WorkBundle
class WorkBundle
{
public:
WorkBundle();
~WorkBundle();
LEO_FORCE_INLINE void Increment()
{
++WorkCount;
}
LEO_FORCE_INLINE void OperationComplete()
{
if (--WorkCount == 0)
OnAllOperationsCompleted();
}
void Join();
protected:
std::atomic<unsigned> WorkCount;
#ifdef _WIN32
HANDLE hEvent = nullptr;
#else
// FIXME: Port to other platforms
#endif
void OnAllOperationsCompleted();
};
//------------------------------------------------------------------------------
// WorkerPool
typedef std::function<void()> WorkerCallT;
class WorkerPool
{
friend class WorkerThread;
public:
WorkerPool();
void Stop();
void Dispatch(WorkerCallT call);
void Run();
protected:
void DrainWorkQueue();
mutable Lock QueueLock;
WorkerThread* Workers = nullptr;
unsigned WorkerCount = 0;
std::vector<WorkerCallT> WorkQueue;
unsigned Remaining;
};
extern WorkerPool* PoolInstance;
#endif // LEO_ENABLE_MULTITHREADING_OPT
} // namespace leopard

View File

@ -897,6 +897,90 @@ static void IFFT_DIT_Decoder(
}
}
// Multithreaded decoder version
static void IFFT_DIT_Decoder_MT(
const uint64_t bytes,
const unsigned m_truncated,
void** work,
const unsigned m,
const ffe_t* skewLUT)
{
WorkBundle workBundle;
// Decimation in time: Unroll 2 layers at a time
unsigned dist = 1, dist4 = 4;
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
{
const unsigned i_end = r + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < i_end; ++i)
{
void** work_i = work + i;
PoolInstance->Dispatch([log_m01, log_m02, log_m23, bytes, work_i, dist, &workBundle]() {
IFFT_DIT4(
bytes,
work_i,
dist,
log_m01,
log_m23,
log_m02);
workBundle.OperationComplete();
});
workBundle.Increment();
}
}
PoolInstance->Run();
workBundle.Join();
}
// If there is one layer left:
if (dist < m)
{
// Assuming that dist = m / 2
LEO_DEBUG_ASSERT(dist * 2 == m);
const ffe_t log_m = skewLUT[dist];
if (log_m == kModulus)
{
for (unsigned i = 0; i < dist; ++i)
{
PoolInstance->Dispatch([work, i, dist, bytes, &workBundle]() {
xor_mem(work[i + dist], work[i], bytes);
workBundle.OperationComplete();
});
workBundle.Increment();
}
}
else
{
for (unsigned i = 0; i < dist; ++i)
{
PoolInstance->Dispatch([work, i, dist, log_m, bytes, &workBundle]() {
IFFT_DIT2(
work[i],
work[i + dist],
log_m,
bytes);
workBundle.OperationComplete();
});
workBundle.Increment();
}
}
PoolInstance->Run();
workBundle.Join();
}
}
/*
Decimation in time FFT:
@ -1208,6 +1292,9 @@ static void FFT_DIT(
unsigned dist4 = m, dist = m >> 2;
for (; dist != 0; dist4 = dist, dist >>= 2)
{
const unsigned thread_u = m_truncated / dist4;
const unsigned thread_v = dist;
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
{
@ -1261,7 +1348,8 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m,
const void* const * data,
void** work)
void** work,
bool multithreaded)
{
// work <- IFFT(data, m, m)
@ -1499,6 +1587,82 @@ static void FFT_DIT_ErrorBits(
}
}
static void FFT_DIT_ErrorBits_MT(
const uint64_t bytes,
void** work,
const unsigned n_truncated,
const unsigned n,
const ffe_t* skewLUT,
const ErrorBitfield& error_bits)
{
WorkBundle workBundle;
unsigned mip_level = LastNonzeroBit32(n);
// Decimation in time: Unroll 2 layers at a time
unsigned dist4 = n, dist = n >> 2;
for (; dist != 0; dist4 = dist, dist >>= 2, mip_level -=2)
{
// For each set of dist*4 elements:
for (unsigned r = 0; r < n_truncated; r += dist4)
{
if (!error_bits.IsNeeded(mip_level, r))
continue;
const ffe_t log_m01 = skewLUT[r + dist];
const ffe_t log_m23 = skewLUT[r + dist * 3];
const ffe_t log_m02 = skewLUT[r + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < r + dist; ++i)
{
void** work_i = work + i;
PoolInstance->Dispatch([bytes, &workBundle, work_i, dist, log_m01, log_m02, log_m23]() {
FFT_DIT4(
bytes,
work_i,
dist,
log_m01,
log_m23,
log_m02);
workBundle.OperationComplete();
});
workBundle.Increment();
}
}
PoolInstance->Run();
workBundle.Join();
}
// If there is one layer left:
if (dist4 == 2)
{
for (unsigned r = 0; r < n_truncated; r += 2)
{
PoolInstance->Dispatch([bytes, &workBundle, skewLUT, work, r]() {
const ffe_t log_m = skewLUT[r + 1];
if (log_m == kModulus)
xor_mem(work[r + 1], work[r], bytes);
else
{
FFT_DIT2(
work[r],
work[r + 1],
log_m,
bytes);
}
workBundle.OperationComplete();
});
workBundle.Increment();
}
PoolInstance->Run();
workBundle.Join();
}
}
#endif // LEO_ERROR_BITFIELD_OPT
@ -1513,8 +1677,11 @@ void ReedSolomonDecode(
unsigned n, // NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work) // n entries
void** work, // n entries
bool multithreaded)
{
if (multithreaded && n * buffer_bytes < 512000)
multithreaded = false;
// Fill in error locations
#ifdef LEO_ERROR_BITFIELD_OPT
@ -1577,24 +1744,68 @@ void ReedSolomonDecode(
// work <- IFFT(work, n, 0)
IFFT_DIT_Decoder(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
if (multithreaded)
{
IFFT_DIT_Decoder_MT(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
}
else
{
IFFT_DIT_Decoder(
buffer_bytes,
m + original_count,
work,
n,
FFTSkew - 1);
}
// work <- FormalDerivative(work, n)
for (unsigned i = 1; i < n; ++i)
if (multithreaded)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
for (unsigned i = 1; i < n; ++i)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
VectorXOR(
buffer_bytes,
width,
work + i - width,
work + i);
if (width <= 4)
{
for (unsigned j = i - width; j < i; ++j)
xor_mem(work[j], work[j + width], buffer_bytes);
}
else
{
WorkBundle workBundle;
for (unsigned j = i - width; j < i; ++j)
{
PoolInstance->Dispatch([work, j, width, &workBundle, buffer_bytes]() {
xor_mem(work[j], work[j + width], buffer_bytes);
workBundle.OperationComplete();
});
workBundle.Increment();
}
PoolInstance->Run();
workBundle.Join();
}
}
}
else
{
for (unsigned i = 1; i < n; ++i)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
VectorXOR(
buffer_bytes,
width,
work + i - width,
work + i);
}
}
// work <- FFT(work, n, 0) truncated to m + original_count
@ -1602,7 +1813,14 @@ void ReedSolomonDecode(
const unsigned output_count = m + original_count;
#ifdef LEO_ERROR_BITFIELD_OPT
FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
if (multithreaded)
{
FFT_DIT_ErrorBits_MT(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}
else
{
FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}
#else
FFT_DIT(buffer_bytes, work, output_count, n, FFTSkew - 1);
#endif

View File

@ -75,7 +75,8 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count) * 2 = work_count
const void* const * const data,
void** work); // Size of GetEncodeWorkCount()
void** work, // Size of GetEncodeWorkCount()
bool multithreaded);
void ReedSolomonDecode(
uint64_t buffer_bytes,
@ -85,7 +86,8 @@ void ReedSolomonDecode(
unsigned n, // = NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work); // n entries
void** work, // n entries
bool multithreaded);
}} // namespace leopard::ff16

View File

@ -1641,7 +1641,8 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m,
const void* const* data,
void** work)
void** work,
bool multithreaded)
{
// work <- IFFT(data, m, m)
@ -1849,7 +1850,8 @@ void ReedSolomonDecode(
unsigned n, // NextPow2(m + original_count) = work_count
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work) // n entries
void** work, // n entries
bool multithreaded)
{
// Fill in error locations

View File

@ -75,7 +75,8 @@ void ReedSolomonEncode(
unsigned recovery_count,
unsigned m, // = NextPow2(recovery_count)
const void* const * const data,
void** work); // m * 2 elements
void** work, // m * 2 elements
bool multithreaded);
void ReedSolomonDecode(
uint64_t buffer_bytes,
@ -85,7 +86,8 @@ void ReedSolomonDecode(
unsigned n, // = NextPow2(m + original_count)
const void* const * const original, // original_count entries
const void* const * const recovery, // recovery_count entries
void** work); // n elements
void** work, // n elements
bool multithreaded);
}} // namespace leopard::ff8

View File

@ -63,6 +63,12 @@ LEO_EXPORT int leo_init_(int version)
return Leopard_Platform;
#endif // LEO_HAS_FF16
#ifdef LEO_ENABLE_MULTITHREADING_OPT
// Start worker threads spinning
leopard::PoolInstance = new leopard::WorkerPool;
#endif // LEO_ENABLE_MULTITHREADING_OPT
m_Initialized = true;
return Leopard_Success;
}
@ -176,7 +182,8 @@ LEO_EXPORT LeopardResult leo_encode(
recovery_count,
m,
original_data,
work_data);
work_data,
mt);
}
else
#endif // LEO_HAS_FF8
@ -189,7 +196,8 @@ LEO_EXPORT LeopardResult leo_encode(
recovery_count,
m,
original_data,
work_data);
work_data,
mt);
}
else
#endif // LEO_HAS_FF16
@ -316,7 +324,8 @@ LEO_EXPORT LeopardResult leo_decode(
n,
original_data,
recovery_data,
work_data);
work_data,
mt);
}
else
#endif // LEO_HAS_FF8
@ -331,7 +340,8 @@ LEO_EXPORT LeopardResult leo_decode(
n,
original_data,
recovery_data,
work_data);
work_data,
mt);
}
else
#endif // LEO_HAS_FF16

View File

@ -51,11 +51,11 @@ struct TestParameters
unsigned buffer_bytes = 64000; // multiple of 64 bytes
unsigned loss_count = 32768; // some fraction of original_count
unsigned seed = 2;
bool multithreaded = true;
bool multithreaded = false;
};
static const unsigned kLargeTrialCount = 1;
static const unsigned kSmallTrialCount = 1;
static const unsigned kSmallTrialCount = 4;
//------------------------------------------------------------------------------