// // strand_service.hpp // ~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2007 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_DETAIL_STRAND_SERVICE_HPP #define ASIO_DETAIL_STRAND_SERVICE_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp" #include #include #include #include "asio/detail/pop_options.hpp" #include "asio/io_service.hpp" #include "asio/detail/bind_handler.hpp" #include "asio/detail/call_stack.hpp" #include "asio/detail/handler_alloc_helpers.hpp" #include "asio/detail/handler_invoke_helpers.hpp" #include "asio/detail/mutex.hpp" #include "asio/detail/noncopyable.hpp" #include "asio/detail/service_base.hpp" namespace asio { namespace detail { // Default service implementation for a strand. class strand_service : public asio::detail::service_base { public: class handler_base; class invoke_current_handler; class post_next_waiter_on_exit; // The underlying implementation of a strand. class strand_impl { #if defined (__BORLANDC__) public: #else private: #endif void add_ref() { asio::detail::mutex::scoped_lock lock(mutex_); ++ref_count_; } void release() { asio::detail::mutex::scoped_lock lock(mutex_); --ref_count_; if (ref_count_ == 0) { lock.unlock(); delete this; } } private: // Only this service will have access to the internal values. friend class strand_service; friend class post_next_waiter_on_exit; friend class invoke_current_handler; strand_impl(strand_service& owner) : owner_(owner), current_handler_(0), first_waiter_(0), last_waiter_(0), ref_count_(0) { // Insert implementation into linked list of all implementations. asio::detail::mutex::scoped_lock lock(owner_.mutex_); next_ = owner_.impl_list_; prev_ = 0; if (owner_.impl_list_) owner_.impl_list_->prev_ = this; owner_.impl_list_ = this; } ~strand_impl() { // Remove implementation from linked list of all implementations. asio::detail::mutex::scoped_lock lock(owner_.mutex_); if (owner_.impl_list_ == this) owner_.impl_list_ = next_; if (prev_) prev_->next_ = next_; if (next_) next_->prev_= prev_; next_ = 0; prev_ = 0; lock.unlock(); if (current_handler_) { current_handler_->destroy(); } while (first_waiter_) { handler_base* next = first_waiter_->next_; first_waiter_->destroy(); first_waiter_ = next; } } // Mutex to protect access to internal data. asio::detail::mutex mutex_; // The service that owns this implementation. strand_service& owner_; // The handler that is ready to execute. If this pointer is non-null then it // indicates that a handler holds the lock. handler_base* current_handler_; // The start of the list of waiting handlers for the strand. handler_base* first_waiter_; // The end of the list of waiting handlers for the strand. handler_base* last_waiter_; // Storage for posted handlers. typedef boost::aligned_storage<64> handler_storage_type; #if defined(__BORLANDC__) boost::aligned_storage<64> handler_storage_; #else handler_storage_type handler_storage_; #endif // Pointers to adjacent socket implementations in linked list. strand_impl* next_; strand_impl* prev_; // The reference count on the strand implementation. size_t ref_count_; #if !defined(__BORLANDC__) friend void intrusive_ptr_add_ref(strand_impl* p) { p->add_ref(); } friend void intrusive_ptr_release(strand_impl* p) { p->release(); } #endif }; friend class strand_impl; typedef boost::intrusive_ptr implementation_type; // Base class for all handler types. class handler_base { public: typedef void (*invoke_func_type)(handler_base*, strand_service&, implementation_type&); typedef void (*destroy_func_type)(handler_base*); handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func) : next_(0), invoke_func_(invoke_func), destroy_func_(destroy_func) { } void invoke(strand_service& service_impl, implementation_type& impl) { invoke_func_(this, service_impl, impl); } void destroy() { destroy_func_(this); } protected: ~handler_base() { } private: friend class strand_service; friend class strand_impl; friend class post_next_waiter_on_exit; handler_base* next_; invoke_func_type invoke_func_; destroy_func_type destroy_func_; }; // Helper class to allow handlers to be dispatched. class invoke_current_handler { public: invoke_current_handler(strand_service& service_impl, const implementation_type& impl) : service_impl_(service_impl), impl_(impl) { } void operator()() { impl_->current_handler_->invoke(service_impl_, impl_); } friend void* asio_handler_allocate(std::size_t size, invoke_current_handler* this_handler) { return this_handler->do_handler_allocate(size); } friend void asio_handler_deallocate(void*, std::size_t, invoke_current_handler*) { } void* do_handler_allocate(std::size_t size) { #if defined(__BORLANDC__) BOOST_ASSERT(size <= boost::aligned_storage<64>::size); #else BOOST_ASSERT(size <= strand_impl::handler_storage_type::size); #endif return impl_->handler_storage_.address(); } // The asio_handler_invoke hook is not defined here since the default one // provides the correct behaviour, and including it here breaks MSVC 7.1 // in some situations. private: strand_service& service_impl_; implementation_type impl_; }; // Helper class to automatically enqueue next waiter on block exit. class post_next_waiter_on_exit { public: post_next_waiter_on_exit(strand_service& service_impl, implementation_type& impl) : service_impl_(service_impl), impl_(impl), cancelled_(false) { } ~post_next_waiter_on_exit() { if (!cancelled_) { asio::detail::mutex::scoped_lock lock(impl_->mutex_); impl_->current_handler_ = impl_->first_waiter_; if (impl_->current_handler_) { impl_->first_waiter_ = impl_->first_waiter_->next_; if (impl_->first_waiter_ == 0) impl_->last_waiter_ = 0; lock.unlock(); service_impl_.io_service().post( invoke_current_handler(service_impl_, impl_)); } } } void cancel() { cancelled_ = true; } private: strand_service& service_impl_; implementation_type& impl_; bool cancelled_; }; // Class template for a waiter. template class handler_wrapper : public handler_base { public: handler_wrapper(Handler handler) : handler_base(&handler_wrapper::do_invoke, &handler_wrapper::do_destroy), handler_(handler) { } static void do_invoke(handler_base* base, strand_service& service_impl, implementation_type& impl) { // Take ownership of the handler object. typedef handler_wrapper this_type; this_type* h(static_cast(base)); typedef handler_alloc_traits alloc_traits; handler_ptr ptr(h->handler_, h); post_next_waiter_on_exit p1(service_impl, impl); // Make a copy of the handler so that the memory can be deallocated before // the upcall is made. Handler handler(h->handler_); // A handler object must still be valid when the next waiter is posted // since destroying the last handler might cause the strand object to be // destroyed. Therefore we create a second post_next_waiter_on_exit object // that will be destroyed before the handler object. p1.cancel(); post_next_waiter_on_exit p2(service_impl, impl); // Free the memory associated with the handler. ptr.reset(); // Indicate that this strand is executing on the current thread. call_stack::context ctx(impl.get()); // Make the upcall. asio_handler_invoke_helpers::invoke(handler, &handler); } static void do_destroy(handler_base* base) { // Take ownership of the handler object. typedef handler_wrapper this_type; this_type* h(static_cast(base)); typedef handler_alloc_traits alloc_traits; handler_ptr ptr(h->handler_, h); } private: Handler handler_; }; // Construct a new strand service for the specified io_service. explicit strand_service(asio::io_service& io_service) : asio::detail::service_base(io_service), mutex_(), impl_list_(0) { } // Destroy all user-defined handler objects owned by the service. void shutdown_service() { // Construct a list of all handlers to be destroyed. asio::detail::mutex::scoped_lock lock(mutex_); strand_impl* impl = impl_list_; handler_base* first_handler = 0; while (impl) { if (impl->current_handler_) { impl->current_handler_->next_ = first_handler; first_handler = impl->current_handler_; impl->current_handler_ = 0; } if (impl->first_waiter_) { impl->last_waiter_->next_ = first_handler; first_handler = impl->first_waiter_; impl->first_waiter_ = 0; impl->last_waiter_ = 0; } impl = impl->next_; } // Destroy all handlers without holding the lock. lock.unlock(); while (first_handler) { handler_base* next = first_handler->next_; first_handler->destroy(); first_handler = next; } } // Construct a new strand implementation. void construct(implementation_type& impl) { impl = implementation_type(new strand_impl(*this)); } // Destroy a strand implementation. void destroy(implementation_type& impl) { implementation_type().swap(impl); } // Request the io_service to invoke the given handler. template void dispatch(implementation_type& impl, Handler handler) { if (call_stack::contains(impl.get())) { asio_handler_invoke_helpers::invoke(handler, &handler); } else { asio::detail::mutex::scoped_lock lock(impl->mutex_); // Allocate and construct an object to wrap the handler. typedef handler_wrapper value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); handler_ptr ptr(raw_ptr, handler); if (impl->current_handler_ == 0) { // This handler now has the lock, so can be dispatched immediately. impl->current_handler_ = ptr.get(); lock.unlock(); this->io_service().dispatch(invoke_current_handler(*this, impl)); ptr.release(); } else { // Another handler already holds the lock, so this handler must join // the list of waiters. The handler will be posted automatically when // its turn comes. if (impl->last_waiter_) { impl->last_waiter_->next_ = ptr.get(); impl->last_waiter_ = impl->last_waiter_->next_; } else { impl->first_waiter_ = ptr.get(); impl->last_waiter_ = ptr.get(); } ptr.release(); } } } // Request the io_service to invoke the given handler and return immediately. template void post(implementation_type& impl, Handler handler) { asio::detail::mutex::scoped_lock lock(impl->mutex_); // Allocate and construct an object to wrap the handler. typedef handler_wrapper value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); handler_ptr ptr(raw_ptr, handler); if (impl->current_handler_ == 0) { // This handler now has the lock, so can be dispatched immediately. impl->current_handler_ = ptr.get(); lock.unlock(); this->io_service().post(invoke_current_handler(*this, impl)); ptr.release(); } else { // Another handler already holds the lock, so this handler must join the // list of waiters. The handler will be posted automatically when its turn // comes. if (impl->last_waiter_) { impl->last_waiter_->next_ = ptr.get(); impl->last_waiter_ = impl->last_waiter_->next_; } else { impl->first_waiter_ = ptr.get(); impl->last_waiter_ = ptr.get(); } ptr.release(); } } private: // Mutex to protect access to the linked list of implementations. asio::detail::mutex mutex_; // The head of a linked list of all implementations. strand_impl* impl_list_; }; } // namespace detail } // namespace asio #if defined(__BORLANDC__) namespace boost { inline void intrusive_ptr_add_ref( asio::detail::strand_service::strand_impl* p) { p->add_ref(); } inline void intrusive_ptr_release( asio::detail::strand_service::strand_impl* p) { p->release(); } } // namespace boost #endif // defined(__BORLANDC__) #include "asio/detail/pop_options.hpp" #endif // ASIO_DETAIL_STRAND_SERVICE_HPP