Merge pull request #45 from kdeme/bug/fix-sync-observer

Fix to only allow sync for peer with eth support
This commit is contained in:
Yuriy Glukhov 2019-04-19 11:51:09 +03:00 committed by GitHub
commit e1dbd76e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 9 deletions

View File

@ -336,6 +336,7 @@ proc startSync(ctx: SyncContext) =
po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} = po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p) ctx.onPeerDisconnected(p)
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po) ctx.peerPool.addObserver(ctx, po)
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) = proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =

View File

@ -35,7 +35,8 @@ proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
p.observers[observerId] = observer p.observers[observerId] = observer
if not observer.onPeerConnected.isNil: if not observer.onPeerConnected.isNil:
for peer in p.connectedNodes.values: for peer in p.connectedNodes.values:
observer.onPeerConnected(peer) if observer.protocol.isNil or peer.supports(observer.protocol):
observer.onPeerConnected(peer)
proc delObserver(p: PeerPool, observerId: int) = proc delObserver(p: PeerPool, observerId: int) =
p.observers.del(observerId) p.observers.del(observerId)
@ -46,6 +47,9 @@ proc addObserver*(p: PeerPool, observerId: ref, observer: PeerObserver) {.inline
proc delObserver*(p: PeerPool, observerId: ref) {.inline.} = proc delObserver*(p: PeerPool, observerId: ref) {.inline.} =
p.delObserver(cast[int](observerId)) p.delObserver(cast[int](observerId))
template setProtocol*(observer: PeerObserver, Protocol: type) =
observer.protocol = Protocol.protocolInfo
proc stopAllPeers(p: PeerPool) {.async.} = proc stopAllPeers(p: PeerPool) {.async.} =
debug "Stopping all peers ..." debug "Stopping all peers ..."
# TODO: ... # TODO: ...
@ -108,7 +112,8 @@ proc addPeer*(pool: PeerPool, peer: Peer): bool =
pool.connectedNodes[peer.remote] = peer pool.connectedNodes[peer.remote] = peer
for o in pool.observers.values: for o in pool.observers.values:
if not o.onPeerConnected.isNil: if not o.onPeerConnected.isNil:
o.onPeerConnected(peer) if o.protocol.isNil or peer.supports(o.protocol):
o.onPeerConnected(peer)
return true return true
else: return false else: return false

View File

@ -59,6 +59,7 @@ type
PeerObserver* = object PeerObserver* = object
onPeerConnected*: proc(p: Peer) {.gcsafe.} onPeerConnected*: proc(p: Peer) {.gcsafe.}
onPeerDisconnected*: proc(p: Peer) {.gcsafe.} onPeerDisconnected*: proc(p: Peer) {.gcsafe.}
protocol*: ProtocolInfo
Capability* = object Capability* = object
name*: string name*: string

View File

@ -227,9 +227,6 @@ proc registerProtocol(protocol: ProtocolInfo) =
# Message composition and encryption # Message composition and encryption
# #
template protocolOffset(peer: Peer, Protocol: type): int =
peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index]
proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline.} = proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline.} =
result = msgId result = msgId
if not peer.dispatcher.isNil: if not peer.dispatcher.isNil:
@ -239,9 +236,12 @@ template getPeer(peer: Peer): auto = peer
template getPeer(response: Response): auto = Peer(response) template getPeer(response: Response): auto = Peer(response)
template getPeer(response: ResponseWithId): auto = response.peer template getPeer(response: ResponseWithId): auto = response.peer
proc supports*(peer: Peer, proto: ProtocolInfo): bool {.inline.} =
peer.dispatcher.protocolOffsets[proto.index] != -1
proc supports*(peer: Peer, Protocol: type): bool {.inline.} = proc supports*(peer: Peer, Protocol: type): bool {.inline.} =
## Checks whether a Peer supports a particular protocol ## Checks whether a Peer supports a particular protocol
peer.protocolOffset(Protocol) != -1 peer.supports(Protocol.protocolInfo)
template perPeerMsgId(peer: Peer, MsgType: type): int = template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)
@ -1122,9 +1122,13 @@ proc removePeer(network: EthereumNode, peer: Peer) =
if network.peerPool != nil and not peer.remote.isNil: if network.peerPool != nil and not peer.remote.isNil:
network.peerPool.connectedNodes.del(peer.remote) network.peerPool.connectedNodes.del(peer.remote)
for observer in network.peerPool.observers.values: # Note: we need to do this check as disconnect (and thus removePeer)
if not observer.onPeerDisconnected.isNil: # currently can get called before the dispatcher is initialized.
observer.onPeerDisconnected(peer) if not peer.dispatcher.isNil:
for observer in network.peerPool.observers.values:
if not observer.onPeerDisconnected.isNil:
if observer.protocol.isNil or peer.supports(observer.protocol):
observer.onPeerDisconnected(peer)
proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] = proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[void] =
var futures = newSeqOfCap[Future[void]](allProtocols.len) var futures = newSeqOfCap[Future[void]](allProtocols.len)