Fix gossip messages seqno according to spec (#253)

* Fix gossip messages seqno according to spec

* Add peers back to gossipsub table, slow down heartbeat

* Revert "Add peers back to gossipsub table, slow down heartbeat"

This reverts commit 01e2e62172.

* make seqno a threadvar, remove from peerinfo

* seqno refactor, into pubsub
This commit is contained in:
Giovanni Petrantoni 2020-07-15 12:51:33 +09:00 committed by GitHub
parent b8b0a2b4bc
commit d7bab37119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 13 deletions

View File

@ -136,7 +136,8 @@ method publish*(f: FloodSub,
return return
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
let msg = Message.init(f.peerInfo, data, topic, f.sign) inc f.msgSeqno
let msg = Message.init(f.peerInfo, data, topic, f.msgSeqno, f.sign)
# start the future but do not wait yet # start the future but do not wait yet
let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg]) let (published, failed) = await f.sendHelper(f.floodsub.getOrDefault(topic), @[msg])
for p in failed: for p in failed:

View File

@ -506,8 +506,9 @@ method publish*(g: GossipSub,
# time # time
g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL) g.lastFanoutPubSub[topic] = Moment.fromNow(GossipSubFanoutTTL)
inc g.msgSeqno
let let
msg = Message.init(g.peerInfo, data, topic, g.sign) msg = Message.init(g.peerInfo, data, topic, g.msgSeqno, g.sign)
msgId = g.msgIdProvider(msg) msgId = g.msgIdProvider(msg)
trace "publishing on topic", trace "publishing on topic",

View File

@ -60,6 +60,7 @@ type
validators*: Table[string, HashSet[ValidatorHandler]] validators*: Table[string, HashSet[ValidatorHandler]]
observers: ref seq[PubSubObserver] # ref as in smart_ptr observers: ref seq[PubSubObserver] # ref as in smart_ptr
msgIdProvider*: MsgIdProvider # Turn message into message id (not nil) msgIdProvider*: MsgIdProvider # Turn message into message id (not nil)
msgSeqno*: uint64
proc hasPeerID*(t: PeerTable, topic, peerId: string): bool = proc hasPeerID*(t: PeerTable, topic, peerId: string): bool =
# unefficient but used only in tests! # unefficient but used only in tests!

View File

@ -187,8 +187,9 @@ proc sendMsg*(p: PubSubPeer,
peerId: PeerID, peerId: PeerID,
topic: string, topic: string,
data: seq[byte], data: seq[byte],
seqno: uint64,
sign: bool): Future[void] {.gcsafe.} = sign: bool): Future[void] {.gcsafe.} =
p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, sign)])]) p.send(@[RPCMsg(messages: @[Message.init(p.peerInfo, data, topic, seqno, sign)])])
proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} = proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
try: try:

View File

@ -19,6 +19,7 @@ import messages, protobuf,
../../../peerinfo, ../../../peerinfo,
../../../crypto/crypto, ../../../crypto/crypto,
../../../protobuf/minprotobuf ../../../protobuf/minprotobuf
import stew/endians2
logScope: logScope:
topics = "pubsubmessage" topics = "pubsubmessage"
@ -56,15 +57,12 @@ proc init*(
p: PeerInfo, p: PeerInfo,
data: seq[byte], data: seq[byte],
topic: string, topic: string,
seqno: uint64,
sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} = sign: bool = true): Message {.gcsafe, raises: [CatchableError, Defect].} =
var seqno: seq[byte] = newSeq[byte](8)
if randomBytes(addr seqno[0], 8) <= 0:
raise (ref CatchableError)(msg: "Cannot get randomness for message")
result = Message( result = Message(
fromPeer: p.peerId, fromPeer: p.peerId,
data: data, data: data,
seqno: seqno, seqno: @(seqno.toBytesBE), # unefficient, fine for now
topicIDs: @[topic]) topicIDs: @[topic])
if sign and p.publicKey.isSome: if sign and p.publicKey.isSome:

View File

@ -227,12 +227,14 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
var seqno = 0'u64
for i in 0..5: for i in 0..5:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) inc seqno
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
check gossipSub.fanout[topic].len == 15 check gossipSub.fanout[topic].len == 15
@ -276,12 +278,14 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
var seqno = 0'u64
for i in 0..5: for i in 0..5:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) inc seqno
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
@ -318,12 +322,14 @@ suite "GossipSub internal":
gossipSub.gossipsub[topic].incl(peer) gossipSub.gossipsub[topic].incl(peer)
# generate messages # generate messages
var seqno = 0'u64
for i in 0..5: for i in 0..5:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, false) inc seqno
let msg = Message.init(peerInfo, ("HELLO" & $i).toBytes(), topic, seqno, false)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()
@ -360,12 +366,14 @@ suite "GossipSub internal":
gossipSub.fanout[topic].incl(peer) gossipSub.fanout[topic].incl(peer)
# generate messages # generate messages
var seqno = 0'u64
for i in 0..5: for i in 0..5:
let conn = newBufferStream(noop) let conn = newBufferStream(noop)
conns &= conn conns &= conn
let peerInfo = randomPeerInfo() let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo conn.peerInfo = peerInfo
let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, false) inc seqno
let msg = Message.init(peerInfo, ("bar" & $i).toBytes(), topic, seqno, false)
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg) gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
let peers = gossipSub.getGossipPeers() let peers = gossipSub.getGossipPeers()

View File

@ -11,8 +11,9 @@ let rng = newRng()
suite "Message": suite "Message":
test "signature": test "signature":
var seqno = 11'u64
let let
peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get()) peer = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
msg = Message.init(peer, @[], "topic", sign = true) msg = Message.init(peer, @[], "topic", seqno, sign = true)
check verify(msg, peer) check verify(msg, peer)