From 33facb5d833feb4346e7b156b800fd0c168070b8 Mon Sep 17 00:00:00 2001 From: Oskar Thoren Date: Wed, 9 Jan 2019 07:16:46 -0500 Subject: [PATCH] Send based on send_time --- data_sync/sync.py | 46 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/data_sync/sync.py b/data_sync/sync.py index 26fed2e..1c8f1a7 100644 --- a/data_sync/sync.py +++ b/data_sync/sync.py @@ -22,10 +22,12 @@ GROUP_ID = "0xdeadbeef" # TODO: Sync state: Store bounded cache list of messages ids # offered by peer and not ack/req by device +# XXX: Add debug log level def log(message): print message # NOTE: Inspired by github.com/ethereum/research networksim.py. +# XXX: Break this out into separate namespace class NetworkSimulator(): def __init__(self): self.nodes = [] @@ -39,14 +41,16 @@ class NetworkSimulator(): for sender, receiver, msg in self.queue[self.time]: # NOTE: Assumes 100% reliability receiver.on_receive(sender, msg) - # Discrete time model - print "tick", self.time + print "" + print "tick", self.time + 1 + print "-----------" for n in self.nodes: n.tick() self.time += 1 # NOTE: Direct message, no broadcast etc def send_message(self, sender_id, receiver_id, message): + #print "*** (network) send_message", sender_id, receiver_id # XXX: Assuming sender exists sender = self.peers[sender_id] receiver = self.peers[receiver_id] @@ -59,6 +63,7 @@ class Node(): def __init__(self, name, network): self.name = name self.log = [] + self.messages = {} self.sync_state = {} self.peers = {} self.network = network @@ -73,6 +78,18 @@ class Node(): # TODO: Send message if reached send time self.time += 1 + # Depending on sync mode, do appropriate actions + self.send_messages() + + def send_messages(self): + for message_id, x in self.sync_state.items(): + for peer, flags in x.items(): + if (peer in self.sharing[self.group_id] and + flags['hold_flag'] == 0 and + flags['send_time'] == self.time): + msg = self.messages[message_id] + self.send_message(peer, msg) + # XXX: Why would node know about peer and not just name? def addPeer(self, peer_id, peer): self.peers[peer_id] = peer @@ -81,9 +98,12 @@ class Node(): self.sharing[self.group_id].add(peer_id) def append_message(self, message): + #print "*** append_message", self.name message_id = get_message_id(message) self.log.append({"id": message_id, "message": message}) + # XXX: Ugly but easier access while keeping log order + self.messages[message_id] = message self.sync_state[message_id] = {} # XXX: For each peer # Ensure added for each peer @@ -96,22 +116,21 @@ class Node(): "ack_flag": 0, "request_flag": 0, "send_count": 0, - "send_time": 0 + "send_time": self.time + 1 } def send_message(self, peer_id, message): message_id = get_message_id(message) peer = self.peers[peer_id] - # XXX: Use tick clock for this - self.sync_state[message_id][peer_id]["send_count"] = 1 - self.sync_state[message_id][peer_id]["send_time"] = 1 - + self.sync_state[message_id][peer_id]["send_count"] += 1 + self.sync_state[message_id][peer_id]["send_time"] += 2 log('MESSAGE ({} -> {}): {}'.format(self.name, peer.name, message_id)) # XXX: Can introduce latency here self.network.send_message(self.name, peer_id, message) def on_receive(self, sender, message): + #print "*** {} received message from {}".format(self.name, sender.name) if (message.header.type == 1): self.on_receive_message(sender, message) elif (message.header.type == 0): @@ -132,7 +151,6 @@ class Node(): "send_time": 0 } - # XXX How is this sent? ack_rec = new_ack_record(message_id) self.network.send_message(self.name, sender.name, ack_rec) log("ACK ({} -> {}): {}".format(self.name, sender.name, message_id)) @@ -245,12 +263,17 @@ print "Assuming one group context (A-B share):" a0 = new_message_record("hello world") +# XXX: remove +# TODO: send_message should be based on send_time and sharing +#a.send_message("B", a0) + +n.tick() +a.print_sync_state() +b.print_sync_state() + # Local append a.append_message(a0) -# TODO: send_message should be based on send_time and sharing -a.send_message("B", a0) - n.tick() a.print_sync_state() b.print_sync_state() @@ -285,6 +308,7 @@ print "\n" # - **Send** any messages that the device is **sharing** with the peer, and does # not know whether the peer holds, and that have reached their send times +# each tick # SEND messages device is SHARING with the peer, doesn't know if peer holds, # and reached send time # What mean by sharing