use sha256 digest as cache keys (#135)
* use sha256 digest as cache keys * rebasing master
This commit is contained in:
parent
9cf1fd0216
commit
f8029e7359
|
@ -391,7 +391,7 @@ method publish*(g: GossipSub,
|
||||||
await procCall PubSub(g).publish(topic, data)
|
await procCall PubSub(g).publish(topic, data)
|
||||||
|
|
||||||
trace "about to publish message on topic", name = topic,
|
trace "about to publish message on topic", name = topic,
|
||||||
data = data.toHex()
|
data = data.shortLog
|
||||||
if data.len > 0 and topic.len > 0:
|
if data.len > 0 and topic.len > 0:
|
||||||
var peers: HashSet[string]
|
var peers: HashSet[string]
|
||||||
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
|
if topic in g.topics: # if we're subscribed to the topic attempt to build a mesh
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import options, hashes, strutils, tables, hashes
|
import options, hashes, strutils, tables, hashes
|
||||||
import chronos, chronicles
|
import chronos, chronicles, nimcrypto/sha2
|
||||||
import rpc/[messages, message, protobuf],
|
import rpc/[messages, message, protobuf],
|
||||||
timedcache,
|
timedcache,
|
||||||
../../peer,
|
../../peer,
|
||||||
|
@ -57,9 +57,9 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
while not conn.closed:
|
while not conn.closed:
|
||||||
trace "waiting for data", peer = p.id, closed = conn.closed
|
trace "waiting for data", peer = p.id, closed = conn.closed
|
||||||
let data = await conn.readLp(64 * 1024)
|
let data = await conn.readLp(64 * 1024)
|
||||||
let hexData = data.toHex()
|
let digest = $(sha256.digest(data))
|
||||||
trace "read data from peer", peer = p.id, data = data.shortLog
|
trace "read data from peer", peer = p.id, data = data.shortLog
|
||||||
if $hexData.hash in p.recvdRpcCache:
|
if digest in p.recvdRpcCache:
|
||||||
trace "message already received, skipping", peer = p.id
|
trace "message already received, skipping", peer = p.id
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -69,7 +69,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
obs.onRecv(p, msg)
|
obs.onRecv(p, msg)
|
||||||
await p.handler(p, @[msg])
|
await p.handler(p, @[msg])
|
||||||
p.recvdRpcCache.put($hexData.hash)
|
p.recvdRpcCache.put(digest)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
|
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
|
||||||
finally:
|
finally:
|
||||||
|
@ -87,19 +87,21 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} =
|
||||||
var mm = m
|
var mm = m
|
||||||
for obs in p.observers[]:
|
for obs in p.observers[]:
|
||||||
obs.onSend(p, mm)
|
obs.onSend(p, mm)
|
||||||
let encodedHex = encoded.buffer.toHex()
|
|
||||||
if encoded.buffer.len <= 0:
|
if encoded.buffer.len <= 0:
|
||||||
trace "empty message, skipping", peer = p.id
|
trace "empty message, skipping", peer = p.id
|
||||||
return
|
return
|
||||||
|
|
||||||
if $encodedHex.hash in p.sentRpcCache:
|
let digest = $(sha256.digest(encoded.buffer))
|
||||||
|
if digest in p.sentRpcCache:
|
||||||
trace "message already sent to peer, skipping", peer = p.id
|
trace "message already sent to peer, skipping", peer = p.id
|
||||||
continue
|
continue
|
||||||
|
|
||||||
proc sendToRemote() {.async.} =
|
proc sendToRemote() {.async.} =
|
||||||
trace "sending encoded msgs to peer", peer = p.id, encoded = encoded.buffer.shortLog
|
trace "sending encoded msgs to peer", peer = p.id,
|
||||||
|
encoded = encoded.buffer.shortLog
|
||||||
await p.sendConn.writeLp(encoded.buffer)
|
await p.sendConn.writeLp(encoded.buffer)
|
||||||
p.sentRpcCache.put($encodedHex.hash)
|
p.sentRpcCache.put(digest)
|
||||||
|
|
||||||
# if no connection has been set,
|
# if no connection has been set,
|
||||||
# queue messages untill a connection
|
# queue messages untill a connection
|
||||||
|
|
Loading…
Reference in New Issue