Added a protocol for the network traffic between client and daemon.

Implemented a protocol layer above twisted.internet.protocol.Protocol
which guarantees correct transfer of RPC messages. The network messages
are transfered with a header containing the length of the message.
This commit is contained in:
bendikro 2012-06-18 20:28:32 +02:00
parent 5dc6dbf216
commit 8e7432e71c
4 changed files with 639 additions and 148 deletions

View File

@ -60,6 +60,8 @@ from deluge.core.authmanager import (AUTH_LEVEL_NONE, AUTH_LEVEL_DEFAULT,
from deluge.error import (DelugeError, NotAuthorizedError, WrappedException, from deluge.error import (DelugeError, NotAuthorizedError, WrappedException,
_ClientSideRecreateError, IncompatibleClient) _ClientSideRecreateError, IncompatibleClient)
from deluge.transfer import DelugeTransferProtocol
RPC_RESPONSE = 1 RPC_RESPONSE = 1
RPC_ERROR = 2 RPC_ERROR = 2
RPC_EVENT = 3 RPC_EVENT = 3
@ -134,54 +136,34 @@ class ServerContextFactory(object):
ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey")) ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey"))
return ctx return ctx
class DelugeRPCProtocol(Protocol): class DelugeRPCProtocol(DelugeTransferProtocol):
__buffer = None
def dataReceived(self, data): def message_received(self, request):
""" """
This method is called whenever data is received from a client. The This method is called whenever a message is received from a client. The
only message that a client sends to the server is a RPC Request message. only message that a client sends to the server is a RPC Request message.
If the RPC Request message is valid, then the method is called in If the RPC Request message is valid, then the method is called in
:meth:`dispatch`. :meth:`dispatch`.
:param data: the data from the client. It should be a zlib compressed :param request: the request from the client.
rencoded string. :type data: tuple
:type data: str
""" """
if self.__buffer: if type(request) is not tuple:
# We have some data from the last dataReceived() so lets prepend it log.debug("Received invalid message: type is not tuple")
data = self.__buffer + data return
self.__buffer = None
while data: if len(request) < 1:
dobj = zlib.decompressobj() log.debug("Received invalid message: there are no items")
try: return
request = rencode.loads(dobj.decompress(data))
except Exception, e:
#log.debug("Received possible invalid message (%r): %s", data, e)
# This could be cut-off data, so we'll save this in the buffer
# and try to prepend it on the next dataReceived()
self.__buffer = data
return
else:
data = dobj.unused_data
if type(request) is not tuple: for call in request:
log.debug("Received invalid message: type is not tuple") if len(call) != 4:
return log.debug("Received invalid rpc request: number of items "
"in request is %s", len(call))
if len(request) < 1: continue
log.debug("Received invalid message: there are no items") #log.debug("RPCRequest: %s", format_request(call))
return reactor.callLater(0, self.dispatch, *call)
for call in request:
if len(call) != 4:
log.debug("Received invalid rpc request: number of items "
"in request is %s", len(call))
continue
#log.debug("RPCRequest: %s", format_request(call))
reactor.callLater(0, self.dispatch, *call)
def sendData(self, data): def sendData(self, data):
""" """
@ -192,7 +174,7 @@ class DelugeRPCProtocol(Protocol):
:type data: object :type data: object
""" """
self.transport.write(zlib.compress(rencode.dumps(data))) self.transfer_message(data)
def connectionMade(self): def connectionMade(self):
""" """

170
deluge/transfer.py Normal file
View File

@ -0,0 +1,170 @@
# -*- coding: utf-8 -*-
#
# transfer.py
#
# Copyright (C) 2012 Bro <bro.development@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
#
try:
import rencode
except ImportError:
import deluge.rencode as rencode
import zlib
import struct
from twisted.internet.protocol import Protocol
from deluge.log import LOG as log
MESSAGE_HEADER_SIZE = 5
class DelugeTransferProtocol(Protocol):
"""
Data messages are transfered using very a simple protocol.
Data messages are transfered with a header containing
the length of the data to be transfered (payload).
"""
def __init__(self):
self._buffer = ""
self._message_length = 0
self._bytes_received = 0
self._bytes_sent = 0
def transfer_message(self, data):
"""
Transfer the data.
The data will be serialized and compressed before being sent.
First a header is sent - containing the length of the compressed payload
to come as a signed integer. After the header, the payload is transfered.
:param data: data to be transfered in a data structure serializable by rencode.
"""
compressed = zlib.compress(rencode.dumps(data))
size_data = len(compressed)
# Store length as a signed integer (using 4 bytes). "!" denotes network byte order.
payload_len = struct.pack("!i", size_data)
header = "D" + payload_len
self._bytes_sent += len(header) + len(compressed)
self.transport.write(header)
self.transport.write(compressed)
def dataReceived(self, data):
"""
This method is called whenever data is received.
:param data: a message as transfered by transfer_message, or a part of such
a messsage.
Global variables:
_buffer - contains the data received
_message_length - the length of the payload of the current message.
"""
self._buffer += data
self._bytes_received += len(data)
while len(self._buffer) >= MESSAGE_HEADER_SIZE:
if self._message_length == 0:
self._handle_new_message()
# We have a complete packet
if len(self._buffer) >= self._message_length:
self._handle_complete_message(self._buffer[:self._message_length])
# Remove message data from buffer
self._buffer = self._buffer[self._message_length:]
self._message_length = 0
else:
break
def _handle_new_message(self):
"""
Handle the start of a new message. This method is called only when the
beginning of the buffer contains data from a new message (i.e. the header).
"""
try:
# Read the first bytes of the message (MESSAGE_HEADER_SIZE bytes)
header = self._buffer[:MESSAGE_HEADER_SIZE]
payload_len = header[1:MESSAGE_HEADER_SIZE]
if header[0] != 'D':
raise Exception("Invalid header format. First byte is %d" % ord(header[0]))
# Extract the length stored as a signed integer (using 4 bytes)
self._message_length = struct.unpack("!i", payload_len)[0]
if self._message_length < 0:
raise Exception("Message length is negative: %d" % self._message_length)
# Remove the header from the buffer
self._buffer = self._buffer[MESSAGE_HEADER_SIZE:]
except Exception, e:
log.warn("Error occured when parsing message header: %s." % str(e))
log.warn("This version of Deluge cannot communicate with the sender of this data.")
self._message_length = 0
self._buffer = ""
def _handle_complete_message(self, data):
"""
Handles a complete message as it is transfered on the network.
:param data: a zlib compressed string encoded with rencode.
"""
try:
self.message_received(rencode.loads(zlib.decompress(data)))
except Exception, e:
log.warn("Failed to decompress (%d bytes) and load serialized data "\
"with rencode: %s" % (len(data), str(e)))
def get_bytes_recv(self):
"""
Returns the number of bytes received.
:returns: the number of bytes received
:rtype: int
"""
return self._bytes_received
def get_bytes_sent(self):
"""
Returns the number of bytes sent.
:returns: the number of bytes sent
:rtype: int
"""
return self._bytes_sent
def message_received(self, message):
"""Override this method to receive the complete message"""
pass

View File

@ -35,18 +35,13 @@
# #
import logging import logging
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor, ssl, defer from twisted.internet import reactor, ssl, defer
try:
import rencode
except ImportError:
import deluge.rencode as rencode
import zlib
import deluge.common import deluge.common
from deluge import error from deluge import error
from deluge.event import known_events from deluge.event import known_events
from deluge.transfer import DelugeTransferProtocol
if deluge.common.windows_check(): if deluge.common.windows_check():
import win32api import win32api
@ -104,11 +99,11 @@ class DelugeRPCRequest(object):
return (self.request_id, self.method, self.args, self.kwargs) return (self.request_id, self.method, self.args, self.kwargs)
class DelugeRPCProtocol(Protocol):
class DelugeRPCProtocol(DelugeTransferProtocol):
def connectionMade(self): def connectionMade(self):
self.__rpc_requests = {} self.__rpc_requests = {}
self.__buffer = None
# Set the protocol in the daemon so it can send data # Set the protocol in the daemon so it can send data
self.factory.daemon.protocol = self self.factory.daemon.protocol = self
# Get the address of the daemon that we've connected to # Get the address of the daemon that we've connected to
@ -116,102 +111,80 @@ class DelugeRPCProtocol(Protocol):
self.factory.daemon.host = peer.host self.factory.daemon.host = peer.host
self.factory.daemon.port = peer.port self.factory.daemon.port = peer.port
self.factory.daemon.connected = True self.factory.daemon.connected = True
log.info("Connected to daemon at %s:%s..", peer.host, peer.port) log.info("Connected to daemon at %s:%s..", peer.host, peer.port)
self.factory.daemon.connect_deferred.callback((peer.host, peer.port)) self.factory.daemon.connect_deferred.callback((peer.host, peer.port))
def dataReceived(self, data): def message_received(self, request):
""" """
This method is called whenever we receive data from the daemon. This method is called whenever we receive a message from the daemon.
:param request: a tuple that should be either a RPCResponse, RCPError or RPCSignal
:param data: a zlib compressed and rencoded string that should be either
a RPCResponse, RCPError or RPCSignal
""" """
if self.__buffer: if type(request) is not tuple:
# We have some data from the last dataReceived() so lets prepend it log.debug("Received invalid message: type is not tuple")
data = self.__buffer + data return
self.__buffer = None if len(request) < 3:
log.debug("Received invalid message: number of items in "
"response is %s", len(request))
return
while data: message_type = request[0]
# Increase the byte counter
self.factory.bytes_recv += len(data)
dobj = zlib.decompressobj() if message_type == RPC_EVENT:
try: event = request[1]
request = rencode.loads(dobj.decompress(data)) #log.debug("Received RPCEvent: %s", event)
except Exception, e: # A RPCEvent was received from the daemon so run any handlers
#log.debug("Received possible invalid message (%r): %s", data, e) # associated with it.
# This could be cut-off data, so we'll save this in the buffer if event in self.factory.event_handlers:
# and try to prepend it on the next dataReceived() for handler in self.factory.event_handlers[event]:
self.__buffer = data reactor.callLater(0, handler, *request[2])
return return
request_id = request[1]
# We get the Deferred object for this request_id to either run the
# callbacks or the errbacks dependent on the response from the daemon.
d = self.factory.daemon.pop_deferred(request_id)
if message_type == RPC_RESPONSE:
# Run the callbacks registered with this Deferred object
d.callback(request[2])
elif message_type == RPC_ERROR:
# Recreate exception and errback'it
exception_cls = getattr(error, request[2])
exception = exception_cls(*request[3], **request[4])
# Ideally we would chain the deferreds instead of instance
# checking just to log them. But, that would mean that any
# errback on the fist deferred should returns it's failure
# so it could pass back to the 2nd deferred on the chain. But,
# that does not always happen.
# So, just do some instance checking and just log rpc error at
# diferent levels.
r = self.__rpc_requests[request_id]
msg = "RPCError Message Received!"
msg += "\n" + "-" * 80
msg += "\n" + "RPCRequest: " + r.__repr__()
msg += "\n" + "-" * 80
if isinstance(exception, error.WrappedException):
msg += "\n" + exception.type + "\n" + exception.message + ": "
msg += exception.traceback
else: else:
data = dobj.unused_data msg += "\n" + request[5] + "\n" + request[2] + ": "
msg += str(exception)
msg += "\n" + "-" * 80
if type(request) is not tuple: if not isinstance(exception, error._ClientSideRecreateError):
log.debug("Received invalid message: type is not tuple") # Let's log these as errors
return log.error(msg)
if len(request) < 3: else:
log.debug("Received invalid message: number of items in " # The rest just get's logged in debug level, just to log
"response is %s", len(3)) # what's happening
return log.debug(msg)
message_type = request[0] d.errback(exception)
del self.__rpc_requests[request_id]
if message_type == RPC_EVENT:
event = request[1]
#log.debug("Received RPCEvent: %s", event)
# A RPCEvent was received from the daemon so run any handlers
# associated with it.
if event in self.factory.event_handlers:
for handler in self.factory.event_handlers[event]:
reactor.callLater(0, handler, *request[2])
continue
request_id = request[1]
# We get the Deferred object for this request_id to either run the
# callbacks or the errbacks dependent on the response from the daemon.
d = self.factory.daemon.pop_deferred(request_id)
if message_type == RPC_RESPONSE:
# Run the callbacks registered with this Deferred object
d.callback(request[2])
elif message_type == RPC_ERROR:
# Recreate exception and errback'it
exception_cls = getattr(error, request[2])
exception = exception_cls(*request[3], **request[4])
# Ideally we would chain the deferreds instead of instance
# checking just to log them. But, that would mean that any
# errback on the fist deferred should returns it's failure
# so it could pass back to the 2nd deferred on the chain. But,
# that does not always happen.
# So, just do some instance checking and just log rpc error at
# diferent levels.
r = self.__rpc_requests[request_id]
msg = "RPCError Message Received!"
msg += "\n" + "-" * 80
msg += "\n" + "RPCRequest: " + r.__repr__()
msg += "\n" + "-" * 80
if isinstance(exception, error.WrappedException):
msg += "\n" + exception.type + "\n" + exception.message + ": "
msg += exception.traceback
else:
msg += "\n" + request[5] + "\n" + request[2] + ": "
msg += str(exception)
msg += "\n" + "-" * 80
if not isinstance(exception, error._ClientSideRecreateError):
# Let's log these as errors
log.error(msg)
else:
# The rest just get's logged in debug level, just to log
# what's happening
log.debug(msg)
d.errback(exception)
del self.__rpc_requests[request_id]
def send_request(self, request): def send_request(self, request):
""" """
@ -220,15 +193,16 @@ class DelugeRPCProtocol(Protocol):
:param request: RPCRequest :param request: RPCRequest
""" """
# Store the DelugeRPCRequest object just in case a RPCError is sent in try:
# response to this request. We use the extra information when printing # Store the DelugeRPCRequest object just in case a RPCError is sent in
# out the error for debugging purposes. # response to this request. We use the extra information when printing
self.__rpc_requests[request.request_id] = request # out the error for debugging purposes.
#log.debug("Sending RPCRequest %s: %s", request.request_id, request) self.__rpc_requests[request.request_id] = request
# Send the request in a tuple because multiple requests can be sent at once #log.debug("Sending RPCRequest %s: %s", request.request_id, request)
data = zlib.compress(rencode.dumps((request.format_message(),))) # Send the request in a tuple because multiple requests can be sent at once
self.factory.bytes_sent += len(data) self.transfer_message((request.format_message(),))
self.transport.write(data) except Exception, e:
log.warn("Error occured when sending message:" + str(e))
class DelugeRPCClientFactory(ClientFactory): class DelugeRPCClientFactory(ClientFactory):
protocol = DelugeRPCProtocol protocol = DelugeRPCProtocol
@ -237,9 +211,6 @@ class DelugeRPCClientFactory(ClientFactory):
self.daemon = daemon self.daemon = daemon
self.event_handlers = event_handlers self.event_handlers = event_handlers
self.bytes_recv = 0
self.bytes_sent = 0
def startedConnecting(self, connector): def startedConnecting(self, connector):
log.info("Connecting to daemon at \"%s:%s\"...", log.info("Connecting to daemon at \"%s:%s\"...",
connector.host, connector.port) connector.host, connector.port)
@ -462,10 +433,10 @@ class DaemonSSLProxy(DaemonProxy):
self.disconnect_callback = cb self.disconnect_callback = cb
def get_bytes_recv(self): def get_bytes_recv(self):
return self.__factory.bytes_recv return self.protocol.get_bytes_recv()
def get_bytes_sent(self): def get_bytes_sent(self):
return self.__factory.bytes_sent return self.protocol.get_bytes_sent()
class DaemonClassicProxy(DaemonProxy): class DaemonClassicProxy(DaemonProxy):
def __init__(self, event_handlers=None): def __init__(self, event_handlers=None):

368
tests/test_transfer.py Normal file
View File

@ -0,0 +1,368 @@
# -*- coding: utf-8 -*-
#
# test_transfer.py
#
# Copyright (C) 2012 Bro <bro.development@gmail.com>
#
# Deluge is free software.
#
# You may redistribute it and/or modify it under the terms of the
# GNU General Public License, as published by the Free Software
# Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# deluge is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with deluge. If not, write to:
# The Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor
# Boston, MA 02110-1301, USA.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the OpenSSL
# library.
# You must obey the GNU General Public License in all respects for all of
# the code used other than OpenSSL. If you modify file(s) with this
# exception, you may extend this exception to your version of the file(s),
# but you are not obligated to do so. If you do not wish to do so, delete
# this exception statement from your version. If you delete this exception
# statement from all source files in the program, then also delete it here.
#
from twisted.trial import unittest
from deluge.transfer import DelugeTransferProtocol
import base64
import deluge.rencode as rencode
class TransferTestClass(DelugeTransferProtocol):
def __init__(self):
DelugeTransferProtocol.__init__(self)
self.transport = self
self.messages_out = []
self.messages_in = []
self.packet_count = 0
def write(self, message):
"""
Called by DelugeTransferProtocol class
This simulates the write method of the self.transport in DelugeTransferProtocol.
"""
self.messages_out.append(message)
def message_received(self, message):
"""
This method overrides message_received is DelugeTransferProtocol and is
called with the complete message as it was sent by DelugeRPCProtocol
"""
self.messages_in.append(message)
def get_messages_out_joined(self):
return b"".join(self.messages_out)
def get_messages_in(self):
return self.messages_in
def dataReceived_old_protocol(self, data):
"""
This is the original method logic (as close as possible) for handling data receival on the client
:param data: a zlib compressed string encoded with rencode.
"""
from datetime import timedelta
import zlib
print "\n=== New Data Received ===\nBytes received:", len(data)
if self._buffer:
# We have some data from the last dataReceived() so lets prepend it
print "Current buffer:", len(self._buffer) if self._buffer else "0"
data = self._buffer + data
self._buffer = None
self.packet_count += 1
self._bytes_received += len(data)
while data:
print "\n-- Handle packet data --"
print "Bytes received:", self._bytes_received
print "Current data:", len(data)
if self._message_length == 0:
# handle_new_message uses _buffer so set data to _buffer.
self._buffer = data
self._handle_new_message()
data = self._buffer
self._buffer = None
self.packet_count = 1
print "New message of length:", self._message_length
dobj = zlib.decompressobj()
try:
request = rencode.loads(dobj.decompress(data))
print "Successfully loaded message",
print " - Buffer length: %d, data length: %d, unused length: %d" % (len(data), \
len(data) - len(dobj.unused_data), len(dobj.unused_data))
print "Packet count:", self.packet_count
except Exception, e:
#log.debug("Received possible invalid message (%r): %s", data, e)
# This could be cut-off data, so we'll save this in the buffer
# and try to prepend it on the next dataReceived()
self._buffer = data
print "Failed to load buffer (size %d): %s" % (len(self._buffer), str(e))
return
else:
data = dobj.unused_data
self._message_length = 0
self.message_received(request)
class DelugeTransferProtocolTestCase(unittest.TestCase):
def setUp(self):
"""
The expected messages corresponds to the test messages (msg1, msg2) after they've been processed
by DelugeTransferProtocol.send, which means that they've first been encoded with pickle,
and then compressed with zlib.
The expected messages are encoded in base64 to easily including it here in the source.
So before comparing the results with the expected messages, the expected messages must be decoded,
or the result message be encoded in base64.
"""
self.transfer = TransferTestClass()
self.msg1 = (0, 1, {"key_int": 1242429423}, {"key_str": "some string"}, {"key_bool": True})
self.msg2 = (2, 3, {"key_float": 12424.29423},
{"key_unicode": u"some string"},
{"key_dict_with_tuple": {"key_tuple": (1, 2, 3)}},
{"keylist": [4, "5", 6.7]})
self.msg1_expected_compressed_base64 = "RAAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU"\
"XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ"
self.msg2_expected_compressed_base64 = "RAAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3i"\
"l+ZlJuenpHYX5+emKhSXFGXmpadPBkmkZCaXxJdn"\
"lmTEl5QW5KRCdIOZhxmBhrUDuTmZxSWHWRpNnRyu"\
"paUBAHYlJxI="
def test_send_one_message(self):
"""
Send one message and test that it has been sent correctoly to the
method 'write' in self.transport.
"""
self.transfer.transfer_message(self.msg1)
# Get the data as sent by DelugeTransferProtocol
messages = self.transfer.get_messages_out_joined()
base64_encoded = base64.b64encode(messages)
self.assertEquals(base64_encoded, self.msg1_expected_compressed_base64)
def test_receive_one_message(self):
"""
Receive one message and test that it has been sent to the
method 'message_received'.
"""
self.transfer.dataReceived(base64.b64decode(self.msg1_expected_compressed_base64))
# Get the data as sent by DelugeTransferProtocol
messages = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(messages))
def test_receive_old_message(self):
"""
Receive an old message (with no header) and verify that the data is discarded.
"""
self.transfer.dataReceived(rencode.dumps(self.msg1))
self.assertEquals(len(self.transfer.get_messages_in()), 0)
self.assertEquals(self.transfer._message_length, 0)
self.assertEquals(len(self.transfer._buffer), 0)
def test_receive_two_concatenated_messages(self):
"""
This test simply concatenates two messsages (as they're sent over the network),
and lets DelugeTransferProtocol receive the data as one string.
"""
two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64)
self.transfer.dataReceived(two_concatenated)
# Get the data as sent by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
message2 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
def test_receive_three_messages_in_parts(self):
"""
This test concatenates three messsages (as they're sent over the network),
and lets DelugeTransferProtocol receive the data in multiple parts.
"""
msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \
base64.b64decode(self.msg2_expected_compressed_base64) + \
base64.b64decode(self.msg1_expected_compressed_base64)
packet_size = 40
one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64))
three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64))
for d in self.receive_parts_helper(msg_bytes, packet_size):
bytes_received = self.transfer.get_bytes_recv()
if bytes_received >= three_messages_byte_count:
expected_msgs_received_count = 3
elif bytes_received >= two_messages_byte_count:
expected_msgs_received_count = 2
elif bytes_received >= one_message_byte_count:
expected_msgs_received_count = 1
else:
expected_msgs_received_count = 0
# Verify that the expected number of complete messages has arrived
self.assertEquals(expected_msgs_received_count, len(self.transfer.get_messages_in()))
# Get the data as received by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
message2 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
message3 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
# Remove underscore to enable test, or run the test directly:
# tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol
def _test_rencode_fail_protocol(self):
"""
This test tries to test the protocol that relies on errors from rencode.
"""
msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \
base64.b64decode(self.msg2_expected_compressed_base64) + \
base64.b64decode(self.msg1_expected_compressed_base64)
packet_size = 149
one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64))
three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64))
print
print "Msg1 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4
print "Msg2 size:", len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4
print "Msg3 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4
print "one_message_byte_count:", one_message_byte_count
print "two_messages_byte_count:", two_messages_byte_count
print "three_messages_byte_count:", three_messages_byte_count
for d in self.receive_parts_helper(msg_bytes, packet_size, self.transfer.dataReceived_old_protocol):
bytes_received = self.transfer.get_bytes_recv()
if bytes_received >= three_messages_byte_count:
expected_msgs_received_count = 3
elif bytes_received >= two_messages_byte_count:
expected_msgs_received_count = 2
elif bytes_received >= one_message_byte_count:
expected_msgs_received_count = 1
else:
expected_msgs_received_count = 0
# Verify that the expected number of complete messages has arrived
if expected_msgs_received_count != len(self.transfer.get_messages_in()):
print "Expected number of messages received is %d, but %d have been received."\
% (expected_msgs_received_count, len(self.transfer.get_messages_in()))
# Get the data as received by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
message2 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
message3 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
def test_receive_middle_of_header(self):
"""
This test concatenates two messsages (as they're sent over the network),
and lets DelugeTransferProtocol receive the data in two parts.
The first part contains the first message, plus two bytes of the next message.
The next part contains the rest of the message.
This is a special case, as DelugeTransferProtocol can't start parsing
a message until it has at least 4 bytes (the size of the header) to be able
to read and parse the size of the payload.
"""
two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64)
first_len = len(base64.b64decode(self.msg1_expected_compressed_base64))
# Now found the entire first message, and half the header of the next message (2 bytes into the header)
self.transfer.dataReceived(two_concatenated[:first_len+2])
# Should be 1 message in the list
self.assertEquals(1, len(self.transfer.get_messages_in()))
# Send the rest
self.transfer.dataReceived(two_concatenated[first_len+2:])
# Should be 2 messages in the list
self.assertEquals(2, len(self.transfer.get_messages_in()))
# Get the data as sent by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
message2 = self.transfer.get_messages_in().pop(0)
self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
# Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
#def test_simulate_big_transfer(self):
# filename = "../deluge.torrentlist"
#
# f = open(filename, "r")
# data = f.read()
# message_to_send = eval(data)
# self.transfer.transfer_message(message_to_send)
#
# # Get the data as sent to the network by DelugeTransferProtocol
# compressed_data = self.transfer.get_messages_out_joined()
# packet_size = 16000 # Or something smaller...
#
# for d in self.receive_parts_helper(compressed_data, packet_size):
# bytes_recv = self.transfer.get_bytes_recv()
# if bytes_recv < len(compressed_data):
# self.assertEquals(len(self.transfer.get_messages_in()), 0)
# else:
# self.assertEquals(len(self.transfer.get_messages_in()), 1)
# # Get the data as received by DelugeTransferProtocol
# transfered_message = self.transfer.get_messages_in().pop(0)
# # Test that the data structures are equal
# #self.assertEquals(transfered_message, message_to_send)
# #self.assertTrue(transfered_message == message_to_send)
#
# #f.close()
# #f = open("rencode.torrentlist", "w")
# #f.write(str(transfered_message))
# #f.close()
def receive_parts_helper(self, data, packet_size, receive_func=None):
byte_count = len(data)
sent_bytes = 0
while byte_count > 0:
to_receive = packet_size if byte_count > packet_size else byte_count
sent_bytes += to_receive
byte_count -= to_receive
if receive_func:
receive_func(data[:to_receive])
else:
self.transfer.dataReceived(data[:to_receive])
data = data[to_receive:]
yield