rename bitswap and move to stores
This commit is contained in:
parent
bd86433679
commit
163fcefb31
|
@ -14,18 +14,18 @@ import pkg/chronos
|
|||
import pkg/libp2p
|
||||
import pkg/libp2p/errors
|
||||
|
||||
import ./bitswap/protobuf/bitswap as pb
|
||||
import ./blockexc/protobuf/blockexc as pb
|
||||
import ./blocktype as bt
|
||||
import ./stores/blockstore
|
||||
import ./utils/asyncheapqueue
|
||||
|
||||
import ./bitswap/network
|
||||
import ./bitswap/engine
|
||||
import ./blockexc/network
|
||||
import ./blockexc/engine
|
||||
|
||||
export network, blockstore, asyncheapqueue, engine
|
||||
|
||||
logScope:
|
||||
topics = "dagger bitswap"
|
||||
topics = "dagger blockexc"
|
||||
|
||||
const
|
||||
DefaultTaskQueueSize = 100
|
||||
|
@ -33,58 +33,58 @@ const
|
|||
DefaultMaxRetries = 3
|
||||
|
||||
type
|
||||
Bitswap* = ref object of BlockStore
|
||||
engine*: BitswapEngine # bitswap decision engine
|
||||
taskQueue*: AsyncHeapQueue[BitswapPeerCtx] # peers we're currently processing tasks for
|
||||
bitswapTasks: seq[Future[void]] # future to control bitswap task
|
||||
bitswapRunning: bool # indicates if the bitswap task is running
|
||||
BlockExc* = ref object of BlockStore
|
||||
engine*: BlockExcEngine # blockexc decision engine
|
||||
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # peers we're currently processing tasks for
|
||||
blockexcTasks: seq[Future[void]] # future to control blockexc task
|
||||
blockexcRunning: bool # indicates if the blockexc task is running
|
||||
concurrentTasks: int # number of concurrent peers we're serving at any given time
|
||||
maxRetries: int # max number of tries for a failed block
|
||||
taskHandler: TaskHandler # handler provided by the engine called by the runner
|
||||
|
||||
proc bitswapTaskRunner(b: Bitswap) {.async.} =
|
||||
proc blockexcTaskRunner(b: BlockExc) {.async.} =
|
||||
## process tasks
|
||||
##
|
||||
|
||||
while b.bitswapRunning:
|
||||
while b.blockexcRunning:
|
||||
let peerCtx = await b.taskQueue.pop()
|
||||
asyncSpawn b.taskHandler(peerCtx)
|
||||
|
||||
trace "Exiting bitswap task runner"
|
||||
trace "Exiting blockexc task runner"
|
||||
|
||||
proc start*(b: Bitswap) {.async.} =
|
||||
## Start the bitswap task
|
||||
proc start*(b: BlockExc) {.async.} =
|
||||
## Start the blockexc task
|
||||
##
|
||||
|
||||
trace "bitswap start"
|
||||
trace "blockexc start"
|
||||
|
||||
if b.bitswapTasks.len > 0:
|
||||
warn "Starting bitswap twice"
|
||||
if b.blockexcTasks.len > 0:
|
||||
warn "Starting blockexc twice"
|
||||
return
|
||||
|
||||
b.bitswapRunning = true
|
||||
b.blockexcRunning = true
|
||||
for i in 0..<b.concurrentTasks:
|
||||
b.bitswapTasks.add(b.bitswapTaskRunner)
|
||||
b.blockexcTasks.add(b.blockexcTaskRunner)
|
||||
|
||||
proc stop*(b: Bitswap) {.async.} =
|
||||
## Stop the bitswap bitswap
|
||||
proc stop*(b: BlockExc) {.async.} =
|
||||
## Stop the blockexc blockexc
|
||||
##
|
||||
|
||||
trace "Bitswap stop"
|
||||
if b.bitswapTasks.len <= 0:
|
||||
warn "Stopping bitswap without starting it"
|
||||
trace "BlockExc stop"
|
||||
if b.blockexcTasks.len <= 0:
|
||||
warn "Stopping blockexc without starting it"
|
||||
return
|
||||
|
||||
b.bitswapRunning = false
|
||||
for t in b.bitswapTasks:
|
||||
b.blockexcRunning = false
|
||||
for t in b.blockexcTasks:
|
||||
if not t.finished:
|
||||
trace "Awaiting task to stop"
|
||||
t.cancel()
|
||||
trace "Task stopped"
|
||||
|
||||
trace "Bitswap stopped"
|
||||
trace "BlockExc stopped"
|
||||
|
||||
method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
|
||||
method getBlocks*(b: BlockExc, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
|
||||
## Get a block from a remote peer
|
||||
##
|
||||
|
||||
|
@ -95,42 +95,40 @@ method getBlocks*(b: Bitswap, cid: seq[Cid]): Future[seq[bt.Block]] {.async.} =
|
|||
it.read
|
||||
)
|
||||
|
||||
method putBlocks*(b: Bitswap, blocks: seq[bt.Block]) =
|
||||
method putBlocks*(b: BlockExc, blocks: seq[bt.Block]) =
|
||||
b.engine.resolveBlocks(blocks)
|
||||
|
||||
procCall BlockStore(b).putBlocks(blocks)
|
||||
|
||||
proc new*(
|
||||
T: type Bitswap,
|
||||
func new*(
|
||||
T: type BlockExc,
|
||||
localStore: BlockStore,
|
||||
wallet: WalletRef,
|
||||
network: BitswapNetwork,
|
||||
network: BlockExcNetwork,
|
||||
concurrentTasks = DefaultConcurrentTasks,
|
||||
maxRetries = DefaultMaxRetries,
|
||||
peersPerRequest = DefaultMaxPeersPerRequest): T =
|
||||
|
||||
let engine = BitswapEngine.new(
|
||||
let engine = BlockExcEngine.new(
|
||||
localStore = localStore,
|
||||
wallet = wallet,
|
||||
peersPerRequest = peersPerRequest,
|
||||
request = network.request,
|
||||
)
|
||||
|
||||
let b = Bitswap(
|
||||
let b = BlockExc(
|
||||
engine: engine,
|
||||
taskQueue: newAsyncHeapQueue[BitswapPeerCtx](DefaultTaskQueueSize),
|
||||
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
|
||||
concurrentTasks: concurrentTasks,
|
||||
maxRetries: maxRetries,
|
||||
)
|
||||
|
||||
# attach engine's task handler
|
||||
b.taskHandler = proc(task: BitswapPeerCtx):
|
||||
Future[void] {.gcsafe.} =
|
||||
b.taskHandler = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} =
|
||||
engine.taskHandler(task)
|
||||
|
||||
# attach task scheduler to engine
|
||||
engine.scheduleTask = proc(task: BitswapPeerCtx):
|
||||
bool {.gcsafe} =
|
||||
engine.scheduleTask = proc(task: BlockExcPeerCtx): bool {.gcsafe} =
|
||||
b.taskQueue.pushOrUpdateNoWait(task).isOk()
|
||||
|
||||
proc peerEventHandler(peerInfo: PeerInfo, event: PeerEvent) {.async.} =
|
||||
|
@ -167,7 +165,7 @@ proc new*(
|
|||
proc paymentHandler(peer: PeerId, payment: SignedState) =
|
||||
engine.paymentHandler(peer, payment)
|
||||
|
||||
network.handlers = BitswapHandlers(
|
||||
network.handlers = BlockExcHandlers(
|
||||
onWantList: blockWantListHandler,
|
||||
onBlocks: blocksHandler,
|
||||
onPresence: blockPresenceHandler,
|
||||
|
|
|
@ -54,7 +54,7 @@ method delBlocks*(s: MemoryStore, cids: seq[Cid]) =
|
|||
|
||||
procCall BlockStore(s).delBlocks(cids)
|
||||
|
||||
proc new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
|
||||
func new*(T: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
|
||||
MemoryStore(
|
||||
blocks: @blocks
|
||||
)
|
||||
|
|
|
@ -15,11 +15,11 @@ import pkg/chronicles
|
|||
import pkg/libp2p
|
||||
import pkg/libp2p/errors
|
||||
|
||||
import ./protobuf/bitswap as pb
|
||||
import ./protobuf/blockexc
|
||||
import ./protobuf/presence
|
||||
import ../blocktype as bt
|
||||
import ../stores/blockstore
|
||||
import ../utils/asyncheapqueue
|
||||
import ../blockstore
|
||||
import ../../utils/asyncheapqueue
|
||||
|
||||
import ./network
|
||||
import ./pendingblocks
|
||||
|
@ -29,24 +29,24 @@ import ./engine/payments
|
|||
export peercontext
|
||||
|
||||
logScope:
|
||||
topics = "dagger bitswap engine"
|
||||
topics = "dagger blockexc engine"
|
||||
|
||||
const
|
||||
DefaultTimeout* = 5.seconds
|
||||
DefaultMaxPeersPerRequest* = 10
|
||||
|
||||
type
|
||||
TaskHandler* = proc(task: BitswapPeerCtx): Future[void] {.gcsafe.}
|
||||
TaskScheduler* = proc(task: BitswapPeerCtx): bool {.gcsafe.}
|
||||
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
|
||||
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
|
||||
|
||||
BitswapEngine* = ref object of RootObj
|
||||
BlockExcEngine* = ref object of RootObj
|
||||
localStore*: BlockStore # where we localStore blocks for this instance
|
||||
peers*: seq[BitswapPeerCtx] # peers we're currently actively exchanging with
|
||||
peers*: seq[BlockExcPeerCtx] # peers we're currently actively exchanging with
|
||||
wantList*: seq[Cid] # local wants list
|
||||
pendingBlocks*: PendingBlocksManager # blocks we're awaiting to be resolved
|
||||
peersPerRequest: int # max number of peers to request from
|
||||
scheduleTask*: TaskScheduler # schedule a new task with the task runner
|
||||
request*: BitswapRequest # bitswap network requests
|
||||
request*: BlockExcRequest # block exchange network requests
|
||||
wallet*: WalletRef # nitro wallet for micropayments
|
||||
pricing*: ?Pricing # optional bandwidth pricing
|
||||
|
||||
|
@ -60,7 +60,7 @@ proc contains*(a: AsyncHeapQueue[Entry], b: Cid): bool =
|
|||
|
||||
a.anyIt( it.cid == b )
|
||||
|
||||
proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx =
|
||||
proc getPeerCtx*(b: BlockExcEngine, peerId: PeerID): BlockExcPeerCtx =
|
||||
## Get the peer's context
|
||||
##
|
||||
|
||||
|
@ -69,7 +69,7 @@ proc getPeerCtx*(b: BitswapEngine, peerId: PeerID): BitswapPeerCtx =
|
|||
return peer[0]
|
||||
|
||||
proc requestBlocks*(
|
||||
b: BitswapEngine,
|
||||
b: BlockExcEngine,
|
||||
cids: seq[Cid],
|
||||
timeout = DefaultTimeout): seq[Future[bt.Block]] =
|
||||
## Request a block from remotes
|
||||
|
@ -91,12 +91,11 @@ proc requestBlocks*(
|
|||
blocks.add(
|
||||
b.pendingBlocks.addOrAwait(c).wait(timeout))
|
||||
|
||||
|
||||
var peers = b.peers
|
||||
|
||||
# get the first peer with at least one (any)
|
||||
# matching cid
|
||||
var blockPeer: BitswapPeerCtx
|
||||
var blockPeer: BlockExcPeerCtx
|
||||
for i, p in peers:
|
||||
let has = cids.anyIt(
|
||||
it in p.peerHave
|
||||
|
@ -125,7 +124,7 @@ proc requestBlocks*(
|
|||
if peers.len == 0:
|
||||
return blocks # no peers to send wants to
|
||||
|
||||
template sendWants(ctx: BitswapPeerCtx) =
|
||||
template sendWants(ctx: BlockExcPeerCtx) =
|
||||
# just send wants
|
||||
b.request.sendWantList(
|
||||
ctx.id,
|
||||
|
@ -142,7 +141,7 @@ proc requestBlocks*(
|
|||
return blocks
|
||||
|
||||
proc blockPresenceHandler*(
|
||||
b: BitswapEngine,
|
||||
b: BlockExcEngine,
|
||||
peer: PeerID,
|
||||
blocks: seq[BlockPresence]) =
|
||||
## Handle block presence
|
||||
|
@ -156,7 +155,7 @@ proc blockPresenceHandler*(
|
|||
if presence =? Presence.init(blk):
|
||||
peerCtx.updatePresence(presence)
|
||||
|
||||
proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
|
||||
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
|
||||
trace "Schedule a task for new blocks"
|
||||
|
||||
let cids = blocks.mapIt( it.cid )
|
||||
|
@ -170,7 +169,7 @@ proc scheduleTasks(b: BitswapEngine, blocks: seq[bt.Block]) =
|
|||
trace "Unable to schedule task for peer", peer = p.id
|
||||
break # do next peer
|
||||
|
||||
proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) =
|
||||
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) =
|
||||
## Resolve pending blocks from the pending blocks manager
|
||||
## and schedule any new task to be ran
|
||||
##
|
||||
|
@ -179,8 +178,8 @@ proc resolveBlocks*(b: BitswapEngine, blocks: seq[bt.Block]) =
|
|||
b.pendingBlocks.resolve(blocks)
|
||||
b.scheduleTasks(blocks)
|
||||
|
||||
proc payForBlocks(engine: BitswapEngine,
|
||||
peer: BitswapPeerCtx,
|
||||
proc payForBlocks(engine: BlockExcEngine,
|
||||
peer: BlockExcPeerCtx,
|
||||
blocks: seq[bt.Block]) =
|
||||
let sendPayment = engine.request.sendPayment
|
||||
if sendPayment.isNil:
|
||||
|
@ -191,7 +190,7 @@ proc payForBlocks(engine: BitswapEngine,
|
|||
sendPayment(peer.id, payment)
|
||||
|
||||
proc blocksHandler*(
|
||||
b: BitswapEngine,
|
||||
b: BlockExcEngine,
|
||||
peer: PeerID,
|
||||
blocks: seq[bt.Block]) =
|
||||
## handle incoming blocks
|
||||
|
@ -206,7 +205,7 @@ proc blocksHandler*(
|
|||
b.payForBlocks(peerCtx, blocks)
|
||||
|
||||
proc wantListHandler*(
|
||||
b: BitswapEngine,
|
||||
b: BlockExcEngine,
|
||||
peer: PeerID,
|
||||
wantList: WantList) =
|
||||
## Handle incoming want lists
|
||||
|
@ -250,14 +249,14 @@ proc wantListHandler*(
|
|||
if not b.scheduleTask(peerCtx):
|
||||
trace "Unable to schedule task for peer", peer
|
||||
|
||||
proc accountHandler*(engine: BitswapEngine, peer: PeerID, account: Account) =
|
||||
proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) =
|
||||
let context = engine.getPeerCtx(peer)
|
||||
if context.isNil:
|
||||
return
|
||||
|
||||
context.account = account.some
|
||||
|
||||
proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState) =
|
||||
proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) =
|
||||
without context =? engine.getPeerCtx(peer).option and
|
||||
account =? context.account:
|
||||
return
|
||||
|
@ -268,14 +267,14 @@ proc paymentHandler*(engine: BitswapEngine, peer: PeerId, payment: SignedState)
|
|||
else:
|
||||
context.paymentChannel = engine.wallet.acceptChannel(payment).option
|
||||
|
||||
proc setupPeer*(b: BitswapEngine, peer: PeerID) =
|
||||
proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
|
||||
## Perform initial setup, such as want
|
||||
## list exchange
|
||||
##
|
||||
|
||||
trace "Setting up new peer", peer
|
||||
if peer notin b.peers:
|
||||
b.peers.add(BitswapPeerCtx(
|
||||
b.peers.add(BlockExcPeerCtx(
|
||||
id: peer
|
||||
))
|
||||
|
||||
|
@ -286,7 +285,7 @@ proc setupPeer*(b: BitswapEngine, peer: PeerID) =
|
|||
if address =? b.pricing.?address:
|
||||
b.request.sendAccount(peer, Account(address: address))
|
||||
|
||||
proc dropPeer*(b: BitswapEngine, peer: PeerID) =
|
||||
proc dropPeer*(b: BlockExcEngine, peer: PeerID) =
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
|
||||
|
@ -295,7 +294,7 @@ proc dropPeer*(b: BitswapEngine, peer: PeerID) =
|
|||
# drop the peer from the peers table
|
||||
b.peers.keepItIf( it.id != peer )
|
||||
|
||||
proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} =
|
||||
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
||||
trace "Handling task for peer", peer = task.id
|
||||
|
||||
var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max)
|
||||
|
@ -334,19 +333,19 @@ proc taskHandler*(b: BitswapEngine, task: BitswapPeerCtx) {.gcsafe, async.} =
|
|||
if wants.len > 0:
|
||||
b.request.sendPresence(task.id, wants)
|
||||
|
||||
proc new*(
|
||||
T: type BitswapEngine,
|
||||
func new*(
|
||||
T: type BlockExcEngine,
|
||||
localStore: BlockStore,
|
||||
wallet: WalletRef,
|
||||
request: BitswapRequest = BitswapRequest(),
|
||||
request: BlockExcRequest = BlockExcRequest(),
|
||||
scheduleTask: TaskScheduler = nil,
|
||||
peersPerRequest = DefaultMaxPeersPerRequest): T =
|
||||
|
||||
proc taskScheduler(task: BitswapPeerCtx): bool =
|
||||
proc taskScheduler(task: BlockExcPeerCtx): bool =
|
||||
if not isNil(scheduleTask):
|
||||
return scheduleTask(task)
|
||||
|
||||
let b = BitswapEngine(
|
||||
let b = BlockExcEngine(
|
||||
localStore: localStore,
|
||||
pendingBlocks: PendingBlocksManager.new(),
|
||||
peersPerRequest: peersPerRequest,
|
|
@ -18,7 +18,7 @@ func openLedgerChannel*(wallet: WalletRef,
|
|||
asset: EthAddress): ?!ChannelId =
|
||||
wallet.openLedgerChannel(hub, ChainId, asset, AmountPerChannel)
|
||||
|
||||
func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId =
|
||||
func getOrOpenChannel(wallet: WalletRef, peer: BlockExcPeerCtx): ?!ChannelId =
|
||||
if channel =? peer.paymentChannel:
|
||||
success channel
|
||||
elif account =? peer.account:
|
||||
|
@ -29,7 +29,7 @@ func getOrOpenChannel(wallet: WalletRef, peer: BitswapPeerCtx): ?!ChannelId =
|
|||
failure "no account set for peer"
|
||||
|
||||
func pay*(wallet: WalletRef,
|
||||
peer: BitswapPeerCtx,
|
||||
peer: BlockExcPeerCtx,
|
||||
amount: UInt256): ?!SignedState =
|
||||
if account =? peer.account:
|
||||
let asset = Asset
|
|
@ -17,7 +17,7 @@ import pkg/questionable
|
|||
import pkg/questionable/results
|
||||
|
||||
import ../blocktype as bt
|
||||
import ./protobuf/bitswap as pb
|
||||
import ./protobuf/blockexc
|
||||
import ./protobuf/payments
|
||||
import ./networkpeer
|
||||
|
||||
|
@ -25,9 +25,9 @@ export pb, networkpeer
|
|||
export payments
|
||||
|
||||
logScope:
|
||||
topics = "dagger bitswap network"
|
||||
topics = "dagger blockexc network"
|
||||
|
||||
const Codec* = "/ipfs/bitswap/1.2.0"
|
||||
const Codec* = "/dagger/blockexc/1.0.0"
|
||||
|
||||
type
|
||||
WantListHandler* = proc(peer: PeerID, wantList: WantList) {.gcsafe.}
|
||||
|
@ -36,7 +36,7 @@ type
|
|||
AccountHandler* = proc(peer: PeerID, account: Account) {.gcsafe.}
|
||||
PaymentHandler* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
|
||||
|
||||
BitswapHandlers* = object
|
||||
BlockExcHandlers* = object
|
||||
onWantList*: WantListHandler
|
||||
onBlocks*: BlocksHandler
|
||||
onPresence*: BlockPresenceHandler
|
||||
|
@ -57,22 +57,22 @@ type
|
|||
AccountBroadcaster* = proc(peer: PeerID, account: Account) {.gcsafe.}
|
||||
PaymentBroadcaster* = proc(peer: PeerID, payment: SignedState) {.gcsafe.}
|
||||
|
||||
BitswapRequest* = object
|
||||
BlockExcRequest* = object
|
||||
sendWantList*: WantListBroadcaster
|
||||
sendBlocks*: BlocksBroadcaster
|
||||
sendPresence*: PresenceBroadcaster
|
||||
sendAccount*: AccountBroadcaster
|
||||
sendPayment*: PaymentBroadcaster
|
||||
|
||||
BitswapNetwork* = ref object of LPProtocol
|
||||
BlockExcNetwork* = ref object of LPProtocol
|
||||
peers*: Table[PeerID, NetworkPeer]
|
||||
switch*: Switch
|
||||
handlers*: BitswapHandlers
|
||||
request*: BitswapRequest
|
||||
handlers*: BlockExcHandlers
|
||||
request*: BlockExcRequest
|
||||
getConn: ConnProvider
|
||||
|
||||
proc handleWantList(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
list: WantList) =
|
||||
## Handle incoming want list
|
||||
|
@ -104,7 +104,7 @@ proc makeWantList*(
|
|||
WantList(entries: entries, full: full)
|
||||
|
||||
proc broadcastWantList*(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
id: PeerID,
|
||||
cids: seq[Cid],
|
||||
priority: int32 = 0,
|
||||
|
@ -130,7 +130,7 @@ proc broadcastWantList*(
|
|||
asyncSpawn b.peers[id].send(Message(wantlist: wantList))
|
||||
|
||||
proc handleBlocks(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
blocks: seq[auto]) =
|
||||
## Handle incoming blocks
|
||||
|
@ -159,7 +159,6 @@ template makeBlocks*(
|
|||
seq[pb.Block] =
|
||||
var blks: seq[pb.Block]
|
||||
for blk in blocks:
|
||||
# for now only send bitswap `1.1.0`
|
||||
blks.add(pb.Block(
|
||||
prefix: blk.cid.data.buffer,
|
||||
data: blk.data
|
||||
|
@ -168,7 +167,7 @@ template makeBlocks*(
|
|||
blks
|
||||
|
||||
proc broadcastBlocks*(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
id: PeerID,
|
||||
blocks: seq[bt.Block]) =
|
||||
## Send blocks to remote
|
||||
|
@ -181,7 +180,7 @@ proc broadcastBlocks*(
|
|||
asyncSpawn b.peers[id].send(pb.Message(payload: makeBlocks(blocks)))
|
||||
|
||||
proc handleBlockPresence(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
presence: seq[BlockPresence]) =
|
||||
## Handle block presence
|
||||
|
@ -194,7 +193,7 @@ proc handleBlockPresence(
|
|||
b.handlers.onPresence(peer.id, presence)
|
||||
|
||||
proc broadcastBlockPresence*(
|
||||
b: BitswapNetwork,
|
||||
b: BlockExcNetwork,
|
||||
id: PeerID,
|
||||
presence: seq[BlockPresence]) =
|
||||
## Send presence to remote
|
||||
|
@ -206,14 +205,14 @@ proc broadcastBlockPresence*(
|
|||
trace "Sending presence to peer", peer = id
|
||||
asyncSpawn b.peers[id].send(Message(blockPresences: presence))
|
||||
|
||||
proc handleAccount(network: BitswapNetwork,
|
||||
proc handleAccount(network: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
account: Account) =
|
||||
if network.handlers.onAccount.isNil:
|
||||
return
|
||||
network.handlers.onAccount(peer.id, account)
|
||||
|
||||
proc broadcastAccount*(network: BitswapNetwork,
|
||||
proc broadcastAccount*(network: BlockExcNetwork,
|
||||
id: PeerId,
|
||||
account: Account) =
|
||||
if id notin network.peers:
|
||||
|
@ -222,7 +221,7 @@ proc broadcastAccount*(network: BitswapNetwork,
|
|||
let message = Message(account: AccountMessage.init(account))
|
||||
asyncSpawn network.peers[id].send(message)
|
||||
|
||||
proc broadcastPayment*(network: BitswapNetwork,
|
||||
proc broadcastPayment*(network: BlockExcNetwork,
|
||||
id: PeerId,
|
||||
payment: SignedState) =
|
||||
if id notin network.peers:
|
||||
|
@ -231,14 +230,14 @@ proc broadcastPayment*(network: BitswapNetwork,
|
|||
let message = Message(payment: StateChannelUpdate.init(payment))
|
||||
asyncSpawn network.peers[id].send(message)
|
||||
|
||||
proc handlePayment(network: BitswapNetwork,
|
||||
proc handlePayment(network: BlockExcNetwork,
|
||||
peer: NetworkPeer,
|
||||
payment: SignedState) =
|
||||
if network.handlers.onPayment.isNil:
|
||||
return
|
||||
network.handlers.onPayment(peer.id, payment)
|
||||
|
||||
proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
||||
proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
||||
try:
|
||||
if msg.wantlist.entries.len > 0:
|
||||
b.handleWantList(peer, msg.wantlist)
|
||||
|
@ -259,10 +258,10 @@ proc rpcHandler(b: BitswapNetwork, peer: NetworkPeer, msg: Message) {.async.} =
|
|||
b.handlePayment(peer, payment)
|
||||
|
||||
except CatchableError as exc:
|
||||
trace "Exception in bitswap rpc handler", exc = exc.msg
|
||||
trace "Exception in blockexc rpc handler", exc = exc.msg
|
||||
|
||||
proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
|
||||
## Creates or retrieves a BitswapNetwork Peer
|
||||
proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer =
|
||||
## Creates or retrieves a BlockExcNetwork Peer
|
||||
##
|
||||
|
||||
if peer in b.peers:
|
||||
|
@ -272,7 +271,7 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
|
|||
try:
|
||||
return await b.switch.dial(peer, Codec)
|
||||
except CatchableError as exc:
|
||||
trace "unable to connect to bitswap peer", exc = exc.msg
|
||||
trace "unable to connect to blockexc peer", exc = exc.msg
|
||||
|
||||
if not isNil(b.getConn):
|
||||
getConn = b.getConn
|
||||
|
@ -281,27 +280,27 @@ proc getOrCreatePeer(b: BitswapNetwork, peer: PeerID): NetworkPeer =
|
|||
b.rpcHandler(p, msg)
|
||||
|
||||
# create new pubsub peer
|
||||
let bitSwapPeer = NetworkPeer.new(peer, getConn, rpcHandler)
|
||||
debug "created new bitswap peer", peer
|
||||
let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler)
|
||||
debug "created new blockexc peer", peer
|
||||
|
||||
b.peers[peer] = bitSwapPeer
|
||||
b.peers[peer] = blockExcPeer
|
||||
|
||||
return bitSwapPeer
|
||||
return blockExcPeer
|
||||
|
||||
proc setupPeer*(b: BitswapNetwork, peer: PeerID) =
|
||||
proc setupPeer*(b: BlockExcNetwork, peer: PeerID) =
|
||||
## Perform initial setup, such as want
|
||||
## list exchange
|
||||
##
|
||||
|
||||
discard b.getOrCreatePeer(peer)
|
||||
|
||||
proc dropPeer*(b: BitswapNetwork, peer: PeerID) =
|
||||
proc dropPeer*(b: BlockExcNetwork, peer: PeerID) =
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
|
||||
b.peers.del(peer)
|
||||
|
||||
method init*(b: BitswapNetwork) =
|
||||
method init*(b: BlockExcNetwork) =
|
||||
## Perform protocol initialization
|
||||
##
|
||||
|
||||
|
@ -320,20 +319,20 @@ method init*(b: BitswapNetwork) =
|
|||
|
||||
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
||||
let peerId = conn.peerInfo.peerId
|
||||
let bitswapPeer = b.getOrCreatePeer(peerId)
|
||||
await bitswapPeer.readLoop(conn) # attach read loop
|
||||
let blockexcPeer = b.getOrCreatePeer(peerId)
|
||||
await blockexcPeer.readLoop(conn) # attach read loop
|
||||
|
||||
b.handler = handle
|
||||
b.codec = Codec
|
||||
|
||||
proc new*(
|
||||
T: type BitswapNetwork,
|
||||
func new*(
|
||||
T: type BlockExcNetwork,
|
||||
switch: Switch,
|
||||
connProvider: ConnProvider = nil): T =
|
||||
## Create a new BitswapNetwork instance
|
||||
## Create a new BlockExcNetwork instance
|
||||
##
|
||||
|
||||
let b = BitswapNetwork(
|
||||
let b = BlockExcNetwork(
|
||||
switch: switch,
|
||||
getConn: connProvider)
|
||||
|
||||
|
@ -361,7 +360,7 @@ proc new*(
|
|||
proc sendPayment(id: PeerID, payment: SignedState) =
|
||||
b.broadcastPayment(id, payment)
|
||||
|
||||
b.request = BitswapRequest(
|
||||
b.request = BlockExcRequest(
|
||||
sendWantList: sendWantList,
|
||||
sendBlocks: sendBlocks,
|
||||
sendPresence: sendPresence,
|
|
@ -12,10 +12,10 @@ import pkg/chronicles
|
|||
import pkg/protobuf_serialization
|
||||
import pkg/libp2p
|
||||
|
||||
import ./protobuf/bitswap
|
||||
import ./protobuf/blockexc
|
||||
|
||||
logScope:
|
||||
topics = "dagger bitswap networkpeer"
|
||||
topics = "dagger blockexc networkpeer"
|
||||
|
||||
const MaxMessageSize = 8 * 1024 * 1024
|
||||
|
||||
|
@ -43,7 +43,7 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
|
|||
trace "Got message for peer", peer = b.id, msg
|
||||
await b.handler(b, msg)
|
||||
except CatchableError as exc:
|
||||
trace "Exception in bitswap read loop", exc = exc.msg
|
||||
trace "Exception in blockexc read loop", exc = exc.msg
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
|
@ -65,7 +65,7 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
|
|||
trace "Sending message to remote", peer = b.id, msg = $msg
|
||||
await conn.writeLp(Protobuf.encode(msg))
|
||||
|
||||
proc new*(
|
||||
func new*(
|
||||
T: type NetworkPeer,
|
||||
peer: PeerId,
|
||||
connProvider: ConnProvider,
|
|
@ -4,7 +4,7 @@ import pkg/libp2p
|
|||
import pkg/chronos
|
||||
import pkg/nitro
|
||||
import pkg/questionable
|
||||
import ./protobuf/bitswap
|
||||
import ./protobuf/blockexc
|
||||
import ./protobuf/payments
|
||||
import ./protobuf/presence
|
||||
|
||||
|
@ -12,7 +12,7 @@ export payments
|
|||
export nitro
|
||||
|
||||
type
|
||||
BitswapPeerCtx* = ref object of RootObj
|
||||
BlockExcPeerCtx* = ref object of RootObj
|
||||
id*: PeerID
|
||||
peerPrices*: Table[Cid, UInt256] # remote peer have list including price
|
||||
peerWants*: seq[Entry] # remote peers want lists
|
||||
|
@ -21,16 +21,16 @@ type
|
|||
account*: ?Account # ethereum account of this peer
|
||||
paymentChannel*: ?ChannelId # payment channel id
|
||||
|
||||
proc peerHave*(context: BitswapPeerCtx): seq[Cid] =
|
||||
proc peerHave*(context: BlockExcPeerCtx): seq[Cid] =
|
||||
toSeq(context.peerPrices.keys)
|
||||
|
||||
proc contains*(a: openArray[BitswapPeerCtx], b: PeerID): bool =
|
||||
proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool =
|
||||
## Convenience method to check for peer prepense
|
||||
##
|
||||
|
||||
a.anyIt( it.id == b )
|
||||
|
||||
func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
|
||||
func updatePresence*(context: BlockExcPeerCtx, presence: Presence) =
|
||||
let cid = presence.cid
|
||||
let price = presence.price
|
||||
|
||||
|
@ -39,7 +39,7 @@ func updatePresence*(context: BitswapPeerCtx, presence: Presence) =
|
|||
elif cid in context.peerHave and not presence.have:
|
||||
context.peerPrices.del(cid)
|
||||
|
||||
func price*(context: BitswapPeerCtx, cids: seq[Cid]): UInt256 =
|
||||
func price*(context: BlockExcPeerCtx, cids: seq[Cid]): UInt256 =
|
||||
for cid in cids:
|
||||
if price =? context.peerPrices.?[cid]:
|
||||
result += price
|
|
@ -16,7 +16,7 @@ import pkg/libp2p
|
|||
import ../blocktype
|
||||
|
||||
logScope:
|
||||
topics = "dagger bitswap pendingblocks"
|
||||
topics = "dagger blockexc pendingblocks"
|
||||
|
||||
type
|
||||
PendingBlocksManager* = ref object of RootObj
|
||||
|
@ -67,7 +67,7 @@ proc contains*(
|
|||
p: PendingBlocksManager,
|
||||
cid: Cid): bool = p.pending(cid)
|
||||
|
||||
proc new*(T: type PendingBlocksManager): T =
|
||||
func new*(T: type PendingBlocksManager): T =
|
||||
T(
|
||||
blocks: initTable[Cid, Future[Block]]()
|
||||
)
|
|
@ -1,6 +1,6 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package bitswap.message.pb;
|
||||
package blockexc.message.pb;
|
||||
|
||||
message Message {
|
||||
|
||||
|
@ -11,7 +11,7 @@ message Message {
|
|||
}
|
||||
|
||||
message Entry {
|
||||
bytes block = 1; // the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0)
|
||||
bytes block = 1; // the block cid
|
||||
int32 priority = 2; // the priority (normalized). default to 1
|
||||
bool cancel = 3; // whether this revokes an entry
|
||||
WantType wantType = 4; // Note: defaults to enum 0, ie Block
|
||||
|
@ -47,10 +47,9 @@ message Message {
|
|||
}
|
||||
|
||||
Wantlist wantlist = 1;
|
||||
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
|
||||
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
|
||||
repeated BlockPresence blockPresences = 4;
|
||||
int32 pendingBytes = 5;
|
||||
AccountMessage account = 6;
|
||||
StateChannelUpdate payment = 7;
|
||||
repeated Block payload = 2;
|
||||
repeated BlockPresence blockPresences = 3;
|
||||
int32 pendingBytes = 4;
|
||||
AccountMessage account = 5;
|
||||
StateChannelUpdate payment = 6;
|
||||
}
|
|
@ -4,7 +4,7 @@ import pkg/stint
|
|||
import pkg/nitro
|
||||
import pkg/questionable
|
||||
import pkg/upraises
|
||||
import ./bitswap
|
||||
import ./blockexc
|
||||
|
||||
export AccountMessage
|
||||
export StateChannelUpdate
|
|
@ -3,7 +3,7 @@ import pkg/stint
|
|||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
import ./bitswap
|
||||
import ./blockexc
|
||||
|
||||
export questionable
|
||||
export stint
|
||||
|
@ -12,7 +12,7 @@ export BlockPresenceType
|
|||
upraises.push: {.upraises: [].}
|
||||
|
||||
type
|
||||
PresenceMessage* = bitswap.BlockPresence
|
||||
PresenceMessage* = blockexc.BlockPresence
|
||||
Presence* = object
|
||||
cid*: Cid
|
||||
have*: bool
|
Loading…
Reference in New Issue