diff --git a/libtorrent/include/libtorrent/asio/buffer.hpp b/libtorrent/include/libtorrent/asio/buffer.hpp index 384e8d70a..7e5dc76c8 100644 --- a/libtorrent/include/libtorrent/asio/buffer.hpp +++ b/libtorrent/include/libtorrent/asio/buffer.hpp @@ -117,7 +117,7 @@ namespace detail { inline void* buffer_cast_helper(const mutable_buffer& b) { #if defined(ASIO_ENABLE_BUFFER_DEBUGGING) - if (b.debug_check_) + if (b.size_ && b.debug_check_) b.debug_check_(); #endif // ASIO_ENABLE_BUFFER_DEBUGGING return b.data_; @@ -281,7 +281,7 @@ namespace detail { inline const void* buffer_cast_helper(const const_buffer& b) { #if defined(ASIO_ENABLE_BUFFER_DEBUGGING) - if (b.debug_check_) + if (b.size_ && b.debug_check_) b.debug_check_(); #endif // ASIO_ENABLE_BUFFER_DEBUGGING return b.data_; diff --git a/libtorrent/include/libtorrent/asio/completion_condition.hpp b/libtorrent/include/libtorrent/asio/completion_condition.hpp index 42696d599..939a4a8f3 100644 --- a/libtorrent/include/libtorrent/asio/completion_condition.hpp +++ b/libtorrent/include/libtorrent/asio/completion_condition.hpp @@ -71,6 +71,28 @@ private: /// Return a completion condition function object that indicates that a read or /// write operation should continue until all of the data has been transferred, /// or until an error occurs. +/** + * This function is used to create an object, of unspecified type, that meets + * CompletionCondition requirements. + * + * @par Example + * Reading until a buffer is full: + * @code + * boost::array buf; + * asio::error_code ec; + * std::size_t n = asio::read( + * sock, asio::buffer(buf), + * asio::transfer_all(), ec); + * if (ec) + * { + * // An error occurred. + * } + * else + * { + * // n == 128 + * } + * @endcode + */ #if defined(GENERATING_DOCUMENTATION) unspecified transfer_all(); #else @@ -83,6 +105,28 @@ inline detail::transfer_all_t transfer_all() /// Return a completion condition function object that indicates that a read or /// write operation should continue until a minimum number of bytes has been /// transferred, or until an error occurs. +/** + * This function is used to create an object, of unspecified type, that meets + * CompletionCondition requirements. + * + * @par Example + * Reading until a buffer is full or contains at least 64 bytes: + * @code + * boost::array buf; + * asio::error_code ec; + * std::size_t n = asio::read( + * sock, asio::buffer(buf), + * asio::transfer_at_least(64), ec); + * if (ec) + * { + * // An error occurred. + * } + * else + * { + * // n >= 64 && n <= 128 + * } + * @endcode + */ #if defined(GENERATING_DOCUMENTATION) unspecified transfer_at_least(std::size_t minimum); #else diff --git a/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp b/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp index d55e86454..9c406c3fa 100644 --- a/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp +++ b/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp @@ -331,7 +331,10 @@ public: std::size_t cancel_timer(timer_queue& timer_queue, void* token) { asio::detail::mutex::scoped_lock lock(mutex_); - return timer_queue.cancel_timer(token); + std::size_t n = timer_queue.cancel_timer(token); + if (n > 0) + interrupter_.interrupt(); + return n; } private: @@ -347,16 +350,13 @@ private: read_op_queue_.dispatch_cancellations(); write_op_queue_.dispatch_cancellations(); except_op_queue_.dispatch_cancellations(); + for (std::size_t i = 0; i < timer_queues_.size(); ++i) + timer_queues_[i]->dispatch_cancellations(); // Check if the thread is supposed to stop. if (stop_thread_) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -365,12 +365,7 @@ private: if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && all_timer_queues_are_empty()) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -398,59 +393,44 @@ private: } else { - if (events[i].events & (EPOLLERR | EPOLLHUP)) + bool more_reads = false; + bool more_writes = false; + bool more_except = false; + asio::error_code ec; + + // Exception operations must be processed first to ensure that any + // out-of-band data is read before normal data. + if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP)) + more_except = except_op_queue_.dispatch_operation(descriptor, ec); + else + more_except = except_op_queue_.has_operation(descriptor); + + if (events[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)) + more_reads = read_op_queue_.dispatch_operation(descriptor, ec); + else + more_reads = read_op_queue_.has_operation(descriptor); + + if (events[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) + more_writes = write_op_queue_.dispatch_operation(descriptor, ec); + else + more_writes = write_op_queue_.has_operation(descriptor); + + epoll_event ev = { 0, { 0 } }; + ev.events = EPOLLERR | EPOLLHUP; + if (more_reads) + ev.events |= EPOLLIN; + if (more_writes) + ev.events |= EPOLLOUT; + if (more_except) + ev.events |= EPOLLPRI; + ev.data.fd = descriptor; + int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); + if (result != 0) { - asio::error_code ec; - except_op_queue_.dispatch_all_operations(descriptor, ec); + ec = asio::error_code(errno, asio::native_ecat); read_op_queue_.dispatch_all_operations(descriptor, ec); write_op_queue_.dispatch_all_operations(descriptor, ec); - - epoll_event ev = { 0, { 0 } }; - ev.events = 0; - ev.data.fd = descriptor; - epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); - } - else - { - bool more_reads = false; - bool more_writes = false; - bool more_except = false; - asio::error_code ec; - - // Exception operations must be processed first to ensure that any - // out-of-band data is read before normal data. - if (events[i].events & EPOLLPRI) - more_except = except_op_queue_.dispatch_operation(descriptor, ec); - else - more_except = except_op_queue_.has_operation(descriptor); - - if (events[i].events & EPOLLIN) - more_reads = read_op_queue_.dispatch_operation(descriptor, ec); - else - more_reads = read_op_queue_.has_operation(descriptor); - - if (events[i].events & EPOLLOUT) - more_writes = write_op_queue_.dispatch_operation(descriptor, ec); - else - more_writes = write_op_queue_.has_operation(descriptor); - - epoll_event ev = { 0, { 0 } }; - ev.events = EPOLLERR | EPOLLHUP; - if (more_reads) - ev.events |= EPOLLIN; - if (more_writes) - ev.events |= EPOLLOUT; - if (more_except) - ev.events |= EPOLLPRI; - ev.data.fd = descriptor; - int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); - if (result != 0) - { - ec = asio::error_code(errno, asio::native_ecat); - read_op_queue_.dispatch_all_operations(descriptor, ec); - write_op_queue_.dispatch_all_operations(descriptor, ec); - except_op_queue_.dispatch_all_operations(descriptor, ec); - } + except_op_queue_.dispatch_all_operations(descriptor, ec); } } } @@ -458,19 +438,17 @@ private: write_op_queue_.dispatch_cancellations(); except_op_queue_.dispatch_cancellations(); for (std::size_t i = 0; i < timer_queues_.size(); ++i) + { timer_queues_[i]->dispatch_timers(); + timer_queues_[i]->dispatch_cancellations(); + } // Issue any pending cancellations. for (size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); } // Run the select loop in the thread. @@ -566,6 +544,22 @@ private: interrupter_.interrupt(); } + // Clean up operations and timers. We must not hold the lock since the + // destructors may make calls back into this reactor. We make a copy of the + // vector of timer queues since the original may be modified while the lock + // is not held. + void cleanup_operations_and_timers( + asio::detail::mutex::scoped_lock& lock) + { + timer_queues_for_cleanup_ = timer_queues_; + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); + for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) + timer_queues_for_cleanup_[i]->cleanup_timers(); + } + // Mutex to protect access to internal data. asio::detail::mutex mutex_; @@ -590,6 +584,10 @@ private: // The timer queues. std::vector timer_queues_; + // A copy of the timer queues, used when cleaning up timers. The copy is + // stored as a class data member to avoid unnecessary memory allocation. + std::vector timer_queues_for_cleanup_; + // The descriptors that are pending cancellation. std::vector pending_cancellations_; diff --git a/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp b/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp index 6628803af..18a842b46 100644 --- a/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp +++ b/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp @@ -321,7 +321,10 @@ public: std::size_t cancel_timer(timer_queue& timer_queue, void* token) { asio::detail::mutex::scoped_lock lock(mutex_); - return timer_queue.cancel_timer(token); + std::size_t n = timer_queue.cancel_timer(token); + if (n > 0) + interrupter_.interrupt(); + return n; } private: @@ -337,16 +340,13 @@ private: read_op_queue_.dispatch_cancellations(); write_op_queue_.dispatch_cancellations(); except_op_queue_.dispatch_cancellations(); + for (std::size_t i = 0; i < timer_queues_.size(); ++i) + timer_queues_[i]->dispatch_cancellations(); // Check if the thread is supposed to stop. if (stop_thread_) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -355,12 +355,7 @@ private: if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && all_timer_queues_are_empty()) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -466,19 +461,17 @@ private: write_op_queue_.dispatch_cancellations(); except_op_queue_.dispatch_cancellations(); for (std::size_t i = 0; i < timer_queues_.size(); ++i) + { timer_queues_[i]->dispatch_timers(); + timer_queues_[i]->dispatch_cancellations(); + } // Issue any pending cancellations. for (std::size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); } // Run the select loop in the thread. @@ -573,6 +566,22 @@ private: interrupter_.interrupt(); } + // Clean up operations and timers. We must not hold the lock since the + // destructors may make calls back into this reactor. We make a copy of the + // vector of timer queues since the original may be modified while the lock + // is not held. + void cleanup_operations_and_timers( + asio::detail::mutex::scoped_lock& lock) + { + timer_queues_for_cleanup_ = timer_queues_; + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); + for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) + timer_queues_for_cleanup_[i]->cleanup_timers(); + } + // Mutex to protect access to internal data. asio::detail::mutex mutex_; @@ -597,6 +606,10 @@ private: // The timer queues. std::vector timer_queues_; + // A copy of the timer queues, used when cleaning up timers. The copy is + // stored as a class data member to avoid unnecessary memory allocation. + std::vector timer_queues_for_cleanup_; + // The descriptors that are pending cancellation. std::vector pending_cancellations_; diff --git a/libtorrent/include/libtorrent/asio/detail/select_reactor.hpp b/libtorrent/include/libtorrent/asio/detail/select_reactor.hpp index 83f093ae6..079ec2de8 100644 --- a/libtorrent/include/libtorrent/asio/detail/select_reactor.hpp +++ b/libtorrent/include/libtorrent/asio/detail/select_reactor.hpp @@ -229,7 +229,10 @@ public: std::size_t cancel_timer(timer_queue& timer_queue, void* token) { asio::detail::mutex::scoped_lock lock(mutex_); - return timer_queue.cancel_timer(token); + std::size_t n = timer_queue.cancel_timer(token); + if (n > 0) + interrupter_.interrupt(); + return n; } private: @@ -245,16 +248,13 @@ private: read_op_queue_.dispatch_cancellations(); write_op_queue_.dispatch_cancellations(); except_op_queue_.dispatch_cancellations(); + for (std::size_t i = 0; i < timer_queues_.size(); ++i) + timer_queues_[i]->dispatch_cancellations(); // Check if the thread is supposed to stop. if (stop_thread_) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -263,12 +263,7 @@ private: if (!block && read_op_queue_.empty() && write_op_queue_.empty() && except_op_queue_.empty() && all_timer_queues_are_empty()) { - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); return; } @@ -321,19 +316,17 @@ private: write_op_queue_.dispatch_cancellations(); } for (std::size_t i = 0; i < timer_queues_.size(); ++i) + { timer_queues_[i]->dispatch_timers(); + timer_queues_[i]->dispatch_cancellations(); + } // Issue any pending cancellations. for (size_t i = 0; i < pending_cancellations_.size(); ++i) cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); - // Clean up operations. We must not hold the lock since the operations may - // make calls back into this reactor. - lock.unlock(); - read_op_queue_.cleanup_operations(); - write_op_queue_.cleanup_operations(); - except_op_queue_.cleanup_operations(); + cleanup_operations_and_timers(lock); } // Run the select loop in the thread. @@ -414,6 +407,22 @@ private: interrupter_.interrupt(); } + // Clean up operations and timers. We must not hold the lock since the + // destructors may make calls back into this reactor. We make a copy of the + // vector of timer queues since the original may be modified while the lock + // is not held. + void cleanup_operations_and_timers( + asio::detail::mutex::scoped_lock& lock) + { + timer_queues_for_cleanup_ = timer_queues_; + lock.unlock(); + read_op_queue_.cleanup_operations(); + write_op_queue_.cleanup_operations(); + except_op_queue_.cleanup_operations(); + for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i) + timer_queues_for_cleanup_[i]->cleanup_timers(); + } + // Mutex to protect access to internal data. asio::detail::mutex mutex_; @@ -435,6 +444,10 @@ private: // The timer queues. std::vector timer_queues_; + // A copy of the timer queues, used when cleaning up timers. The copy is + // stored as a class data member to avoid unnecessary memory allocation. + std::vector timer_queues_for_cleanup_; + // The descriptors that are pending cancellation. std::vector pending_cancellations_; diff --git a/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp b/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp index 4b38c6ee8..7f4e15edf 100644 --- a/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp +++ b/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp @@ -1504,6 +1504,12 @@ inline asio::error_code translate_addrinfo_error(int error) case EAI_MEMORY: return asio::error::no_memory; case EAI_NONAME: +#if defined(EAI_ADDRFAMILY) + case EAI_ADDRFAMILY: +#endif +#if defined(EAI_NODATA) && (EAI_NODATA != EAI_NONAME) + case EAI_NODATA: +#endif return asio::error::host_not_found; case EAI_SERVICE: return asio::error::service_not_found; diff --git a/libtorrent/include/libtorrent/asio/detail/timer_queue.hpp b/libtorrent/include/libtorrent/asio/detail/timer_queue.hpp index af1e36bd5..7735e87cf 100644 --- a/libtorrent/include/libtorrent/asio/detail/timer_queue.hpp +++ b/libtorrent/include/libtorrent/asio/detail/timer_queue.hpp @@ -48,7 +48,9 @@ public: // Constructor. timer_queue() : timers_(), - heap_() + heap_(), + cancelled_timers_(0), + cleanup_timers_(0) { } @@ -111,12 +113,17 @@ public: { timer_base* t = heap_[0]; remove_timer(t); + t->prev_ = 0; + t->next_ = cleanup_timers_; + cleanup_timers_ = t; t->invoke(asio::error_code()); } } - // Cancel the timer with the given token. The handler will be invoked - // immediately with the result operation_aborted. + // Cancel the timers with the given token. Any timers pending for the token + // will be notified that they have been cancelled next time + // dispatch_cancellations is called. Returns the number of timers that were + // cancelled. std::size_t cancel_timer(void* timer_token) { std::size_t num_cancelled = 0; @@ -129,7 +136,9 @@ public: { timer_base* next = t->next_; remove_timer(t); - t->invoke(asio::error::operation_aborted); + t->prev_ = 0; + t->next_ = cancelled_timers_; + cancelled_timers_ = t; t = next; ++num_cancelled; } @@ -137,6 +146,31 @@ public: return num_cancelled; } + // Dispatch any pending cancels for timers. + virtual void dispatch_cancellations() + { + while (cancelled_timers_) + { + timer_base* this_timer = cancelled_timers_; + cancelled_timers_ = this_timer->next_; + this_timer->next_ = cleanup_timers_; + cleanup_timers_ = this_timer; + this_timer->invoke(asio::error::operation_aborted); + } + } + + // Destroy timers that are waiting to be cleaned up. + virtual void cleanup_timers() + { + while (cleanup_timers_) + { + timer_base* next_timer = cleanup_timers_->next_; + cleanup_timers_->next_ = 0; + cleanup_timers_->destroy(); + cleanup_timers_ = next_timer; + } + } + // Destroy all timers. virtual void destroy_timers() { @@ -151,6 +185,7 @@ public: } heap_.clear(); timers_.clear(); + cleanup_timers(); } private: @@ -238,8 +273,7 @@ private: static void invoke_handler(timer_base* base, const asio::error_code& result) { - std::auto_ptr > t(static_cast*>(base)); - t->handler_(result); + static_cast*>(base)->handler_(result); } // Destroy the handler. @@ -338,6 +372,12 @@ private: // The heap of timers, with the earliest timer at the front. std::vector heap_; + + // The list of timers to be cancelled. + timer_base* cancelled_timers_; + + // The list of timers to be destroyed. + timer_base* cleanup_timers_; }; } // namespace detail diff --git a/libtorrent/include/libtorrent/asio/detail/timer_queue_base.hpp b/libtorrent/include/libtorrent/asio/detail/timer_queue_base.hpp index c8be49748..6cf25747a 100644 --- a/libtorrent/include/libtorrent/asio/detail/timer_queue_base.hpp +++ b/libtorrent/include/libtorrent/asio/detail/timer_queue_base.hpp @@ -44,6 +44,12 @@ public: // Dispatch all ready timers. virtual void dispatch_timers() = 0; + // Dispatch any pending cancels for timers. + virtual void dispatch_cancellations() = 0; + + // Destroy timers that are waiting to be cleaned up. + virtual void cleanup_timers() = 0; + // Destroy all timers. virtual void destroy_timers() = 0; }; diff --git a/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service_fwd.hpp b/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service_fwd.hpp index 184fdfa18..26eacae2a 100644 --- a/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service_fwd.hpp +++ b/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service_fwd.hpp @@ -21,6 +21,8 @@ #include #include "asio/detail/pop_options.hpp" +#include "asio/detail/socket_types.hpp" + // This service is only supported on Win32 (NT4 and later). #if !defined(ASIO_DISABLE_IOCP) #if defined(BOOST_WINDOWS) || defined(__CYGWIN__) diff --git a/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp b/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp index 007286e8d..d0078978a 100644 --- a/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp +++ b/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp @@ -1950,26 +1950,29 @@ public: } private: - // Helper function to provide InterlockedCompareExchangePointer functionality - // on very old Platform SDKs. + // Helper function to emulate InterlockedCompareExchangePointer functionality + // for: + // - very old Platform SDKs; and + // - platform SDKs where MSVC's /Wp64 option causes spurious warnings. void* interlocked_compare_exchange_pointer(void** dest, void* exch, void* cmp) { -#if defined(_WIN32_WINNT) && (_WIN32_WINNT <= 0x400) && (_M_IX86) +#if defined(_M_IX86) return reinterpret_cast(InterlockedCompareExchange( - reinterpret_cast(dest), reinterpret_cast(exch), + reinterpret_cast(dest), reinterpret_cast(exch), reinterpret_cast(cmp))); #else return InterlockedCompareExchangePointer(dest, exch, cmp); #endif } - // Helper function to provide InterlockedExchangePointer functionality on very - // old Platform SDKs. + // Helper function to emulate InterlockedExchangePointer functionality for: + // - very old Platform SDKs; and + // - platform SDKs where MSVC's /Wp64 option causes spurious warnings. void* interlocked_exchange_pointer(void** dest, void* val) { -#if defined(_WIN32_WINNT) && (_WIN32_WINNT <= 0x400) && (_M_IX86) +#if defined(_M_IX86) return reinterpret_cast(InterlockedExchange( - reinterpret_cast(dest), reinterpret_cast(val))); + reinterpret_cast(dest), reinterpret_cast(val))); #else return InterlockedExchangePointer(dest, val); #endif