Send based on send_time
This commit is contained in:
parent
70edbfc4b6
commit
33facb5d83
|
@ -22,10 +22,12 @@ GROUP_ID = "0xdeadbeef"
|
||||||
# TODO: Sync state: Store bounded cache list of messages ids
|
# TODO: Sync state: Store bounded cache list of messages ids
|
||||||
# offered by peer and not ack/req by device
|
# offered by peer and not ack/req by device
|
||||||
|
|
||||||
|
# XXX: Add debug log level
|
||||||
def log(message):
|
def log(message):
|
||||||
print message
|
print message
|
||||||
|
|
||||||
# NOTE: Inspired by github.com/ethereum/research networksim.py.
|
# NOTE: Inspired by github.com/ethereum/research networksim.py.
|
||||||
|
# XXX: Break this out into separate namespace
|
||||||
class NetworkSimulator():
|
class NetworkSimulator():
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.nodes = []
|
self.nodes = []
|
||||||
|
@ -39,14 +41,16 @@ class NetworkSimulator():
|
||||||
for sender, receiver, msg in self.queue[self.time]:
|
for sender, receiver, msg in self.queue[self.time]:
|
||||||
# NOTE: Assumes 100% reliability
|
# NOTE: Assumes 100% reliability
|
||||||
receiver.on_receive(sender, msg)
|
receiver.on_receive(sender, msg)
|
||||||
# Discrete time model
|
print ""
|
||||||
print "tick", self.time
|
print "tick", self.time + 1
|
||||||
|
print "-----------"
|
||||||
for n in self.nodes:
|
for n in self.nodes:
|
||||||
n.tick()
|
n.tick()
|
||||||
self.time += 1
|
self.time += 1
|
||||||
|
|
||||||
# NOTE: Direct message, no broadcast etc
|
# NOTE: Direct message, no broadcast etc
|
||||||
def send_message(self, sender_id, receiver_id, message):
|
def send_message(self, sender_id, receiver_id, message):
|
||||||
|
#print "*** (network) send_message", sender_id, receiver_id
|
||||||
# XXX: Assuming sender exists
|
# XXX: Assuming sender exists
|
||||||
sender = self.peers[sender_id]
|
sender = self.peers[sender_id]
|
||||||
receiver = self.peers[receiver_id]
|
receiver = self.peers[receiver_id]
|
||||||
|
@ -59,6 +63,7 @@ class Node():
|
||||||
def __init__(self, name, network):
|
def __init__(self, name, network):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.log = []
|
self.log = []
|
||||||
|
self.messages = {}
|
||||||
self.sync_state = {}
|
self.sync_state = {}
|
||||||
self.peers = {}
|
self.peers = {}
|
||||||
self.network = network
|
self.network = network
|
||||||
|
@ -73,6 +78,18 @@ class Node():
|
||||||
# TODO: Send message if reached send time
|
# TODO: Send message if reached send time
|
||||||
self.time += 1
|
self.time += 1
|
||||||
|
|
||||||
|
# Depending on sync mode, do appropriate actions
|
||||||
|
self.send_messages()
|
||||||
|
|
||||||
|
def send_messages(self):
|
||||||
|
for message_id, x in self.sync_state.items():
|
||||||
|
for peer, flags in x.items():
|
||||||
|
if (peer in self.sharing[self.group_id] and
|
||||||
|
flags['hold_flag'] == 0 and
|
||||||
|
flags['send_time'] == self.time):
|
||||||
|
msg = self.messages[message_id]
|
||||||
|
self.send_message(peer, msg)
|
||||||
|
|
||||||
# XXX: Why would node know about peer and not just name?
|
# XXX: Why would node know about peer and not just name?
|
||||||
def addPeer(self, peer_id, peer):
|
def addPeer(self, peer_id, peer):
|
||||||
self.peers[peer_id] = peer
|
self.peers[peer_id] = peer
|
||||||
|
@ -81,9 +98,12 @@ class Node():
|
||||||
self.sharing[self.group_id].add(peer_id)
|
self.sharing[self.group_id].add(peer_id)
|
||||||
|
|
||||||
def append_message(self, message):
|
def append_message(self, message):
|
||||||
|
#print "*** append_message", self.name
|
||||||
message_id = get_message_id(message)
|
message_id = get_message_id(message)
|
||||||
self.log.append({"id": message_id,
|
self.log.append({"id": message_id,
|
||||||
"message": message})
|
"message": message})
|
||||||
|
# XXX: Ugly but easier access while keeping log order
|
||||||
|
self.messages[message_id] = message
|
||||||
self.sync_state[message_id] = {}
|
self.sync_state[message_id] = {}
|
||||||
# XXX: For each peer
|
# XXX: For each peer
|
||||||
# Ensure added for each peer
|
# Ensure added for each peer
|
||||||
|
@ -96,22 +116,21 @@ class Node():
|
||||||
"ack_flag": 0,
|
"ack_flag": 0,
|
||||||
"request_flag": 0,
|
"request_flag": 0,
|
||||||
"send_count": 0,
|
"send_count": 0,
|
||||||
"send_time": 0
|
"send_time": self.time + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
def send_message(self, peer_id, message):
|
def send_message(self, peer_id, message):
|
||||||
message_id = get_message_id(message)
|
message_id = get_message_id(message)
|
||||||
peer = self.peers[peer_id]
|
peer = self.peers[peer_id]
|
||||||
# XXX: Use tick clock for this
|
self.sync_state[message_id][peer_id]["send_count"] += 1
|
||||||
self.sync_state[message_id][peer_id]["send_count"] = 1
|
self.sync_state[message_id][peer_id]["send_time"] += 2
|
||||||
self.sync_state[message_id][peer_id]["send_time"] = 1
|
|
||||||
|
|
||||||
log('MESSAGE ({} -> {}): {}'.format(self.name, peer.name, message_id))
|
log('MESSAGE ({} -> {}): {}'.format(self.name, peer.name, message_id))
|
||||||
|
|
||||||
# XXX: Can introduce latency here
|
# XXX: Can introduce latency here
|
||||||
self.network.send_message(self.name, peer_id, message)
|
self.network.send_message(self.name, peer_id, message)
|
||||||
|
|
||||||
def on_receive(self, sender, message):
|
def on_receive(self, sender, message):
|
||||||
|
#print "*** {} received message from {}".format(self.name, sender.name)
|
||||||
if (message.header.type == 1):
|
if (message.header.type == 1):
|
||||||
self.on_receive_message(sender, message)
|
self.on_receive_message(sender, message)
|
||||||
elif (message.header.type == 0):
|
elif (message.header.type == 0):
|
||||||
|
@ -132,7 +151,6 @@ class Node():
|
||||||
"send_time": 0
|
"send_time": 0
|
||||||
}
|
}
|
||||||
|
|
||||||
# XXX How is this sent?
|
|
||||||
ack_rec = new_ack_record(message_id)
|
ack_rec = new_ack_record(message_id)
|
||||||
self.network.send_message(self.name, sender.name, ack_rec)
|
self.network.send_message(self.name, sender.name, ack_rec)
|
||||||
log("ACK ({} -> {}): {}".format(self.name, sender.name, message_id))
|
log("ACK ({} -> {}): {}".format(self.name, sender.name, message_id))
|
||||||
|
@ -245,12 +263,17 @@ print "Assuming one group context (A-B share):"
|
||||||
a0 = new_message_record("hello world")
|
a0 = new_message_record("hello world")
|
||||||
|
|
||||||
|
|
||||||
|
# XXX: remove
|
||||||
|
# TODO: send_message should be based on send_time and sharing
|
||||||
|
#a.send_message("B", a0)
|
||||||
|
|
||||||
|
n.tick()
|
||||||
|
a.print_sync_state()
|
||||||
|
b.print_sync_state()
|
||||||
|
|
||||||
# Local append
|
# Local append
|
||||||
a.append_message(a0)
|
a.append_message(a0)
|
||||||
|
|
||||||
# TODO: send_message should be based on send_time and sharing
|
|
||||||
a.send_message("B", a0)
|
|
||||||
|
|
||||||
n.tick()
|
n.tick()
|
||||||
a.print_sync_state()
|
a.print_sync_state()
|
||||||
b.print_sync_state()
|
b.print_sync_state()
|
||||||
|
@ -285,6 +308,7 @@ print "\n"
|
||||||
# - **Send** any messages that the device is **sharing** with the peer, and does
|
# - **Send** any messages that the device is **sharing** with the peer, and does
|
||||||
# not know whether the peer holds, and that have reached their send times
|
# not know whether the peer holds, and that have reached their send times
|
||||||
|
|
||||||
|
# each tick
|
||||||
# SEND messages device is SHARING with the peer, doesn't know if peer holds,
|
# SEND messages device is SHARING with the peer, doesn't know if peer holds,
|
||||||
# and reached send time
|
# and reached send time
|
||||||
# What mean by sharing
|
# What mean by sharing
|
||||||
|
|
Loading…
Reference in New Issue