Remove local providers (#53)

* refactor to use batch deletes

* add ability to remove local providers
This commit is contained in:
Dmitriy Ryajov 2022-10-03 15:14:47 -06:00 committed by GitHub
parent 4b9fa0356e
commit 08928e57d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 34 deletions

View File

@ -700,7 +700,7 @@ proc addProvider*(
var res = await d.lookup(cId) var res = await d.lookup(cId)
trace "lookup returned:", res trace "lookup returned:", res
# TODO: lookup is sepcified as not returning local, even if that is the closest. Is this OK? # TODO: lookup is specified as not returning local, even if that is the closest. Is this OK?
if res.len == 0: if res.len == 0:
res.add(d.localNode) res.add(d.localNode)
for toNode in res: for toNode in res:
@ -711,7 +711,6 @@ proc addProvider*(
return res return res
proc sendGetProviders(d: Protocol, toNode: Node, proc sendGetProviders(d: Protocol, toNode: Node,
cId: NodeId): Future[DiscResult[ProvidersMessage]] cId: NodeId): Future[DiscResult[ProvidersMessage]]
{.async.} = {.async.} =
@ -749,6 +748,16 @@ proc getProvidersLocal*(
return provs.get return provs.get
proc removeProvidersLocal*(
d: Protocol,
peerId: PeerId) {.async.} =
trace "Removing local provider", peerId
if(
let res = await d.providers.remove(peerId);
res.isErr):
trace "Error removing provider", err = res.error.msg
proc getProviders*( proc getProviders*(
d: Protocol, d: Protocol,
cId: NodeId, cId: NodeId,
@ -785,7 +794,8 @@ proc getProviders*(
error "Sending of GetProviders message failed", error = providersMsgRes.error error "Sending of GetProviders message failed", error = providersMsgRes.error
# TODO: should we consider this as an error result if all GetProviders # TODO: should we consider this as an error result if all GetProviders
# requests fail?? # requests fail??
trace "getProviders collected: ", res=res.mapIt(it.data)
trace "getProviders collected: ", res = res.mapIt(it.data)
return ok res return ok res
@ -1026,7 +1036,9 @@ proc newProtocol*(
enrAutoUpdate = false, enrAutoUpdate = false,
config = defaultDiscoveryConfig, config = defaultDiscoveryConfig,
rng = newRng(), rng = newRng(),
providers: ProvidersManager = nil): providers = ProvidersManager.new(
SQLiteDatastore.new(Memory)
.expect("Should not fail!"))):
Protocol = Protocol =
# TODO: Tried adding bindPort = udpPort as parameter but that gave # TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain. # "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
@ -1070,13 +1082,6 @@ proc newProtocol*(
config.tableIpLimits, config.tableIpLimits,
rng) rng)
providers =
if providers.isNil:
# TODO: There should be a passthrough datastore
ProvidersManager.new(SQLiteDatastore.new(Memory).expect("Should not fail!"))
else:
providers
result = Protocol( result = Protocol(
privateKey: privKey, privateKey: privKey,
localNode: node, localNode: node,
@ -1097,7 +1102,9 @@ proc newProtocol*(
bindIp = IPv4_any(), bindIp = IPv4_any(),
config = defaultDiscoveryConfig, config = defaultDiscoveryConfig,
rng = newRng(), rng = newRng(),
providers: ProvidersManager = nil): providers = ProvidersManager.new(
SQLiteDatastore.new(Memory)
.expect("Should not fail!"))):
Protocol = Protocol =
info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record) info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record)
let node = newNode(record).expect("Properly initialized record") let node = newNode(record).expect("Properly initialized record")
@ -1105,14 +1112,6 @@ proc newProtocol*(
# TODO Consider whether this should be a Defect # TODO Consider whether this should be a Defect
doAssert rng != nil, "RNG initialization failed" doAssert rng != nil, "RNG initialization failed"
let
providers =
if providers.isNil:
# TODO: There should be a passthrough datastore
ProvidersManager.new(SQLiteDatastore.new(Memory).expect("Should not fail!"))
else:
providers
result = Protocol( result = Protocol(
privateKey: privKey, privateKey: privKey,
localNode: node, localNode: node,

View File

@ -213,32 +213,38 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
block: block:
without iter =? (await self.store.query(q)), err: without iter =? (await self.store.query(q)), err:
trace "Unable to obtain record for key", key = cidKey trace "Unable to obtain record for key", key = cidKey
return failure(err.msg) return failure err
defer: defer:
if not isNil(iter): if not isNil(iter):
trace "Cleaning up query iterator" trace "Cleaning up query iterator"
discard (await iter.dispose()) discard (await iter.dispose())
var
keys: seq[Key]
for item in iter: for item in iter:
if pair =? (await item) and pair.key.isSome: if pair =? (await item) and pair.key.isSome:
let key = pair.key.get() let
if err =? (await self.store.delete(key)).errorOption: key = pair.key.get()
trace "Error deleting record from persistent store", err = err.msg
continue
keys.add(key)
without pairs =? key.fromCidKey, err: without pairs =? key.fromCidKey, err:
trace "Unable to parse peer id from key", key trace "Unable to parse peer id from key", key
continue return failure err
self.cache.remove(cid, pairs.peerId) self.cache.remove(cid, pairs.peerId)
trace "Deleted record from store", key trace "Deleted record from store", key
if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption:
trace "Error deleting record from persistent store", err = err.msg
return failure err
return success() return success()
proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} =
without cidKey =? (CidKey / "*" / $peerId), err: without cidKey =? (CidKey / "*" / $peerId), err:
return failure(err.msg) return failure err
let let
q = Query.init(cidKey) q = Query.init(cidKey)
@ -246,31 +252,36 @@ proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} =
block: block:
without iter =? (await self.store.query(q)), err: without iter =? (await self.store.query(q)), err:
trace "Unable to obtain record for key", key = cidKey trace "Unable to obtain record for key", key = cidKey
return failure(err.msg) return failure err
defer: defer:
if not isNil(iter): if not isNil(iter):
trace "Cleaning up query iterator" trace "Cleaning up query iterator"
discard (await iter.dispose()) discard (await iter.dispose())
var
keys: seq[Key]
for item in iter: for item in iter:
if pair =? (await item) and pair.key.isSome: if pair =? (await item) and pair.key.isSome:
let let
key = pair.key.get() key = pair.key.get()
if err =? (await self.store.delete(key)).errorOption: keys.add(key)
trace "Error deleting record from persistent store", err = err.msg
continue
trace "Deleted record from store", key
let let
parts = key.id.split(datastore.Separator) parts = key.id.split(datastore.Separator)
self.cache.remove(NodeId.fromHex(parts[2]), peerId) self.cache.remove(NodeId.fromHex(parts[2]), peerId)
if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption:
trace "Error deleting record from persistent store", err = err.msg
return failure err
trace "Deleted records from store"
without provKey =? makeProviderKey(peerId), err: without provKey =? makeProviderKey(peerId), err:
return failure(err.msg) return failure err
trace "Removing provider record", key = provKey trace "Removing provider record", key = provKey
return (await self.store.delete(provKey)) return (await self.store.delete(provKey))