2
0
mirror of synced 2025-01-11 09:06:56 +00:00

removes dependency on static pod name and address info

This commit is contained in:
benbierens 2023-11-06 14:33:47 +01:00
parent db0a21bc60
commit dc9f3ab090
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
26 changed files with 451 additions and 236 deletions

View File

@ -6,17 +6,17 @@ namespace KubernetesWorkflow
{
private readonly K8sClient client;
private readonly string k8sNamespace;
private readonly RunningPod pod;
private readonly string podName;
private readonly string containerName;
private readonly string command;
private readonly string[] arguments;
private readonly List<string> lines = new List<string>();
public CommandRunner(K8sClient client, string k8sNamespace, RunningPod pod, string containerName, string command, string[] arguments)
public CommandRunner(K8sClient client, string k8sNamespace, string podName, string containerName, string command, string[] arguments)
{
this.client = client;
this.k8sNamespace = k8sNamespace;
this.pod = pod;
this.podName = podName;
this.containerName = containerName;
this.command = command;
this.arguments = arguments;
@ -27,7 +27,7 @@ namespace KubernetesWorkflow
var input = new[] { command }.Concat(arguments).ToArray();
Time.Wait(client.Run(c => c.NamespacedPodExecAsync(
pod.PodInfo.Name, k8sNamespace, containerName, input, false, Callback, new CancellationToken())));
podName, k8sNamespace, containerName, input, false, Callback, new CancellationToken())));
}
public string GetStdOut()

View File

@ -64,6 +64,11 @@
Number = number;
Tag = tag;
Protocol = protocol;
if (string.IsNullOrWhiteSpace(Tag))
{
throw new Exception("A unique port tag is required");
}
}
public int Number { get; }

View File

@ -7,19 +7,23 @@ namespace KubernetesWorkflow
{
private readonly ILog log;
private readonly KubernetesClientConfiguration config;
private readonly string containerName;
private readonly string podName;
private readonly string recipeName;
private readonly string k8sNamespace;
private readonly RunningContainer container;
private ILogHandler? logHandler;
private CancellationTokenSource cts;
private Task? worker;
private Exception? workerException;
public CrashWatcher(ILog log, KubernetesClientConfiguration config, string k8sNamespace, RunningContainer container)
public CrashWatcher(ILog log, KubernetesClientConfiguration config, string containerName, string podName, string recipeName, string k8sNamespace)
{
this.log = log;
this.config = config;
this.containerName = containerName;
this.podName = podName;
this.recipeName = recipeName;
this.k8sNamespace = k8sNamespace;
this.container = container;
cts = new CancellationTokenSource();
}
@ -46,7 +50,7 @@ namespace KubernetesWorkflow
public bool HasContainerCrashed()
{
using var client = new Kubernetes(config);
return HasContainerBeenRestarted(client, container.Pod.PodInfo.Name);
return HasContainerBeenRestarted(client);
}
private void Worker()
@ -66,29 +70,26 @@ namespace KubernetesWorkflow
using var client = new Kubernetes(config);
while (!token.IsCancellationRequested)
{
token.WaitHandle.WaitOne(TimeSpan.FromSeconds(1));
token.WaitHandle.WaitOne(TimeSpan.FromSeconds(10));
var pod = container.Pod;
var recipe = container.Recipe;
var podName = pod.PodInfo.Name;
if (HasContainerBeenRestarted(client, podName))
if (HasContainerBeenRestarted(client))
{
DownloadCrashedContainerLogs(client, podName, recipe);
DownloadCrashedContainerLogs(client);
return;
}
}
}
private bool HasContainerBeenRestarted(Kubernetes client, string podName)
private bool HasContainerBeenRestarted(Kubernetes client)
{
var podInfo = client.ReadNamespacedPod(podName, k8sNamespace);
return podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0);
}
private void DownloadCrashedContainerLogs(Kubernetes client, string podName, ContainerRecipe recipe)
private void DownloadCrashedContainerLogs(Kubernetes client)
{
log.Log("Pod crash detected for " + container.Name);
using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipe.Name, previous: true);
log.Log("Pod crash detected for " + containerName);
using var stream = client.ReadNamespacedPodLog(podName, k8sNamespace, recipeName, previous: true);
logHandler!.Log(stream);
}
}

View File

@ -11,7 +11,7 @@ namespace KubernetesWorkflow
private readonly K8sCluster cluster;
private readonly WorkflowNumberSource workflowNumberSource;
private readonly K8sClient client;
private const string podLabelKey = "pod-uuid";
public const string PodLabelKey = "pod-uuid";
public K8sController(ILog log, K8sCluster cluster, WorkflowNumberSource workflowNumberSource, string k8sNamespace)
{
@ -28,57 +28,63 @@ namespace KubernetesWorkflow
client.Dispose();
}
public RunningPod BringOnline(ContainerRecipe[] containerRecipes, ILocation location)
public StartResult BringOnline(ContainerRecipe[] containerRecipes, ILocation location)
{
log.Debug();
EnsureNamespace();
var podLabel = K8sNameUtils.Format(Guid.NewGuid().ToString());
var deploymentName = CreateDeployment(containerRecipes, location, podLabel);
var (serviceName, servicePortsMap) = CreateService(containerRecipes);
var deployment = CreateDeployment(containerRecipes, location, podLabel);
var internalService = CreateInternalService(containerRecipes);
var externalService = CreateExternalService(containerRecipes);
var runnerLocation = DetermineRunnerLocation(deployment);
var pod = FindPodByLabel(podLabel);
var podInfo = CreatePodInfo(pod);
return new RunningPod(cluster, podInfo, deploymentName, serviceName, servicePortsMap.ToArray());
}
private V1Pod FindPodByLabel(string podLabel)
{
var pods = client.Run(c => c.ListNamespacedPod(K8sNamespace));
foreach (var pod in pods.Items)
return new StartResult(cluster, containerRecipes, deployment, internalService, externalService)
{
var label = pod.GetLabel(podLabelKey);
if (label == podLabel)
{
return pod;
}
}
throw new Exception("Unable to find pod by label.");
RunnerLocation = runnerLocation
};
}
public void Stop(RunningPod pod)
private RunnerLocation DetermineRunnerLocation(RunningDeployment deployment)
{
log.Debug();
if (!string.IsNullOrEmpty(pod.ServiceName)) DeleteService(pod.ServiceName);
DeleteDeployment(pod.DeploymentName);
WaitUntilDeploymentOffline(pod.DeploymentName);
WaitUntilPodOffline(pod.PodInfo.Name);
var podInfo = GetPodInfo(deployment);
return RunnerLocationUtils.DetermineRunnerLocation(podInfo, cluster);
}
public void DownloadPodLog(RunningPod pod, ContainerRecipe recipe, ILogHandler logHandler, int? tailLines)
public PodInfo GetPodInfo(RunningDeployment deployment)
{
var pod = GetPodForDeployment(deployment);
return CreatePodInfo(pod);
}
public void Stop(StartResult startResult)
{
log.Debug();
using var stream = client.Run(c => c.ReadNamespacedPodLog(pod.PodInfo.Name, K8sNamespace, recipe.Name, tailLines: tailLines));
if (startResult.InternalService != null) DeleteService(startResult.InternalService);
if (startResult.ExternalService != null) DeleteService(startResult.ExternalService);
DeleteDeployment(startResult.Deployment);
WaitUntilPodsForDeploymentAreOffline(startResult.Deployment);
}
public void DownloadPodLog(RunningContainer container, ILogHandler logHandler, int? tailLines)
{
log.Debug();
var podName = GetPodName(container);
var recipeName = container.Recipe.Name;
using var stream = client.Run(c => c.ReadNamespacedPodLog(podName, K8sNamespace, recipeName, tailLines: tailLines));
logHandler.Log(stream);
}
public string ExecuteCommand(RunningPod pod, string containerName, string command, params string[] args)
public string ExecuteCommand(RunningContainer container, string command, params string[] args)
{
var containerName = container.Name;
var cmdAndArgs = $"{containerName}: {command} ({string.Join(",", args)})";
log.Debug(cmdAndArgs);
var runner = new CommandRunner(client, K8sNamespace, pod, containerName, command, args);
var podName = GetPodName(container);
var runner = new CommandRunner(client, K8sNamespace, podName, containerName, command, args);
runner.Run();
var result = runner.GetStdOut();
@ -315,7 +321,7 @@ namespace KubernetesWorkflow
#region Deployment management
private string CreateDeployment(ContainerRecipe[] containerRecipes, ILocation location, string podLabel)
private RunningDeployment CreateDeployment(ContainerRecipe[] containerRecipes, ILocation location, string podLabel)
{
var deploymentSpec = new V1Deployment
{
@ -348,13 +354,14 @@ namespace KubernetesWorkflow
client.Run(c => c.CreateNamespacedDeployment(deploymentSpec, K8sNamespace));
WaitUntilDeploymentOnline(deploymentSpec.Metadata.Name);
return deploymentSpec.Metadata.Name;
var name = deploymentSpec.Metadata.Name;
return new RunningDeployment(name, podLabel);
}
private void DeleteDeployment(string deploymentName)
private void DeleteDeployment(RunningDeployment deployment)
{
client.Run(c => c.DeleteNamespacedDeployment(deploymentName, K8sNamespace));
WaitUntilDeploymentOffline(deploymentName);
client.Run(c => c.DeleteNamespacedDeployment(deployment.Name, K8sNamespace));
WaitUntilDeploymentOffline(deployment.Name);
}
private IDictionary<string, string> CreateNodeSelector(ILocation location)
@ -382,7 +389,7 @@ namespace KubernetesWorkflow
private IDictionary<string, string> GetSelector(ContainerRecipe[] containerRecipes, string podLabel)
{
var labels = containerRecipes.First().PodLabels.Clone();
labels.Add(podLabelKey, podLabel);
labels.Add(PodLabelKey, podLabel);
return labels.GetLabels();
}
@ -609,113 +616,140 @@ namespace KubernetesWorkflow
private string GetNameForPort(ContainerRecipe recipe, Port port)
{
return $"p{workflowNumberSource.WorkflowNumber}-{recipe.Number}-{port.Number}-{port.Protocol.ToString().ToLowerInvariant()}";
var inputs = new[]
{
$"p{workflowNumberSource.WorkflowNumber}",
recipe.Number.ToString(),
port.Number.ToString(),
port.Protocol.ToString().ToLowerInvariant()
};
return K8sNameUtils.FormatPortName(string.Join(",", inputs));
}
private string GetPodName(RunningContainer container)
{
return GetPodForDeployment(container.RunningContainers.StartResult.Deployment).Metadata.Name;
}
private V1Pod GetPodForDeployment(RunningDeployment deployment)
{
var allPods = client.Run(c => c.ListNamespacedPod(K8sNamespace));
var pods = allPods.Items.Where(p => p.GetLabel(PodLabelKey) == deployment.PodLabel).ToArray();
if (pods.Length != 1) throw new Exception("Expected to find only 1 pod by podLabel.");
return pods[0];
}
#endregion
#region Service management
private (string, List<ContainerRecipePortMapEntry>) CreateService(ContainerRecipe[] containerRecipes)
private RunningService? CreateInternalService(ContainerRecipe[] recipes)
{
var result = new List<ContainerRecipePortMapEntry>();
return CreateService(recipes, r => r.InternalPorts, "ClusterIP", "int");
}
var ports = CreateServicePorts(containerRecipes);
private RunningService? CreateExternalService(ContainerRecipe[] recipes)
{
return CreateService(recipes, r => r.ExposedPorts, "NodePort", "ext");
}
if (!ports.Any())
{
// None of these container-recipes wish to expose anything via a service port.
// So, we don't have to create a service.
return (string.Empty, result);
}
private RunningService? CreateService(ContainerRecipe[] recipes, Func<ContainerRecipe, Port[]> portSelector, string serviceType, string namePostfix)
{
var ports = CreateServicePorts(recipes, portSelector);
if (!ports.Any()) return null;
var serviceSpec = new V1Service
{
ApiVersion = "v1",
Metadata = CreateServiceMetadata(containerRecipes),
Metadata = CreateServiceMetadata(recipes, namePostfix),
Spec = new V1ServiceSpec
{
Type = "NodePort",
Selector = GetSelector(containerRecipes),
Ports = ports
Type = serviceType,
Selector = GetSelector(recipes),
Ports = ports,
}
};
client.Run(c => c.CreateNamespacedService(serviceSpec, K8sNamespace));
ReadBackServiceAndMapPorts(serviceSpec, containerRecipes, result);
var result = ReadBackServiceAndMapPorts(serviceSpec, recipes);
var name = serviceSpec.Metadata.Name;
return (serviceSpec.Metadata.Name, result);
return new RunningService(name, result);
}
private void ReadBackServiceAndMapPorts(V1Service serviceSpec, ContainerRecipe[] containerRecipes, List<ContainerRecipePortMapEntry> result)
private List<ContainerRecipePortMapEntry> ReadBackServiceAndMapPorts(V1Service serviceSpec, ContainerRecipe[] containerRecipes)
{
// For each container-recipe, we need to figure out which service-ports it was assigned by K8s.
var result = new List<ContainerRecipePortMapEntry>();
// For each container-recipe-port, we need to figure out which service-ports it was assigned by K8s.
var readback = client.Run(c => c.ReadNamespacedService(serviceSpec.Metadata.Name, K8sNamespace));
foreach (var r in containerRecipes)
{
foreach (var port in r.ExposedPorts)
var recipePorts = r.ExposedPorts.Concat(r.InternalPorts).ToArray();
foreach (var port in recipePorts)
{
var portName = GetNameForPort(r, port);
var matchingServicePorts = readback.Spec.Ports.Where(p => p.Name == portName);
if (matchingServicePorts.Any())
{
// These service ports belongs to this recipe.
var optionals = matchingServicePorts.Select(p => MapNodePortIfAble(p, port.Tag, port.Protocol));
var ports = optionals.Where(p => p != null).Select(p => p!).ToArray();
var ports = matchingServicePorts.Select(p => MapPortIfAble(p, port.Tag, port.Protocol)).ToArray();
if (ports.Any())
{
result.Add(new ContainerRecipePortMapEntry(r.Number, ports));
log.Debug($"Service Readback: {portName} found: {string.Join(",", ports.Select(p => p.ToString()))}");
}
}
}
return result;
}
private Port? MapNodePortIfAble(V1ServicePort p, string tag, PortProtocol protocol)
private Port MapPortIfAble(V1ServicePort p, string tag, PortProtocol protocol)
{
if (p.NodePort == null) return null;
return new Port(p.NodePort.Value, tag, protocol);
if (p.NodePort != null) return new Port(p.NodePort.Value, tag, protocol);
if (p.Port > 0) return new Port(p.Port, tag, protocol);
throw new Exception("Unable to map port.");
}
private void DeleteService(string serviceName)
private void DeleteService(RunningService service)
{
client.Run(c => c.DeleteNamespacedService(serviceName, K8sNamespace));
client.Run(c => c.DeleteNamespacedService(service.Name, K8sNamespace));
}
private V1ObjectMeta CreateServiceMetadata(ContainerRecipe[] containerRecipes)
private V1ObjectMeta CreateServiceMetadata(ContainerRecipe[] containerRecipes, string namePostfix)
{
var exposedRecipe = containerRecipes.FirstOrDefault(c => c.ExposedPorts.Any());
var name = "service-" + workflowNumberSource.WorkflowNumber;
if (exposedRecipe != null)
{
name = K8sNameUtils.Format(exposedRecipe.Name) + "-" + workflowNumberSource.WorkflowNumber;
}
var recipeName = containerRecipes.First().Name;
var name = K8sNameUtils.Format($"{recipeName}-{workflowNumberSource.WorkflowNumber}-{namePostfix}");
log.Debug("Creating service: " + name);
return new V1ObjectMeta
{
Name = name,
NamespaceProperty = K8sNamespace
NamespaceProperty = K8sNamespace,
};
}
private List<V1ServicePort> CreateServicePorts(ContainerRecipe[] recipes)
private List<V1ServicePort> CreateServicePorts(ContainerRecipe[] recipes, Func<ContainerRecipe, Port[]> portSelector)
{
var result = new List<V1ServicePort>();
foreach (var recipe in recipes)
{
result.AddRange(CreateServicePorts(recipe));
var ports = portSelector(recipe);
foreach (var port in ports)
{
result.AddRange(CreateServicePorts(recipe, port));
}
}
return result;
}
private List<V1ServicePort> CreateServicePorts(ContainerRecipe recipe)
private List<V1ServicePort> CreateServicePorts(ContainerRecipe recipe, Port recipePort)
{
var result = new List<V1ServicePort>();
foreach (var port in recipe.ExposedPorts)
{
if (port.IsTcp()) CreateServicePort(result, recipe, port, "TCP");
if (port.IsUdp()) CreateServicePort(result, recipe, port, "UDP");
}
if (recipePort.IsTcp()) CreateServicePort(result, recipe, recipePort, "TCP");
if (recipePort.IsUdp()) CreateServicePort(result, recipe, recipePort, "UDP");
return result;
}
@ -726,7 +760,7 @@ namespace KubernetesWorkflow
Name = GetNameForPort(recipe, port),
Protocol = protocol,
Port = port.Number,
TargetPort = GetNameForPort(recipe, port),
TargetPort = GetNameForPort(recipe, port)
});
}
@ -758,13 +792,12 @@ namespace KubernetesWorkflow
});
}
private void WaitUntilPodOffline(string podName)
private void WaitUntilPodsForDeploymentAreOffline(RunningDeployment deployment)
{
WaitUntil(() =>
{
var pods = client.Run(c => c.ListNamespacedPod(K8sNamespace)).Items;
var pod = pods.SingleOrDefault(p => p.Metadata.Name == podName);
return pod == null;
var pods = FindPodsByLabel(deployment.PodLabel);
return !pods.Any();
});
}
@ -785,7 +818,17 @@ namespace KubernetesWorkflow
public CrashWatcher CreateCrashWatcher(RunningContainer container)
{
return new CrashWatcher(log, cluster.GetK8sClientConfig(), K8sNamespace, container);
var containerName = container.Name;
var podName = GetPodName(container);
var recipeName = container.Recipe.Name;
return new CrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace);
}
private V1Pod[] FindPodsByLabel(string podLabel)
{
var pods = client.Run(c => c.ListNamespacedPod(K8sNamespace));
return pods.Items.Where(p => p.GetLabel(PodLabelKey) == podLabel).ToArray();
}
private PodInfo CreatePodInfo(V1Pod pod)

View File

@ -3,8 +3,19 @@
public static class K8sNameUtils
{
public static string Format(string s)
{
return Format(s, 62);
}
public static string FormatPortName(string s)
{
return Format(s, 15);
}
private static string Format(string s, int maxLength)
{
var result = s.ToLowerInvariant()
.Replace("_", "-")
.Replace(" ", "-")
.Replace(":", "-")
.Replace("/", "-")
@ -14,7 +25,7 @@
.Replace(",", "-");
result = result.Trim('-');
if (result.Length > 62) result = result.Substring(0, 62);
if (result.Length > maxLength) result = result.Substring(0, maxLength);
return result;
}

View File

@ -13,34 +13,31 @@ namespace KubernetesWorkflow
{
private static RunnerLocation? knownLocation = null;
internal static RunnerLocation DetermineRunnerLocation(RunningContainer container)
internal static RunnerLocation DetermineRunnerLocation(PodInfo info, K8sCluster cluster)
{
if (knownLocation != null) return knownLocation.Value;
knownLocation = PingForLocation(container);
knownLocation = PingForLocation(info, cluster);
return knownLocation.Value;
}
private static RunnerLocation PingForLocation(RunningContainer container)
private static RunnerLocation PingForLocation(PodInfo podInfo, K8sCluster cluster)
{
if (PingHost(container.Pod.PodInfo.Ip))
if (PingHost(podInfo.Ip))
{
return RunnerLocation.InternalToCluster;
}
foreach (var port in container.ContainerPorts)
if (PingHost(Format(cluster.HostAddress)))
{
if (port.ExternalAddress.IsValid() && PingHost(Format(port.ExternalAddress)))
{
return RunnerLocation.ExternalToCluster;
}
return RunnerLocation.ExternalToCluster;
}
throw new Exception("Unable to determine location relative to kubernetes cluster.");
}
private static string Format(Address host)
private static string Format(string host)
{
return host.Host
return host
.Replace("http://", "")
.Replace("https://", "");
}

View File

@ -1,20 +1,29 @@
using Utils;
using Newtonsoft.Json;
using Utils;
namespace KubernetesWorkflow
{
public class RunningContainers
{
public RunningContainers(StartupConfig startupConfig, RunningPod runningPod, RunningContainer[] containers)
public RunningContainers(StartupConfig startupConfig, StartResult startResult, RunningContainer[] containers)
{
StartupConfig = startupConfig;
RunningPod = runningPod;
StartResult = startResult;
Containers = containers;
foreach (var c in containers) c.RunningContainers = this;
}
public StartupConfig StartupConfig { get; }
public RunningPod RunningPod { get; }
public StartResult StartResult { get; }
public RunningContainer[] Containers { get; }
[JsonIgnore]
public string Name
{
get { return $"{Containers.Length}x '{Containers.First().Name}'"; }
}
public string Describe()
{
return string.Join(",", Containers.Select(c => c.Name));
@ -23,50 +32,49 @@ namespace KubernetesWorkflow
public class RunningContainer
{
public RunningContainer(RunningPod pod, ContainerRecipe recipe, Port[] servicePorts, string name, ContainerPort[] containerPorts)
public RunningContainer(string name, ContainerRecipe recipe, ContainerAddress[] addresses)
{
Pod = pod;
Recipe = recipe;
ServicePorts = servicePorts;
Name = name;
ContainerPorts = containerPorts;
Recipe = recipe;
Addresses = addresses;
}
public string Name { get; }
public RunningPod Pod { get; }
public ContainerRecipe Recipe { get; }
public Port[] ServicePorts { get; }
public ContainerPort[] ContainerPorts { get; }
public ContainerAddress[] Addresses { get; }
public ContainerPort GetContainerPort(string portTag)
{
return ContainerPorts.Single(c => c.Port.Tag == portTag);
}
[JsonIgnore]
public RunningContainers RunningContainers { get; internal set; } = null!;
public Address GetAddress(string portTag)
{
var containerPort = GetContainerPort(portTag);
if (RunnerLocationUtils.DetermineRunnerLocation(this) == RunnerLocation.InternalToCluster)
var containerAddress = Addresses.Single(a => a.PortTag == portTag);
if (containerAddress.IsInteral && RunningContainers.StartResult.RunnerLocation == RunnerLocation.ExternalToCluster)
{
return containerPort.InternalAddress;
throw new Exception("Attempt to access a container address created from an Internal port, " +
"while runner is located external to the cluster.");
}
if (!containerPort.ExternalAddress.IsValid()) throw new Exception($"Getting address by tag {portTag} resulted in an invalid address.");
return containerPort.ExternalAddress;
return containerAddress.Address;
}
}
public class ContainerPort
public class ContainerAddress
{
public ContainerPort(Port port, Address externalAddress, Address internalAddress)
public ContainerAddress(string portTag, Address address, bool isInteral)
{
Port = port;
ExternalAddress = externalAddress;
InternalAddress = internalAddress;
PortTag = portTag;
Address = address;
IsInteral = isInteral;
}
public Port Port { get; }
public Address ExternalAddress { get; }
public Address InternalAddress { get; }
public string PortTag { get; }
public Address Address { get; }
public bool IsInteral { get; }
public override string ToString()
{
return $"{PortTag} -> '{Address}'";
}
}
public static class RunningContainersExtensions

View File

@ -1,5 +1,114 @@
namespace KubernetesWorkflow
using k8s;
using k8s.Models;
using Newtonsoft.Json;
namespace KubernetesWorkflow
{
public class StartResult
{
public StartResult(
K8sCluster cluster,
ContainerRecipe[] containerRecipes,
RunningDeployment deployment,
RunningService? internalService,
RunningService? externalService)
{
Cluster = cluster;
ContainerRecipes = containerRecipes;
Deployment = deployment;
InternalService = internalService;
ExternalService = externalService;
}
public K8sCluster Cluster { get; }
public ContainerRecipe[] ContainerRecipes { get; }
public RunningDeployment Deployment { get; }
public RunningService? InternalService { get; }
public RunningService? ExternalService { get; }
[JsonIgnore]
internal RunnerLocation RunnerLocation { get; set; }
public Port GetServicePorts(ContainerRecipe recipe, string tag)
{
if (InternalService != null)
{
var p = InternalService.GetServicePortForRecipeAndTag(recipe, tag);
if (p != null) return p;
}
if (ExternalService != null)
{
var p = ExternalService.GetServicePortForRecipeAndTag(recipe, tag);
if (p != null) return p;
}
throw new Exception($"Unable to find port by tag '{tag}' for recipe '{recipe.Name}'.");
}
public Port[] GetServicePortsForContainer(ContainerRecipe recipe)
{
if (InternalService != null)
{
var p = InternalService.GetServicePortsForRecipe(recipe);
if (p.Any()) return p;
}
if (ExternalService != null)
{
var p = ExternalService.GetServicePortsForRecipe(recipe);
if (p.Any()) return p;
}
return Array.Empty<Port>();
}
}
public class RunningDeployment
{
public RunningDeployment(string name, string podLabel)
{
Name = name;
PodLabel = podLabel;
}
public string Name { get; }
public string PodLabel { get; }
public V1Pod GetPod(K8sClient client, string k8sNamespace)
{
var allPods = client.Run(c => c.ListNamespacedPod(k8sNamespace));
var pods = allPods.Items.Where(p => p.GetLabel(K8sController.PodLabelKey) == PodLabel).ToArray();
if (pods.Length != 1) throw new Exception("Expected to find only 1 pod by podLabel.");
return pods[0];
}
}
public class RunningService
{
public RunningService(string name, List<ContainerRecipePortMapEntry> result)
{
Name = name;
Result = result;
}
public string Name { get; }
public List<ContainerRecipePortMapEntry> Result { get; }
public Port? GetServicePortForRecipeAndTag(ContainerRecipe recipe, string tag)
{
return GetServicePortsForRecipe(recipe).SingleOrDefault(p => p.Tag == tag);
}
public Port[] GetServicePortsForRecipe(ContainerRecipe recipe)
{
return Result
.Where(p => p.RecipeNumber == recipe.Number)
.SelectMany(p => p.Ports)
.ToArray();
}
}
public class RunningPod
{
public RunningPod(K8sCluster cluster, PodInfo podInfo, string deploymentName, string serviceName, ContainerRecipePortMapEntry[] portMapEntries)
@ -20,7 +129,7 @@
public Port[] GetServicePortsForContainerRecipe(ContainerRecipe containerRecipe)
{
return PortMapEntries
.Where(p => p.ContainerNumber == containerRecipe.Number)
.Where(p => p.RecipeNumber == containerRecipe.Number)
.SelectMany(p => p.Ports)
.ToArray();
}
@ -28,13 +137,13 @@
public class ContainerRecipePortMapEntry
{
public ContainerRecipePortMapEntry(int containerNumber, Port[] ports)
public ContainerRecipePortMapEntry(int recipeNumber, Port[] ports)
{
ContainerNumber = containerNumber;
RecipeNumber = recipeNumber;
Ports = ports;
}
public int ContainerNumber { get; }
public int RecipeNumber { get; }
public Port[] Ports { get; }
}

View File

@ -9,8 +9,10 @@ namespace KubernetesWorkflow
IKnownLocations GetAvailableLocations();
RunningContainers Start(int numberOfContainers, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig);
RunningContainers Start(int numberOfContainers, ILocation location, ContainerRecipeFactory recipeFactory, StartupConfig startupConfig);
PodInfo GetPodInfo(RunningContainer container);
PodInfo GetPodInfo(RunningContainers containers);
CrashWatcher CreateCrashWatcher(RunningContainer container);
void Stop(RunningContainers runningContainers);
void Stop(RunningContainers containers);
void DownloadContainerLog(RunningContainer container, ILogHandler logHandler, int? tailLines = null);
string ExecuteCommand(RunningContainer container, string command, params string[] args);
void DeleteNamespace();
@ -51,15 +53,25 @@ namespace KubernetesWorkflow
return K8s(controller =>
{
var recipes = CreateRecipes(numberOfContainers, recipeFactory, startupConfig);
var runningPod = controller.BringOnline(recipes, location);
var containers = CreateContainers(runningPod, recipes, startupConfig);
var startResult = controller.BringOnline(recipes, location);
var containers = CreateContainers(startResult, recipes, startupConfig);
var rc = new RunningContainers(startupConfig, runningPod, containers);
var rc = new RunningContainers(startupConfig, startResult, containers);
cluster.Configuration.Hooks.OnContainersStarted(rc);
return rc;
});
}
public PodInfo GetPodInfo(RunningContainer container)
{
return K8s(c => c.GetPodInfo(container.RunningContainers.StartResult.Deployment));
}
public PodInfo GetPodInfo(RunningContainers containers)
{
return K8s(c => c.GetPodInfo(containers.StartResult.Deployment));
}
public CrashWatcher CreateCrashWatcher(RunningContainer container)
{
return K8s(c => c.CreateCrashWatcher(container));
@ -69,7 +81,7 @@ namespace KubernetesWorkflow
{
K8s(controller =>
{
controller.Stop(runningContainers.RunningPod);
controller.Stop(runningContainers.StartResult);
cluster.Configuration.Hooks.OnContainersStopped(runningContainers);
});
}
@ -78,7 +90,7 @@ namespace KubernetesWorkflow
{
K8s(controller =>
{
controller.DownloadPodLog(container.Pod, container.Recipe, logHandler, tailLines);
controller.DownloadPodLog(container, logHandler, tailLines);
});
}
@ -86,7 +98,7 @@ namespace KubernetesWorkflow
{
return K8s(controller =>
{
return controller.ExecuteCommand(container.Pod, container.Recipe.Name, command, args);
return controller.ExecuteCommand(container, command, args);
});
}
@ -106,18 +118,16 @@ namespace KubernetesWorkflow
});
}
private RunningContainer[] CreateContainers(RunningPod runningPod, ContainerRecipe[] recipes, StartupConfig startupConfig)
private RunningContainer[] CreateContainers(StartResult startResult, ContainerRecipe[] recipes, StartupConfig startupConfig)
{
log.Debug();
return recipes.Select(r =>
{
var servicePorts = runningPod.GetServicePortsForContainerRecipe(r);
log.Debug($"{r} -> service ports: {string.Join(",", servicePorts.Select(p => p.Number))}");
var name = GetContainerName(r, startupConfig);
var addresses = CreateContainerAddresses(startResult, r);
log.Debug($"{r}={name} -> container addresses: {string.Join(Environment.NewLine, addresses.Select(a => a.ToString()))}");
return new RunningContainer(runningPod, r, servicePorts, name,
CreateContainerPorts(runningPod, r, servicePorts));
return new RunningContainer(name, r, addresses);
}).ToArray();
}
@ -135,43 +145,36 @@ namespace KubernetesWorkflow
}
}
private ContainerPort[] CreateContainerPorts(RunningPod pod, ContainerRecipe recipe, Port[] servicePorts)
private ContainerAddress[] CreateContainerAddresses(StartResult startResult, ContainerRecipe recipe)
{
var result = new List<ContainerPort>();
var result = new List<ContainerAddress>();
foreach (var exposedPort in recipe.ExposedPorts)
{
result.Add(new ContainerPort(
exposedPort,
GetContainerExternalAddress(pod, servicePorts, exposedPort),
GetContainerInternalAddress(pod, exposedPort)));
result.Add(new ContainerAddress(exposedPort.Tag, GetContainerExternalAddress(startResult, recipe, exposedPort.Tag), false));
}
foreach (var internalPort in recipe.InternalPorts)
{
if (!string.IsNullOrEmpty(internalPort.Tag))
{
result.Add(new ContainerPort(
internalPort,
new Address(string.Empty, 0),
GetContainerInternalAddress(pod, internalPort)));
}
result.Add(new ContainerAddress(internalPort.Tag, GetContainerInternalAddress(startResult, recipe, internalPort.Tag), true));
}
return result.ToArray();
}
private static Address GetContainerExternalAddress(RunningPod pod, Port[] servicePorts, Port exposedPort)
private static Address GetContainerExternalAddress(StartResult startResult, ContainerRecipe recipe, string tag)
{
var servicePort = servicePorts.Single(p => p.Tag == exposedPort.Tag);
var port = startResult.GetServicePorts(recipe, tag);
return new Address(
pod.Cluster.HostAddress,
servicePort.Number);
startResult.Cluster.HostAddress,
port.Number);
}
private Address GetContainerInternalAddress(RunningPod pod, Port port)
private Address GetContainerInternalAddress(StartResult startResult, ContainerRecipe recipe, string tag)
{
var serviceName = startResult.InternalService!.Name;
var port = startResult.GetServicePorts(recipe, tag);
return new Address(
$"http://{pod.PodInfo.Ip}",
$"http://{serviceName}",
port.Number);
}
@ -182,6 +185,8 @@ namespace KubernetesWorkflow
for (var i = 0; i < numberOfContainers; i++)
{
var recipe = recipeFactory.CreateRecipe(i, numberSource.GetContainerNumber(), componentFactory, startupConfig);
CheckPorts(recipe);
if (cluster.Configuration.AddAppPodLabel) recipe.PodLabels.Add("app", recipeFactory.AppName);
cluster.Configuration.Hooks.OnContainerRecipeCreated(recipe);
result.Add(recipe);
@ -190,6 +195,18 @@ namespace KubernetesWorkflow
return result.ToArray();
}
private void CheckPorts(ContainerRecipe recipe)
{
var allTags =
recipe.ExposedPorts.Concat(recipe.InternalPorts)
.Select(p => K8sNameUtils.Format(p.Tag)).ToArray();
if (allTags.Length != allTags.Distinct().Count())
{
throw new Exception("Duplicate port tags found in recipe for " + recipe.Name);
}
}
private void K8s(Action<K8sController> action)
{
try

View File

@ -17,8 +17,7 @@ namespace CodexContractsPlugin
{
var config = startupConfig.Get<CodexContractsContainerConfig>();
var containerPort = config.GethNode.StartResult.Container.GetContainerPort(GethContainerRecipe.HttpPortTag);
var address = containerPort.InternalAddress;
var address = config.GethNode.StartResult.Container.GetAddress(GethContainerRecipe.HttpPortTag);
AddEnvVar("DISTTEST_NETWORK_URL", address.ToString());
AddEnvVar("HARDHAT_NETWORK", "codexdisttestnetwork");

View File

@ -97,6 +97,12 @@ namespace CodexPlugin
return Container.Name;
}
public PodInfo GetPodInfo()
{
var workflow = tools.CreateWorkflow();
return workflow.GetPodInfo(Container);
}
private IHttp Http()
{
return tools.CreateHttp(GetAddress(), baseUrl: "/api/codex/v1", CheckContainerCrashed, Container.Name);

View File

@ -1,4 +1,5 @@
using KubernetesWorkflow;
using GethPlugin;
using KubernetesWorkflow;
using Utils;
namespace CodexPlugin
@ -94,11 +95,10 @@ namespace CodexPlugin
{
var mconfig = config.MarketplaceConfig;
var gethStart = mconfig.GethNode.StartResult;
var ip = gethStart.Container.Pod.PodInfo.Ip;
var port = gethStart.WsPort.Number;
var wsAddress = gethStart.Container.GetAddress(GethContainerRecipe.WsPortTag);
var marketplaceAddress = mconfig.CodexContracts.Deployment.MarketplaceAddress;
AddEnvVar("CODEX_ETH_PROVIDER", $"ws://{ip}:{port}");
AddEnvVar("CODEX_ETH_PROVIDER", $"ws://{wsAddress.Host}:{wsAddress.Port}");
AddEnvVar("CODEX_MARKETPLACE_ADDRESS", marketplaceAddress);
AddEnvVar("CODEX_PERSISTENCE", "true");

View File

@ -26,13 +26,13 @@ namespace CodexPlugin
public class CodexInstance
{
public CodexInstance(RunningContainer container, CodexDebugResponse info)
public CodexInstance(RunningContainers containers, CodexDebugResponse info)
{
Container = container;
Containers = containers;
Info = info;
}
public RunningContainer Container { get; }
public RunningContainers Containers { get; }
public CodexDebugResponse Info { get; }
}

View File

@ -22,6 +22,7 @@ namespace CodexPlugin
CodexDebugVersionResponse Version { get; }
IMarketplaceAccess Marketplace { get; }
CrashWatcher CrashWatcher { get; }
PodInfo GetPodInfo();
void Stop();
}
@ -52,9 +53,7 @@ namespace CodexPlugin
{
get
{
var port = CodexAccess.Container.Recipe.GetPortByTag(CodexContainerRecipe.MetricsPortTag);
if (port == null) throw new Exception("Metrics is not available for this Codex node. Please start it with the option '.EnableMetrics()' to enable it.");
return new MetricsScrapeTarget(CodexAccess.Container, port);
return new MetricsScrapeTarget(CodexAccess.Container, CodexContainerRecipe.MetricsPortTag);
}
}
public EthAddress EthAddress
@ -134,6 +133,11 @@ namespace CodexPlugin
Log($"Successfully connected to peer {peer.GetName()}.");
}
public PodInfo GetPodInfo()
{
return CodexAccess.GetPodInfo();
}
public void Stop()
{
if (Group.Count() > 1) throw new InvalidOperationException("Codex-nodes that are part of a group cannot be " +
@ -166,7 +170,10 @@ namespace CodexPlugin
// The peer we want to connect is in a different pod.
// We must replace the default IP with the pod IP in the multiAddress.
return multiAddress.Replace("0.0.0.0", peer.CodexAccess.Container.Pod.PodInfo.Ip);
var workflow = tools.CreateWorkflow();
var podInfo = workflow.GetPodInfo(peer.Container);
return multiAddress.Replace("0.0.0.0", podInfo.Ip);
}
private void DownloadToFile(string contentId, TrackedFile file)

View File

@ -24,8 +24,12 @@ namespace CodexPlugin
var containers = StartCodexContainers(startupConfig, codexSetup.NumberOfNodes, codexSetup.Location);
var podInfos = string.Join(", ", containers.Containers().Select(c => $"Container: '{c.Name}' runs at '{c.Pod.PodInfo.K8SNodeName}'={c.Pod.PodInfo.Ip}"));
Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.Containers().First().Recipe.Image}'. ({podInfos})");
foreach (var rc in containers)
{
var podInfo = GetPodInfo(rc);
var podInfos = string.Join(", ", rc.Containers.Select(c => $"Container: '{c.Name}' runs at '{podInfo.K8SNodeName}'={podInfo.Ip}"));
Log($"Started {codexSetup.NumberOfNodes} nodes of image '{containers.Containers().First().Recipe.Image}'. ({podInfos})");
}
LogSeparator();
return containers;
@ -80,6 +84,12 @@ namespace CodexPlugin
return result.ToArray();
}
private PodInfo GetPodInfo(RunningContainers rc)
{
var workflow = pluginTools.CreateWorkflow();
return workflow.GetPodInfo(rc);
}
private CodexNodeGroup CreateCodexGroup(CoreInterface coreInterface, RunningContainers[] runningContainers, CodexNodeFactory codexNodeFactory)
{
var group = new CodexNodeGroup(this, pluginTools, runningContainers, codexNodeFactory);

View File

@ -10,13 +10,6 @@ namespace CodexPlugin
return Plugin(ci).DeployCodexNodes(number, setup);
}
public static ICodexNodeGroup WrapCodexContainers(this CoreInterface ci, RunningContainer[] containers)
{
// ew, clean this up.
var rcs = new RunningContainers(null!, containers.First().Pod, containers);
return WrapCodexContainers(ci, new[] { rcs });
}
public static ICodexNodeGroup WrapCodexContainers(this CoreInterface ci, RunningContainers[] containers)
{
return Plugin(ci).WrapCodexContainers(ci, containers);

View File

@ -122,7 +122,7 @@ namespace MetricsPlugin
private string GetInstanceNameForNode(IMetricsScrapeTarget target)
{
return $"{target.Ip}:{target.Port}";
return target.Address.ToString();
}
private string GetInstanceStringForNode(IMetricsScrapeTarget target)

View File

@ -1,12 +1,12 @@
using KubernetesWorkflow;
using Utils;
namespace MetricsPlugin
{
public interface IMetricsScrapeTarget
{
string Name { get; }
string Ip { get; }
int Port { get; }
Address Address { get; }
}
public interface IHasMetricsScrapeTarget
@ -21,25 +21,23 @@ namespace MetricsPlugin
public class MetricsScrapeTarget : IMetricsScrapeTarget
{
public MetricsScrapeTarget(string ip, int port, string name)
public MetricsScrapeTarget(Address address, string name)
{
Ip = ip;
Port = port;
Address = address;
Name = name;
}
public MetricsScrapeTarget(RunningContainer container, int port)
: this(container.Pod.PodInfo.Ip, port, container.Name)
public MetricsScrapeTarget(string ip, int port, string name)
: this(new Address("http://" + ip, port), name)
{
}
public MetricsScrapeTarget(RunningContainer container, Port port)
: this(container, port.Number)
public MetricsScrapeTarget(RunningContainer container, string portTag)
: this(container.GetAddress(portTag), container.Name)
{
}
public string Name { get; }
public string Ip { get; }
public int Port { get; }
public Address Address { get; }
}
}

View File

@ -59,7 +59,7 @@ namespace MetricsPlugin
foreach (var target in targets)
{
config += $" - '{target.Ip}:{target.Port}'\n";
config += $" - '{target.Address.Host}:{target.Address.Port}'\n";
}
var bytes = Encoding.ASCII.GetBytes(config);

View File

@ -46,7 +46,9 @@ namespace ContinuousTests
private string CreateQueryTemplate(RunningContainer container, DateTime startUtc, DateTime endUtc)
{
var podName = container.Pod.PodInfo.Name;
var workflow = tools.CreateWorkflow();
var podInfo = workflow.GetPodInfo(container);
var podName = podInfo.Name;
var start = startUtc.ToString("o");
var end = endUtc.ToString("o");

View File

@ -111,9 +111,11 @@ namespace ContinuousTests
var effectiveEnd = DateTime.UtcNow;
var elasticSearchLogDownloader = new ElasticSearchLogDownloader(entryPoint.Tools, fixtureLog);
var workflow = entryPoint.Tools.CreateWorkflow();
foreach (var node in nodes)
{
var openingLine = $"{node.Container.Pod.PodInfo.Name} = {node.Container.Name} = {node.GetDebugInfo().id}";
var podInfo = workflow.GetPodInfo(node.Container);
var openingLine = $"{podInfo.Name} = {node.Container.Name} = {node.GetDebugInfo().id}";
elasticSearchLogDownloader.Download(fixtureLog.CreateSubfile(), node.Container, effectiveStart, effectiveEnd, openingLine);
}
}
@ -244,13 +246,13 @@ namespace ContinuousTests
return entryPoint.CreateInterface().WrapCodexContainers(containers).ToArray();
}
private RunningContainer[] SelectRandomContainers()
private RunningContainers[] SelectRandomContainers()
{
var number = handle.Test.RequiredNumberOfNodes;
var containers = config.CodexDeployment.CodexInstances.Select(i => i.Container).ToList();
var containers = config.CodexDeployment.CodexInstances.Select(i => i.Containers).ToList();
if (number == -1) return containers.ToArray();
var result = new RunningContainer[number];
var result = new RunningContainers[number];
for (var i = 0; i < number; i++)
{
result[i] = containers.PickOneRandom();

View File

@ -39,14 +39,18 @@ namespace ContinuousTests
{
log.Log("");
var deployment = config.CodexDeployment;
var workflow = entryPoint.Tools.CreateWorkflow();
foreach (var instance in deployment.CodexInstances)
{
var container = instance.Container;
log.Log($"Codex environment variables for '{container.Name}':");
log.Log($"Pod name: {container.Pod.PodInfo.Name} - Deployment name: {container.Pod.DeploymentName}");
var codexVars = container.Recipe.EnvVars;
foreach (var vars in codexVars) log.Log(vars.ToString());
log.Log("");
foreach (var container in instance.Containers.Containers)
{
var podInfo = workflow.GetPodInfo(container);
log.Log($"Codex environment variables for '{container.Name}':");
log.Log($"Pod name: {podInfo.Name} - Deployment name: {instance.Containers.StartResult.Deployment.Name}");
var codexVars = container.Recipe.EnvVars;
foreach (var vars in codexVars) log.Log(vars.ToString());
log.Log("");
}
}
log.Log($"Deployment metadata: {JsonConvert.SerializeObject(deployment.Metadata)}");
log.Log("");
@ -82,7 +86,7 @@ namespace ContinuousTests
private void CheckCodexNodes(BaseLog log, Configuration config)
{
var nodes = entryPoint.CreateInterface().WrapCodexContainers(config.CodexDeployment.CodexInstances.Select(i => i.Container).ToArray());
var nodes = entryPoint.CreateInterface().WrapCodexContainers(config.CodexDeployment.CodexInstances.Select(i => i.Containers).ToArray());
var pass = true;
foreach (var n in nodes)
{

View File

@ -59,7 +59,8 @@ namespace DistTestCore.Helpers
if (peer == null) return $"peerId: {node.peerId} is not known.";
var container = peer.Node.Container;
var ip = container.Pod.PodInfo.Ip;
var podInfo = peer.Node.GetPodInfo();
var ip = podInfo.Ip;
var discPort = container.Recipe.GetPortByTag(CodexContainerRecipe.DiscoveryPortTag)!;
return $"{ip}:{discPort.Number}";
}

View File

@ -15,7 +15,7 @@ namespace BiblioTech
protected override async Task ExecuteDeploymentCommand(CommandContext context, CodexDeployment codexDeployment)
{
var codexContainers = codexDeployment.CodexInstances.Select(c => c.Container).ToArray();
var codexContainers = codexDeployment.CodexInstances.Select(c => c.Containers).ToArray();
var group = ci.WrapCodexContainers(codexContainers);

View File

@ -133,6 +133,8 @@ namespace BiblioTech.Commands
{
var deployments = Program.DeploymentFilesMonitor.GetDeployments();
//todo shows old deployments
if (!deployments.Any())
{
await context.Followup("No deployments available.");
@ -266,7 +268,7 @@ namespace BiblioTech.Commands
try
{
var group = ci.WrapCodexContainers(deployment.CodexInstances.Select(i => i.Container).ToArray());
var group = ci.WrapCodexContainers(deployment.CodexInstances.Select(i => i.Containers).ToArray());
await action(group, deployment.Metadata.Name);
}
catch (Exception ex)

View File

@ -165,7 +165,7 @@ namespace CodexNetDeployer
private CodexInstance CreateCodexInstance(ICodexNode node)
{
return new CodexInstance(node.Container, node.GetDebugInfo());
return new CodexInstance(node.Container.RunningContainers, node.GetDebugInfo());
}
private string? GetKubeConfig(string kubeConfigFile)