Tune MT operations for different buffer sizes

This commit is contained in:
Christopher Taylor 2017-06-05 20:04:20 -07:00
parent 73a58a21b1
commit eef50925d9
3 changed files with 434 additions and 85 deletions

View File

@ -641,6 +641,11 @@ public:
WorkerPool();
void Stop();
unsigned GetParallelism() const
{
return WorkerCount + 1;
}
WorkBundle* GetBundle()
{
WorkBundle* back;
@ -702,9 +707,12 @@ inline void WorkBundle::Dispatch(const WorkerCallT& call)
inline void WorkBundle::Complete()
{
PoolInstance->Run();
OperationComplete();
Join();
if (WorkCount > 0)
{
PoolInstance->Run();
OperationComplete();
Join();
}
PoolInstance->FreeBundle(this);
}

View File

@ -897,6 +897,8 @@ static void IFFT_DIT_Decoder(
}
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
// Multithreaded decoder version
static void IFFT_DIT_Decoder_MT(
const uint64_t bytes,
@ -905,32 +907,85 @@ static void IFFT_DIT_Decoder_MT(
const unsigned m,
const ffe_t* skewLUT)
{
unsigned bundle_min = (unsigned)(8000 / bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
// Decimation in time: Unroll 2 layers at a time
unsigned dist = 1, dist4 = 4;
for (; dist4 <= m; dist = dist4, dist4 <<= 2)
{
WorkBundle* workBundle = PoolInstance->GetBundle();
// For each set of dist*4 elements:
for (unsigned r = 0; r < m_truncated; r += dist4)
if (m_truncated <= dist4)
{
const unsigned i_end = r + dist;
unsigned bundle_count = dist / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
// For each set of dist*4 elements:
const unsigned i_end = dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < i_end; ++i)
for (unsigned i = 0; i < i_end; i += bundle_count)
{
void** work_i = work + i;
workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work_i, dist, &workBundle]() {
IFFT_DIT4(
bytes,
work_i,
dist,
log_m01,
log_m23,
log_m02);
workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_count, i, i_end, dist]() {
unsigned j_end = i + bundle_count;
if (j_end > i_end)
j_end = i_end;
for (unsigned j = i; j < j_end; ++j)
{
IFFT_DIT4(
bytes,
work + j,
dist,
log_m01,
log_m23,
log_m02);
}
});
}
}
else
{
unsigned bundle_count = (m_truncated / dist) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
bundle_count /= 4;
if (bundle_count < 1)
bundle_count = 1;
const unsigned dist4_bundle_count = dist4 * bundle_count;
for (unsigned r = 0; r < m_truncated; r += dist4_bundle_count)
{
workBundle->Dispatch([bytes, work, skewLUT, dist4, m_truncated, r, dist4_bundle_count, dist]() {
unsigned s_end = r + dist4_bundle_count;
if (s_end > m_truncated)
s_end = m_truncated;
// For each set of dist*4 elements:
for (unsigned s = r; s < s_end; s += dist4)
{
const unsigned i_end = s + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = s; i < i_end; ++i)
{
IFFT_DIT4(
bytes,
work + i,
dist,
log_m01,
log_m23,
log_m02);
}
}
});
}
}
@ -948,25 +1003,39 @@ static void IFFT_DIT_Decoder_MT(
const ffe_t log_m = skewLUT[dist];
unsigned bundle_count = dist / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
if (log_m == kModulus)
{
for (unsigned i = 0; i < dist; ++i)
for (unsigned i = 0; i < dist; i += bundle_count)
{
workBundle->Dispatch([work, i, dist, bytes, &workBundle]() {
xor_mem(work[i + dist], work[i], bytes);
workBundle->Dispatch([work, i, dist, bundle_count, bytes]() {
unsigned j_end = i + bundle_count;
if (j_end > dist)
j_end = dist;
for (unsigned j = i; j < j_end; ++j)
xor_mem(work[j + dist], work[j], bytes);
});
}
}
else
{
for (unsigned i = 0; i < dist; ++i)
for (unsigned i = 0; i < dist; i += bundle_count)
{
workBundle->Dispatch([work, i, dist, log_m, bytes, &workBundle]() {
IFFT_DIT2(
work[i],
work[i + dist],
log_m,
bytes);
workBundle->Dispatch([work, i, dist, log_m, bytes, bundle_count]() {
unsigned j_end = i + bundle_count;
if (j_end > dist)
j_end = dist;
for (unsigned j = i; j < j_end; ++j)
{
IFFT_DIT2(
work[j],
work[j + dist],
log_m,
bytes);
}
});
}
}
@ -975,6 +1044,8 @@ static void IFFT_DIT_Decoder_MT(
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
/*
Decimation in time FFT:
@ -1581,6 +1652,8 @@ static void FFT_DIT_ErrorBits(
}
}
#ifdef LEO_ENABLE_MULTITHREADING_OPT
static void FFT_DIT_ErrorBits_MT(
const uint64_t bytes,
void** work,
@ -1589,6 +1662,11 @@ static void FFT_DIT_ErrorBits_MT(
const ffe_t* skewLUT,
const ErrorBitfield& error_bits)
{
unsigned bundle_min = (unsigned)(8000 / bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
unsigned mip_level = LastNonzeroBit32(n);
// Decimation in time: Unroll 2 layers at a time
@ -1597,29 +1675,79 @@ static void FFT_DIT_ErrorBits_MT(
{
WorkBundle* workBundle = PoolInstance->GetBundle();
// For each set of dist*4 elements:
for (unsigned r = 0; r < n_truncated; r += dist4)
if (n_truncated <= dist4)
{
if (!error_bits.IsNeeded(mip_level, r))
continue;
const ffe_t log_m01 = skewLUT[r + dist];
const ffe_t log_m23 = skewLUT[r + dist * 3];
const ffe_t log_m02 = skewLUT[r + dist * 2];
// For each set of dist elements:
for (unsigned i = r; i < r + dist; ++i)
if (error_bits.IsNeeded(mip_level, 0))
{
void** work_i = work + i;
unsigned bundle_size = dist / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
workBundle->Dispatch([bytes, &workBundle, work_i, dist, log_m01, log_m02, log_m23]() {
FFT_DIT4(
bytes,
work_i,
dist,
log_m01,
log_m23,
log_m02);
const unsigned i_end = dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = 0; i < i_end; i += bundle_size)
{
workBundle->Dispatch([log_m01, log_m02, log_m23, bytes, work, bundle_size, i, i_end, dist]() {
unsigned j_end = i + bundle_size;
if (j_end > i_end)
j_end = i_end;
for (unsigned j = i; j < j_end; ++j)
{
FFT_DIT4(
bytes,
work + j,
dist,
log_m01,
log_m23,
log_m02);
}
});
}
}
}
else
{
unsigned bundle_count = (n_truncated / dist) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
bundle_count /= 4;
if (bundle_count < 1)
bundle_count = 1;
const unsigned dist4_bundle_count = dist4 * bundle_count;
// For each set of dist*4 elements:
for (unsigned r = 0; r < n_truncated; r += dist4_bundle_count)
{
workBundle->Dispatch([bytes, work, skewLUT, &error_bits, mip_level, dist4, n_truncated, r, dist4_bundle_count, dist]() {
unsigned s_end = r + dist4_bundle_count;
if (s_end > n_truncated)
s_end = n_truncated;
for (unsigned s = r; s < s_end; s += dist4)
{
if (!error_bits.IsNeeded(mip_level, s))
continue;
const unsigned i_end = s + dist;
const ffe_t log_m01 = skewLUT[i_end];
const ffe_t log_m02 = skewLUT[i_end + dist];
const ffe_t log_m23 = skewLUT[i_end + dist * 2];
// For each set of dist elements:
for (unsigned i = s; i < i_end; ++i)
{
FFT_DIT4(
bytes,
work + i,
dist,
log_m01,
log_m23,
log_m02);
}
}
});
}
}
@ -1632,20 +1760,30 @@ static void FFT_DIT_ErrorBits_MT(
{
WorkBundle* workBundle = PoolInstance->GetBundle();
for (unsigned r = 0; r < n_truncated; r += 2)
{
workBundle->Dispatch([bytes, &workBundle, skewLUT, work, r]() {
const ffe_t log_m = skewLUT[r + 1];
unsigned bundle_count = (n_truncated / 2) / parallel_max;
if (bundle_count < bundle_min)
bundle_count = bundle_min;
if (log_m == kModulus)
xor_mem(work[r + 1], work[r], bytes);
else
for (unsigned s = 0; s < n_truncated; s += 2 * bundle_count)
{
workBundle->Dispatch([bytes, skewLUT, work, s, n_truncated, bundle_count]() {
unsigned r_end = s + 2 * bundle_count;
if (r_end > n_truncated)
r_end = n_truncated;
for (unsigned r = s; r < r_end; r += 2)
{
FFT_DIT2(
work[r],
work[r + 1],
log_m,
bytes);
const ffe_t log_m = skewLUT[r + 1];
if (log_m == kModulus)
xor_mem(work[r + 1], work[r], bytes);
else
{
FFT_DIT2(
work[r],
work[r + 1],
log_m,
bytes);
}
}
});
}
@ -1654,6 +1792,8 @@ static void FFT_DIT_ErrorBits_MT(
}
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
#endif // LEO_ERROR_BITFIELD_OPT
@ -1709,32 +1849,100 @@ void ReedSolomonDecode(
FWHT(error_locations, kOrder, kOrder);
// work <- recovery data
for (unsigned i = 0; i < recovery_count; ++i)
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
if (recovery[i])
mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes);
else
unsigned bundle_min = (unsigned)(64000 / buffer_bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
unsigned bundle_size = recovery_count / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
WorkBundle* workBundle = PoolInstance->GetBundle();
// work <- recovery data
for (unsigned i = 0; i < recovery_count; i += bundle_size)
{
workBundle->Dispatch([i, recovery_count, recovery, &error_locations, bundle_size, work, buffer_bytes]() {
unsigned j_end = i + bundle_size;
if (j_end > recovery_count)
j_end = recovery_count;
for (unsigned j = i; j < j_end; ++j)
{
if (recovery[j])
mul_mem(work[j], recovery[j], error_locations[j], buffer_bytes);
else
memset(work[j], 0, buffer_bytes);
}
});
}
workBundle->Dispatch([recovery_count, m, buffer_bytes, work]() {
for (unsigned i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
});
// work <- original data
bundle_size = original_count / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
for (unsigned i = 0; i < original_count; i += bundle_size)
{
workBundle->Dispatch([i, original_count, original, &error_locations, m, bundle_size, work, buffer_bytes]() {
unsigned j_end = i + bundle_size;
if (j_end > original_count)
j_end = original_count;
for (unsigned j = i; j < j_end; ++j)
{
if (original[j])
mul_mem(work[m + j], original[j], error_locations[m + j], buffer_bytes);
else
memset(work[m + j], 0, buffer_bytes);
}
});
}
workBundle->Dispatch([original_count, m, n, buffer_bytes, work]() {
for (unsigned i = m + original_count; i < n; ++i)
memset(work[i], 0, buffer_bytes);
});
workBundle->Complete();
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
// work <- recovery data
for (unsigned i = 0; i < recovery_count; ++i)
{
if (recovery[i])
mul_mem(work[i], recovery[i], error_locations[i], buffer_bytes);
else
memset(work[i], 0, buffer_bytes);
}
for (unsigned i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
// work <- original data
for (unsigned i = 0; i < original_count; ++i)
{
if (original[i])
mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes);
else
memset(work[m + i], 0, buffer_bytes);
}
for (unsigned i = m + original_count; i < n; ++i)
memset(work[i], 0, buffer_bytes);
}
for (unsigned i = recovery_count; i < m; ++i)
memset(work[i], 0, buffer_bytes);
// work <- original data
for (unsigned i = 0; i < original_count; ++i)
{
if (original[i])
mul_mem(work[m + i], original[i], error_locations[m + i], buffer_bytes);
else
memset(work[m + i], 0, buffer_bytes);
}
for (unsigned i = m + original_count; i < n; ++i)
memset(work[i], 0, buffer_bytes);
// work <- IFFT(work, n, 0)
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
IFFT_DIT_Decoder_MT(
@ -1745,6 +1953,7 @@ void ReedSolomonDecode(
FFTSkew - 1);
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
IFFT_DIT_Decoder(
buffer_bytes,
@ -1756,25 +1965,39 @@ void ReedSolomonDecode(
// work <- FormalDerivative(work, n)
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
unsigned bundle_min = (unsigned)(64000 / buffer_bytes);
if (bundle_min < 1)
bundle_min = 1;
const unsigned parallel_max = PoolInstance->GetParallelism();
for (unsigned i = 1; i < n; ++i)
{
const unsigned width = ((i ^ (i - 1)) + 1) >> 1;
if (width <= 4)
if (width <= 2 + bundle_min)
{
for (unsigned j = i - width; j < i; ++j)
xor_mem(work[j], work[j + width], buffer_bytes);
}
else
{
unsigned bundle_size = width / parallel_max;
if (bundle_size < bundle_min)
bundle_size = bundle_min;
WorkBundle* workBundle = PoolInstance->GetBundle();
for (unsigned j = i - width; j < i; ++j)
for (unsigned j = i - width; j < i; j += bundle_size)
{
workBundle->Dispatch([work, j, width, &workBundle, buffer_bytes]() {
xor_mem(work[j], work[j + width], buffer_bytes);
workBundle->Dispatch([work, i, j, width, bundle_size, buffer_bytes]() {
unsigned k_end = j + bundle_size;
if (k_end > i)
k_end = i;
for (unsigned k = j; k < k_end; ++k)
xor_mem(work[k], work[k + width], buffer_bytes);
});
}
@ -1783,6 +2006,7 @@ void ReedSolomonDecode(
}
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
for (unsigned i = 1; i < n; ++i)
{
@ -1801,11 +2025,13 @@ void ReedSolomonDecode(
const unsigned output_count = m + original_count;
#ifdef LEO_ERROR_BITFIELD_OPT
#ifdef LEO_ENABLE_MULTITHREADING_OPT
if (multithreaded)
{
FFT_DIT_ErrorBits_MT(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}
else
#endif // LEO_ENABLE_MULTITHREADING_OPT
{
FFT_DIT_ErrorBits(buffer_bytes, work, output_count, n, FFTSkew - 1, error_bits);
}

View File

@ -48,14 +48,14 @@ struct TestParameters
unsigned original_count = 100; // under 65536
unsigned recovery_count = 20; // under 65536 - original_count
#endif
unsigned buffer_bytes = 64000; // multiple of 64 bytes
unsigned buffer_bytes = 6400; // multiple of 64 bytes
unsigned loss_count = 32768; // some fraction of original_count
unsigned seed = 2;
bool multithreaded = true;
};
static const unsigned kLargeTrialCount = 1;
static const unsigned kSmallTrialCount = 4;
static const unsigned kSmallTrialCount = 10;
//------------------------------------------------------------------------------
@ -528,6 +528,108 @@ static bool Benchmark(const TestParameters& params)
}
//------------------------------------------------------------------------------
// Multithreading Tests
#ifdef LEO_ENABLE_MULTITHREADING_OPT
void MultithreadingTests()
{
static const unsigned kBytes = 100;
static const unsigned N = 10000;
std::vector<uint8_t*> Buffers(N * 2);
for (unsigned i = 0; i < N * 2; ++i)
Buffers[i] = leopard::SIMDSafeAllocate(kBytes);
{
for (unsigned j = 1; j <= 16; ++j)
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle();
const unsigned split_j = N / j;
for (unsigned k = 0; k < N; k += split_j)
{
bundle->Dispatch([split_j, k, &Buffers]() {
for (unsigned m = k; m < N && m < (k + split_j); ++m)
{
leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes);
}
});
}
bundle->Complete();
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl;
}
for (unsigned j = 32; j <= N / 2; j *= 2)
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
leopard::WorkBundle* bundle = leopard::PoolInstance->GetBundle();
const unsigned split_j = N / j;
for (unsigned k = 0; k < N; k += split_j)
{
bundle->Dispatch([split_j, k, &Buffers]() {
for (unsigned m = k; m < N && m < (k + split_j); ++m)
{
leopard::xor_mem(Buffers[m + N], Buffers[m], kBytes);
}
});
}
bundle->Complete();
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) across " << j << " work-thread items: " << mbps << " MB/s" << endl;
}
{
FunctionTimer trialTimer("trial");
for (unsigned t = 0; t < 40; ++t)
{
trialTimer.BeginCall();
for (unsigned k = 0; k < N; ++k)
{
leopard::xor_mem(Buffers[k + N], Buffers[k], kBytes);
}
trialTimer.EndCall();
}
float mbps = N * (uint64_t)kBytes / (float)(trialTimer.MinCallUsec);
cout << "mem_xor(" << N * (uint64_t)kBytes / 1000000.f << " MB) single-threaded: " << mbps << " MB/s" << endl;
}
}
for (unsigned i = 0; i < N * 2; ++i)
leopard::SIMDSafeFree(Buffers[i]);
}
#endif // LEO_ENABLE_MULTITHREADING_OPT
//------------------------------------------------------------------------------
// Entrypoint
@ -563,11 +665,24 @@ int main(int argc, char **argv)
if (params.loss_count > params.recovery_count)
params.loss_count = params.recovery_count;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "]" << endl;
params.multithreaded = false;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading OFF)" << endl;
if (!Benchmark(params))
goto Failed;
params.multithreaded = true;
cout << "Parameters: [original count=" << params.original_count << "] [recovery count=" << params.recovery_count << "] [buffer bytes=" << params.buffer_bytes << "] [loss count=" << params.loss_count << "] [random seed=" << params.seed << "] (multi-threading ON)" << endl;
if (!Benchmark(params))
goto Failed;
#ifdef LEO_ENABLE_MULTITHREADING_OPT
MultithreadingTests();
#endif
#if 1
static const unsigned kMaxRandomData = 32768;