diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 745dd01ec..2d9ef3f98 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -9,7 +9,8 @@ import options, sets, hashes, strutils import chronos, chronicles -import rpcmsg, +import rpcmsg, + timedcache, ../../peer, ../../peerinfo, ../../connection, @@ -21,12 +22,12 @@ logScope: type PubSubPeer* = ref object of RootObj + id*: string # base58 peer id string peerInfo*: PeerInfo conn*: Connection handler*: RPCHandler topics*: seq[string] - id*: string # base58 peer id string - seen: HashSet[string] # list of messages forwarded to peers + seen: TimedCache[string] # list of messages forwarded to peers RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -64,7 +65,7 @@ proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async, gcsafe.} = trace "sending encoded msgs to peer", peer = p.id, encoded = encodedHex await p.conn.writeLp(encoded.buffer) - p.seen.incl(encodedHex) + p.seen.put(encodedHex) proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = new result @@ -72,4 +73,4 @@ proc newPubSubPeer*(conn: Connection, handler: RPCHandler): PubSubPeer = result.conn = conn result.peerInfo = conn.peerInfo result.id = conn.peerInfo.peerId.get().pretty() - result.seen = initSet[string]() + result.seen = newTimedCache[string]() diff --git a/libp2p/protocols/pubsub/rpcmsg.nim b/libp2p/protocols/pubsub/rpcmsg.nim index 6fb344476..9ca5f3ab7 100644 --- a/libp2p/protocols/pubsub/rpcmsg.nim +++ b/libp2p/protocols/pubsub/rpcmsg.nim @@ -9,7 +9,7 @@ import sequtils, options import chronos, nimcrypto/sysrand, chronicles -import ../../peerinfo, +import ../../peerinfo, ../../peer, ../../crypto/crypto, ../../protobuf/minprotobuf diff --git a/libp2p/protocols/pubsub/timedcache.nim b/libp2p/protocols/pubsub/timedcache.nim new file mode 100644 index 000000000..8b40e255d --- /dev/null +++ b/libp2p/protocols/pubsub/timedcache.nim @@ -0,0 +1,56 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import tables, hashes +import chronos, chronicles + +logScope: + topic = "TimedCache" + +const Timeout* = 5 * 1000 # default timeout in ms + +type + ExpireHandler*[V] = proc(val: V) {.gcsafe.} + TimedEntry*[V] = object of RootObj + val: V + handler: ExpireHandler[V] + + TimedCache*[V] = ref object of RootObj + cache*: Table[string, TimedEntry[V]] + onExpire*: ExpireHandler[V] + +proc newTimedCache*[V](): TimedCache[V] = + new result + result.cache = initTable[string, TimedEntry[V]]() + +proc put*[V](t: TimedCache[V], + key: string, + val: V = "", + timeout: uint64 = Timeout, + handler: ExpireHandler[V] = nil) = + trace "adding entry to timed cache", key = key, val = val + t.cache[key] = TimedEntry[V](val: val, handler: handler) + + # TODO: addTimer with param Duration is missing from chronos, needs to be added + addTimer( + timeout, + proc (arg: pointer = nil) {.gcsafe.} = + trace "deleting expired entry from timed cache", key = key, val = val + var entry = t.cache[key] + t.cache.del(key) + if not isNil(entry.handler): + entry.handler(entry.val) + ) + +proc contains*[V](t: TimedCache[V], key: string): bool = + t.cache.contains(key) + +proc del*[V](t: TimedCache[V], key: string) = + trace "deleting entry from timed cache", key = key + t.cache.del(key)