mirror of
https://github.com/logos-storage/codex-factory.git
synced 2026-01-11 01:13:08 +00:00
feat: specify workers count (#111)
This commit is contained in:
parent
00a6f38585
commit
81ed316b38
@ -19,6 +19,7 @@ export const ENV_ENV_PREFIX_KEY = 'FACTORY_ENV_PREFIX'
|
||||
const ENV_IMAGE_PREFIX_KEY = 'FACTORY_IMAGE_PREFIX'
|
||||
const ENV_REPO_KEY = 'FACTORY_DOCKER_REPO'
|
||||
const ENV_DETACH_KEY = 'FACTORY_DETACH'
|
||||
const ENV_WORKERS_KEY = 'FACTORY_WORKERS'
|
||||
const ENV_FRESH_KEY = 'FACTORY_FRESH'
|
||||
|
||||
export class Start extends RootCommand implements LeafCommand {
|
||||
@ -46,6 +47,16 @@ export class Start extends RootCommand implements LeafCommand {
|
||||
})
|
||||
public detach!: boolean
|
||||
|
||||
@Option({
|
||||
key: 'workers',
|
||||
alias: 'w',
|
||||
type: 'number',
|
||||
description: `Number of workers to spin. Value between 0 and ${WORKER_COUNT} including.`,
|
||||
envKey: ENV_WORKERS_KEY,
|
||||
default: WORKER_COUNT,
|
||||
})
|
||||
public workers!: number
|
||||
|
||||
@Option({
|
||||
key: 'repo',
|
||||
type: 'string',
|
||||
@ -79,6 +90,10 @@ export class Start extends RootCommand implements LeafCommand {
|
||||
public async run(): Promise<void> {
|
||||
await super.init()
|
||||
|
||||
if (this.workers < 0 || this.workers > WORKER_COUNT) {
|
||||
throw new Error(`Worker count has to be between 0 and ${WORKER_COUNT} including.`)
|
||||
}
|
||||
|
||||
if (!this.beeVersion) {
|
||||
this.beeVersion = await findBeeVersion()
|
||||
this.console.log('Bee version not specified. Found it configured externally.')
|
||||
@ -169,25 +184,27 @@ export class Start extends RootCommand implements LeafCommand {
|
||||
throw e
|
||||
}
|
||||
|
||||
const workerSpinner = ora({
|
||||
text: 'Starting worker Bee nodes...',
|
||||
spinner: 'point',
|
||||
color: 'yellow',
|
||||
isSilent: this.verbosity === VerbosityLevel.Quiet,
|
||||
}).start()
|
||||
if (this.workers > 0) {
|
||||
const workerSpinner = ora({
|
||||
text: 'Starting worker Bee nodes...',
|
||||
spinner: 'point',
|
||||
color: 'yellow',
|
||||
isSilent: this.verbosity === VerbosityLevel.Quiet,
|
||||
}).start()
|
||||
|
||||
try {
|
||||
for (let i = 1; i <= WORKER_COUNT; i++) {
|
||||
await docker.startWorkerNode(this.beeVersion, i, queenAddress, dockerOptions)
|
||||
try {
|
||||
for (let i = 1; i <= this.workers; i++) {
|
||||
await docker.startWorkerNode(this.beeVersion, i, queenAddress, dockerOptions)
|
||||
}
|
||||
|
||||
workerSpinner.text = 'Waiting until all workers connect to queen...'
|
||||
await waitForWorkers(this.workers, docker.getAllStatus.bind(docker))
|
||||
workerSpinner.succeed('Worker nodes are up and listening')
|
||||
} catch (e) {
|
||||
workerSpinner.fail(`It was not possible to start worker nodes!`)
|
||||
await this.stopDocker(docker)
|
||||
throw e
|
||||
}
|
||||
|
||||
workerSpinner.text = 'Waiting until all workers connect to queen...'
|
||||
await waitForWorkers(async () => Object.values(await docker.getAllStatus()).every(node => node === 'running'))
|
||||
workerSpinner.succeed('Worker nodes are up and listening')
|
||||
} catch (e) {
|
||||
workerSpinner.fail(`It was not possible to start worker nodes!`)
|
||||
await this.stopDocker(docker)
|
||||
throw e
|
||||
}
|
||||
|
||||
if (!this.detach) {
|
||||
|
||||
@ -2,7 +2,7 @@ import fetch, { FetchError } from 'node-fetch'
|
||||
import { sleep } from './index'
|
||||
import { TimeoutError } from './error'
|
||||
import { BeeDebug } from '@ethersphere/bee-js'
|
||||
import { WORKER_COUNT } from './docker'
|
||||
import { AllStatus } from './docker'
|
||||
|
||||
const AWAIT_SLEEP = 3_000
|
||||
|
||||
@ -80,20 +80,24 @@ export async function waitForQueen(verifyQueenIsUp: () => Promise<boolean>, wait
|
||||
}
|
||||
|
||||
export async function waitForWorkers(
|
||||
verifyWorkersAreUp: () => Promise<boolean>,
|
||||
workerCount: number,
|
||||
getStatus: () => Promise<AllStatus>,
|
||||
waitingIterations = 120,
|
||||
): Promise<void> {
|
||||
const beeDebug = new BeeDebug('http://127.0.0.1:1635')
|
||||
|
||||
const status = await getStatus()
|
||||
for (let i = 1; i <= workerCount; i++) {
|
||||
if (status[`worker${i}` as keyof AllStatus] !== 'running') {
|
||||
throw new Error('Some of the workers node is not running!')
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = 0; i < waitingIterations; i++) {
|
||||
try {
|
||||
if (!(await verifyWorkersAreUp())) {
|
||||
throw new Error('Some of the workers node is not running!')
|
||||
}
|
||||
|
||||
const peers = await beeDebug.getPeers()
|
||||
|
||||
if (peers.length >= WORKER_COUNT) {
|
||||
if (peers.length >= workerCount) {
|
||||
return
|
||||
}
|
||||
} catch (e) {
|
||||
|
||||
@ -64,6 +64,29 @@ describe('start command', () => {
|
||||
}),
|
||||
)
|
||||
|
||||
describe('should start cluster with just few workers', () => {
|
||||
beforeAll(async () => {
|
||||
await run(['stop', '--rm']) // Cleanup the testing containers
|
||||
})
|
||||
|
||||
it(
|
||||
'',
|
||||
wrapper(async () => {
|
||||
// As spinning the cluster with --detach the command will exit once the cluster is up and running
|
||||
await run(['start', '--workers', '2'])
|
||||
|
||||
await expect(findContainer(docker, 'queen')).resolves.toBeDefined()
|
||||
await expect(findContainer(docker, 'blockchain')).resolves.toBeDefined()
|
||||
await expect(findContainer(docker, 'worker-1')).resolves.toBeDefined()
|
||||
await expect(findContainer(docker, 'worker-2')).resolves.toBeDefined()
|
||||
await expect(findContainer(docker, 'worker-3')).rejects.toHaveProperty('statusCode', 404)
|
||||
await expect(findContainer(docker, 'worker-4')).rejects.toHaveProperty('statusCode', 404)
|
||||
|
||||
await expect(beeDebug.getHealth()).resolves.toHaveProperty('status')
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
describe('should create docker network', () => {
|
||||
beforeAll(async () => {
|
||||
await run(['stop', '--rm']) // Cleanup the testing containers
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user