diff --git a/libtorrent/src/disk_io_thread.cpp b/libtorrent/src/disk_io_thread.cpp new file mode 100644 index 000000000..6f97ce7df --- /dev/null +++ b/libtorrent/src/disk_io_thread.cpp @@ -0,0 +1,241 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/storage.hpp" +#include +#include "libtorrent/disk_io_thread.hpp" + +namespace libtorrent +{ + + disk_io_thread::disk_io_thread(int block_size) + : m_abort(false) + , m_queue_buffer_size(0) + , m_pool(block_size) + , m_disk_io_thread(boost::ref(*this)) + {} + + disk_io_thread::~disk_io_thread() + { + boost::mutex::scoped_lock l(m_mutex); + m_abort = true; + m_signal.notify_all(); + l.unlock(); + + m_disk_io_thread.join(); + } + + // aborts read operations + void disk_io_thread::stop(boost::intrusive_ptr s) + { + boost::mutex::scoped_lock l(m_mutex); + // read jobs are aborted, write and move jobs are syncronized + for (std::deque::iterator i = m_jobs.begin(); + i != m_jobs.end();) + { + if (i->storage != s) + { + ++i; + continue; + } + if (i->action == disk_io_job::read) + { + i->callback(-1, *i); + m_jobs.erase(i++); + continue; + } + ++i; + } + m_signal.notify_all(); + } + + bool range_overlap(int start1, int length1, int start2, int length2) + { + return (start1 <= start2 && start1 + length1 > start2) + || (start2 <= start1 && start2 + length2 > start1); + } + + namespace + { + bool operator<(disk_io_job const& lhs, disk_io_job const& rhs) + { + if (lhs.storage.get() < rhs.storage.get()) return true; + if (lhs.storage.get() > rhs.storage.get()) return false; + if (lhs.piece < rhs.piece) return true; + if (lhs.piece > rhs.piece) return false; + if (lhs.offset < rhs.offset) return true; +// if (lhs.offset > rhs.offset) return false; + return false; + } + } + + void disk_io_thread::add_job(disk_io_job const& j + , boost::function const& f) + { + assert(!j.callback); + boost::mutex::scoped_lock l(m_mutex); + + std::deque::reverse_iterator i = m_jobs.rbegin(); + if (j.action == disk_io_job::read) + { + // when we're reading, we may not skip + // ahead of any write operation that overlaps + // the region we're reading + for (; i != m_jobs.rend(); ++i) + { + if (i->action == disk_io_job::read && *i < j) + break; + if (i->action == disk_io_job::write + && i->storage == j.storage + && i->piece == j.piece + && range_overlap(i->offset, i->buffer_size + , j.offset, j.buffer_size)) + { + // we have to stop, and we haven't + // found a suitable place for this job + // so just queue it up at the end + i = m_jobs.rbegin(); + break; + } + } + } + else if (j.action == disk_io_job::write) + { + for (; i != m_jobs.rend(); ++i) + { + if (i->action == disk_io_job::write && *i < j) + { + if (i != m_jobs.rbegin() + && i.base()->storage.get() != j.storage.get()) + i = m_jobs.rbegin(); + break; + } + } + } + + if (i == m_jobs.rend()) i = m_jobs.rbegin(); + + std::deque::iterator k = m_jobs.insert(i.base(), j); + k->callback.swap(const_cast&>(f)); + if (j.action == disk_io_job::write) + m_queue_buffer_size += j.buffer_size; + assert(j.storage.get()); + m_signal.notify_all(); + } + + char* disk_io_thread::allocate_buffer() + { + boost::mutex::scoped_lock l(m_mutex); + return (char*)m_pool.ordered_malloc(); + } + + void disk_io_thread::operator()() + { + for (;;) + { + boost::mutex::scoped_lock l(m_mutex); + while (m_jobs.empty() && !m_abort) + m_signal.wait(l); + if (m_abort && m_jobs.empty()) return; + + boost::function handler; + handler.swap(m_jobs.front().callback); + disk_io_job j = m_jobs.front(); + m_jobs.pop_front(); + m_queue_buffer_size -= j.buffer_size; + l.unlock(); + + int ret = 0; + + try + { +// std::cerr << "DISK THREAD: executing job: " << j.action << std::endl; + switch (j.action) + { + case disk_io_job::read: + l.lock(); + j.buffer = (char*)m_pool.ordered_malloc(); + l.unlock(); + if (j.buffer == 0) + { + ret = -1; + j.str = "out of memory"; + } + else + { + ret = j.storage->read_impl(j.buffer, j.piece, j.offset + , j.buffer_size); + + // simulates slow drives + // usleep(300); + } + break; + case disk_io_job::write: + assert(j.buffer); + j.storage->write_impl(j.buffer, j.piece, j.offset + , j.buffer_size); + + // simulates a slow drive + // usleep(300); + break; + case disk_io_job::hash: + { + sha1_hash h = j.storage->hash_for_piece_impl(j.piece); + j.str.resize(20); + std::memcpy(&j.str[0], &h[0], 20); + } + break; + case disk_io_job::move_storage: + ret = j.storage->move_storage_impl(j.str) ? 1 : 0; + break; + case disk_io_job::release_files: + j.storage->release_files_impl(); + break; + } + } + catch (std::exception& e) + { +// std::cerr << "DISK THREAD: exception: " << e.what() << std::endl; + j.str = e.what(); + ret = -1; + } + +// if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl; +// else std::cerr << "DISK THREAD: invoking callback" << std::endl; + try { if (handler) handler(ret, j); } + catch (std::exception&) {} + + if (j.buffer) m_pool.ordered_free(j.buffer); + } + } +} + diff --git a/libtorrent/src/socks4_stream.cpp b/libtorrent/src/socks4_stream.cpp new file mode 100644 index 000000000..3a31b2375 --- /dev/null +++ b/libtorrent/src/socks4_stream.cpp @@ -0,0 +1,147 @@ +/* + +Copyright (c) 2007, Arvid Norberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +*/ + +#include "libtorrent/pch.hpp" + +#include "libtorrent/socks4_stream.hpp" + +namespace libtorrent +{ + + void socks4_stream::name_lookup(asio::error_code const& e, tcp::resolver::iterator i + , boost::shared_ptr h) + { + if (e) + { + (*h)(e); + close(); + return; + } + + // SOCKS4 doesn't support IPv6 addresses + while (i != tcp::resolver::iterator() && i->endpoint().address().is_v6()) + ++i; + + if (i == tcp::resolver::iterator()) + { + asio::error_code ec = asio::error::operation_not_supported; + (*h)(e); + close(); + return; + } + + m_sock.async_connect(i->endpoint(), boost::bind( + &socks4_stream::connected, this, _1, h)); + } + + void socks4_stream::connected(asio::error_code const& e, boost::shared_ptr h) + { + if (e) + { + (*h)(e); + close(); + return; + } + + using namespace libtorrent::detail; + + m_buffer.resize(m_user.size() + 9); + char* p = &m_buffer[0]; + write_uint8(4, p); // SOCKS VERSION 4 + write_uint8(1, p); // SOCKS CONNECT + write_uint16(m_remote_endpoint.port(), p); + write_uint32(m_remote_endpoint.address().to_v4().to_ulong(), p); + std::copy(m_user.begin(), m_user.end(), p); + p += m_user.size(); + write_uint8(0, p); // NULL terminator + + asio::async_write(m_sock, asio::buffer(m_buffer) + , boost::bind(&socks4_stream::handshake1, this, _1, h)); + } + + void socks4_stream::handshake1(asio::error_code const& e, boost::shared_ptr h) + { + if (e) + { + (*h)(e); + close(); + return; + } + + m_buffer.resize(8); + asio::async_read(m_sock, asio::buffer(m_buffer) + , boost::bind(&socks4_stream::handshake2, this, _1, h)); + } + + void socks4_stream::handshake2(asio::error_code const& e, boost::shared_ptr h) + { + if (e) + { + (*h)(e); + close(); + return; + } + + using namespace libtorrent::detail; + + char* p = &m_buffer[0]; + int reply_version = read_uint8(p); + int status_code = read_uint8(p); + + if (reply_version != 0) + { + (*h)(asio::error::operation_not_supported); + close(); + return; + } + + // access granted + if (status_code == 90) + { + std::vector().swap(m_buffer); + (*h)(e); + return; + } + + asio::error_code ec = asio::error::fault; + switch (status_code) + { + case 91: ec = asio::error::connection_refused; break; + case 92: ec = asio::error::no_permission; break; + case 93: ec = asio::error::no_permission; break; + } + (*h)(ec); + close(); + } + +} +