dashboard/helpers/kubernetes.py
2022-09-28 14:57:20 +02:00

306 lines
11 KiB
Python

"""
List of functions to get data from Flux Kustomizations and Helmreleases
"""
import crypt
import secrets
import string
import jinja2
import yaml
from kubernetes import client, config
from kubernetes.client import api_client
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import create_from_yaml
from kubernetes.utils.create_from_yaml import FailToCreateError
# Load the kube config once
config.load_kube_config()
def create_variables_secret(app_slug, variables_filepath):
"""Checks if a variables secret for app_name already exists, generates it if necessary.
:param app_slug: The slug of the app, used in the oauth secrets
:type app_slug: string
:param variables_filepath: The path to an existing jinja2 template
:type variables_filepath: string
"""
new_secret_dict = read_template_to_dict(
variables_filepath,
{"app": app_slug})
secret_name, secret_namespace = get_secret_metadata(new_secret_dict)
current_secret_data = get_kubernetes_secret_data(
secret_name, secret_namespace
)
if current_secret_data is None:
# Create new secret
update_secret = False
elif current_secret_data.keys() != new_secret_dict["data"].keys():
# Update current secret with new keys
update_secret = True
print(
f"Secret {secret_name} in namespace {secret_namespace}"
" already exists. Merging..."
)
# Merge dicts. Values from current_secret_data take precedence
new_secret_dict["data"] |= current_secret_data
else:
# Do Nothing
print(
f"Secret {secret_name} in namespace {secret_namespace}"
" is already in a good state, doing nothing."
)
return True
print(
f"Storing secret {secret_name} in namespace"
f" {secret_namespace} in cluster."
)
store_kubernetes_secret(
new_secret_dict, secret_namespace, update=update_secret
)
return True
def get_secret_metadata(secret_dict):
"""Returns secret name and namespace from metadata field in a yaml string."""
secret_name = secret_dict["metadata"]["name"]
# default namespace is flux-system, but other namespace can be
# provided in secret metadata
if "namespace" in secret_dict["metadata"]:
secret_namespace = secret_dict["metadata"]["namespace"]
else:
secret_namespace = "flux-system"
return secret_name, secret_namespace
def get_kubernetes_secret_data(secret_name, namespace):
"""Returns the contents of a kubernetes secret or None if the secret does not exist."""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
try:
secret = api_instance.read_namespaced_secret(secret_name, namespace).data
except ApiException as ex:
# 404 is expected when the optional secret does not exist.
if ex.status != 404:
raise ex
return None
return secret
def store_kubernetes_secret(secret_dict, namespace, update=False):
"""Stores either a new secret in the cluster, or updates an existing one."""
api_client_instance = api_client.ApiClient()
if update:
verb = "updated"
api_response = patch_kubernetes_secret(secret_dict, namespace)
else:
verb = "created"
try:
api_response = create_from_yaml(
api_client_instance,
yaml_objects=[secret_dict],
namespace=namespace
)
except FailToCreateError as ex:
print(f"Secret not {verb} because of exception {ex}")
return
print(f"Secret {verb} with api response: {api_response}")
def store_kustomization(kustomization_template_filepath, app_slug):
"""Add a kustomization that installs app {app_slug} to the cluster"""
kustomization_dict = read_template_to_dict(kustomization_template_filepath,
{"app": app_slug})
api_client_instance = api_client.ApiClient()
custom_objects_api = client.CustomObjectsApi(api_client_instance)
try:
api_response = custom_objects_api.create_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta2",
namespace="flux-system",
plural="kustomizations",
body=kustomization_dict)
except FailToCreateError as ex:
print(f"Could not create {app_slug} Kustomization because of exception {ex}")
return
print(f"Kustomization created with api response: {api_response}")
def delete_kustomization(kustomization_name):
"""Deletes kustomization for an app_slug. Should also result in the
deletion of the app's HelmReleases, PVCs, OAuth2Client, etc. Nothing will
remain"""
api_client_instance = api_client.ApiClient()
custom_objects_api = client.CustomObjectsApi(api_client_instance)
body = client.V1DeleteOptions()
try:
api_response = custom_objects_api.delete_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta2",
namespace="flux-system",
plural="kustomizations",
name=kustomization_name,
body=body)
except ApiException as ex:
print(f"Could not delete {kustomization_name} Kustomization because of exception {ex}")
return False
print(f"Kustomization deleted with api response: {api_response}")
def read_template_to_dict(template_filepath, template_globals):
"""Reads a Jinja2 template that contains yaml and turns it into a dict
:param template_filepath: The path to an existing Jinja2 template
:type template_filepath: string
:param template_globals: The variables substituted in the template
:type template_globals: dict
:return: dict, or None if anything fails
"""
env = jinja2.Environment(
extensions=["jinja2_base64_filters.Base64Filters"])
env.filters["generate_password"] = generate_password
# Check if k8s secret already exists, if not, generate it
with open(template_filepath, encoding="UTF-8") as template_file:
lines = template_file.read()
templated_dict = yaml.safe_load(
env.from_string(lines, globals=template_globals).render()
)
return templated_dict
return None
def patch_kubernetes_secret(secret_dict, namespace):
"""Patches secret in the cluster with new data."""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
name = secret_dict["metadata"]["name"]
body = {}
body["data"] = secret_dict["data"]
return api_instance.patch_namespaced_secret(name, namespace, body)
def generate_password(length):
"""Generates a password of "length" characters."""
length = int(length)
password = "".join((secrets.choice(string.ascii_letters)
for i in range(length)))
return password
def gen_htpasswd(user, password):
"""Generate htpasswd entry for user with password."""
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
def get_all_kustomization_names(namespace='flux-system'):
"""
Returns all flux kustomizations in a namespace.
:param namespace: namespace that contains kustomizations. Default: `flux-system`
:type namespace: str
:return: List of names for kustomizations in namespace
:rtype: list
"""
kustomizations = get_all_kustomizations(namespace)
return_kustomizations = []
for kustomization in kustomizations['items']:
return_kustomizations.append(kustomization['metadata']['name'])
return return_kustomizations
def get_all_kustomizations(namespace='flux-system'):
"""
Returns all flux kustomizations in a namespace.
:param namespace: namespace that contains kustomizations. Default: `flux-system`
:type namespace: str
:return: Kustomizations as returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: object
"""
api = client.CustomObjectsApi()
api_response = api.list_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
plural="kustomizations",
namespace=namespace,
)
return api_response
def get_all_helmrelease_names(namespace='stackspin'):
"""
Returns names of all helmreleases in a namespace.
:param namespace: namespace that contains kustomizations. Default: `stackspin`
:type namespace: str
:return: List of names for helmreleases in namespace
:rtype: list
"""
helmreleases = get_all_helmreleases(namespace)
return_helmreleases = []
for helmrelease in helmreleases['items']:
return_helmreleases.append(helmrelease['metadata']['name'])
return return_helmreleases
def get_all_helmreleases(namespace='stackspin'):
"""
Returns all helmreleases in a namespace.
:param namespace: namespace that contains kustomizations. Default: `stackspin`
:type namespace: str
:return: Helmreleases as returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: object
"""
api = client.CustomObjectsApi()
api_response = api.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
plural="helmreleases",
namespace=namespace,
)
return api_response
def get_kustomization(name, namespace='flux-system'):
"""Returns all info of a Flux kustomization with name 'name'"""
api = client.CustomObjectsApi()
try:
resource = api.get_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
name=name,
namespace=namespace,
plural="kustomizations",
)
except client.exceptions.ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return resource
def get_helmrelease(name, namespace='stackspin-apps'):
"""Returns all info of a Flux helmrelease with name 'name'"""
api = client.CustomObjectsApi()
try:
resource = api.get_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
name=name,
namespace=namespace,
plural="helmreleases",
)
except client.exceptions.ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return resource
def get_readiness(app_status):
"""
Parses an app status's 'conditions' to find a type field called 'Ready' and
returns its status. Works for Kustomizations as well as Helmreleases.
"""
for condition in app_status['conditions']:
if condition['type'] == 'Ready':
return condition['status']
# If this point is reached, no condition "Ready" exists, so the application
# is not ready.
return False