asio sync

This commit is contained in:
Marcos Pinto 2007-08-08 01:29:13 +00:00
parent 174ff4ee70
commit ce513e5725
10 changed files with 248 additions and 123 deletions

View File

@ -117,7 +117,7 @@ namespace detail {
inline void* buffer_cast_helper(const mutable_buffer& b)
{
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
if (b.debug_check_)
if (b.size_ && b.debug_check_)
b.debug_check_();
#endif // ASIO_ENABLE_BUFFER_DEBUGGING
return b.data_;
@ -281,7 +281,7 @@ namespace detail {
inline const void* buffer_cast_helper(const const_buffer& b)
{
#if defined(ASIO_ENABLE_BUFFER_DEBUGGING)
if (b.debug_check_)
if (b.size_ && b.debug_check_)
b.debug_check_();
#endif // ASIO_ENABLE_BUFFER_DEBUGGING
return b.data_;

View File

@ -71,6 +71,28 @@ private:
/// Return a completion condition function object that indicates that a read or
/// write operation should continue until all of the data has been transferred,
/// or until an error occurs.
/**
* This function is used to create an object, of unspecified type, that meets
* CompletionCondition requirements.
*
* @par Example
* Reading until a buffer is full:
* @code
* boost::array<char, 128> buf;
* asio::error_code ec;
* std::size_t n = asio::read(
* sock, asio::buffer(buf),
* asio::transfer_all(), ec);
* if (ec)
* {
* // An error occurred.
* }
* else
* {
* // n == 128
* }
* @endcode
*/
#if defined(GENERATING_DOCUMENTATION)
unspecified transfer_all();
#else
@ -83,6 +105,28 @@ inline detail::transfer_all_t transfer_all()
/// Return a completion condition function object that indicates that a read or
/// write operation should continue until a minimum number of bytes has been
/// transferred, or until an error occurs.
/**
* This function is used to create an object, of unspecified type, that meets
* CompletionCondition requirements.
*
* @par Example
* Reading until a buffer is full or contains at least 64 bytes:
* @code
* boost::array<char, 128> buf;
* asio::error_code ec;
* std::size_t n = asio::read(
* sock, asio::buffer(buf),
* asio::transfer_at_least(64), ec);
* if (ec)
* {
* // An error occurred.
* }
* else
* {
* // n >= 64 && n <= 128
* }
* @endcode
*/
#if defined(GENERATING_DOCUMENTATION)
unspecified transfer_at_least(std::size_t minimum);
#else

View File

@ -331,7 +331,10 @@ public:
std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
{
asio::detail::mutex::scoped_lock lock(mutex_);
return timer_queue.cancel_timer(token);
std::size_t n = timer_queue.cancel_timer(token);
if (n > 0)
interrupter_.interrupt();
return n;
}
private:
@ -347,16 +350,13 @@ private:
read_op_queue_.dispatch_cancellations();
write_op_queue_.dispatch_cancellations();
except_op_queue_.dispatch_cancellations();
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
timer_queues_[i]->dispatch_cancellations();
// Check if the thread is supposed to stop.
if (stop_thread_)
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -365,12 +365,7 @@ private:
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
&& except_op_queue_.empty() && all_timer_queues_are_empty())
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -398,59 +393,44 @@ private:
}
else
{
if (events[i].events & (EPOLLERR | EPOLLHUP))
bool more_reads = false;
bool more_writes = false;
bool more_except = false;
asio::error_code ec;
// Exception operations must be processed first to ensure that any
// out-of-band data is read before normal data.
if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))
more_except = except_op_queue_.dispatch_operation(descriptor, ec);
else
more_except = except_op_queue_.has_operation(descriptor);
if (events[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP))
more_reads = read_op_queue_.dispatch_operation(descriptor, ec);
else
more_reads = read_op_queue_.has_operation(descriptor);
if (events[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP))
more_writes = write_op_queue_.dispatch_operation(descriptor, ec);
else
more_writes = write_op_queue_.has_operation(descriptor);
epoll_event ev = { 0, { 0 } };
ev.events = EPOLLERR | EPOLLHUP;
if (more_reads)
ev.events |= EPOLLIN;
if (more_writes)
ev.events |= EPOLLOUT;
if (more_except)
ev.events |= EPOLLPRI;
ev.data.fd = descriptor;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
if (result != 0)
{
asio::error_code ec;
except_op_queue_.dispatch_all_operations(descriptor, ec);
ec = asio::error_code(errno, asio::native_ecat);
read_op_queue_.dispatch_all_operations(descriptor, ec);
write_op_queue_.dispatch_all_operations(descriptor, ec);
epoll_event ev = { 0, { 0 } };
ev.events = 0;
ev.data.fd = descriptor;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
}
else
{
bool more_reads = false;
bool more_writes = false;
bool more_except = false;
asio::error_code ec;
// Exception operations must be processed first to ensure that any
// out-of-band data is read before normal data.
if (events[i].events & EPOLLPRI)
more_except = except_op_queue_.dispatch_operation(descriptor, ec);
else
more_except = except_op_queue_.has_operation(descriptor);
if (events[i].events & EPOLLIN)
more_reads = read_op_queue_.dispatch_operation(descriptor, ec);
else
more_reads = read_op_queue_.has_operation(descriptor);
if (events[i].events & EPOLLOUT)
more_writes = write_op_queue_.dispatch_operation(descriptor, ec);
else
more_writes = write_op_queue_.has_operation(descriptor);
epoll_event ev = { 0, { 0 } };
ev.events = EPOLLERR | EPOLLHUP;
if (more_reads)
ev.events |= EPOLLIN;
if (more_writes)
ev.events |= EPOLLOUT;
if (more_except)
ev.events |= EPOLLPRI;
ev.data.fd = descriptor;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
if (result != 0)
{
ec = asio::error_code(errno, asio::native_ecat);
read_op_queue_.dispatch_all_operations(descriptor, ec);
write_op_queue_.dispatch_all_operations(descriptor, ec);
except_op_queue_.dispatch_all_operations(descriptor, ec);
}
except_op_queue_.dispatch_all_operations(descriptor, ec);
}
}
}
@ -458,19 +438,17 @@ private:
write_op_queue_.dispatch_cancellations();
except_op_queue_.dispatch_cancellations();
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
{
timer_queues_[i]->dispatch_timers();
timer_queues_[i]->dispatch_cancellations();
}
// Issue any pending cancellations.
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
cancel_ops_unlocked(pending_cancellations_[i]);
pending_cancellations_.clear();
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
}
// Run the select loop in the thread.
@ -566,6 +544,22 @@ private:
interrupter_.interrupt();
}
// Clean up operations and timers. We must not hold the lock since the
// destructors may make calls back into this reactor. We make a copy of the
// vector of timer queues since the original may be modified while the lock
// is not held.
void cleanup_operations_and_timers(
asio::detail::mutex::scoped_lock& lock)
{
timer_queues_for_cleanup_ = timer_queues_;
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
timer_queues_for_cleanup_[i]->cleanup_timers();
}
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
@ -590,6 +584,10 @@ private:
// The timer queues.
std::vector<timer_queue_base*> timer_queues_;
// A copy of the timer queues, used when cleaning up timers. The copy is
// stored as a class data member to avoid unnecessary memory allocation.
std::vector<timer_queue_base*> timer_queues_for_cleanup_;
// The descriptors that are pending cancellation.
std::vector<socket_type> pending_cancellations_;

View File

@ -321,7 +321,10 @@ public:
std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
{
asio::detail::mutex::scoped_lock lock(mutex_);
return timer_queue.cancel_timer(token);
std::size_t n = timer_queue.cancel_timer(token);
if (n > 0)
interrupter_.interrupt();
return n;
}
private:
@ -337,16 +340,13 @@ private:
read_op_queue_.dispatch_cancellations();
write_op_queue_.dispatch_cancellations();
except_op_queue_.dispatch_cancellations();
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
timer_queues_[i]->dispatch_cancellations();
// Check if the thread is supposed to stop.
if (stop_thread_)
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -355,12 +355,7 @@ private:
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
&& except_op_queue_.empty() && all_timer_queues_are_empty())
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -466,19 +461,17 @@ private:
write_op_queue_.dispatch_cancellations();
except_op_queue_.dispatch_cancellations();
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
{
timer_queues_[i]->dispatch_timers();
timer_queues_[i]->dispatch_cancellations();
}
// Issue any pending cancellations.
for (std::size_t i = 0; i < pending_cancellations_.size(); ++i)
cancel_ops_unlocked(pending_cancellations_[i]);
pending_cancellations_.clear();
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
}
// Run the select loop in the thread.
@ -573,6 +566,22 @@ private:
interrupter_.interrupt();
}
// Clean up operations and timers. We must not hold the lock since the
// destructors may make calls back into this reactor. We make a copy of the
// vector of timer queues since the original may be modified while the lock
// is not held.
void cleanup_operations_and_timers(
asio::detail::mutex::scoped_lock& lock)
{
timer_queues_for_cleanup_ = timer_queues_;
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
timer_queues_for_cleanup_[i]->cleanup_timers();
}
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
@ -597,6 +606,10 @@ private:
// The timer queues.
std::vector<timer_queue_base*> timer_queues_;
// A copy of the timer queues, used when cleaning up timers. The copy is
// stored as a class data member to avoid unnecessary memory allocation.
std::vector<timer_queue_base*> timer_queues_for_cleanup_;
// The descriptors that are pending cancellation.
std::vector<socket_type> pending_cancellations_;

View File

@ -229,7 +229,10 @@ public:
std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
{
asio::detail::mutex::scoped_lock lock(mutex_);
return timer_queue.cancel_timer(token);
std::size_t n = timer_queue.cancel_timer(token);
if (n > 0)
interrupter_.interrupt();
return n;
}
private:
@ -245,16 +248,13 @@ private:
read_op_queue_.dispatch_cancellations();
write_op_queue_.dispatch_cancellations();
except_op_queue_.dispatch_cancellations();
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
timer_queues_[i]->dispatch_cancellations();
// Check if the thread is supposed to stop.
if (stop_thread_)
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -263,12 +263,7 @@ private:
if (!block && read_op_queue_.empty() && write_op_queue_.empty()
&& except_op_queue_.empty() && all_timer_queues_are_empty())
{
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
return;
}
@ -321,19 +316,17 @@ private:
write_op_queue_.dispatch_cancellations();
}
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
{
timer_queues_[i]->dispatch_timers();
timer_queues_[i]->dispatch_cancellations();
}
// Issue any pending cancellations.
for (size_t i = 0; i < pending_cancellations_.size(); ++i)
cancel_ops_unlocked(pending_cancellations_[i]);
pending_cancellations_.clear();
// Clean up operations. We must not hold the lock since the operations may
// make calls back into this reactor.
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
cleanup_operations_and_timers(lock);
}
// Run the select loop in the thread.
@ -414,6 +407,22 @@ private:
interrupter_.interrupt();
}
// Clean up operations and timers. We must not hold the lock since the
// destructors may make calls back into this reactor. We make a copy of the
// vector of timer queues since the original may be modified while the lock
// is not held.
void cleanup_operations_and_timers(
asio::detail::mutex::scoped_lock& lock)
{
timer_queues_for_cleanup_ = timer_queues_;
lock.unlock();
read_op_queue_.cleanup_operations();
write_op_queue_.cleanup_operations();
except_op_queue_.cleanup_operations();
for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
timer_queues_for_cleanup_[i]->cleanup_timers();
}
// Mutex to protect access to internal data.
asio::detail::mutex mutex_;
@ -435,6 +444,10 @@ private:
// The timer queues.
std::vector<timer_queue_base*> timer_queues_;
// A copy of the timer queues, used when cleaning up timers. The copy is
// stored as a class data member to avoid unnecessary memory allocation.
std::vector<timer_queue_base*> timer_queues_for_cleanup_;
// The descriptors that are pending cancellation.
std::vector<socket_type> pending_cancellations_;

View File

@ -1504,6 +1504,12 @@ inline asio::error_code translate_addrinfo_error(int error)
case EAI_MEMORY:
return asio::error::no_memory;
case EAI_NONAME:
#if defined(EAI_ADDRFAMILY)
case EAI_ADDRFAMILY:
#endif
#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME)
case EAI_NODATA:
#endif
return asio::error::host_not_found;
case EAI_SERVICE:
return asio::error::service_not_found;

View File

@ -48,7 +48,9 @@ public:
// Constructor.
timer_queue()
: timers_(),
heap_()
heap_(),
cancelled_timers_(0),
cleanup_timers_(0)
{
}
@ -111,12 +113,17 @@ public:
{
timer_base* t = heap_[0];
remove_timer(t);
t->prev_ = 0;
t->next_ = cleanup_timers_;
cleanup_timers_ = t;
t->invoke(asio::error_code());
}
}
// Cancel the timer with the given token. The handler will be invoked
// immediately with the result operation_aborted.
// Cancel the timers with the given token. Any timers pending for the token
// will be notified that they have been cancelled next time
// dispatch_cancellations is called. Returns the number of timers that were
// cancelled.
std::size_t cancel_timer(void* timer_token)
{
std::size_t num_cancelled = 0;
@ -129,7 +136,9 @@ public:
{
timer_base* next = t->next_;
remove_timer(t);
t->invoke(asio::error::operation_aborted);
t->prev_ = 0;
t->next_ = cancelled_timers_;
cancelled_timers_ = t;
t = next;
++num_cancelled;
}
@ -137,6 +146,31 @@ public:
return num_cancelled;
}
// Dispatch any pending cancels for timers.
virtual void dispatch_cancellations()
{
while (cancelled_timers_)
{
timer_base* this_timer = cancelled_timers_;
cancelled_timers_ = this_timer->next_;
this_timer->next_ = cleanup_timers_;
cleanup_timers_ = this_timer;
this_timer->invoke(asio::error::operation_aborted);
}
}
// Destroy timers that are waiting to be cleaned up.
virtual void cleanup_timers()
{
while (cleanup_timers_)
{
timer_base* next_timer = cleanup_timers_->next_;
cleanup_timers_->next_ = 0;
cleanup_timers_->destroy();
cleanup_timers_ = next_timer;
}
}
// Destroy all timers.
virtual void destroy_timers()
{
@ -151,6 +185,7 @@ public:
}
heap_.clear();
timers_.clear();
cleanup_timers();
}
private:
@ -238,8 +273,7 @@ private:
static void invoke_handler(timer_base* base,
const asio::error_code& result)
{
std::auto_ptr<timer<Handler> > t(static_cast<timer<Handler>*>(base));
t->handler_(result);
static_cast<timer<Handler>*>(base)->handler_(result);
}
// Destroy the handler.
@ -338,6 +372,12 @@ private:
// The heap of timers, with the earliest timer at the front.
std::vector<timer_base*> heap_;
// The list of timers to be cancelled.
timer_base* cancelled_timers_;
// The list of timers to be destroyed.
timer_base* cleanup_timers_;
};
} // namespace detail

View File

@ -44,6 +44,12 @@ public:
// Dispatch all ready timers.
virtual void dispatch_timers() = 0;
// Dispatch any pending cancels for timers.
virtual void dispatch_cancellations() = 0;
// Destroy timers that are waiting to be cleaned up.
virtual void cleanup_timers() = 0;
// Destroy all timers.
virtual void destroy_timers() = 0;
};

View File

@ -21,6 +21,8 @@
#include <boost/config.hpp>
#include "asio/detail/pop_options.hpp"
#include "asio/detail/socket_types.hpp"
// This service is only supported on Win32 (NT4 and later).
#if !defined(ASIO_DISABLE_IOCP)
#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)

View File

@ -1950,26 +1950,29 @@ public:
}
private:
// Helper function to provide InterlockedCompareExchangePointer functionality
// on very old Platform SDKs.
// Helper function to emulate InterlockedCompareExchangePointer functionality
// for:
// - very old Platform SDKs; and
// - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp)
{
#if defined(_WIN32_WINNT) && (_WIN32_WINNT <= 0x400) && (_M_IX86)
#if defined(_M_IX86)
return reinterpret_cast<void*>(InterlockedCompareExchange(
reinterpret_cast<LONG*>(dest), reinterpret_cast<LONG>(exch),
reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(exch),
reinterpret_cast<LONG>(cmp)));
#else
return InterlockedCompareExchangePointer(dest, exch, cmp);
#endif
}
// Helper function to provide InterlockedExchangePointer functionality on very
// old Platform SDKs.
// Helper function to emulate InterlockedExchangePointer functionality for:
// - very old Platform SDKs; and
// - platform SDKs where MSVC's /Wp64 option causes spurious warnings.
void* interlocked_exchange_pointer(void** dest, void* val)
{
#if defined(_WIN32_WINNT) && (_WIN32_WINNT <= 0x400) && (_M_IX86)
#if defined(_M_IX86)
return reinterpret_cast<void*>(InterlockedExchange(
reinterpret_cast<LONG*>(dest), reinterpret_cast<LONG>(val)));
reinterpret_cast<PLONG>(dest), reinterpret_cast<LONG>(val)));
#else
return InterlockedExchangePointer(dest, val);
#endif