mirror of
https://github.com/logos-storage/logos-storage-nim-cs-dist-tests.git
synced 2026-05-26 02:59:26 +00:00
Replace the indirect `SetSchedulingAffinity(notIn: "false")` / `allow-tests-pods` mechanism with `ScheduleInPoolsWithLabel(key, value)` and `AddToleration(key, value, effect)` in ContainerRecipeFactory. This is much more readable from an API perspective. `SetSchedulingAffinity(notIn: "false")` was a double-negative (hard to reason about) and it was not clear that this was meant to schedule on pools with labels `allow-tests-pods=true`. Previously, pods were steered to the spot node pool via a node affinity exclusion on a boolean label (`allow-tests-pods NotIn ["false"]`), and spot taint toleration was added implicitly by using the `system-node-critical` priority class. The priority class was removed earlier because it caused a ResourceQuota admission error in GCP, which silently broke spot node scheduling. The new API is explicit: recipes call `ScheduleInPoolsWithLabel` to set a nodeSelector label that targets the intended pool, and `AddToleration` to declare any taints the pool carries. Tolerations are set at the recipe level to allow for the recipe to move back to Digital Ocean if needed (removing the unneeded toleration). All four recipes (storage, prometheus, discord bot, rewarder bot) now call both. Cleanup applied alongside: - `PodToleration` converted to a record for structural equality and simpler deduplication - `ExposedPorts`, `InternalPorts`, `EnvVars`, `Volumes` on `ContainerRecipe` changed to `IReadOnlyList<T>` for consistent immutable typing - `SetCriticalPriority` property renamed to `IsCriticalPriority` - `GetPriorityClassName` returns `string?` instead of `null!` - `Reset()` extracted in `ContainerRecipeFactory` to consolidate post-create state reset - Fixed bug: `nodePoolLabels` and `tolerations` were passed by reference and then cleared, leaving the recipe with empty collections; now snapshotted before clearing - `SchedulingAffinity.cs` deleted (no remaining callers)
970 lines
36 KiB
C#
970 lines
36 KiB
C#
using k8s;
|
|
using k8s.Models;
|
|
using KubernetesWorkflow.Recipe;
|
|
using KubernetesWorkflow.Types;
|
|
using Logging;
|
|
using Utils;
|
|
|
|
namespace KubernetesWorkflow
|
|
{
|
|
public class K8sController
|
|
{
|
|
private readonly ILog log;
|
|
private readonly K8sCluster cluster;
|
|
private readonly WorkflowNumberSource workflowNumberSource;
|
|
private readonly K8sClient client;
|
|
public const string PodLabelKey = "pod-uuid";
|
|
|
|
public K8sController(ILog log, K8sCluster cluster, WorkflowNumberSource workflowNumberSource, string k8sNamespace)
|
|
{
|
|
this.log = log;
|
|
this.cluster = cluster;
|
|
this.workflowNumberSource = workflowNumberSource;
|
|
client = new K8sClient(cluster.GetK8sClientConfig());
|
|
|
|
K8sNamespace = k8sNamespace;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
client.Dispose();
|
|
}
|
|
|
|
public StartResult BringOnline(ContainerRecipe[] containerRecipes, ILocation location)
|
|
{
|
|
log.Debug();
|
|
EnsureNamespace();
|
|
|
|
var podLabel = K8sNameUtils.Format(Guid.NewGuid().ToString());
|
|
var deployment = CreateDeployment(containerRecipes, location, podLabel);
|
|
var internalService = CreateInternalService(containerRecipes);
|
|
var externalService = CreateExternalService(containerRecipes);
|
|
|
|
return new StartResult(cluster, containerRecipes, deployment, internalService, externalService);
|
|
}
|
|
|
|
public void WaitUntilOnline(RunningContainer container)
|
|
{
|
|
WaitUntilDeploymentOnline(container);
|
|
}
|
|
|
|
public PodInfo GetPodInfo(RunningDeployment deployment)
|
|
{
|
|
var pod = GetPodForDeployment(deployment);
|
|
return CreatePodInfo(pod);
|
|
}
|
|
|
|
public void Stop(StartResult startResult, bool waitTillStopped)
|
|
{
|
|
log.Debug();
|
|
if (startResult.InternalService != null) DeleteService(startResult.InternalService);
|
|
if (startResult.ExternalService != null) DeleteService(startResult.ExternalService);
|
|
DeleteDeployment(startResult.Deployment);
|
|
|
|
if (waitTillStopped) WaitUntilPodsForDeploymentAreOffline(startResult.Deployment);
|
|
}
|
|
|
|
public void DownloadPodLog(RunningContainer container, ILogHandler logHandler, int? tailLines, bool? previous)
|
|
{
|
|
log.Debug();
|
|
|
|
var podName = GetPodName(container);
|
|
var recipeName = container.Recipe.Name;
|
|
|
|
using var stream = client.Run(c => c.ReadNamespacedPodLog(podName, K8sNamespace, recipeName, tailLines: tailLines, previous: previous));
|
|
logHandler.Log(stream);
|
|
}
|
|
|
|
public string ExecuteCommand(RunningContainer container, string command, params string[] args)
|
|
{
|
|
var containerName = container.Recipe.Name;
|
|
var cmdAndArgs = $"{containerName}: {command} ({string.Join(",", args)})";
|
|
log.Debug(cmdAndArgs);
|
|
|
|
var podName = GetPodName(container);
|
|
var runner = new CommandRunner(client, K8sNamespace, podName, containerName, command, args);
|
|
runner.Run();
|
|
var result = runner.GetStdOut();
|
|
|
|
log.Debug($"{cmdAndArgs} = '{result}'");
|
|
return result;
|
|
}
|
|
|
|
public int[] GetUsedExternalPorts()
|
|
{
|
|
return client.Run(c =>
|
|
{
|
|
var result = new List<int>();
|
|
|
|
var services = c.ListServiceForAllNamespaces();
|
|
var nodePorts = services.Items.Where(s => s.Spec.Type == "NodePort").ToArray();
|
|
if (!nodePorts.Any()) return result.ToArray();
|
|
|
|
foreach (var service in nodePorts)
|
|
{
|
|
foreach (var port in service.Spec.Ports)
|
|
{
|
|
if (port.NodePort.HasValue)
|
|
{
|
|
result.Add(port.NodePort.Value);
|
|
}
|
|
}
|
|
}
|
|
|
|
return result.ToArray();
|
|
});
|
|
}
|
|
|
|
public void DeleteAllNamespacesStartingWith(string prefix, bool wait)
|
|
{
|
|
log.Debug();
|
|
|
|
var all = client.Run(c => c.ListNamespace().Items);
|
|
var namespaces = all.Select(n => n.Name()).Where(n => n.StartsWith(prefix));
|
|
|
|
if (wait)
|
|
{
|
|
// If we're going to wait, trigger the delete for all the namespaces immediately.
|
|
// Then wait for them to finish one by one.
|
|
foreach (var ns in namespaces)
|
|
{
|
|
DeleteNamespace(ns, false);
|
|
}
|
|
}
|
|
|
|
foreach (var ns in namespaces)
|
|
{
|
|
DeleteNamespace(ns, wait);
|
|
}
|
|
}
|
|
|
|
public void DeleteNamespace(bool wait)
|
|
{
|
|
log.Debug();
|
|
if (IsNamespaceOnline(K8sNamespace))
|
|
{
|
|
client.Run(c => c.DeleteNamespace(K8sNamespace, null, null, gracePeriodSeconds: 0));
|
|
|
|
if (wait) WaitUntilNamespaceDeleted(K8sNamespace);
|
|
}
|
|
}
|
|
|
|
public void DeleteNamespace(string ns, bool wait)
|
|
{
|
|
log.Debug();
|
|
if (IsNamespaceOnline(ns))
|
|
{
|
|
client.Run(c => c.DeleteNamespace(ns, null, null, gracePeriodSeconds: 0));
|
|
if (wait) WaitUntilNamespaceDeleted(ns);
|
|
}
|
|
}
|
|
|
|
#region Discover K8s Nodes
|
|
|
|
public K8sNodeLabel[] GetAvailableK8sNodes()
|
|
{
|
|
var nodes = client.Run(c => c.ListNode());
|
|
|
|
var optionals = nodes.Items.Select(i => CreateNodeLabel(i));
|
|
return optionals.Where(n => n != null).Select(n => n!).ToArray();
|
|
}
|
|
|
|
private K8sNodeLabel? CreateNodeLabel(V1Node i)
|
|
{
|
|
var keys = i.Metadata.Labels.Keys;
|
|
var hostnameKey = keys.SingleOrDefault(k => k.ToLowerInvariant().Contains("hostname"));
|
|
if (hostnameKey != null)
|
|
{
|
|
var hostnameValue = i.Metadata.Labels[hostnameKey];
|
|
return new K8sNodeLabel(hostnameKey, hostnameValue);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Namespace management
|
|
|
|
private string K8sNamespace { get; }
|
|
|
|
private void EnsureNamespace()
|
|
{
|
|
if (IsNamespaceOnline(K8sNamespace)) return;
|
|
|
|
var namespaceSpec = new V1Namespace
|
|
{
|
|
ApiVersion = "v1",
|
|
Metadata = new V1ObjectMeta
|
|
{
|
|
Name = K8sNamespace,
|
|
Labels = new Dictionary<string, string> { { "name", K8sNamespace } }
|
|
}
|
|
};
|
|
client.Run(c => c.CreateNamespace(namespaceSpec));
|
|
WaitUntilNamespaceCreated();
|
|
|
|
CreatePolicy();
|
|
}
|
|
|
|
private bool IsNamespaceOnline(string name)
|
|
{
|
|
return client.Run(c => c.ListNamespace().Items.Any(n => n.Metadata.Name == name));
|
|
}
|
|
|
|
private void CreatePolicy()
|
|
{
|
|
client.Run(c =>
|
|
{
|
|
var body = new V1NetworkPolicy
|
|
{
|
|
Metadata = new V1ObjectMeta
|
|
{
|
|
Name = "isolate-policy",
|
|
NamespaceProperty = K8sNamespace
|
|
},
|
|
Spec = new V1NetworkPolicySpec
|
|
{
|
|
PodSelector = new V1LabelSelector {},
|
|
PolicyTypes = new[]
|
|
{
|
|
"Ingress",
|
|
"Egress"
|
|
},
|
|
Ingress = new List<V1NetworkPolicyIngressRule>
|
|
{
|
|
new V1NetworkPolicyIngressRule
|
|
{
|
|
FromProperty = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
PodSelector = new V1LabelSelector {}
|
|
}
|
|
}
|
|
},
|
|
new V1NetworkPolicyIngressRule
|
|
{
|
|
FromProperty = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
NamespaceSelector = new V1LabelSelector
|
|
{
|
|
MatchLabels = GetRunnerNamespaceSelector()
|
|
}
|
|
}
|
|
}
|
|
},
|
|
new V1NetworkPolicyIngressRule
|
|
{
|
|
FromProperty = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
NamespaceSelector = new V1LabelSelector
|
|
{
|
|
MatchLabels = GetPrometheusNamespaceSelector()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
},
|
|
Egress = new List<V1NetworkPolicyEgressRule>
|
|
{
|
|
new V1NetworkPolicyEgressRule
|
|
{
|
|
To = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
PodSelector = new V1LabelSelector {}
|
|
}
|
|
}
|
|
},
|
|
new V1NetworkPolicyEgressRule
|
|
{
|
|
To = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
NamespaceSelector = new V1LabelSelector
|
|
{
|
|
MatchLabels = new Dictionary<string, string> { { "kubernetes.io/metadata.name", "kube-system" } }
|
|
}
|
|
},
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
PodSelector = new V1LabelSelector
|
|
{
|
|
MatchLabels = new Dictionary<string, string> { { "k8s-app", "kube-dns" } }
|
|
}
|
|
}
|
|
},
|
|
Ports = new List<V1NetworkPolicyPort>
|
|
{
|
|
new V1NetworkPolicyPort
|
|
{
|
|
Port = 53,
|
|
Protocol = "UDP"
|
|
}
|
|
}
|
|
},
|
|
new V1NetworkPolicyEgressRule
|
|
{
|
|
To = new List<V1NetworkPolicyPeer>
|
|
{
|
|
new V1NetworkPolicyPeer
|
|
{
|
|
IpBlock = new V1IPBlock
|
|
{
|
|
Cidr = "0.0.0.0/0"
|
|
}
|
|
}
|
|
},
|
|
Ports = new List<V1NetworkPolicyPort>
|
|
{
|
|
new V1NetworkPolicyPort
|
|
{
|
|
Port = 80,
|
|
Protocol = "TCP"
|
|
},
|
|
new V1NetworkPolicyPort
|
|
{
|
|
Port = 443,
|
|
Protocol = "TCP"
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
};
|
|
|
|
c.CreateNamespacedNetworkPolicy(body, K8sNamespace);
|
|
});
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Deployment management
|
|
|
|
private RunningDeployment CreateDeployment(ContainerRecipe[] containerRecipes, ILocation location, string podLabel)
|
|
{
|
|
var deploymentSpec = new V1Deployment
|
|
{
|
|
ApiVersion = "apps/v1",
|
|
Metadata = CreateDeploymentMetadata(containerRecipes),
|
|
Spec = new V1DeploymentSpec
|
|
{
|
|
Replicas = 1,
|
|
Selector = new V1LabelSelector
|
|
{
|
|
MatchLabels = GetSelector(containerRecipes)
|
|
},
|
|
Template = new V1PodTemplateSpec
|
|
{
|
|
Metadata = new V1ObjectMeta
|
|
{
|
|
Labels = GetSelector(containerRecipes, podLabel),
|
|
Annotations = GetAnnotations(containerRecipes)
|
|
},
|
|
Spec = new V1PodSpec
|
|
{
|
|
PriorityClassName = GetPriorityClassName(containerRecipes),
|
|
NodeSelector = CreateNodeSelector(location, containerRecipes),
|
|
Tolerations = CreateTolerations(containerRecipes),
|
|
Containers = CreateDeploymentContainers(containerRecipes),
|
|
Volumes = CreateVolumes(containerRecipes)
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
client.Run(c => c.CreateNamespacedDeployment(deploymentSpec, K8sNamespace));
|
|
|
|
var name = deploymentSpec.Metadata.Name;
|
|
return new RunningDeployment(name, podLabel);
|
|
}
|
|
|
|
private void DeleteDeployment(RunningDeployment deployment)
|
|
{
|
|
client.Run(c => c.DeleteNamespacedDeployment(deployment.Name, K8sNamespace));
|
|
WaitUntilDeploymentOffline(deployment.Name);
|
|
}
|
|
|
|
private IDictionary<string, string> CreateNodeSelector(ILocation location, ContainerRecipe[] recipes)
|
|
{
|
|
var result = new Dictionary<string, string>();
|
|
|
|
var nodeLabel = GetNodeLabelForLocation(location);
|
|
if (nodeLabel != null) result[nodeLabel.Key] = nodeLabel.Value;
|
|
|
|
foreach (var recipe in recipes)
|
|
foreach (var kvp in recipe.NodePoolLabels)
|
|
result[kvp.Key] = kvp.Value;
|
|
|
|
return result;
|
|
}
|
|
|
|
private IList<V1Toleration>? CreateTolerations(ContainerRecipe[] recipes)
|
|
{
|
|
var distinct = recipes.SelectMany(r => r.Tolerations).Distinct().ToList();
|
|
if (!distinct.Any()) return null;
|
|
|
|
return distinct.Select(t => new V1Toleration
|
|
{
|
|
Key = t.Key,
|
|
OperatorProperty = "Equal",
|
|
Value = t.Value,
|
|
Effect = t.Effect
|
|
}).ToList();
|
|
}
|
|
|
|
private K8sNodeLabel? GetNodeLabelForLocation(ILocation location)
|
|
{
|
|
var l = (Location)location;
|
|
return l.NodeLabel;
|
|
}
|
|
|
|
private string? GetPriorityClassName(ContainerRecipe[] containerRecipes)
|
|
{
|
|
if (containerRecipes.Any(c => c.IsCriticalPriority)) return "system-node-critical";
|
|
return null;
|
|
}
|
|
|
|
private IDictionary<string, string> GetSelector(ContainerRecipe[] containerRecipes)
|
|
{
|
|
return containerRecipes.First().PodLabels.GetLabels();
|
|
}
|
|
|
|
private IDictionary<string, string> GetSelector(ContainerRecipe[] containerRecipes, string podLabel)
|
|
{
|
|
var labels = containerRecipes.First().PodLabels.Clone();
|
|
labels.Add(PodLabelKey, podLabel);
|
|
return labels.GetLabels();
|
|
}
|
|
|
|
private IDictionary<string, string> GetRunnerNamespaceSelector()
|
|
{
|
|
return new Dictionary<string, string> { { "kubernetes.io/metadata.name", "default" } };
|
|
}
|
|
|
|
private IDictionary<string, string> GetPrometheusNamespaceSelector()
|
|
{
|
|
return new Dictionary<string, string> { { "kubernetes.io/metadata.name", "monitoring" } };
|
|
}
|
|
|
|
private IDictionary<string, string> GetAnnotations(ContainerRecipe[] containerRecipes)
|
|
{
|
|
return containerRecipes.First().PodAnnotations.GetAnnotations();
|
|
}
|
|
|
|
private V1ObjectMeta CreateDeploymentMetadata(ContainerRecipe[] containerRecipes)
|
|
{
|
|
return new V1ObjectMeta
|
|
{
|
|
Name = string.Join('-',containerRecipes.Select(r => r.Name)),
|
|
NamespaceProperty = K8sNamespace,
|
|
Labels = GetSelector(containerRecipes),
|
|
Annotations = GetAnnotations(containerRecipes)
|
|
};
|
|
}
|
|
|
|
private List<V1Container> CreateDeploymentContainers(ContainerRecipe[] containerRecipes)
|
|
{
|
|
return containerRecipes.Select(CreateDeploymentContainer).ToList();
|
|
}
|
|
|
|
private V1Container CreateDeploymentContainer(ContainerRecipe recipe)
|
|
{
|
|
return new V1Container
|
|
{
|
|
Name = recipe.Name,
|
|
Image = recipe.Image,
|
|
ImagePullPolicy = "Always",
|
|
Ports = CreateContainerPorts(recipe),
|
|
Env = CreateEnv(recipe),
|
|
VolumeMounts = CreateContainerVolumeMounts(recipe),
|
|
Resources = CreateResourceLimits(recipe),
|
|
Command = CreateCommandList(recipe)
|
|
};
|
|
}
|
|
|
|
private IList<string> CreateCommandList(ContainerRecipe recipe)
|
|
{
|
|
if (recipe.CommandOverride == null || !recipe.CommandOverride.Command.Any()) return null!;
|
|
return recipe.CommandOverride.Command.ToList();
|
|
}
|
|
|
|
private V1ResourceRequirements CreateResourceLimits(ContainerRecipe recipe)
|
|
{
|
|
return new V1ResourceRequirements
|
|
{
|
|
Requests = CreateResourceQuantities(recipe.Resources.Requests),
|
|
Limits = CreateResourceQuantities(recipe.Resources.Limits)
|
|
};
|
|
}
|
|
|
|
private Dictionary<string, ResourceQuantity> CreateResourceQuantities(ContainerResourceSet set)
|
|
{
|
|
var result = new Dictionary<string, ResourceQuantity>();
|
|
if (set.MilliCPUs != 0)
|
|
{
|
|
result.Add("cpu", new ResourceQuantity($"{set.MilliCPUs}m"));
|
|
}
|
|
if (set.Memory.SizeInBytes != 0)
|
|
{
|
|
result.Add("memory", new ResourceQuantity(set.Memory.SizeInBytes.ToString()));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private List<V1VolumeMount> CreateContainerVolumeMounts(ContainerRecipe recipe)
|
|
{
|
|
return recipe.Volumes.Select(CreateContainerVolumeMount).ToList();
|
|
}
|
|
|
|
private V1VolumeMount CreateContainerVolumeMount(VolumeMount v)
|
|
{
|
|
return new V1VolumeMount
|
|
{
|
|
Name = v.VolumeName,
|
|
MountPath = v.MountPath,
|
|
SubPath = v.SubPath,
|
|
};
|
|
}
|
|
|
|
private List<V1Volume> CreateVolumes(ContainerRecipe[] containerRecipes)
|
|
{
|
|
return containerRecipes.Where(c => c.Volumes.Any()).SelectMany(CreateVolumes).ToList();
|
|
}
|
|
|
|
private List<V1Volume> CreateVolumes(ContainerRecipe recipe)
|
|
{
|
|
return recipe.Volumes.Select(CreateVolume).ToList();
|
|
}
|
|
|
|
private V1Volume CreateVolume(VolumeMount v)
|
|
{
|
|
CreatePersistentVolumeClaimIfNeeded(v);
|
|
|
|
if (!string.IsNullOrEmpty(v.HostPath))
|
|
{
|
|
return new V1Volume
|
|
{
|
|
Name = v.VolumeName,
|
|
HostPath = new V1HostPathVolumeSource
|
|
{
|
|
Path = v.HostPath
|
|
}
|
|
};
|
|
}
|
|
|
|
if (!string.IsNullOrEmpty(v.Secret))
|
|
{
|
|
return new V1Volume
|
|
{
|
|
Name = v.VolumeName,
|
|
Secret = CreateVolumeSecret(v)
|
|
};
|
|
}
|
|
|
|
return new V1Volume
|
|
{
|
|
Name = v.VolumeName,
|
|
PersistentVolumeClaim = new V1PersistentVolumeClaimVolumeSource
|
|
{
|
|
ClaimName = v.VolumeName
|
|
}
|
|
};
|
|
}
|
|
|
|
private void CreatePersistentVolumeClaimIfNeeded(VolumeMount v)
|
|
{
|
|
var pvcs = client.Run(c => c.ListNamespacedPersistentVolumeClaim(K8sNamespace));
|
|
if (pvcs != null && pvcs.Items.Any(i => i.Name() == v.VolumeName)) return;
|
|
|
|
client.Run(c => c.CreateNamespacedPersistentVolumeClaim(new V1PersistentVolumeClaim
|
|
{
|
|
ApiVersion = "v1",
|
|
Metadata = new V1ObjectMeta
|
|
{
|
|
Name = v.VolumeName,
|
|
},
|
|
Spec = new V1PersistentVolumeClaimSpec
|
|
{
|
|
AccessModes = new List<string>
|
|
{
|
|
"ReadWriteOnce"
|
|
},
|
|
Resources = CreateVolumeResourceRequirements(v),
|
|
},
|
|
}, K8sNamespace));
|
|
}
|
|
|
|
private V1SecretVolumeSource CreateVolumeSecret(VolumeMount v)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(v.Secret)) return null!;
|
|
return new V1SecretVolumeSource
|
|
{
|
|
SecretName = v.Secret
|
|
};
|
|
}
|
|
|
|
private V1VolumeResourceRequirements CreateVolumeResourceRequirements(VolumeMount v)
|
|
{
|
|
if (v.ResourceQuantity == null) return null!;
|
|
return new V1VolumeResourceRequirements
|
|
{
|
|
Requests = new Dictionary<string, ResourceQuantity>()
|
|
{
|
|
{"storage", new ResourceQuantity(v.ResourceQuantity) }
|
|
}
|
|
};
|
|
}
|
|
|
|
private List<V1EnvVar> CreateEnv(ContainerRecipe recipe)
|
|
{
|
|
return recipe.EnvVars.Select(CreateEnvVar).ToList();
|
|
}
|
|
|
|
private V1EnvVar CreateEnvVar(EnvVar envVar)
|
|
{
|
|
return new V1EnvVar
|
|
{
|
|
Name = envVar.Name,
|
|
Value = envVar.Value,
|
|
};
|
|
}
|
|
|
|
private List<V1ContainerPort> CreateContainerPorts(ContainerRecipe recipe)
|
|
{
|
|
var exposedPorts = recipe.ExposedPorts.SelectMany(p => CreateContainerPort(recipe, p));
|
|
var internalPorts = recipe.InternalPorts.SelectMany(p => CreateContainerPort(recipe, p));
|
|
return exposedPorts.Concat(internalPorts).ToList();
|
|
}
|
|
|
|
private List<V1ContainerPort> CreateContainerPort(ContainerRecipe recipe, Port port)
|
|
{
|
|
var result = new List<V1ContainerPort>();
|
|
if (port.IsTcp()) CreateTcpContainerPort(result, recipe, port);
|
|
if (port.IsUdp()) CreateUdpContainerPort(result, recipe, port);
|
|
return result;
|
|
}
|
|
|
|
private void CreateUdpContainerPort(List<V1ContainerPort> result, ContainerRecipe recipe, Port port)
|
|
{
|
|
result.Add(CreateContainerPort(recipe, port, "UDP"));
|
|
}
|
|
|
|
private void CreateTcpContainerPort(List<V1ContainerPort> result, ContainerRecipe recipe, Port port)
|
|
{
|
|
result.Add(CreateContainerPort(recipe, port, "TCP"));
|
|
}
|
|
|
|
private V1ContainerPort CreateContainerPort(ContainerRecipe recipe, Port port, string protocol)
|
|
{
|
|
return new V1ContainerPort
|
|
{
|
|
Name = GetNameForPort(recipe, port),
|
|
ContainerPort = port.Number,
|
|
Protocol = protocol
|
|
};
|
|
}
|
|
|
|
private string GetNameForPort(ContainerRecipe recipe, Port port)
|
|
{
|
|
var inputs = new[]
|
|
{
|
|
$"p{workflowNumberSource.WorkflowNumber}",
|
|
recipe.Number.ToString(),
|
|
port.Number.ToString(),
|
|
port.Protocol.ToString().ToLowerInvariant()
|
|
};
|
|
|
|
return K8sNameUtils.FormatPortName(string.Join(",", inputs));
|
|
}
|
|
|
|
private string GetPodName(RunningContainer container)
|
|
{
|
|
return GetPodForDeployment(container.RunningPod.StartResult.Deployment).Metadata.Name;
|
|
}
|
|
|
|
private V1Pod GetPodForDeployment(RunningDeployment deployment)
|
|
{
|
|
return Time.Retry(() => GetPodForDeplomentInternal(deployment),
|
|
// K8s might be moving pods around. If it's scaling the cluster
|
|
// to handle the increased load, it might take a while before the new
|
|
// VMs are up and ready. So we use a generous timeout.
|
|
maxTimeout: TimeSpan.FromMinutes(15.0),
|
|
retryTime: TimeSpan.FromSeconds(30.0),
|
|
description: "Find pod by label for deployment.");
|
|
}
|
|
|
|
private V1Pod GetPodForDeplomentInternal(RunningDeployment deployment)
|
|
{
|
|
var allPods = client.Run(c => c.ListNamespacedPod(K8sNamespace));
|
|
var pods = allPods.Items.Where(p => p.GetLabel(PodLabelKey) == deployment.PodLabel).ToArray();
|
|
|
|
if (pods.Length != 1)
|
|
{
|
|
var allLabels = allPods.Items.Select(p =>
|
|
{
|
|
var labels = string.Join(",", p.Labels().Select(l => $"{l.Key}={l.Value}"));
|
|
return $"pod:'{p.Name()}' has labels: [{labels}]";
|
|
});
|
|
throw new Exception($"Expected to find 1 pod by podLabel '{deployment.PodLabel}'. Found: {pods.Length}. " +
|
|
$"Total number of pods: {allPods.Items.Count}. Their labels: {string.Join(Environment.NewLine, allLabels)}");
|
|
}
|
|
var pod = pods[0];
|
|
if (pod.Status == null) throw new Exception("Pod status unknown");
|
|
if (string.IsNullOrEmpty(pod.Status.PodIP))
|
|
{
|
|
// Pod exists but has no IP yet (still being scheduled/networked).
|
|
// Sleep briefly so this failure takes >1s, preventing the failFast
|
|
// short-circuit in Retry.CheckMaximums from firing after 6 tries.
|
|
// The caller retries for up to 15 minutes, which is needed when the
|
|
// spot node pool is scaling up to accommodate new pods.
|
|
Time.Sleep(TimeSpan.FromSeconds(2));
|
|
throw new Exception("Pod IP unknown");
|
|
}
|
|
return pod;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Service management
|
|
|
|
private RunningService? CreateInternalService(ContainerRecipe[] recipes)
|
|
{
|
|
return CreateService(recipes, r => r.InternalPorts.Concat(r.ExposedPorts), "ClusterIP", "int", false);
|
|
}
|
|
|
|
private RunningService? CreateExternalService(ContainerRecipe[] recipes)
|
|
{
|
|
return CreateService(recipes, r => r.ExposedPorts, "NodePort", "ext", true);
|
|
}
|
|
|
|
private RunningService? CreateService(ContainerRecipe[] recipes, Func<ContainerRecipe, IEnumerable<Port>> portSelector, string serviceType, string namePostfix, bool isNodePort)
|
|
{
|
|
var ports = CreateServicePorts(recipes, portSelector, isNodePort);
|
|
if (!ports.Any()) return null;
|
|
|
|
var serviceSpec = new V1Service
|
|
{
|
|
ApiVersion = "v1",
|
|
Metadata = CreateServiceMetadata(recipes, namePostfix),
|
|
Spec = new V1ServiceSpec
|
|
{
|
|
Type = serviceType,
|
|
Selector = GetSelector(recipes),
|
|
Ports = ports,
|
|
}
|
|
};
|
|
|
|
client.Run(c => c.CreateNamespacedService(serviceSpec, K8sNamespace));
|
|
|
|
var result = ReadBackServiceAndMapPorts(serviceSpec, recipes);
|
|
var name = serviceSpec.Metadata.Name;
|
|
|
|
return new RunningService(name, result);
|
|
}
|
|
|
|
private List<ContainerRecipePortMapEntry> ReadBackServiceAndMapPorts(V1Service serviceSpec, ContainerRecipe[] containerRecipes)
|
|
{
|
|
var result = new List<ContainerRecipePortMapEntry>();
|
|
|
|
// For each container-recipe-port, we need to figure out which service-ports it was assigned by K8s.
|
|
var readback = client.Run(c => c.ReadNamespacedService(serviceSpec.Metadata.Name, K8sNamespace));
|
|
foreach (var r in containerRecipes)
|
|
{
|
|
var recipePorts = r.ExposedPorts.Concat(r.InternalPorts).ToArray();
|
|
foreach (var port in recipePorts)
|
|
{
|
|
var portName = GetNameForPort(r, port);
|
|
|
|
var matchingServicePorts = readback.Spec.Ports.Where(p => p.Name == portName);
|
|
var ports = matchingServicePorts.Select(p => MapPortIfAble(p, port.Tag, port.Protocol)).ToArray();
|
|
|
|
if (ports.Any())
|
|
{
|
|
result.Add(new ContainerRecipePortMapEntry(r.Number, ports));
|
|
log.Debug($"Service Readback: {portName} found: {string.Join(",", ports.Select(p => p.ToString()))}");
|
|
}
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
private Port MapPortIfAble(V1ServicePort p, string tag, PortProtocol protocol)
|
|
{
|
|
if (p.NodePort != null) return new Port(p.NodePort.Value, tag, protocol);
|
|
if (p.Port > 0) return new Port(p.Port, tag, protocol);
|
|
throw new Exception("Unable to map port.");
|
|
}
|
|
|
|
private void DeleteService(RunningService service)
|
|
{
|
|
client.Run(c => c.DeleteNamespacedService(service.Name, K8sNamespace));
|
|
}
|
|
|
|
private V1ObjectMeta CreateServiceMetadata(ContainerRecipe[] containerRecipes, string namePostfix)
|
|
{
|
|
var recipeName = containerRecipes.First().Name;
|
|
var name = K8sNameUtils.Format($"{recipeName}-{workflowNumberSource.WorkflowNumber}-{namePostfix}");
|
|
log.Debug("Creating service: " + name);
|
|
return new V1ObjectMeta
|
|
{
|
|
Name = name,
|
|
NamespaceProperty = K8sNamespace,
|
|
};
|
|
}
|
|
|
|
private List<V1ServicePort> CreateServicePorts(ContainerRecipe[] recipes, Func<ContainerRecipe, IEnumerable<Port>> portSelector, bool isNodePort)
|
|
{
|
|
var result = new List<V1ServicePort>();
|
|
foreach (var recipe in recipes)
|
|
{
|
|
var ports = portSelector(recipe);
|
|
foreach (var port in ports)
|
|
{
|
|
result.AddRange(CreateServicePorts(recipe, port, isNodePort));
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private List<V1ServicePort> CreateServicePorts(ContainerRecipe recipe, Port recipePort, bool isNodePort)
|
|
{
|
|
var result = new List<V1ServicePort>();
|
|
if (recipePort.IsTcp()) CreateServicePort(result, recipe, recipePort, "TCP", isNodePort);
|
|
if (recipePort.IsUdp()) CreateServicePort(result, recipe, recipePort, "UDP", isNodePort);
|
|
return result;
|
|
}
|
|
|
|
private void CreateServicePort(List<V1ServicePort> result, ContainerRecipe recipe, Port port, string protocol, bool isNodePort)
|
|
{
|
|
var p = new V1ServicePort
|
|
{
|
|
Name = GetNameForPort(recipe, port),
|
|
Protocol = protocol,
|
|
Port = port.Number,
|
|
TargetPort = GetNameForPort(recipe, port)
|
|
};
|
|
|
|
if (isNodePort) p.NodePort = port.Number;
|
|
|
|
result.Add(p);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Waiting
|
|
|
|
private void WaitUntilNamespaceCreated()
|
|
{
|
|
WaitUntil(() => IsNamespaceOnline(K8sNamespace), nameof(WaitUntilNamespaceCreated));
|
|
}
|
|
|
|
private void WaitUntilNamespaceDeleted(string @namespace)
|
|
{
|
|
WaitUntil(() => !IsNamespaceOnline(@namespace), nameof(WaitUntilNamespaceDeleted));
|
|
}
|
|
|
|
private void WaitUntilDeploymentOnline(RunningContainer container)
|
|
{
|
|
WaitUntil(() =>
|
|
{
|
|
CheckForCrash(container);
|
|
|
|
var deployment = client.Run(c => c.ReadNamespacedDeployment(container.Recipe.Name, K8sNamespace));
|
|
return deployment?.Status.AvailableReplicas != null && deployment.Status.AvailableReplicas > 0;
|
|
}, nameof(WaitUntilDeploymentOnline));
|
|
}
|
|
|
|
private void CheckForCrash(RunningContainer container)
|
|
{
|
|
var deploymentName = container.Recipe.Name;
|
|
var podName = GetPodName(container);
|
|
|
|
var podInfo = client.Run(c => c.ReadNamespacedPod(podName, K8sNamespace));
|
|
if (podInfo == null) return;
|
|
if (podInfo.Status == null) return;
|
|
if (podInfo.Status.ContainerStatuses == null) return;
|
|
|
|
var result = podInfo.Status.ContainerStatuses.Any(c => c.RestartCount > 0);
|
|
if (result)
|
|
{
|
|
var msg = $"Pod crash detected for deployment {deploymentName} (pod:{podName})";
|
|
log.Error(msg);
|
|
|
|
DownloadPodLog(container, new WriteToFileLogHandler(log, msg, deploymentName), tailLines: null, previous: true);
|
|
|
|
throw new Exception(msg);
|
|
}
|
|
}
|
|
|
|
private void WaitUntilDeploymentOffline(string deploymentName)
|
|
{
|
|
WaitUntil(() =>
|
|
{
|
|
var deployments = client.Run(c => c.ListNamespacedDeployment(K8sNamespace));
|
|
var deployment = deployments.Items.SingleOrDefault(d => d.Metadata.Name == deploymentName);
|
|
return deployment == null || deployment.Status.AvailableReplicas == 0;
|
|
}, nameof(WaitUntilDeploymentOffline));
|
|
}
|
|
|
|
private void WaitUntilPodsForDeploymentAreOffline(RunningDeployment deployment)
|
|
{
|
|
WaitUntil(() =>
|
|
{
|
|
var pods = FindPodsByLabel(deployment.PodLabel);
|
|
return !pods.Any();
|
|
}, nameof(WaitUntilPodsForDeploymentAreOffline));
|
|
}
|
|
|
|
private void WaitUntil(Func<bool> predicate, string msg)
|
|
{
|
|
var sw = Stopwatch.Begin(log, true);
|
|
try
|
|
{
|
|
Time.WaitUntil(predicate, cluster.K8sOperationTimeout(), cluster.K8sOperationRetryDelay(), msg);
|
|
}
|
|
finally
|
|
{
|
|
sw.End(msg, 1);
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
|
|
public ContainerCrashWatcher CreateCrashWatcher(RunningContainer container)
|
|
{
|
|
var containerName = container.Name;
|
|
var podName = GetPodName(container);
|
|
var recipeName = container.Recipe.Name;
|
|
|
|
return new ContainerCrashWatcher(log, cluster.GetK8sClientConfig(), containerName, podName, recipeName, K8sNamespace);
|
|
}
|
|
|
|
private V1Pod[] FindPodsByLabel(string podLabel)
|
|
{
|
|
var pods = client.Run(c => c.ListNamespacedPod(K8sNamespace));
|
|
return pods.Items.Where(p => p.GetLabel(PodLabelKey) == podLabel).ToArray();
|
|
}
|
|
|
|
private PodInfo CreatePodInfo(V1Pod pod)
|
|
{
|
|
var name = pod.Name();
|
|
var ip = pod.Status.PodIP;
|
|
var k8sNodeName = pod.Spec.NodeName;
|
|
|
|
if (string.IsNullOrEmpty(name)) throw new InvalidOperationException("Invalid pod name received. Test infra failure.");
|
|
if (string.IsNullOrEmpty(ip)) throw new InvalidOperationException("Invalid pod IP received. Test infra failure.");
|
|
|
|
return new PodInfo(name, ip, k8sNodeName);
|
|
}
|
|
}
|
|
}
|