""" List of functions to get data from Flux Kustomizations and Helmreleases """ import crypt import secrets import string import jinja2 import yaml from kubernetes import client, config from kubernetes.client import api_client from kubernetes.client.exceptions import ApiException from kubernetes.utils import create_from_yaml from kubernetes.utils.create_from_yaml import FailToCreateError # Load the kube config once config.load_kube_config() def create_variables_secret(app_slug, variables_filepath): """Checks if a variables secret for app_name already exists, generates it if necessary. :param app_slug: The slug of the app, used in the oauth secrets :type app_slug: string :param variables_filepath: The path to an existing jinja2 template :type variables_filepath: string """ new_secret_dict = read_template_to_dict( variables_filepath, {"app": app_slug}) secret_name, secret_namespace = get_secret_metadata(new_secret_dict) current_secret_data = get_kubernetes_secret_data( secret_name, secret_namespace ) if current_secret_data is None: # Create new secret update_secret = False elif current_secret_data.keys() != new_secret_dict["data"].keys(): # Update current secret with new keys update_secret = True print( f"Secret {secret_name} in namespace {secret_namespace}" " already exists. Merging..." ) # Merge dicts. Values from current_secret_data take precedence new_secret_dict["data"] |= current_secret_data else: # Do Nothing print( f"Secret {secret_name} in namespace {secret_namespace}" " is already in a good state, doing nothing." ) return True print( f"Storing secret {secret_name} in namespace" f" {secret_namespace} in cluster." ) store_kubernetes_secret( new_secret_dict, secret_namespace, update=update_secret ) return True def get_secret_metadata(secret_dict): """Returns secret name and namespace from metadata field in a yaml string.""" secret_name = secret_dict["metadata"]["name"] # default namespace is flux-system, but other namespace can be # provided in secret metadata if "namespace" in secret_dict["metadata"]: secret_namespace = secret_dict["metadata"]["namespace"] else: secret_namespace = "flux-system" return secret_name, secret_namespace def get_kubernetes_secret_data(secret_name, namespace): """Returns the contents of a kubernetes secret or None if the secret does not exist.""" api_client_instance = api_client.ApiClient() api_instance = client.CoreV1Api(api_client_instance) try: secret = api_instance.read_namespaced_secret(secret_name, namespace).data except ApiException as ex: # 404 is expected when the optional secret does not exist. if ex.status != 404: raise ex return None return secret def store_kubernetes_secret(secret_dict, namespace, update=False): """Stores either a new secret in the cluster, or updates an existing one.""" api_client_instance = api_client.ApiClient() if update: verb = "updated" api_response = patch_kubernetes_secret(secret_dict, namespace) else: verb = "created" try: api_response = create_from_yaml( api_client_instance, yaml_objects=[secret_dict], namespace=namespace ) except FailToCreateError as ex: print(f"Secret not {verb} because of exception {ex}") return print(f"Secret {verb} with api response: {api_response}") def store_kustomization(kustomization_template_filepath, app_slug): """Add a kustomization that installs app {app_slug} to the cluster""" kustomization_dict = read_template_to_dict(kustomization_template_filepath, {"app": app_slug}) custom_objects_api = client.CustomObjectsApi() try: api_response = custom_objects_api.create_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta2", namespace="flux-system", plural="kustomizations", body=kustomization_dict) except FailToCreateError as ex: print(f"Could not create {app_slug} Kustomization because of exception {ex}") return print(f"Kustomization created with api response: {api_response}") def delete_kustomization(kustomization_name): """Deletes kustomization for an app_slug. Should also result in the deletion of the app's HelmReleases, PVCs, OAuth2Client, etc. Nothing will remain""" custom_objects_api = client.CustomObjectsApi() body = client.V1DeleteOptions() try: api_response = custom_objects_api.delete_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta2", namespace="flux-system", plural="kustomizations", name=kustomization_name, body=body) except ApiException as ex: print(f"Could not delete {kustomization_name} Kustomization because of exception {ex}") return False print(f"Kustomization deleted with api response: {api_response}") return api_response def read_template_to_dict(template_filepath, template_globals): """Reads a Jinja2 template that contains yaml and turns it into a dict :param template_filepath: The path to an existing Jinja2 template :type template_filepath: string :param template_globals: The variables substituted in the template :type template_globals: dict :return: dict, or None if anything fails """ env = jinja2.Environment( extensions=["jinja2_base64_filters.Base64Filters"]) env.filters["generate_password"] = generate_password # Check if k8s secret already exists, if not, generate it with open(template_filepath, encoding="UTF-8") as template_file: lines = template_file.read() templated_dict = yaml.safe_load( env.from_string(lines, globals=template_globals).render() ) return templated_dict return None def patch_kubernetes_secret(secret_dict, namespace): """Patches secret in the cluster with new data.""" api_client_instance = api_client.ApiClient() api_instance = client.CoreV1Api(api_client_instance) name = secret_dict["metadata"]["name"] body = {} body["data"] = secret_dict["data"] return api_instance.patch_namespaced_secret(name, namespace, body) def generate_password(length): """Generates a password of "length" characters.""" length = int(length) password = "".join((secrets.choice(string.ascii_letters) for i in range(length))) return password def gen_htpasswd(user, password): """Generate htpasswd entry for user with password.""" return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}" def get_all_kustomization_names(namespace='flux-system'): """ Returns all flux kustomizations in a namespace. :param namespace: namespace that contains kustomizations. Default: `flux-system` :type namespace: str :return: List of names for kustomizations in namespace :rtype: list """ kustomizations = get_all_kustomizations(namespace) return_kustomizations = [] for kustomization in kustomizations['items']: return_kustomizations.append(kustomization['metadata']['name']) return return_kustomizations def get_all_kustomizations(namespace='flux-system'): """ Returns all flux kustomizations in a namespace. :param namespace: namespace that contains kustomizations. Default: `flux-system` :type namespace: str :return: Kustomizations as returned by CustomObjectsApi.list_namespaced_custom_object() :rtype: object """ api = client.CustomObjectsApi() api_response = api.list_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta1", plural="kustomizations", namespace=namespace, ) return api_response def get_all_helmrelease_names(namespace='stackspin'): """ Returns names of all helmreleases in a namespace. :param namespace: namespace that contains kustomizations. Default: `stackspin` :type namespace: str :return: List of names for helmreleases in namespace :rtype: list """ helmreleases = get_all_helmreleases(namespace) return_helmreleases = [] for helmrelease in helmreleases['items']: return_helmreleases.append(helmrelease['metadata']['name']) return return_helmreleases def get_all_helmreleases(namespace='stackspin'): """ Returns all helmreleases in a namespace. :param namespace: namespace that contains kustomizations. Default: `stackspin` :type namespace: str :return: Helmreleases as returned by CustomObjectsApi.list_namespaced_custom_object() :rtype: object """ api = client.CustomObjectsApi() api_response = api.list_namespaced_custom_object( group="helm.toolkit.fluxcd.io", version="v2beta1", plural="helmreleases", namespace=namespace, ) return api_response def get_kustomization(name, namespace='flux-system'): """Returns all info of a Flux kustomization with name 'name'""" api = client.CustomObjectsApi() try: resource = api.get_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta1", name=name, namespace=namespace, plural="kustomizations", ) except client.exceptions.ApiException as error: if error.status == 404: return None # Raise all non-404 errors raise error return resource def get_helmrelease(name, namespace='stackspin-apps'): """Returns all info of a Flux helmrelease with name 'name'""" api = client.CustomObjectsApi() try: resource = api.get_namespaced_custom_object( group="helm.toolkit.fluxcd.io", version="v2beta1", name=name, namespace=namespace, plural="helmreleases", ) except client.exceptions.ApiException as error: if error.status == 404: return None # Raise all non-404 errors raise error return resource def list_helmreleases(namespace='stackspin-apps', label_selector=""): """ Lists all helmreleases in a certain namespace (stackspin-apps by default) Optionally takes a label selector to limit the list. """ api_instance = client.CustomObjectsApi() try: api_response = api_instance.list_namespaced_custom_object( group="helm.toolkit.fluxcd.io", version="v2beta1", namespace=namespace, plural="helmreleases", label_selector=label_selector) except ApiException as error: if error.status == 404: return None # Raise all non-404 errors raise error return api_response def get_readiness(app_status): """ Parses an app status's 'conditions' to find a type field called 'Ready' and returns its status. Works for Kustomizations as well as Helmreleases. """ for condition in app_status['conditions']: if condition['type'] == 'Ready': return condition['status'] # If this point is reached, no condition "Ready" exists, so the application # is not ready. return False