# Sync protocol PoC import hashlib import networksim import networkwhisper import random import sync_pb2 import time # Each group belongs to a client. # Hardcoded for now. # group\_id = HASH("GROUP\_ID", client\_id, group\_descriptor) GROUP_ID = "0xdeadbeef" # TODO: Introduce exponential back-off for send_time based on send_count # XXX: Hardcoded for logging, cba #NODE = 'xxx' # XXX: Add debug log level #def log(message): # XXX: Instead of this, how about printing this to a sync log? #print(message) # XXX: Don't know which node! Oops. # with open(NODE + 'sync.log', 'w') as f: # f.write(message + '\n') # print(message) def merge_two_dicts(x, y): z = x.copy() z.update(y) return z class Node(): def __init__(self, logfn, name, network, profile, mode='batch'): self.name = name self.log = [] self.messages = {} self.sync_state = {} self.peers = {} self.network = network self.time = 0 self.mode = mode self.offeredMessages = {} # XXX: Should be bounded self.logger = logfn # XXX: Assumes only one group self.group_id = GROUP_ID self.sharing = {GROUP_ID: set()} # Network should be aware of sync node so it can call it network.sync_node = self self.profile = profile # for index in pulsating reseries if mobile node # XXX: Hacky if (self.name == 'A'): self.randomSeed = 0 elif (self.name == 'B'): self.randomSeed = 1 else: self.randomSeed = random.randint(1,10) if profile == 'burstyMobile': self.reliability = 0.1 self.update_availability() elif profile == 'onlineDesktop': self.reliability = 1 # or 0.9 else: self.reliability = 1 self.availability = self.reliability def tick(self): # XXX: What else do? # TODO: Send message if reached send time self.time += 1 if (self.profile == 'burstyMobile'): self.update_availability() if (self.availability == 1): if (self.mode == 'batch'): self.ack_sent_messages() self.ack_offered_messages() self.req_offered_messages() self.send_requested_messages() self.send_messages() elif (self.mode == 'interactive'): self.ack_received_messages() self.ack_offered_messages() self.req_offered_messages() self.send_requested_messages() self.offer_messages() #elif (self.availability == 0): #print "*** node NOT available", self.name #else: # print "*** conflation overload, reliability/availability mismatch" # NOTE: Assuming same semantics for batch and interactive mode. #- **Acknowledge** any messages **received** from the peer that the device has #not yet acknowledged def ack_received_messages(self): self.ack_sent_messages() # - **Acknowledge** any messages **sent** by the peer that the device has not yet # acknowledged def ack_sent_messages(self): # TODO: Accumulate and send all in one go # XXX: Better Pythonesque way to do this for mid, x in self.sync_state.items(): for peer, flags in x.items(): if flags['ack_flag'] == 1: ack_rec = new_ack_record([mid]) self.network.send_message(self.name, peer, ack_rec) self.sync_state[mid][peer]['ack_flag'] = 0 self.logger(" ACK ({} -> {}): {}".format(self.name[-4:], peer[-4:], mid[-4:])) # - **Acknowledge** any messages **offered** by the peer that the device holds, # and has not yet acknowledged # ACK maybe once?! def ack_offered_messages(self): for peer, message_ids in self.offeredMessages.items(): for message_id in message_ids: if (message_id in self.messages and # XXX: What if they didn't receive ACK? self.sync_state[message_id][peer]['ack_flag'] == 1): # XXX: Slurp up ack_rec = new_ack_record([message_id]) self.sync_state[message_id][peer]['ack_flag'] = 0 self.network.send_message(self.name, peer, ack_rec) # **Request** any messages **offered** by the peer that the device does not # hold, and has not yet requested # NOTE: (Overloaded?) use of send_time and send_count for reqs. # Seems OK since hold flag clarifies if you need to offer/send or ack. def req_offered_messages(self): # XXX: Not removing from cache, instead letting it grow indefinitely # (later: bounded) UNLESS ACK etc is received for peer_id, message_ids in self.offeredMessages.items(): for message_id in message_ids: if (message_id not in self.messages and # XXX: Not clear this is part of spec self.sync_state[message_id][peer_id]['send_time'] <= self.time ): # XXX: Slurp up req_rec = new_req_record([message_id]) self.network.send_message(self.name, peer_id, req_rec) n = self.sync_state[message_id][peer_id]["send_count"] + 1 self.update_sync_state(message_id, peer_id, { 'hold_flag': 1, 'send_count': n, 'send_time': self.time + int(n**2) + 1 }) self.logger("REQUEST ({} -> {}): {}".format(self.name[-4:], peer_id[-4:], message_id[-4:])) # XXX: It is double requesting, should be polite # - **Send** any messages that the device is **sharing** with the peer, that have # been **requested** by the peer, and that have reached their send times def send_requested_messages(self): for message_id, x in self.sync_state.items(): for peer_id, flags in x.items(): if (peer_id in self.sharing[self.group_id] and flags['request_flag'] == 1 and flags['send_time'] <= self.time): message = self.messages[message_id] send_count = self.sync_state[message_id][peer_id]["send_count"] + 1 self.sync_state[message_id][peer_id]["send_count"] = send_count self.sync_state[message_id][peer_id]["send_time"] += self.time + send_count*2 self.sync_state[message_id][peer_id]["request_flag"] = 0 self.logger('MESSAGE ({} -> {}): {} requested and sent'.format(self.name[-4:], peer_id[-4:], message_id[-4:])) # XXX: Can introduce latency here self.network.send_message(self.name, peer_id, message) # When turn off request flag? #- **Offer** any messages that the device is **sharing** with the peer, and does # not know whether the peer holds, and that have reached their send times # XXX: Not tested yet, interactive mode def offer_messages(self): for message_id, x in self.sync_state.items(): for peer_id, flags in x.items(): ids = [] if (peer_id in self.sharing[self.group_id] and flags['hold_flag'] == 0 and flags['send_time'] <= self.time): # TODO: Extend to slurp up all, need index peer->message offer_rec = new_offer_record([message_id]) # HERE we send # XXX: peer_id, should be pbukey self.network.send_message(self.name, peer_id, offer_rec) send_count = self.sync_state[message_id][peer_id]["send_count"] + 1 self.sync_state[message_id][peer_id]["send_count"] = send_count self.sync_state[message_id][peer_id]["send_time"] += self.time + send_count*2 self.logger(" OFFER ({} -> {}): {}".format(self.name[-4:], peer_id[-4:], message_id[-4:])) # - **Send** any messages that the device is **sharing** with the peer, and does # not know whether the peer holds, and that have reached their send times def send_messages(self): for message_id, x in self.sync_state.items(): for peer_id, flags in x.items(): # Should be case for B no? if (peer_id in self.sharing[self.group_id] and flags['hold_flag'] == 0 and flags['send_time'] <= self.time): message = self.messages[message_id] send_count = self.sync_state[message_id][peer_id]["send_count"] + 1 self.sync_state[message_id][peer_id]["send_count"] = send_count self.sync_state[message_id][peer_id]["send_time"] += self.time + send_count*2 self.logger('MESSAGE ({} -> {}): {} sent'.format(self.name[-4:], peer_id[-4:], message_id[-4:])) # XXX: Can introduce latency here self.network.send_message(self.name, peer_id, message) # XXX: Why would node know about peer and not just name? # TODO: Refactor this to illustrate that it is just a set of pubkeys def addPeer(self, peer_id, peer): self.peers[peer_id] = peer def share(self, peer_id): self.sharing[self.group_id].add(peer_id) # Helper method def update_sync_state(self, message_id, peer_id, new_state): if message_id not in self.sync_state: self.sync_state[message_id] = {} if peer_id not in self.sync_state[message_id]: self.sync_state[message_id][peer_id] = { "hold_flag": 0, "ack_flag": 0, "request_flag": 0, "send_count": 0, "send_time": self.time + 1 } current = self.sync_state[message_id][peer_id] new = merge_two_dicts(current, new_state) self.sync_state[message_id][peer_id] = new def append_message(self, message): message_id = get_message_id(message) #print("*** append", message) self.log.append({"id": message_id, "message": message}) # XXX: Ugly but easier access while keeping log order self.messages[message_id] = message self.sync_state[message_id] = {} # Ensure added for each peer # If we add peer at different time, ensure state init # TODO: Only share with certain peers, e.g. clientPolicy # XXX here we go, probably #print("**SHARE1 SHDNOTBEEMPTY", self.peers) # TODO: Problem - this shouldn't be empty # Where does this come from? for peer in self.peers.keys(): if peer in self.sharing[self.group_id]: #print("**SHARE2", peer) # ok, then what? self.sync_state[message_id][peer] = { "hold_flag": 0, "ack_flag": 0, "request_flag": 0, "send_count": 0, "send_time": self.time + 1 } # TODO: Probably something more here for message parsing # TODO: Need to switch from object to pubkey here with name etc def on_receive(self, sender, message): if random.random() < self.reliability: #print "*** {} received message from {}".format(self.name, sender.name) if (message.header.type == 1): self.on_receive_message(sender, message) elif (message.header.type == 0): self.on_receive_ack(sender, message) elif (message.header.type == 2): self.on_receive_offer(sender, message) elif (message.header.type == 3): self.on_receive_request(sender, message) else: print("XXX: unknown message type") else: self.logger("*** node {} offline, dropping message".format(self.name)) # TODO: Problem: It assumes there's a name, as opposed to a pubkey def on_receive_message(self, sender_pubkey, message): message_id = get_message_id(message) self.logger('MESSAGE ({} -> {}): {} received'.format(sender_pubkey[-4:], self.name[-4:], message_id[-4:])) if message_id not in self.sync_state: self.sync_state[message_id] = {} if sender_pubkey in self.sync_state[message_id]: self.sync_state[message_id][sender_pubkey]['hold_flag'] == 1 self.sync_state[message_id][sender_pubkey]['ack_flag'] == 1 # XXX: ACK again here? self.sync_state[message_id][sender_pubkey] = { "hold_flag": 1, "ack_flag": 1, "request_flag": 0, "send_count": 0, "send_time": 0 } # XXX: If multiple group id, dispatch per group id for peer in self.sharing[self.group_id]: if peer not in self.sync_state[message_id]: self.sync_state[message_id][peer] = { "hold_flag": 0, "ack_flag": 0, "request_flag": 0, "send_count": 0, "send_time": 0 } # XXX: Huh, This is MESSAGE, shouldn't it be PAYLOAD inside it? assert message.header.type == 1, "Type should be MESSAGE, possible faulty logic" self.messages[message_id] = message # XXX: Shortcuts, lets take 'em text = message.payload.message.body.decode('utf-8') short_sender = sender_pubkey[-4:] print(short_sender + ": " + text) def on_receive_ack(self, sender_pubkey, message): for ack in message.payload.ack.id: self.logger(' ACK ({} -> {}): {} received'.format(sender_pubkey[-4:], self.name[-4:], ack[-4:])) self.sync_state[ack][sender_pubkey]["hold_flag"] = 1 def on_receive_offer(self, sender_pubkey, message): for message_id in message.payload.offer.id: self.logger(' OFFER ({} -> {}): {} received'.format(sender_pubkey[-4:], self.name[-4:], message_id[-4:])) if (message_id in self.sync_state and sender_pubkey in self.sync_state[message_id] and self.sync_state[message_id][sender_pubkey]['ack_flag'] == 1): print("Have message, not ACKED yet, add to list", sender_pubkey, message_id) if sender_pubkey not in self.offeredMessages: self.offeredMessages[sender_pubkey] = [] self.offeredMessages[sender_pubkey].append(message_id) elif message_id not in self.sync_state: #print "*** {} on_receive_offer from {} not holding {}".format(self.name, sender_pubkey, message_id) if sender_pubkey not in self.offeredMessages: self.offeredMessages[sender_pubkey] = [] self.offeredMessages[sender_pubkey].append(message_id) #else: # print "*** {} on_receive_offer have {} and ACKd OR peer {} unknown".format(self.name, message_id, sender_pubkey) # XXX: Init fn to wrap updates if message_id not in self.sync_state: self.sync_state[message_id] = {} if sender_pubkey not in self.sync_state[message_id]: self.sync_state[message_id][sender_pubkey] = { "hold_flag": 1, "ack_flag": 0, "request_flag": 0, "send_count": 0, "send_time": 0 } self.sync_state[message_id][sender_pubkey]['hold_flag'] = 1 #print "*** {} offeredMessages {}".format(self.name, self.offeredMessages) def on_receive_request(self, sender_pubkey, message): for req in message.payload.request.id: self.logger('REQUEST ({} -> {}): {} received'.format(sender_pubkey[-4:], self.name[-4:], req[-4:])) self.sync_state[req][sender_pubkey]["request_flag"] = 1 def print_sync_state(self): print("\n{} POV @{}".format(self.name[-4:], self.time)) print("-" * 60) n = self.name for message_id, x in self.sync_state.items(): line = message_id[-4:] + " | " for peer, flags in x.items(): line += peer[-4:] + ": " if flags['hold_flag']: line += "hold " if flags['ack_flag']: line += "ack " if flags['request_flag']: line += "req " line += "@" + str(flags['send_time']) line += "(" + str(flags['send_count']) + ")" line += " | " print(line) #log("-" * 60) # Shorter names for pubkey def print_sync_state2(self): print("\n{} POV @{}".format(self.name[-4:], self.time)) print("-" * 60) n = self.name[-4:] for message_id, x in self.sync_state.items(): line = message_id[-4:] + " | " for peer, flags in x.items(): line += peer[-4:] + ": " if flags['hold_flag']: line += "hold " if flags['ack_flag']: line += "ack " if flags['request_flag']: line += "req " line += "@" + str(flags['send_time']) line += "(" + str(flags['send_count']) + ")" line += " | " print(line) #log("-" * 60) def update_availability(self): #arr = [1, 1, 1, 1, 1, 0, 0, 0, 0, 0] arr = [1, 1, 0, 0, 1, 1, 0, 0] idx = (self.time + self.randomSeed) % 8 # 10 self.reliability = arr[idx] # XXX conflating these for now, depends on POV/agency self.availability = arr[idx] # XXX: Self-describing better in practice, format? def sha1(message): # XXX correct encoding? sha = hashlib.sha1(message.encode('utf-8')) return sha.hexdigest() #- message\_id = HASH("MESSAGE\_ID", group\_id, timestamp, message\_body) # TODO: Create a message def create_message(body): group_id = "0xdeadbeef" timestamp = time.time() message_body = body message = {"group_id": group_id, "timestamp": timestamp, "message_body": message_body} return message # XXX: Is this hashing correctly? def get_message_id(message_record): msg = message_record.payload.message s = "MESSAGE_ID" + msg.group_id + str(msg.timestamp) + msg.body.decode() #print("***", s) return sha1(s) # TODO: Move these protobuf helpers somewhere better # XXX: where is the message id? def new_message_record(body): msg = sync_pb2.Record() msg.header.version = 1 # assert based on type and length msg.header.type = 1 # MESSAGE type # XXX: Should be inferred msg.header.length = 10 # XXX: Hardcoded for now msg.payload.message.group_id = "foo" # XXX: Should be 64 bit integer ms msg.payload.message.timestamp = int(time.time()) msg.payload.message.body = str.encode(body) return msg def new_ack_record(ids): msg = sync_pb2.Record() msg.header.version = 1 # assert based on type and length msg.header.type = 0 # ACK type # XXX: Should be inferred msg.header.length = 10 for id in ids: msg.payload.ack.id.append(id) return msg def new_offer_record(ids): msg = sync_pb2.Record() msg.header.version = 1 # assert based on type and length msg.header.type = 2 # OFFER type # XXX: Should be inferred msg.header.length = 10 for id in ids: msg.payload.offer.id.append(id) return msg def new_req_record(ids): msg = sync_pb2.Record() msg.header.version = 1 # assert based on type and length msg.header.type = 3 # REQUEST type # XXX: Should be inferred msg.header.length = 10 for id in ids: msg.payload.request.id.append(id) return msg # Mocking ################################################################################ # TODO: For whisper nodes should be public keys # What about keypair to try to decrypt? should be in node def run(steps=10): n = networksim.NetworkSimulator() # XXX: Not clear to me what's best here # Interactive: less BW, Batch: less coordination a = Node("A", n, 'burstyMobile', 'batch') b = Node("B", n, 'burstyMobile', 'batch') c = Node("C", n, 'desktop', 'interactive') d = Node("D", n, 'desktop', 'batch') n.peers["A"] = a n.peers["B"] = b n.peers["C"] = c n.peers["D"] = d n.nodes = [a, b, c, d] a.addPeer("B", b) a.addPeer("C", c) b.addPeer("A", a) c.addPeer("A", a) #b.addPeer("C", c) # hm #c.addPeer("B", b) b.addPeer("D", d) c.addPeer("D", d) # NOTE: Client should decide policy, implict group a.share("B") b.share("A") # C and D participating # a.share("C") b.share("D") c.share("A") c.share("D") d.share("B") d.share("C") print("\nAssuming one group context (A-B (C-D) share):") # XXX: Conditional append to get message graph? # TODO: Actually need to encode graph, client concern local_appends = { 1: [[a, "A: hello world"]], 2: [[b, "B: hello!"]], } for i in range(steps): # NOTE: include signature and parent message if n.time in local_appends: for peer, msg in local_appends[n.time]: rec = new_message_record(msg) peer.append_message(rec) n.tick() #a.print_sync_state() #b.print_sync_state() #c.print_sync_state() # a.print_sync_state() # b.print_sync_state() # c.print_sync_state() # d.print_sync_state() def whisperRun(steps=10): a_keyPair = "0x57083392b29bdf24512c93cfdf45d38c87d9d882da3918c59f4406445ea976a4" b_keyPair= "0x7b5c5af9736d9f1773f2020dd0fef0bc3c8aeaf147d2bf41961e766588e086e7" # TODO: should be node names # Derived, used for addressing a_pubKey = "0x04d94a1a01872b598c7cdc5aca2358d35eb91cd8a91eaea8da277451bb71d45c0d1eb87a31ea04e32f537e90165c870b3e115a12438c754d507ac75bddd6ecacd5" b_pubKey = "0x04ff921ddf78b5ed4537402f59a150caf9d96a83f2a345a1ddf9df12e99e7778f314c9ca72e8285eb213af84f5a7b01aabb62c67e46657976ded6658e1b9e83c73" aNode = networkwhisper.WhisperNodeHelper(a_keyPair) bNode = networkwhisper.WhisperNodeHelper(b_keyPair) # XXX: Not clear to me what's best here # Interactive: less BW, Batch: less coordination a = Node(a_pubKey, aNode, 'burstyMobile', 'batch') b = Node(b_pubKey, bNode, 'burstyMobile', 'batch') # XXX: Not clear this is needed for Whisper, since all nodes should be part of network # Possibly analog with topics? #n.peers["A"] = a #n.peers["B"] = b aNode.nodes = [a] bNode.nodes = [b] a.addPeer(b_pubKey, b) b.addPeer(a_pubKey, a) # NOTE: Client should decide policy, implict group a.share(b_pubKey) b.share(a_pubKey) print("\nAssuming one group context (A-B) share):") # XXX: Conditional append to get message graph? # TODO: Actually need to encode graph, client concern local_appends = { 1: [[a, "A: hello world"]], 2: [[b, "B: hello!"]], } # XXX: what is this again? should be for both nodes for i in range(steps): # NOTE: include signature and parent message if aNode.time in local_appends: for peer, msg in local_appends[aNode.time]: rec = new_message_record(msg) peer.append_message(rec) # XXX: Why discrete time model here? aNode.tick() bNode.tick() #a.print_sync_state() #b.print_sync_state() # a.print_sync_state2() # b.print_sync_state2() # TODO: With Whisper branch this one breaks, probably due to sender{,.name} => sender_pubkey mismatch. #run(30) #whisperRun(30)