From 7d4ecb2c25ecf4958ae6e8dd38ff8f5b498f22b7 Mon Sep 17 00:00:00 2001 From: Oskar Thoren Date: Wed, 6 Feb 2019 07:27:31 -0500 Subject: [PATCH] WIP --- data_sync/networkwhisper.py | 66 +++++++++++++++++----------- data_sync/sync.py | 86 ++++++++++++++++++++++++++++++++++++- data_sync/whisper.py | 24 ++++++----- 3 files changed, 138 insertions(+), 38 deletions(-) diff --git a/data_sync/networkwhisper.py b/data_sync/networkwhisper.py index 31d78e2..b2af1dd 100644 --- a/data_sync/networkwhisper.py +++ b/data_sync/networkwhisper.py @@ -2,62 +2,72 @@ from web3 import Web3, HTTPProvider from web3.shh import Shh import random -# XXX use these -a_keyPair = "0x57083392b29bdf24512c93cfdf45d38c87d9d882da3918c59f4406445ea976a4" -b_keyPair= "0x7b5c5af9736d9f1773f2020dd0fef0bc3c8aeaf147d2bf41961e766588e086e7" - -# Derived, used for addressing -a_pubKey = "0x04d94a1a01872b598c7cdc5aca2358d35eb91cd8a91eaea8da277451bb71d45c0d1eb87a31ea04e32f537e90165c870b3e115a12438c754d507ac75bddd6ecacd5" -b_pubKey = "0x04ff921ddf78b5ed4537402f59a150caf9d96a83f2a345a1ddf9df12e99e7778f314c9ca72e8285eb213af84f5a7b01aabb62c67e46657976ded6658e1b9e83c73" - +# Temp +import sync_pb2 # XXX: This assumes a node is actually running - shell out to boot geth? # At least error if proc not running class WhisperNodeHelper(): - def __init__(self): + def __init__(self, keypair): # XXX: Whisper specific, but this host should be unique per node self.host = "http://localhost:8500" - web3 = Web3(HTTPProvider(host)) - Shh.attach(web3, "shh") + self.web3 = Web3(HTTPProvider(self.host)) + Shh.attach(self.web3, "shh") self.topic="0xf8946aac" # discovery-topic - self.keyPair = "XXX" - self.myFilter = self.pollFilter(self.topic, self.keyPair) + self.keyPair = keypair + self.myFilter = self.poll_filter(self.topic, self.keyPair) # XXX: Prune this self.nodes = [] self.time = 0 - self.queue = {} - self.peers = {} + #self.queue = {} + #self.peers = {} # Global network reliability self.reliability = 1 # 0.95? Dunno. def poll_filter(self, topic, keyPair): # XXX: Doesn't belong here - kId = web3.shh.addPrivateKey(keyPair) - pubKey = web3.shh.getPublicKey(kId) + kId = self.web3.shh.addPrivateKey(keyPair) + pubKey = self.web3.shh.getPublicKey(kId) #print("***PUBKEY", pubKey) - myFilter = web3.shh.newMessageFilter({'topic': topic, + myFilter = self.web3.shh.newMessageFilter({'topic': topic, 'privateKeyID': kId}) # Purpose of this if we do getMessages? myFilter.poll_interval = 600; return myFilter - def tick(self): - myFilter = "XXX" - filterID = myFilter.filter_id - retreived_messages = web3.shh.getMessages(filterID) + filterID = self.myFilter.filter_id + retreived_messages = self.web3.shh.getMessages(filterID) # TODO: Deal with these messages similar to simulation # receiver.on_receive(sender, msg) for i in range(0, len(retreived_messages)): #print(retreived_messages[i]['payload']) - print("\nRECV " + retreived_messages[i]['payload'].decode("utf-8")) - #print(retreived_messages[i]) + #print("\nRECV TYPE", type(retreived_messages[i]['payload'])) + #print("\nRECV payload", retreived_messages[i]['payload']) + + # XXX: This parsing should probably happen elsewhere + msg = sync_pb2.Record() + payload = retreived_messages[i]['payload'] + #print("\nRECV payload", payload) + msg.ParseFromString(payload) + # XXX correct way to refer to MESSAGE + if msg.header.type == 1: + print("\nRECV parse", msg.payload.message.body.decode()) + + # XXX: what do we actually do with this? on receive + # Hmmmm how should this work? + # HEREATM + receiver.on_receive(sender, msg) + #print "" print("tick", self.time + 1) #print "-----------" + + # XXX: This is ugly, why is this ticking nodes? + # NOTE: Only self is ticking for n in self.nodes: n.tick() self.time += 1 @@ -83,14 +93,18 @@ class WhisperNodeHelper(): # topic assumed to be hardcoded def send_message(self, sender_id, address_to, msg): print("*** (whisper-network) send_message", address_to) + # XXX: Is this what we want to do? + payload = msg.SerializeToString() + print("*** (whisper-network) send_message payload", payload) + #print("*** (whisper-network) send_message hex", self.web3.toHex(payload)) topic = self.topic - web3.shh.post({ + self.web3.shh.post({ 'pubKey': address_to, 'topic': topic, 'powTarget': 2.01, 'powTime': 2, 'ttl': 10, - 'payload': web3.toHex(text=msg) + 'payload': self.web3.toHex(payload) }); # NetworkSim stub diff --git a/data_sync/sync.py b/data_sync/sync.py index 1cacab7..e5c9c43 100644 --- a/data_sync/sync.py +++ b/data_sync/sync.py @@ -2,6 +2,7 @@ import hashlib import networksim +import networkwhisper import random import sync_pb2 import time @@ -353,6 +354,28 @@ class Node(): log(line) #log("-" * 60) + # Shorter names for pubkey + def print_sync_state2(self): + log("\n{} POV @{}".format(self.name[-4:], self.time)) + log("-" * 60) + n = self.name[-4:] + for message_id, x in self.sync_state.items(): + line = message_id[:4] + " | " + for peer, flags in x.items(): + line += peer[-4:] + ": " + if flags['hold_flag']: + line += "hold " + if flags['ack_flag']: + line += "ack " + if flags['request_flag']: + line += "req " + line += "@" + str(flags['send_time']) + line += "(" + str(flags['send_count']) + ")" + line += " | " + + log(line) + #log("-" * 60) + def update_availability(self): #arr = [1, 1, 1, 1, 1, 0, 0, 0, 0, 0] arr = [1, 1, 0, 0, 1, 1, 0, 0] @@ -437,6 +460,9 @@ def new_req_record(ids): # Mocking ################################################################################ +# TODO: For whisper nodes should be public keys +# What about keypair to try to decrypt? should be in node + def run(steps=10): n = networksim.NetworkSimulator() @@ -502,4 +528,62 @@ def run(steps=10): c.print_sync_state() d.print_sync_state() -run(30) +def whisperRun(steps=10): + a_keyPair = "0x57083392b29bdf24512c93cfdf45d38c87d9d882da3918c59f4406445ea976a4" + b_keyPair= "0x7b5c5af9736d9f1773f2020dd0fef0bc3c8aeaf147d2bf41961e766588e086e7" + + # TODO: should be node names + # Derived, used for addressing + a_pubKey = "0x04d94a1a01872b598c7cdc5aca2358d35eb91cd8a91eaea8da277451bb71d45c0d1eb87a31ea04e32f537e90165c870b3e115a12438c754d507ac75bddd6ecacd5" + b_pubKey = "0x04ff921ddf78b5ed4537402f59a150caf9d96a83f2a345a1ddf9df12e99e7778f314c9ca72e8285eb213af84f5a7b01aabb62c67e46657976ded6658e1b9e83c73" + + aNode = networkwhisper.WhisperNodeHelper(a_keyPair) + bNode = networkwhisper.WhisperNodeHelper(b_keyPair) + + # XXX: Not clear to me what's best here + # Interactive: less BW, Batch: less coordination + a = Node(a_pubKey, aNode, 'burstyMobile', 'batch') + b = Node(b_pubKey, bNode, 'burstyMobile', 'batch') + + # XXX: Not clear this is needed for Whisper, since all nodes should be part of network + # Possibly analog with topics? + #n.peers["A"] = a + #n.peers["B"] = b + aNode.nodes = [a] + bNode.nodes = [b] + + a.addPeer(b_pubKey, b) + b.addPeer(a_pubKey, a) + + # NOTE: Client should decide policy, implict group + a.share(b_pubKey) + b.share(a_pubKey) + + print("\nAssuming one group context (A-B) share):") + + # XXX: Conditional append to get message graph? + # TODO: Actually need to encode graph, client concern + local_appends = { + 1: [[a, "A: hello world"]], + 2: [[b, "B: hello!"]], + } + + # XXX: what is this again? should be for both nodes + for i in range(steps): + # NOTE: include signature and parent message + if aNode.time in local_appends: + for peer, msg in local_appends[aNode.time]: + rec = new_message_record(msg) + peer.append_message(rec) + + # XXX: Why discrete time model here? + aNode.tick() + bNode.tick() + #a.print_sync_state() + #b.print_sync_state() + + a.print_sync_state2() + b.print_sync_state2() + +#run(30) +whisperRun(30) diff --git a/data_sync/whisper.py b/data_sync/whisper.py index 96d751d..c858d94 100644 --- a/data_sync/whisper.py +++ b/data_sync/whisper.py @@ -31,6 +31,8 @@ def newKeyPair(): # privateKeyID - String: ID of private (asymmetric) key for message decryption. def pollFilter(topic, keyPair): kId = web3.shh.addPrivateKey(keyPair) + pubKey = web3.shh.getPublicKey(kId) + print("***PUBKEY", pubKey) myFilter = web3.shh.newMessageFilter({'topic': topic, 'privateKeyID': kId}) myFilter.poll_interval = 600; @@ -68,7 +70,7 @@ class Daemon: #sendMessage(address_to, topic, "hello") getMessages(myFilter) #print("tick") - time.sleep(0.3) + time.sleep(1) # Args #--------------------------------------------------------------------- @@ -88,16 +90,20 @@ if(node == "a"): print("a") keyPair = a_keyPair # XXX: Seems weird, should be b_pubkey? - address_to = oskar - #address_to = a_pubKey # Works + #address_to = oskar + address_to = a_pubKey # Works #address_to = b_pubKey host = "http://localhost:8500" elif(node == "b"): print("b") keyPair = b_keyPair - address_to = oskar + #address_to = oskar + # XXX #address_to = a_pubKey - host = "http://localhost:8501" + address_to = b_pubKey + #address_to = a_pubKey + host = "http://localhost:8500" + #host = "http://localhost:8501" else: print("Unknown node") sys.exit(0) @@ -169,12 +175,8 @@ def run(): #daemon.start() #repl.start() -b = Thread(name='background', target=Daemon().run) -f = Thread(name='foreground', target=run) - -b.start() -f.start() - +#b = Thread(name='background', target=Daemon().run) +#f = Thread(name='foreground', target=run) # TODO # Usage: