Implements KubernetesWorkflow assembly.

This commit is contained in:
benbierens 2023-04-12 15:11:36 +02:00
parent 7b91c83f5b
commit 2bcf512737
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
12 changed files with 516 additions and 68 deletions

View File

@ -341,44 +341,6 @@ namespace CodexDistTestCore
#endregion
#region Namespace management
private void EnsureTestNamespace()
{
if (IsTestNamespaceOnline()) return;
var namespaceSpec = new V1Namespace
{
ApiVersion = "v1",
Metadata = new V1ObjectMeta
{
Name = K8sNamespace,
Labels = new Dictionary<string, string> { { "name", K8sNamespace } }
}
};
client.CreateNamespace(namespaceSpec);
}
private void DeleteNamespace()
{
if (IsTestNamespaceOnline())
{
client.DeleteNamespace(K8sNamespace, null, null, gracePeriodSeconds: 0);
}
}
private string K8sNamespace
{
get { return K8sCluster.K8sNamespace; }
}
#endregion
private bool IsTestNamespaceOnline()
{
return client.ListNamespace().Items.Any(n => n.Metadata.Name == K8sNamespace);
}
private class CommandRunner
{
private readonly Kubernetes client;

View File

@ -2,16 +2,17 @@
{
public class ContainerRecipe
{
public ContainerRecipe(string name, string image, Port[] exposedPorts, Port[] internalPorts, EnvVar[] envVars)
public ContainerRecipe(int number, string image, Port[] exposedPorts, Port[] internalPorts, EnvVar[] envVars)
{
Name = name;
Number = number;
Image = image;
ExposedPorts = exposedPorts;
InternalPorts = internalPorts;
EnvVars = envVars;
}
public string Name { get; }
public string Name { get { return $"ctnr{Number}"; } }
public int Number { get; }
public string Image { get; }
public Port[] ExposedPorts { get; }
public Port[] InternalPorts { get; }

View File

@ -14,9 +14,7 @@
Initialize(config);
var name = $"ctnr{containerNumber}";
return new ContainerRecipe(name, Image, exposedPorts.ToArray(), internalPorts.ToArray(), envVars.ToArray());
return new ContainerRecipe(containerNumber, Image, exposedPorts.ToArray(), internalPorts.ToArray(), envVars.ToArray());
}
protected abstract string Image { get; }

View File

@ -36,5 +36,16 @@ namespace KubernetesWorkflow
if (location == Location.Unspecified) return string.Empty;
return K8sNodeLocationMap[location];
}
// make configurable from test env!
public TimeSpan K8sOperationTimeout()
{
return TimeSpan.FromMinutes(5);
}
public TimeSpan WaitForK8sServiceDelay()
{
return TimeSpan.FromSeconds(5);
}
}
}

View File

@ -1,25 +1,410 @@
namespace KubernetesWorkflow
using k8s;
using k8s.Models;
namespace KubernetesWorkflow
{
public class K8sController
{
private readonly K8sCluster cluster;
private readonly KnownK8sPods knownPods;
private readonly WorkflowNumberSource workflowNumberSource;
private readonly Kubernetes client;
public K8sController(K8sCluster cluster)
public K8sController(K8sCluster cluster, KnownK8sPods knownPods, WorkflowNumberSource workflowNumberSource)
{
this.cluster = cluster;
this.knownPods = knownPods;
this.workflowNumberSource = workflowNumberSource;
client = new Kubernetes(cluster.GetK8sClientConfig());
}
public RunningPod BringOnline(ContainerRecipe[] containerRecipes)
public void Dispose()
{
// Ensure namespace
// create deployment
// create service if necessary
// wait until deployment online
// fetch pod info
client.Dispose();
}
public RunningPod BringOnline(ContainerRecipe[] containerRecipes, Location location)
{
EnsureTestNamespace();
// for each container, there is now an array of service ports available.
CreateDeployment(containerRecipes, location);
var servicePortsMap = CreateService(containerRecipes);
var (podName, podIp) = FetchNewPod();
return null!;
return new RunningPod(cluster, podName, podIp, servicePortsMap);
}
public void DeleteAllResources()
{
DeleteNamespace();
WaitUntilNamespaceDeleted();
}
#region Namespace management
private void EnsureTestNamespace()
{
if (IsTestNamespaceOnline()) return;
var namespaceSpec = new V1Namespace
{
ApiVersion = "v1",
Metadata = new V1ObjectMeta
{
Name = K8sNamespace,
Labels = new Dictionary<string, string> { { "name", K8sNamespace } }
}
};
client.CreateNamespace(namespaceSpec);
WaitUntilNamespaceCreated();
}
private void DeleteNamespace()
{
if (IsTestNamespaceOnline())
{
client.DeleteNamespace(K8sNamespace, null, null, gracePeriodSeconds: 0);
}
}
private string K8sNamespace
{
get { return K8sCluster.K8sNamespace; }
}
private bool IsTestNamespaceOnline()
{
return client.ListNamespace().Items.Any(n => n.Metadata.Name == K8sNamespace);
}
#endregion
#region Deployment management
private void CreateDeployment(ContainerRecipe[] containerRecipes, Location location)
{
var deploymentSpec = new V1Deployment
{
ApiVersion = "apps/v1",
Metadata = CreateDeploymentMetadata(),
Spec = new V1DeploymentSpec
{
Replicas = 1,
Selector = new V1LabelSelector
{
MatchLabels = GetSelector()
},
Template = new V1PodTemplateSpec
{
Metadata = new V1ObjectMeta
{
Labels = GetSelector()
},
Spec = new V1PodSpec
{
NodeSelector = CreateNodeSelector(location),
Containers = CreateDeploymentContainers(containerRecipes)
}
}
}
};
client.CreateNamespacedDeployment(deploymentSpec, K8sNamespace);
WaitUntilDeploymentCreated(deploymentSpec);
}
private IDictionary<string, string> CreateNodeSelector(Location location)
{
if (location == Location.Unspecified) return new Dictionary<string, string>();
return new Dictionary<string, string>
{
{ "codex-test-location", cluster.GetNodeLabelForLocation(location) }
};
}
private IDictionary<string, string> GetSelector()
{
return new Dictionary<string, string> { { "codex-test-node", "dist-test-" + workflowNumberSource.WorkflowNumber } };
}
private V1ObjectMeta CreateDeploymentMetadata()
{
return new V1ObjectMeta
{
Name = "deploy-" + workflowNumberSource.WorkflowNumber,
NamespaceProperty = K8sCluster.K8sNamespace
};
}
private List<V1Container> CreateDeploymentContainers(ContainerRecipe[] containerRecipes)
{
return containerRecipes.Select(r => CreateDeploymentContainer(r)).ToList();
}
private V1Container CreateDeploymentContainer(ContainerRecipe recipe)
{
return new V1Container
{
Name = recipe.Name,
Image = recipe.Image,
Ports = CreateContainerPorts(recipe),
Env = CreateEnv(recipe)
};
}
private List<V1EnvVar> CreateEnv(ContainerRecipe recipe)
{
return recipe.EnvVars.Select(CreateEnvVar).ToList();
}
private V1EnvVar CreateEnvVar(EnvVar envVar)
{
return new V1EnvVar
{
Name = envVar.Name,
Value = envVar.Value,
};
}
private List<V1ContainerPort> CreateContainerPorts(ContainerRecipe recipe)
{
var exposedPorts = recipe.ExposedPorts.Select(p => CreateContainerPort(recipe, p));
var internalPorts = recipe.InternalPorts.Select(p => CreateContainerPort(recipe, p));
return exposedPorts.Concat(internalPorts).ToList();
}
private V1ContainerPort CreateContainerPort(ContainerRecipe recipe, Port port)
{
return new V1ContainerPort
{
Name = GetNameForPort(recipe, port),
ContainerPort = port.Number
};
}
private string GetNameForPort(ContainerRecipe recipe, Port port)
{
return $"P{workflowNumberSource.WorkflowNumber}-{recipe.Number}-{port.Number}";
}
//private void DeleteDeployment(CodexNodeGroup group)
//{
// if (group.Deployment == null) return;
// client.DeleteNamespacedDeployment(group.Deployment.Name(), K8sNamespace);
// group.Deployment = null;
//}
//private void CreatePrometheusDeployment(K8sPrometheusSpecs spec)
//{
// client.CreateNamespacedDeployment(spec.CreatePrometheusDeployment(), K8sNamespace);
//}
//private void CreateGethBootstrapDeployment(K8sGethBoostrapSpecs spec)
//{
// client.CreateNamespacedDeployment(spec.CreateGethBootstrapDeployment(), K8sNamespace);
//}
//private void CreateGethCompanionDeployment(GethBootstrapInfo info, GethCompanionGroup group)
//{
// client.CreateNamespacedDeployment(info.Spec.CreateGethCompanionDeployment(group, info), K8sNamespace);
//}
#endregion
#region Service management
private Dictionary<ContainerRecipe, Port[]> CreateService(ContainerRecipe[] containerRecipes)
{
var result = new Dictionary<ContainerRecipe, Port[]>();
var ports = CreateServicePorts(result, containerRecipes);
if (!ports.Any())
{
// None of these container-recipes wish to expose anything via a serice port.
// So, we don't have to create a service.
return result;
}
var serviceSpec = new V1Service
{
ApiVersion = "v1",
Metadata = CreateServiceMetadata(),
Spec = new V1ServiceSpec
{
Type = "NodePort",
Selector = GetSelector(),
Ports = ports
}
};
client.CreateNamespacedService(serviceSpec, K8sNamespace);
return result;
}
private V1ObjectMeta CreateServiceMetadata()
{
return new V1ObjectMeta
{
Name = "deploy-" + workflowNumberSource.WorkflowNumber,
NamespaceProperty = K8sCluster.K8sNamespace
};
}
private List<V1ServicePort> CreateServicePorts(Dictionary<ContainerRecipe, Port[]> servicePorts, ContainerRecipe[] recipes)
{
var result = new List<V1ServicePort>();
foreach (var recipe in recipes)
{
result.AddRange(CreateServicePorts(servicePorts, recipe));
}
return result;
}
private List<V1ServicePort> CreateServicePorts(Dictionary<ContainerRecipe, Port[]> servicePorts, ContainerRecipe recipe)
{
var result = new List<V1ServicePort>();
var usedPorts = new List<Port>();
foreach (var port in recipe.ExposedPorts)
{
var servicePort = workflowNumberSource.GetServicePort();
usedPorts.Add(new Port(servicePort));
result.Add(new V1ServicePort
{
Name = GetNameForPort(recipe, port),
Protocol = "TCP",
Port = port.Number,
TargetPort = GetNameForPort(recipe, port),
NodePort = servicePort
});
}
servicePorts.Add(recipe, usedPorts.ToArray());
return result;
}
//private void DeleteService(CodexNodeGroup online)
//{
// if (online.Service == null) return;
// client.DeleteNamespacedService(online.Service.Name(), K8sNamespace);
// online.Service = null;
//}
//private void CreatePrometheusService(K8sPrometheusSpecs spec)
//{
// client.CreateNamespacedService(spec.CreatePrometheusService(), K8sNamespace);
//}
//private void CreateGethBootstrapService(K8sGethBoostrapSpecs spec)
//{
// client.CreateNamespacedService(spec.CreateGethBootstrapService(), K8sNamespace);
//}
#endregion
#region Waiting
//private void WaitUntilOnline(CodexNodeGroup online)
//{
// WaitUntil(() =>
// {
// online.Deployment = client.ReadNamespacedDeployment(online.Deployment.Name(), K8sNamespace);
// return online.Deployment?.Status.AvailableReplicas != null && online.Deployment.Status.AvailableReplicas > 0;
// });
//}
//private void WaitUntilOffline(string deploymentName)
//{
// WaitUntil(() =>
// {
// var deployment = client.ReadNamespacedDeployment(deploymentName, K8sNamespace);
// return deployment == null || deployment.Status.AvailableReplicas == 0;
// });
//}
//private void WaitUntilZeroPods()
//{
// WaitUntil(() => !client.ListNamespacedPod(K8sNamespace).Items.Any());
//}
private void WaitUntilNamespaceCreated()
{
WaitUntil(() => IsTestNamespaceOnline());
}
private void WaitUntilNamespaceDeleted()
{
WaitUntil(() => !IsTestNamespaceOnline());
}
//private void WaitUntilPrometheusOnline(K8sPrometheusSpecs spec)
//{
// WaitUntilDeploymentOnline(spec.GetDeploymentName());
//}
//private void WaitUntilGethBootstrapOnline(K8sGethBoostrapSpecs spec)
//{
// WaitUntilDeploymentOnline(spec.GetBootstrapDeploymentName());
//}
//private void WaitUntilGethCompanionGroupOnline(K8sGethBoostrapSpecs spec, GethCompanionGroup group)
//{
// WaitUntilDeploymentOnline(spec.GetCompanionDeploymentName(group));
//}
private void WaitUntilDeploymentCreated(V1Deployment deploymentSpec)
{
WaitUntilDeploymentOnline(deploymentSpec.Metadata.Name);
}
private void WaitUntilDeploymentOnline(string deploymentName)
{
WaitUntil(() =>
{
var deployment = client.ReadNamespacedDeployment(deploymentName, K8sNamespace);
return deployment?.Status.AvailableReplicas != null && deployment.Status.AvailableReplicas > 0;
});
}
private void WaitUntil(Func<bool> predicate)
{
var start = DateTime.UtcNow;
var state = predicate();
while (!state)
{
if (DateTime.UtcNow - start > cluster.K8sOperationTimeout())
{
throw new TimeoutException("K8s operation timed out.");
}
cluster.WaitForK8sServiceDelay();
state = predicate();
}
}
#endregion
private (string, string) FetchNewPod()
{
var pods = client.ListNamespacedPod(K8sNamespace).Items;
var newPods = pods.Where(p => !knownPods.Contains(p.Name())).ToArray();
if (newPods.Length != 1) throw new InvalidOperationException("Expected only 1 pod to be created. Test infra failure.");
var newPod = newPods.Single();
var name = newPod.Name();
var ip = newPod.Status.PodIP;
if (string.IsNullOrEmpty(name)) throw new InvalidOperationException("Invalid pod name received. Test infra failure.");
if (string.IsNullOrEmpty(ip)) throw new InvalidOperationException("Invalid pod IP received. Test infra failure.");
knownPods.Add(name);
return (name, ip);
}
}
}

View File

@ -0,0 +1,17 @@
namespace KubernetesWorkflow
{
public class KnownK8sPods
{
private readonly List<string> knownActivePodNames = new List<string>();
public bool Contains(string name)
{
return knownActivePodNames.Contains(name);
}
public void Add(string name)
{
knownActivePodNames.Add(name);
}
}
}

View File

@ -11,4 +11,8 @@
<PackageReference Include="KubernetesClient" Version="10.1.4" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Utils\Utils.csproj" />
</ItemGroup>
</Project>

View File

@ -1,4 +1,5 @@
using System.Globalization;
using Utils;
namespace KubernetesWorkflow
{

View File

@ -2,23 +2,36 @@
{
public class StartupWorkflow
{
private readonly NumberSource containerNumberSource;
private readonly K8sController k8SController;
private readonly WorkflowNumberSource numberSource;
private readonly K8sCluster cluster;
private readonly KnownK8sPods knownK8SPods;
private readonly RecipeComponentFactory componentFactory = new RecipeComponentFactory();
public StartupWorkflow(NumberSource containerNumberSource, K8sController k8SController)
internal StartupWorkflow(WorkflowNumberSource numberSource, K8sCluster cluster, KnownK8sPods knownK8SPods)
{
this.containerNumberSource = containerNumberSource;
this.k8SController = k8SController;
this.numberSource = numberSource;
this.cluster = cluster;
this.knownK8SPods = knownK8SPods;
}
public RunningContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
public RunningContainers Start(int numberOfContainers, Location location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig)
{
var recipes = CreateRecipes(numberOfContainers, recipeFactory, startupConfig);
return K8s(controller =>
{
var recipes = CreateRecipes(numberOfContainers, recipeFactory, startupConfig);
var runningPod = k8SController.BringOnline(recipes);
var runningPod = controller.BringOnline(recipes, location);
return new RunningContainers(startupConfig, runningPod, CreateContainers(runningPod, recipes));
return new RunningContainers(startupConfig, runningPod, CreateContainers(runningPod, recipes));
});
}
public void DeleteAllResources()
{
K8s(controller =>
{
controller.DeleteAllResources();
});
}
private static RunningContainer[] CreateContainers(RunningPod runningPod, ContainerRecipe[] recipes)
@ -31,10 +44,26 @@
var result = new List<ContainerRecipe>();
for (var i = 0; i < numberOfContainers; i++)
{
result.Add(recipeFactory.CreateRecipe(containerNumberSource.GetNextNumber(), componentFactory, startupConfig));
result.Add(recipeFactory.CreateRecipe(numberSource.GetContainerNumber(), componentFactory, startupConfig));
}
return result.ToArray();
}
private void K8s(Action<K8sController> action)
{
var controller = new K8sController(cluster, knownK8SPods, numberSource);
action(controller);
controller.Dispose();
}
private T K8s<T>(Func<K8sController, T> action)
{
var controller = new K8sController(cluster, knownK8SPods, numberSource);
var result = action(controller);
controller.Dispose();
return result;
}
}
}

View File

@ -1,13 +1,19 @@
namespace KubernetesWorkflow
using Utils;
namespace KubernetesWorkflow
{
public class WorkflowCreator
{
private readonly NumberSource containerNumberSource = new NumberSource(0);
private readonly K8sController controller = new K8sController(new K8sCluster());
private readonly NumberSource numberSource = new NumberSource(0);
private readonly NumberSource servicePortNumberSource = new NumberSource(30001);
private readonly K8sCluster cluster = new K8sCluster();
private readonly KnownK8sPods knownPods = new KnownK8sPods();
public StartupWorkflow CreateWorkflow()
{
return new StartupWorkflow(containerNumberSource, controller);
var workflowNumberSource = new WorkflowNumberSource(numberSource.GetNextNumber(), servicePortNumberSource);
return new StartupWorkflow(workflowNumberSource, cluster, knownPods);
}
}
}

View File

@ -0,0 +1,28 @@
using Utils;
namespace KubernetesWorkflow
{
public class WorkflowNumberSource
{
private readonly NumberSource containerNumberSource = new NumberSource(0);
private readonly NumberSource servicePortNumberSource;
public WorkflowNumberSource(int workflowNumber, NumberSource servicePortNumberSource)
{
WorkflowNumber = workflowNumber;
this.servicePortNumberSource = servicePortNumberSource;
}
public int WorkflowNumber { get; }
public int GetContainerNumber()
{
return containerNumberSource.GetNextNumber();
}
public int GetServicePort()
{
return servicePortNumberSource.GetNextNumber();
}
}
}

View File

@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KubernetesWorkflow", "Kuber
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Utils", "Utils\Utils.csproj", "{957DE3B8-9571-450A-8609-B267DCA8727C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Logging", "Logging\Logging.csproj", "{8481A4A6-4BDD-41B0-A3EB-EF53F7BD40D1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -45,6 +47,10 @@ Global
{957DE3B8-9571-450A-8609-B267DCA8727C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{957DE3B8-9571-450A-8609-B267DCA8727C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{957DE3B8-9571-450A-8609-B267DCA8727C}.Release|Any CPU.Build.0 = Release|Any CPU
{8481A4A6-4BDD-41B0-A3EB-EF53F7BD40D1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8481A4A6-4BDD-41B0-A3EB-EF53F7BD40D1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8481A4A6-4BDD-41B0-A3EB-EF53F7BD40D1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8481A4A6-4BDD-41B0-A3EB-EF53F7BD40D1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE