From 494c0cc558d7098feb802fb8fa50d283d440b258 Mon Sep 17 00:00:00 2001 From: Marcos Pinto Date: Sun, 10 Feb 2008 22:07:23 +0000 Subject: [PATCH] sync asio --- libtorrent/include/asio/buffer.hpp | 5 + .../include/asio/detail/epoll_reactor.hpp | 14 ++- .../include/asio/detail/kqueue_reactor.hpp | 14 ++- .../asio/detail/old_win_sdk_compat.hpp | 16 ++- .../asio/detail/posix_fd_set_adapter.hpp | 13 ++- .../include/asio/detail/push_options.hpp | 6 + .../include/asio/detail/reactor_op_queue.hpp | 7 +- libtorrent/include/asio/detail/socket_ops.hpp | 21 +++- .../include/asio/detail/socket_types.hpp | 15 ++- .../asio/detail/win_fd_set_adapter.hpp | 8 +- .../asio/detail/win_iocp_io_service.hpp | 104 ++++++++++++++---- .../asio/detail/win_iocp_socket_service.hpp | 49 ++++----- libtorrent/include/asio/error.hpp | 10 +- libtorrent/include/asio/error_code.hpp | 8 +- libtorrent/include/asio/impl/error_code.ipp | 2 + .../include/asio/ssl/detail/openssl_init.hpp | 16 +++ .../asio/ssl/detail/openssl_operation.hpp | 81 ++++++++++---- .../ssl/detail/openssl_stream_service.hpp | 53 ++++++--- libtorrent/include/asio/ssl/stream.hpp | 7 +- libtorrent/include/asio/version.hpp | 2 +- 20 files changed, 347 insertions(+), 104 deletions(-) diff --git a/libtorrent/include/asio/buffer.hpp b/libtorrent/include/asio/buffer.hpp index 881bc1176..57df33edf 100644 --- a/libtorrent/include/asio/buffer.hpp +++ b/libtorrent/include/asio/buffer.hpp @@ -392,7 +392,12 @@ public: ~buffer_debug_check() { +#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400) + // MSVC's string iterator checking may crash in a std::string::iterator + // object's destructor when the iterator points to an already-destroyed + // std::string object, unless the iterator is cleared first. iter_ = Iterator(); +#endif // BOOST_WORKAROUND(BOOST_MSVC, >= 1400) } void operator()() diff --git a/libtorrent/include/asio/detail/epoll_reactor.hpp b/libtorrent/include/asio/detail/epoll_reactor.hpp index 93d39a23c..68b6fff4c 100644 --- a/libtorrent/include/asio/detail/epoll_reactor.hpp +++ b/libtorrent/include/asio/detail/epoll_reactor.hpp @@ -66,7 +66,8 @@ public: pending_cancellations_(), stop_thread_(false), thread_(0), - shutdown_(false) + shutdown_(false), + need_epoll_wait_(true) { // Start the reactor's internal thread only if needed. if (Own_Thread) @@ -387,7 +388,9 @@ private: // Block on the epoll descriptor. epoll_event events[128]; - int num_events = epoll_wait(epoll_fd_, events, 128, timeout); + int num_events = (block || need_epoll_wait_) + ? epoll_wait(epoll_fd_, events, 128, timeout) + : 0; lock.lock(); wait_in_progress_ = false; @@ -478,6 +481,10 @@ private: cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + // Determine whether epoll_wait should be called when the reactor next runs. + need_epoll_wait_ = !read_op_queue_.empty() + || !write_op_queue_.empty() || !except_op_queue_.empty(); + cleanup_operations_and_timers(lock); } @@ -632,6 +639,9 @@ private: // Whether the service has been shut down. bool shutdown_; + + // Whether we need to call epoll_wait the next time the reactor is run. + bool need_epoll_wait_; }; } // namespace detail diff --git a/libtorrent/include/asio/detail/kqueue_reactor.hpp b/libtorrent/include/asio/detail/kqueue_reactor.hpp index 896ff5403..6b09d4dbc 100644 --- a/libtorrent/include/asio/detail/kqueue_reactor.hpp +++ b/libtorrent/include/asio/detail/kqueue_reactor.hpp @@ -74,7 +74,8 @@ public: pending_cancellations_(), stop_thread_(false), thread_(0), - shutdown_(false) + shutdown_(false), + need_kqueue_wait_(true) { // Start the reactor's internal thread only if needed. if (Own_Thread) @@ -373,7 +374,9 @@ private: // Block on the kqueue descriptor. struct kevent events[128]; - int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); + int num_events = (block || need_kqueue_wait_) + ? kevent(kqueue_fd_, 0, 0, events, 128, timeout) + : 0; lock.lock(); wait_in_progress_ = false; @@ -478,6 +481,10 @@ private: cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + // Determine whether kqueue needs to be called next time the reactor is run. + need_kqueue_wait_ = !read_op_queue_.empty() + || !write_op_queue_.empty() || !except_op_queue_.empty(); + cleanup_operations_and_timers(lock); } @@ -630,6 +637,9 @@ private: // Whether the service has been shut down. bool shutdown_; + + // Whether we need to call kqueue the next time the reactor is run. + bool need_kqueue_wait_; }; } // namespace detail diff --git a/libtorrent/include/asio/detail/old_win_sdk_compat.hpp b/libtorrent/include/asio/detail/old_win_sdk_compat.hpp index 2fb957aa9..e32552197 100644 --- a/libtorrent/include/asio/detail/old_win_sdk_compat.hpp +++ b/libtorrent/include/asio/detail/old_win_sdk_compat.hpp @@ -31,6 +31,10 @@ #if defined(ASIO_HAS_OLD_WIN_SDK) // Emulation of types that are missing from old Platform SDKs. +// +// N.B. this emulation is also used if building for a Windows 2000 target with +// a recent (i.e. Vista or later) SDK, as the SDK does not provide IPv6 support +// in that case. namespace asio { namespace detail { @@ -54,9 +58,19 @@ struct sockaddr_storage_emulation struct in6_addr_emulation { - u_char s6_addr[16]; + union + { + u_char Byte[16]; + u_short Word[8]; + } u; }; +#if !defined(s6_addr) +# define _S6_un u +# define _S6_u8 Byte +# define s6_addr _S6_un._S6_u8 +#endif // !defined(s6_addr) + struct sockaddr_in6_emulation { short sin6_family; diff --git a/libtorrent/include/asio/detail/posix_fd_set_adapter.hpp b/libtorrent/include/asio/detail/posix_fd_set_adapter.hpp index c43904f4d..28e717bb1 100644 --- a/libtorrent/include/asio/detail/posix_fd_set_adapter.hpp +++ b/libtorrent/include/asio/detail/posix_fd_set_adapter.hpp @@ -35,11 +35,16 @@ public: FD_ZERO(&fd_set_); } - void set(socket_type descriptor) + bool set(socket_type descriptor) { - if (max_descriptor_ == invalid_socket || descriptor > max_descriptor_) - max_descriptor_ = descriptor; - FD_SET(descriptor, &fd_set_); + if (descriptor < (socket_type)FD_SETSIZE) + { + if (max_descriptor_ == invalid_socket || descriptor > max_descriptor_) + max_descriptor_ = descriptor; + FD_SET(descriptor, &fd_set_); + return true; + } + return false; } bool is_set(socket_type descriptor) const diff --git a/libtorrent/include/asio/detail/push_options.hpp b/libtorrent/include/asio/detail/push_options.hpp index 0b68d2933..96ddf7d04 100644 --- a/libtorrent/include/asio/detail/push_options.hpp +++ b/libtorrent/include/asio/detail/push_options.hpp @@ -90,6 +90,12 @@ # pragma warning (disable:4244) # pragma warning (disable:4355) # pragma warning (disable:4675) +# if defined(_M_IX86) && defined(_Wp64) +// The /Wp64 option is broken. If you want to check 64 bit portability, use a +// 64 bit compiler! +# pragma warning (disable:4311) +# pragma warning (disable:4312) +# endif // defined(_M_IX86) && defined(_Wp64) # pragma pack (push, 8) // Note that if the /Og optimisation flag is enabled with MSVC6, the compiler // has a tendency to incorrectly optimise away some calls to member template diff --git a/libtorrent/include/asio/detail/reactor_op_queue.hpp b/libtorrent/include/asio/detail/reactor_op_queue.hpp index b2d1054c6..583636047 100644 --- a/libtorrent/include/asio/detail/reactor_op_queue.hpp +++ b/libtorrent/include/asio/detail/reactor_op_queue.hpp @@ -173,8 +173,13 @@ public: typename operation_map::iterator i = operations_.begin(); while (i != operations_.end()) { - descriptors.set(i->first); + Descriptor descriptor = i->first; ++i; + if (!descriptors.set(descriptor)) + { + asio::error_code ec(error::fd_set_failure); + dispatch_all_operations(descriptor, ec); + } } } diff --git a/libtorrent/include/asio/detail/socket_ops.hpp b/libtorrent/include/asio/detail/socket_ops.hpp index ee0b4b582..06ffe9f91 100644 --- a/libtorrent/include/asio/detail/socket_ops.hpp +++ b/libtorrent/include/asio/detail/socket_ops.hpp @@ -1124,8 +1124,12 @@ inline void gai_free(void* p) inline void gai_strcpy(char* target, const char* source, std::size_t max_size) { using namespace std; +#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400) && !defined(UNDER_CE) + strcpy_s(target, max_size, source); +#else *target = 0; strncat(target, source, max_size); +#endif } enum { gai_clone_flag = 1 << 30 }; @@ -1658,7 +1662,11 @@ inline asio::error_code getnameinfo_emulation( { return ec = asio::error::no_buffer_space; } +#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400) && !defined(UNDER_CE) + sprintf_s(serv, servlen, "%u", ntohs(port)); +#else sprintf(serv, "%u", ntohs(port)); +#endif } else { @@ -1677,7 +1685,11 @@ inline asio::error_code getnameinfo_emulation( { return ec = asio::error::no_buffer_space; } +#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400) && !defined(UNDER_CE) + sprintf_s(serv, servlen, "%u", ntohs(port)); +#else sprintf(serv, "%u", ntohs(port)); +#endif } #if defined(BOOST_HAS_THREADS) && defined(BOOST_HAS_PTHREADS) ::pthread_mutex_unlock(&mutex); @@ -1799,19 +1811,22 @@ inline asio::error_code getnameinfo(const socket_addr_type* addr, # if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0501) || defined(UNDER_CE) // Building for Windows XP, Windows Server 2003, or later. clear_error(ec); - int error = ::getnameinfo(addr, addrlen, host, static_cast(hostlen), + int error = ::getnameinfo(addr, static_cast(addrlen), + host, static_cast(hostlen), serv, static_cast(servlen), flags); return ec = translate_addrinfo_error(error); # else // Building for Windows 2000 or earlier. typedef int (WSAAPI *gni_t)(const socket_addr_type*, - int, char*, std::size_t, char*, std::size_t, int); + int, char*, DWORD, char*, DWORD, int); if (HMODULE winsock_module = ::GetModuleHandleA("ws2_32")) { if (gni_t gni = (gni_t)::GetProcAddress(winsock_module, "getnameinfo")) { clear_error(ec); - int error = gni(addr, addrlen, host, hostlen, serv, servlen, flags); + int error = gni(addr, static_cast(addrlen), + host, static_cast(hostlen), + serv, static_cast(servlen), flags); return ec = translate_addrinfo_error(error); } } diff --git a/libtorrent/include/asio/detail/socket_types.hpp b/libtorrent/include/asio/detail/socket_types.hpp index e2b68910c..d1497a345 100644 --- a/libtorrent/include/asio/detail/socket_types.hpp +++ b/libtorrent/include/asio/detail/socket_types.hpp @@ -92,7 +92,11 @@ # include # include # include -# include +# if defined(__hpux) && !defined(__HP_aCC) +# include +# else +# include +# endif # include # include # include @@ -156,7 +160,16 @@ const int max_addr_v4_str_len = INET_ADDRSTRLEN; const int max_addr_v6_str_len = INET6_ADDRSTRLEN + 1 + IF_NAMESIZE; typedef sockaddr socket_addr_type; typedef in_addr in4_addr_type; +# if defined(__hpux) +// HP-UX doesn't provide ip_mreq when _XOPEN_SOURCE_EXTENDED is defined. +struct in4_mreq_type +{ + struct in_addr imr_multiaddr; + struct in_addr imr_interface; +}; +# else typedef ip_mreq in4_mreq_type; +# endif typedef sockaddr_in sockaddr_in4_type; typedef in6_addr in6_addr_type; typedef ipv6_mreq in6_mreq_type; diff --git a/libtorrent/include/asio/detail/win_fd_set_adapter.hpp b/libtorrent/include/asio/detail/win_fd_set_adapter.hpp index f2632c4d0..c97063a53 100644 --- a/libtorrent/include/asio/detail/win_fd_set_adapter.hpp +++ b/libtorrent/include/asio/detail/win_fd_set_adapter.hpp @@ -36,13 +36,17 @@ public: fd_set_.fd_count = 0; } - void set(socket_type descriptor) + bool set(socket_type descriptor) { for (u_int i = 0; i < fd_set_.fd_count; ++i) if (fd_set_.fd_array[i] == descriptor) - return; + return true; if (fd_set_.fd_count < win_fd_set_size) + { fd_set_.fd_array[fd_set_.fd_count++] = descriptor; + return true; + } + return false; } bool is_set(socket_type descriptor) const diff --git a/libtorrent/include/asio/detail/win_iocp_io_service.hpp b/libtorrent/include/asio/detail/win_iocp_io_service.hpp index 46e651653..fe2e58a0e 100644 --- a/libtorrent/include/asio/detail/win_iocp_io_service.hpp +++ b/libtorrent/include/asio/detail/win_iocp_io_service.hpp @@ -34,7 +34,6 @@ #include "asio/detail/service_base.hpp" #include "asio/detail/socket_types.hpp" #include "asio/detail/timer_queue.hpp" -#include "asio/detail/win_iocp_operation.hpp" #include "asio/detail/mutex.hpp" namespace asio { @@ -44,14 +43,64 @@ class win_iocp_io_service : public asio::detail::service_base { public: - // Base class for all operations. - typedef win_iocp_operation operation; + // Base class for all operations. A function pointer is used instead of + // virtual functions to avoid the associated overhead. + // + // This class inherits from OVERLAPPED so that we can downcast to get back to + // the operation pointer from the LPOVERLAPPED out parameter of + // GetQueuedCompletionStatus. + class operation + : public OVERLAPPED + { + public: + typedef void (*invoke_func_type)(operation*, DWORD, size_t); + typedef void (*destroy_func_type)(operation*); + + operation(win_iocp_io_service& iocp_service, + invoke_func_type invoke_func, destroy_func_type destroy_func) + : outstanding_operations_(&iocp_service.outstanding_operations_), + invoke_func_(invoke_func), + destroy_func_(destroy_func) + { + Internal = 0; + InternalHigh = 0; + Offset = 0; + OffsetHigh = 0; + hEvent = 0; + + ::InterlockedIncrement(outstanding_operations_); + } + + void do_completion(DWORD last_error, size_t bytes_transferred) + { + invoke_func_(this, last_error, bytes_transferred); + } + + void destroy() + { + destroy_func_(this); + } + + protected: + // Prevent deletion through this type. + ~operation() + { + ::InterlockedDecrement(outstanding_operations_); + } + + private: + long* outstanding_operations_; + invoke_func_type invoke_func_; + destroy_func_type destroy_func_; + }; + // Constructor. win_iocp_io_service(asio::io_service& io_service) : asio::detail::service_base(io_service), iocp_(), outstanding_work_(0), + outstanding_operations_(0), stopped_(0), shutdown_(0), timer_thread_(0), @@ -79,7 +128,7 @@ public: { ::InterlockedExchange(&shutdown_, 1); - for (;;) + while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0) { DWORD bytes_transferred = 0; #if (WINVER < 0x0500) @@ -88,12 +137,8 @@ public: DWORD_PTR completion_key = 0; #endif LPOVERLAPPED overlapped = 0; - ::SetLastError(0); - BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, - &bytes_transferred, &completion_key, &overlapped, 0); - DWORD last_error = ::GetLastError(); - if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT) - break; + ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, + &completion_key, &overlapped, INFINITE); if (overlapped) static_cast(overlapped)->destroy(); } @@ -249,7 +294,7 @@ public: } // Request invocation of the given OVERLAPPED-derived operation. - void post_completion(win_iocp_operation* op, DWORD op_last_error, + void post_completion(operation* op, DWORD op_last_error, DWORD bytes_transferred) { // Enqueue the operation on the I/O completion port. @@ -347,7 +392,7 @@ private: &timer_thread_, this_thread_id, 0) == 0); // Calculate timeout for GetQueuedCompletionStatus call. - DWORD timeout = max_timeout; + DWORD timeout = INFINITE; if (dispatching_timers) { asio::detail::mutex::scoped_lock lock(timer_mutex_); @@ -371,13 +416,28 @@ private: // Dispatch any pending timers. if (dispatching_timers) { - asio::detail::mutex::scoped_lock lock(timer_mutex_); - timer_queues_copy_ = timer_queues_; - for (std::size_t i = 0; i < timer_queues_.size(); ++i) + try { - timer_queues_[i]->dispatch_timers(); - timer_queues_[i]->dispatch_cancellations(); - timer_queues_[i]->cleanup_timers(); + asio::detail::mutex::scoped_lock lock(timer_mutex_); + timer_queues_copy_ = timer_queues_; + for (std::size_t i = 0; i < timer_queues_.size(); ++i) + { + timer_queues_[i]->dispatch_timers(); + timer_queues_[i]->dispatch_cancellations(); + timer_queues_[i]->cleanup_timers(); + } + } + catch (...) + { + // Transfer responsibility for dispatching timers to another thread. + if (::InterlockedCompareExchange(&timer_thread_, + 0, this_thread_id) == this_thread_id) + { + ::PostQueuedCompletionStatus(iocp_.handle, + 0, transfer_timer_dispatching, 0); + } + + throw; } } @@ -532,7 +592,7 @@ private: { handler_operation(win_iocp_io_service& io_service, Handler handler) - : operation(&handler_operation::do_completion_impl, + : operation(io_service, &handler_operation::do_completion_impl, &handler_operation::destroy_impl), io_service_(io_service), handler_(handler) @@ -593,6 +653,10 @@ private: // The count of unfinished work. long outstanding_work_; + // The count of unfinished operations. + long outstanding_operations_; + friend class operation; + // Flag to indicate whether the event loop has been stopped. long stopped_; @@ -602,7 +666,7 @@ private: enum { // Maximum GetQueuedCompletionStatus timeout, in milliseconds. - max_timeout = 1000, + max_timeout = 500, // Completion key value to indicate that responsibility for dispatching // timers is being cooperatively transferred from one thread to another. diff --git a/libtorrent/include/asio/detail/win_iocp_socket_service.hpp b/libtorrent/include/asio/detail/win_iocp_socket_service.hpp index cb3640638..faf6a5c1e 100644 --- a/libtorrent/include/asio/detail/win_iocp_socket_service.hpp +++ b/libtorrent/include/asio/detail/win_iocp_socket_service.hpp @@ -56,7 +56,7 @@ public: typedef typename Protocol::endpoint endpoint_type; // Base class for all operations. - typedef win_iocp_operation operation; + typedef win_iocp_io_service::operation operation; struct noop_deleter { void operator()(void*) {} }; typedef boost::shared_ptr shared_cancel_token_type; @@ -680,13 +680,13 @@ public: : public operation { public: - send_operation(asio::io_service& io_service, + send_operation(win_iocp_io_service& io_service, weak_cancel_token_type cancel_token, const ConstBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &send_operation::do_completion_impl, &send_operation::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) @@ -782,8 +782,8 @@ public: typedef send_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), impl.cancel_token_, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + impl.cancel_token_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -860,7 +860,7 @@ public: // Send the data. DWORD bytes_transferred = 0; int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, - flags, destination.data(), destination.size(), 0, 0); + flags, destination.data(), static_cast(destination.size()), 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -880,12 +880,12 @@ public: : public operation { public: - send_to_operation(asio::io_service& io_service, + send_to_operation(win_iocp_io_service& io_service, const ConstBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &send_to_operation::do_completion_impl, &send_to_operation::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), buffers_(buffers), handler_(handler) { @@ -973,8 +973,7 @@ public: typedef send_to_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -991,8 +990,8 @@ public: // Send the data. DWORD bytes_transferred = 0; - int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, - flags, destination.data(), destination.size(), ptr.get(), 0); + int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags, + destination.data(), static_cast(destination.size()), ptr.get(), 0); DWORD last_error = ::WSAGetLastError(); // Check if the operation completed immediately. @@ -1074,15 +1073,15 @@ public: : public operation { public: - receive_operation(asio::io_service& io_service, + receive_operation(win_iocp_io_service& io_service, weak_cancel_token_type cancel_token, const MutableBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &receive_operation< MutableBufferSequence, Handler>::do_completion_impl, &receive_operation< MutableBufferSequence, Handler>::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) @@ -1185,8 +1184,8 @@ public: typedef receive_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), impl.cancel_token_, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + impl.cancel_token_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -1290,17 +1289,17 @@ public: : public operation { public: - receive_from_operation(asio::io_service& io_service, + receive_from_operation(win_iocp_io_service& io_service, endpoint_type& endpoint, const MutableBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &receive_from_operation< MutableBufferSequence, Handler>::do_completion_impl, &receive_from_operation< MutableBufferSequence, Handler>::destroy_impl), endpoint_(endpoint), endpoint_size_(static_cast(endpoint.capacity())), - work_(io_service), + work_(io_service.get_io_service()), buffers_(buffers), handler_(handler) { @@ -1405,8 +1404,8 @@ public: typedef receive_from_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), sender_endp, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + sender_endp, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -1508,7 +1507,7 @@ public: socket_type socket, socket_type new_socket, Socket& peer, const protocol_type& protocol, endpoint_type* peer_endpoint, bool enable_connection_aborted, Handler handler) - : operation( + : operation(io_service, &accept_operation::do_completion_impl, &accept_operation::destroy_impl), io_service_(io_service), diff --git a/libtorrent/include/asio/error.hpp b/libtorrent/include/asio/error.hpp index c65599c91..1c1946af6 100644 --- a/libtorrent/include/asio/error.hpp +++ b/libtorrent/include/asio/error.hpp @@ -70,6 +70,11 @@ enum basic_errors /// Operation already in progress. already_started = ASIO_SOCKET_ERROR(EALREADY), + /// Broken pipe. + broken_pipe = ASIO_WIN_OR_POSIX( + ASIO_NATIVE_ERROR(ERROR_BROKEN_PIPE), + ASIO_NATIVE_ERROR(EPIPE)), + /// A connection has been aborted. connection_aborted = ASIO_SOCKET_ERROR(ECONNABORTED), @@ -194,7 +199,10 @@ enum misc_errors eof, /// Element not found. - not_found + not_found, + + /// The descriptor cannot fit into the select system call's fd_set. + fd_set_failure }; enum ssl_errors diff --git a/libtorrent/include/asio/error_code.hpp b/libtorrent/include/asio/error_code.hpp index 516d599d7..6c8370d42 100644 --- a/libtorrent/include/asio/error_code.hpp +++ b/libtorrent/include/asio/error_code.hpp @@ -112,7 +112,11 @@ public: { }; - typedef unspecified_bool_type_t* unspecified_bool_type; + typedef void (*unspecified_bool_type)(unspecified_bool_type_t); + + static void unspecified_bool_true(unspecified_bool_type_t) + { + } /// Operator returns non-null if there is a non-success error code. operator unspecified_bool_type() const @@ -120,7 +124,7 @@ public: if (value_ == 0) return 0; else - return reinterpret_cast(1); + return &error_code::unspecified_bool_true; } /// Operator to test if the error represents success. diff --git a/libtorrent/include/asio/impl/error_code.ipp b/libtorrent/include/asio/impl/error_code.ipp index 9925cb484..52aef2851 100644 --- a/libtorrent/include/asio/impl/error_code.ipp +++ b/libtorrent/include/asio/impl/error_code.ipp @@ -35,6 +35,8 @@ inline std::string error_code::message() const return "Already open."; if (*this == error::not_found) return "Not found."; + if (*this == error::fd_set_failure) + return "The descriptor does not fit into the select call's fd_set."; if (category_ == error::get_ssl_category()) return "SSL error."; #if defined(BOOST_WINDOWS) || defined(__CYGWIN__) diff --git a/libtorrent/include/asio/ssl/detail/openssl_init.hpp b/libtorrent/include/asio/ssl/detail/openssl_init.hpp index 06fbf86fe..e646acf48 100755 --- a/libtorrent/include/asio/ssl/detail/openssl_init.hpp +++ b/libtorrent/include/asio/ssl/detail/openssl_init.hpp @@ -20,10 +20,12 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include "asio/detail/pop_options.hpp" #include "asio/detail/mutex.hpp" +#include "asio/detail/tss_ptr.hpp" #include "asio/ssl/detail/openssl_types.hpp" namespace asio { @@ -51,6 +53,7 @@ private: for (size_t i = 0; i < mutexes_.size(); ++i) mutexes_[i].reset(new asio::detail::mutex); ::CRYPTO_set_locking_callback(&do_init::openssl_locking_func); + ::CRYPTO_set_id_callback(&do_init::openssl_id_func); } } @@ -58,6 +61,7 @@ private: { if (Do_Init) { + ::CRYPTO_set_id_callback(0); ::CRYPTO_set_locking_callback(0); ::ERR_free_strings(); ::ERR_remove_state(0); @@ -80,6 +84,15 @@ private: } private: + static unsigned long openssl_id_func() + { + void* id = instance()->thread_id_; + if (id == 0) + instance()->thread_id_ = id = &id; // Ugh. + BOOST_ASSERT(sizeof(unsigned long) >= sizeof(void*)); + return reinterpret_cast(id); + } + static void openssl_locking_func(int mode, int n, const char *file, int line) { @@ -91,6 +104,9 @@ private: // Mutexes to be used in locking callbacks. std::vector > mutexes_; + + // The thread identifiers to be used by openssl. + asio::detail::tss_ptr thread_id_; }; public: diff --git a/libtorrent/include/asio/ssl/detail/openssl_operation.hpp b/libtorrent/include/asio/ssl/detail/openssl_operation.hpp index c254aceb2..503559e7f 100755 --- a/libtorrent/include/asio/ssl/detail/openssl_operation.hpp +++ b/libtorrent/include/asio/ssl/detail/openssl_operation.hpp @@ -19,6 +19,7 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include "asio/detail/pop_options.hpp" @@ -87,10 +88,12 @@ public: net_buffer& recv_buf, SSL* session, BIO* ssl_bio, - user_handler_func handler + user_handler_func handler, + asio::io_service::strand& strand ) : primitive_(primitive) , user_handler_(handler) + , strand_(&strand) , recv_buf_(recv_buf) , socket_(socket) , ssl_bio_(ssl_bio) @@ -100,6 +103,10 @@ public: &openssl_operation::do_async_write, this, boost::arg<1>(), boost::arg<2>() ); + read_ = boost::bind( + &openssl_operation::do_async_read, + this + ); handler_= boost::bind( &openssl_operation::async_user_handler, this, boost::arg<1>(), boost::arg<2>() @@ -113,6 +120,7 @@ public: SSL* session, BIO* ssl_bio) : primitive_(primitive) + , strand_(0) , recv_buf_(recv_buf) , socket_(socket) , ssl_bio_(ssl_bio) @@ -122,6 +130,10 @@ public: &openssl_operation::do_sync_write, this, boost::arg<1>(), boost::arg<2>() ); + read_ = boost::bind( + &openssl_operation::do_sync_read, + this + ); handler_ = boost::bind( &openssl_operation::sync_user_handler, this, boost::arg<1>(), boost::arg<2>() @@ -134,7 +146,7 @@ public: int start() { int rc = primitive_( session_ ); - int sys_error_code = ERR_get_error(); + bool is_operation_done = (rc > 0); // For connect/accept/shutdown, the operation // is done, when return code is 1 @@ -144,6 +156,8 @@ public: int error_code = !is_operation_done ? ::SSL_get_error( session_, rc ) : 0; + int sys_error_code = ERR_get_error(); + bool is_read_needed = (error_code == SSL_ERROR_WANT_READ); bool is_write_needed = (error_code == SSL_ERROR_WANT_WRITE || ::BIO_ctrl_pending( ssl_bio_ )); @@ -211,6 +225,10 @@ public: return start(); } + else if (is_read_needed) + { + return read_(); + } } // Continue with operation, flush any SSL data out to network... @@ -222,10 +240,13 @@ private: typedef boost::function int_handler_func; typedef boost::function write_func; + typedef boost::function read_func; ssl_primitive_func primitive_; user_handler_func user_handler_; + asio::io_service::strand* strand_; write_func write_; + read_func read_; int_handler_func handler_; net_buffer send_buf_; // buffers for network IO @@ -249,8 +270,15 @@ private: throw asio::system_error(error); } - int async_user_handler(const asio::error_code& error, int rc) + int async_user_handler(asio::error_code error, int rc) { + if (rc < 0) + { + if (!error) + error = asio::error::no_recovery; + rc = 0; + } + user_handler_(error, rc); return 0; } @@ -280,19 +308,23 @@ private: { unsigned char *data_start = send_buf_.get_unused_start(); send_buf_.data_added(len); - + + BOOST_ASSERT(strand_); asio::async_write ( socket_, asio::buffer(data_start, len), - boost::bind + strand_->wrap ( - &openssl_operation::async_write_handler, - this, - is_operation_done, - rc, - asio::placeholders::error, - asio::placeholders::bytes_transferred + boost::bind + ( + &openssl_operation::async_write_handler, + this, + is_operation_done, + rc, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) ) ); @@ -315,8 +347,8 @@ private: } // OPeration is not done and writing to net has been made... - // start reading... - do_async_read(); + // start operation again + start(); return 0; } @@ -339,21 +371,26 @@ private: handler_(error, rc); } - void do_async_read() + int do_async_read() { // Wait for new data + BOOST_ASSERT(strand_); socket_.async_read_some ( asio::buffer(recv_buf_.get_unused_start(), recv_buf_.get_unused_len()), - boost::bind + strand_->wrap ( - &openssl_operation::async_read_handler, - this, - asio::placeholders::error, - asio::placeholders::bytes_transferred - ) + boost::bind + ( + &openssl_operation::async_read_handler, + this, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ) ); + return 0; } void async_read_handler(const asio::error_code& error, @@ -432,8 +469,8 @@ private: // Finish the operation, with success return rc; - // Operation is not finished, read data from net... - return do_sync_read(); + // Operation is not finished, start again. + return start(); } int do_sync_read() diff --git a/libtorrent/include/asio/ssl/detail/openssl_stream_service.hpp b/libtorrent/include/asio/ssl/detail/openssl_stream_service.hpp index 3812deb80..ea0339326 100644 --- a/libtorrent/include/asio/ssl/detail/openssl_stream_service.hpp +++ b/libtorrent/include/asio/ssl/detail/openssl_stream_service.hpp @@ -20,6 +20,7 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "asio/error.hpp" #include "asio/io_service.hpp" +#include "asio/strand.hpp" #include "asio/detail/service_base.hpp" #include "asio/ssl/basic_context.hpp" #include "asio/ssl/stream_base.hpp" @@ -42,6 +44,8 @@ class openssl_stream_service : public asio::detail::service_base { private: + enum { max_buffer_size = INT_MAX }; + //Base handler for asyncrhonous operations template class base_handler @@ -160,7 +164,8 @@ public: // Construct a new stream socket service for the specified io_service. explicit openssl_stream_service(asio::io_service& io_service) - : asio::detail::service_base(io_service) + : asio::detail::service_base(io_service), + strand_(io_service) { } @@ -255,11 +260,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Shut down SSL on the stream. @@ -309,11 +315,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Write some data to the stream. @@ -324,10 +331,14 @@ public: size_t bytes_transferred = 0; try { + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function send_func = boost::bind(&::SSL_write, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - static_cast(asio::buffer_size(*buffers.begin()))); + static_cast(buffer_size)); openssl_operation op( send_func, next_layer, @@ -356,10 +367,14 @@ public: send_handler* local_handler = new send_handler(handler, get_io_service()); + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function send_func = boost::bind(&::SSL_write, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - static_cast(asio::buffer_size(*buffers.begin()))); + static_cast(buffer_size)); openssl_operation* op = new openssl_operation ( @@ -374,11 +389,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Read some data from the stream. @@ -389,10 +405,14 @@ public: size_t bytes_transferred = 0; try { + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function recv_func = boost::bind(&::SSL_read, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - asio::buffer_size(*buffers.begin())); + static_cast(buffer_size)); openssl_operation op(recv_func, next_layer, impl->recv_buf, @@ -421,10 +441,14 @@ public: recv_handler* local_handler = new recv_handler(handler, get_io_service()); + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function recv_func = boost::bind(&::SSL_read, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - asio::buffer_size(*buffers.begin())); + static_cast(buffer_size)); openssl_operation* op = new openssl_operation ( @@ -439,11 +463,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Peek at the incoming data on the stream. @@ -465,6 +490,8 @@ public: } private: + asio::io_service::strand strand_; + typedef asio::detail::mutex mutex_type; template diff --git a/libtorrent/include/asio/ssl/stream.hpp b/libtorrent/include/asio/ssl/stream.hpp index 9ee48fef1..e22f6b06c 100644 --- a/libtorrent/include/asio/ssl/stream.hpp +++ b/libtorrent/include/asio/ssl/stream.hpp @@ -44,16 +44,15 @@ namespace ssl { * @e Shared @e objects: Unsafe. * * @par Example - * To use the SSL stream template with a stream_socket, you would write: + * To use the SSL stream template with an ip::tcp::socket, you would write: * @code * asio::io_service io_service; * asio::ssl::context context(io_service, asio::ssl::context::sslv23); - * asio::ssl::stream sock(io_service, context); + * asio::ssl::stream sock(io_service, context); * @endcode * * @par Concepts: - * Async_Object, Async_Read_Stream, Async_Write_Stream, Error_Source, Stream, - * Sync_Read_Stream, Sync_Write_Stream. + * AsyncReadStream, AsyncWriteStream, Stream, SyncRead_Stream, SyncWriteStream. */ template class stream diff --git a/libtorrent/include/asio/version.hpp b/libtorrent/include/asio/version.hpp index 82cf4e15a..f2d68f986 100644 --- a/libtorrent/include/asio/version.hpp +++ b/libtorrent/include/asio/version.hpp @@ -18,6 +18,6 @@ // ASIO_VERSION % 100 is the sub-minor version // ASIO_VERSION / 100 % 1000 is the minor version // ASIO_VERSION / 100000 is the major version -#define ASIO_VERSION 308 // 0.3.8 +#define ASIO_VERSION 309 // 0.3.9 #endif // ASIO_VERSION_HPP