research/data_sync/sync.py

441 lines
14 KiB
Python
Raw Normal View History

2019-01-02 12:59:38 +00:00
# Sync protocol PoC
2019-01-03 02:59:33 +00:00
import hashlib
2019-01-09 12:59:16 +00:00
import random
2019-01-03 02:59:33 +00:00
import sync_pb2
import time
2019-01-02 12:59:38 +00:00
# TODO: Expand message to be a payload with message hash
2019-01-09 05:28:12 +00:00
# TODO: Encode group with message graph and immutable messages
2019-01-02 12:59:38 +00:00
# TODO: Introduce latency and unreliability
# TODO: Encode mobile node bursty behavior better
2019-01-09 05:28:12 +00:00
## TODO: Encode things like client, group scope, etc
# client\_id = R(HASH\_LEN)
CLIENT_ID = "0xdeadbeef"
# Each group belongs to a client.
# Hardcoded for now.
# group\_id = HASH("GROUP\_ID", client\_id, group\_descriptor)
GROUP_ID = "0xdeadbeef"
# TODO: Sync state: Store bounded cache list of messages ids
# offered by peer and not ack/req by device
2019-01-09 04:11:25 +00:00
2019-01-09 12:16:46 +00:00
# XXX: Add debug log level
2019-01-08 03:20:24 +00:00
def log(message):
print message
# NOTE: Inspired by github.com/ethereum/research networksim.py.
2019-01-09 12:16:46 +00:00
# XXX: Break this out into separate namespace
class NetworkSimulator():
def __init__(self):
self.nodes = []
self.time = 0
self.queue = {}
self.peers = {}
# Global network reliability
2019-01-09 13:56:36 +00:00
self.reliability = 1 # 0.95? Dunno.
def tick(self):
if self.time in self.queue:
# XXX: Should sender be here?
for sender, receiver, msg in self.queue[self.time]:
2019-01-09 12:59:16 +00:00
if random.random() < self.reliability:
2019-01-09 13:16:52 +00:00
#print "*** message ok", sender.name, "->", receiver.name
2019-01-09 12:59:16 +00:00
receiver.on_receive(sender, msg)
2019-01-09 13:16:52 +00:00
#else:
#print "*** message dropped", sender.name, "->", receiver.name
2019-01-09 12:59:16 +00:00
#print ""
2019-01-11 03:47:10 +00:00
print "tick", self.time + 1
2019-01-09 12:59:16 +00:00
#print "-----------"
for n in self.nodes:
n.tick()
self.time += 1
2019-01-09 13:03:37 +00:00
# XXX: This should be normal distribution or Poisson
def latency_uniform_random(self):
latency = random.randint(1,3)
return latency
# NOTE: Direct message, no broadcast etc
def send_message(self, sender_id, receiver_id, message):
2019-01-09 12:16:46 +00:00
#print "*** (network) send_message", sender_id, receiver_id
# XXX: Assuming sender exists
sender = self.peers[sender_id]
receiver = self.peers[receiver_id]
2019-01-09 13:03:37 +00:00
recv_time = self.time + self.latency_uniform_random()
if recv_time not in self.queue:
self.queue[recv_time] = []
self.queue[recv_time].append((sender, receiver, message))
2019-01-02 12:59:38 +00:00
class Node():
def __init__(self, name, network, profile):
2019-01-02 12:59:38 +00:00
self.name = name
self.log = []
2019-01-09 12:16:46 +00:00
self.messages = {}
2019-01-02 12:59:38 +00:00
self.sync_state = {}
self.peers = {}
self.network = network
self.time = 0
2019-01-09 05:28:12 +00:00
# XXX: Assumes only one group
self.group_id = GROUP_ID
self.sharing = {GROUP_ID: set()}
self.profile = profile
# for index in pulsating reseries if mobile node
2019-01-11 03:47:10 +00:00
# XXX: Hacky
if (self.name == 'A'):
self.randomSeed = 0
elif (self.name == 'B'):
self.randomSeed = 5
else:
self.randomSeed = random.randint(1,10)
if profile == 'burstyMobile':
self.reliability = 0.1
2019-01-11 03:47:10 +00:00
self.update_availability()
elif profile == 'onlineDesktop':
self.reliability = 1 # or 0.9
else:
self.reliability = 1
2019-01-11 03:47:10 +00:00
self.availability = self.reliability
def tick(self):
# XXX: What else do?
# TODO: Send message if reached send time
self.time += 1
2019-01-02 12:59:38 +00:00
if (self.profile == 'burstyMobile'):
2019-01-11 03:47:10 +00:00
self.update_availability()
if (self.availability == 1):
# TODO: Do stuff like actions here
#print "*** node available", self.name
# Depending on sync mode, do appropriate actions
self.send_messages()
#elif (self.availability == 0):
#print "*** node NOT available", self.name
#else:
# print "*** conflation overload, reliability/availability mismatch"
2019-01-09 12:16:46 +00:00
def send_messages(self):
for message_id, x in self.sync_state.items():
for peer, flags in x.items():
if (peer in self.sharing[self.group_id] and
flags['hold_flag'] == 0 and
flags['send_time'] == self.time):
msg = self.messages[message_id]
self.send_message(peer, msg)
2019-01-09 05:28:12 +00:00
# XXX: Why would node know about peer and not just name?
def addPeer(self, peer_id, peer):
self.peers[peer_id] = peer
def share(self, peer_id):
self.sharing[self.group_id].add(peer_id)
2019-01-02 12:59:38 +00:00
def append_message(self, message):
2019-01-09 12:16:46 +00:00
#print "*** append_message", self.name
2019-01-08 03:20:24 +00:00
message_id = get_message_id(message)
self.log.append({"id": message_id,
"message": message})
2019-01-09 12:16:46 +00:00
# XXX: Ugly but easier access while keeping log order
self.messages[message_id] = message
2019-01-09 04:11:25 +00:00
self.sync_state[message_id] = {}
# XXX: For each peer
# Ensure added for each peer
# If we add peer at different time, ensure state init
2019-01-09 05:28:12 +00:00
# TODO: Only share with certain peers, e.g. clientPolicy
2019-01-09 04:11:25 +00:00
for peer in self.peers.keys():
2019-01-09 05:28:12 +00:00
if peer in self.sharing[self.group_id]:
self.sync_state[message_id][peer] = {
"hold_flag": 0,
"ack_flag": 0,
"request_flag": 0,
"send_count": 0,
2019-01-09 12:16:46 +00:00
"send_time": self.time + 1
2019-01-09 05:28:12 +00:00
}
2019-01-08 03:20:24 +00:00
def send_message(self, peer_id, message):
message_id = get_message_id(message)
peer = self.peers[peer_id]
2019-01-09 12:16:46 +00:00
self.sync_state[message_id][peer_id]["send_count"] += 1
self.sync_state[message_id][peer_id]["send_time"] += 2
log('MESSAGE ({} -> {}): {} sent'.format(self.name, peer.name, message_id [:4]))
2019-01-02 12:59:38 +00:00
# XXX: Can introduce latency here
self.network.send_message(self.name, peer_id, message)
2019-01-02 12:59:38 +00:00
def on_receive(self, sender, message):
if random.random() < self.reliability:
#print "*** {} received message from {}".format(self.name, sender.name)
if (message.header.type == 1):
self.on_receive_message(sender, message)
elif (message.header.type == 0):
self.on_receive_ack(sender, message)
else:
print "XXX: unknown message type"
else:
log("*** node {} offline, dropping message".format(self.name))
2019-01-02 12:59:38 +00:00
def on_receive_message(self, sender, message):
message_id = get_message_id(message)
log('MESSAGE ({} -> {}): {} received'.format(sender.name, self.name, message_id[:4]))
# Message coming from A
2019-01-09 04:11:25 +00:00
if message_id not in self.sync_state:
self.sync_state[message_id] = {}
self.sync_state[message_id][sender.name] = {
"hold_flag": 1,
"ack_flag": 0,
"request_flag": 0,
"send_count": 0,
"send_time": 0
}
self.messages[message_id] = message
ack_rec = new_ack_record(message_id)
self.network.send_message(self.name, sender.name, ack_rec)
log(" ACK ({} -> {}): {}".format(self.name, sender.name, message_id[:4]))
def on_receive_ack(self, sender, message):
for ack in message.payload.ack.id:
log(' ACK ({} -> {}): {} received'.format(sender.name, self.name, ack[:4]))
2019-01-09 04:11:25 +00:00
self.sync_state[ack][sender.name]["hold_flag"] = 1
def print_sync_state(self):
2019-01-11 03:47:10 +00:00
#log("\n{} POV @{}".format(self.name, self.time))
#log("-" * 60)
n = self.name
2019-01-09 04:11:25 +00:00
for message_id, x in self.sync_state.items():
2019-01-09 13:56:36 +00:00
line = message_id[:4] + " | "
2019-01-09 04:11:25 +00:00
for peer, flags in x.items():
2019-01-09 13:56:36 +00:00
line += peer + ": "
if flags['hold_flag']:
line += "hold "
if flags['ack_flag']:
line += "ack "
if flags['request_flag']:
line += "req "
line += "@" + str(flags['send_time'])
line += "(" + str(flags['send_count']) + ")"
line += " | "
log(line)
2019-01-11 03:47:10 +00:00
#log("-" * 60)
2019-01-11 03:47:10 +00:00
def update_availability(self):
arr = [1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
idx = (self.time + self.randomSeed) % 10
self.reliability = arr[idx]
2019-01-11 03:47:10 +00:00
# XXX conflating these for now, depends on POV/agency
self.availability = arr[idx]
# XXX: Self-describing better in practice, format?
2019-01-08 03:20:24 +00:00
def sha1(message):
sha = hashlib.sha1(message)
return sha.hexdigest()
2019-01-02 12:59:38 +00:00
2019-01-08 03:20:24 +00:00
#- message\_id = HASH("MESSAGE\_ID", group\_id, timestamp, message\_body)
# TODO: Create a message
def create_message(body):
group_id = "0xdeadbeef"
timestamp = time.time()
message_body = body
message = {"group_id": group_id, "timestamp": timestamp, "message_body": message_body}
return message
2019-01-02 12:59:38 +00:00
2019-01-08 03:20:24 +00:00
# XXX: Is this hashing correctly?
def get_message_id(message_record):
msg = message_record.payload.message
s = "MESSAGE_ID" + msg.group_id + str(msg.timestamp) + msg.body
#print s
return sha1(s)
2019-01-03 02:59:33 +00:00
# XXX: where is the message id?
def new_message_record(body):
msg = sync_pb2.Record()
msg.header.version = 1
# assert based on type and length
msg.header.type = 1 # MESSAGE type
# XXX: Should be inferred
msg.header.length = 10
# XXX: Hardcoded for now
msg.payload.message.group_id = "foo"
# XXX: Should be 64 bit integer ms
msg.payload.message.timestamp = int(time.time())
msg.payload.message.body = body
return msg
# XXX: Only takes one id
def new_ack_record(id):
msg = sync_pb2.Record()
msg.header.version = 1
# assert based on type and length
msg.header.type = 0 # ACK type
# XXX: Should be inferred
msg.header.length = 10
msg.payload.ack.id.append(id)
return msg
2019-01-08 03:20:24 +00:00
# Mocking
################################################################################
2019-01-09 12:59:16 +00:00
def run(steps=10):
2019-01-09 12:29:09 +00:00
n = NetworkSimulator()
a = Node("A", n, 'burstyMobile')
b = Node("B", n, 'burstyMobile')
c = Node("C", n, 'desktop')
2019-01-09 12:29:09 +00:00
n.peers["A"] = a
n.peers["B"] = b
n.peers["C"] = c
n.nodes = [a, b, c]
2019-01-09 05:28:12 +00:00
2019-01-09 12:29:09 +00:00
a.addPeer("B", b)
a.addPeer("C", c)
b.addPeer("A", a)
c.addPeer("A", a)
2019-01-09 12:29:09 +00:00
# NOTE: Client should decide policy, implict group
a.share("B")
b.share("A")
2019-01-08 03:20:24 +00:00
# XXX: Hm, a lot of coordination here? Weird?
a.share("C")
b.share("C")
c.share("A")
c.share("B")
print "\nAssuming one group context (A-B-C share):"
2019-01-09 05:28:12 +00:00
2019-01-09 13:16:52 +00:00
# XXX: Conditional append to get message graph?
# TODO: Actually need to encode graph, client concern
local_appends = {
2019-01-09 13:16:52 +00:00
1: [[a, "A: hello world"]],
2: [[b, "B: hello!"]],
}
2019-01-09 12:29:09 +00:00
for i in range(steps):
# NOTE: include signature and parent message
if n.time in local_appends:
for peer, msg in local_appends[n.time]:
rec = new_message_record(msg)
peer.append_message(rec)
2019-01-09 05:28:12 +00:00
2019-01-09 12:29:09 +00:00
n.tick()
#a.print_sync_state()
2019-01-09 12:59:16 +00:00
#b.print_sync_state()
2019-01-09 13:56:36 +00:00
#c.print_sync_state()
2019-01-09 05:28:12 +00:00
# TODO: Move to client
# XXX: This confuses things somewhat, as this is
# client concerns
#acc = "\n"
#for _, msg in a.messages.items():
# acc += msg.payload.message.body + "\n"
## XXX: Where is the sender stored? in msg?
#print "A POV:", acc
#acc = "\n"
#for _, msg in b.messages.items():
# acc += msg.payload.message.body + "\n"
#print "B POV:", acc
2019-01-09 13:16:52 +00:00
2019-01-09 05:28:12 +00:00
## TODO: Sync modes, interactive (+bw -latency) and batch (v.v.)
# Need to encode logic for actions taken at given time,
# respect send_time etc,
# WRT offer messages and so on
# Batch mode first (less coordination):
# - **Acknowledge** any messages **sent** by the peer that the device has not yet
# acknowledged
# - **Acknowledge** any messages **offered** by the peer that the device holds,
# and has not yet acknowledged
# - **Request** any messages **offered** by the peer that the device does not
# hold, and has not yet requested
# - **Send** any messages that the device is **sharing** with the peer, that have
# been **requested** by the peer, and that have reached their send times
# - **Send** any messages that the device is **sharing** with the peer, and does
# not know whether the peer holds, and that have reached their send times
2019-01-09 12:16:46 +00:00
# each tick
2019-01-09 05:28:12 +00:00
# SEND messages device is SHARING with the peer, doesn't know if peer holds,
# and reached send time
# What mean by sharing
# Member of a group:
# Two peers sync group message they share group
# Membership and sharing dynamic
# How do we de determine if two peers synchronize a
# specific group's messages with eachother?
# For any given (data) group, a device can decide
# if they want to share or not with a peer.
# TODO: ACK should also be share policy
# XXX: Encode offline mostly
# XXX: How does B look at the message?
2019-01-09 13:16:52 +00:00
# XXX: If A,B reliability 0.1 and C 0.9
# How does that actually realistically look?
# Need to encode the other actions and offer etc
# Exponential backoff too probably
# Then C can offer messages to B, e.g.
# How viz message graph?
# TODO: Move into separate client namespace
2019-01-09 13:16:52 +00:00
# What does a message look like, roughly:
# This is actually client specific!
ex = {'payload': "hello_world",
'content-type': "text/plain",
'signed_sender': 'A-signed-pubkey',
'dependencies': ["foo_message_id"]}
# XXX: How will C receive the message from A to B?
# TODO: Requires offering to B, e.g.
# Or B requesting it
2019-01-09 13:56:36 +00:00
# Why is A sending same message to C over again
# and again, with reliability = 1,1?
# C no op? it acks but A ignores it?
# Duh, C sends but A doesn't see it...
2019-01-09 14:01:11 +00:00
# TODO: When A comes online, whenever that is
# It needs to requests messages
# Aka ask C (or whomever) for messages
# How?
# - **Request** any messages **offered** by the peer that the device does not
# But how would it know to offer messages?
# It could play nice and offer all messages it has to C, but if C has all
# There needs to be some way for C to go for it, unless A is going to over-offer with bloom filter or so
# Look into this a bit more, naive way is signal
#
# Also think unreliability isn't necessary same as being offline and online
# Once online it is reliable, so it is bursty - how best encode that?
# Ok let's stop for now
2019-01-11 03:47:10 +00:00
run(10)
2019-01-11 03:47:10 +00:00
# Scenario:
# A online / offline / onlie
# B vice versa
# Why is B sending ACK to A if it is offline? not true?