""" List of functions to get data from Flux Kustomizations and Helmreleases """ import crypt import secrets import string import jinja2 import yaml from kubernetes import client, config from kubernetes.client import api_client from kubernetes.client.exceptions import ApiException from kubernetes.utils import create_from_yaml from kubernetes.utils.create_from_yaml import FailToCreateError from flask import current_app # Load the kube config once # # By default this loads whatever we define in the `KUBECONFIG` env variable, # otherwise loads the config from default locations, similar to what kubectl # does. config.load_kube_config() def create_variables_secret(app_slug, variables_filepath): """Checks if a variables secret for app_name already exists, generates it if necessary. If a secret already exists, loops through keys from the template, and adds values for keys that miss in the Kubernetes secret, but are available in the template. :param app_slug: The slug of the app, used in the oauth secrets :type app_slug: string :param variables_filepath: The path to an existing jinja2 template :type variables_filepath: string :return: returns True, unless an exception gets raised by the Kubernetes API :rtype: boolean """ new_secret_dict = read_template_to_dict( variables_filepath, {"app": app_slug}) secret_name, secret_namespace = get_secret_metadata(new_secret_dict) current_secret_data = get_kubernetes_secret_data( secret_name, secret_namespace ) if current_secret_data is None: # Create new secret update_secret = False elif current_secret_data.keys() != new_secret_dict["data"].keys(): # Update current secret with new keys update_secret = True current_app.logger.info( f"Secret {secret_name} in namespace {secret_namespace}" " already exists. Merging..." ) # Merge dicts. Values from current_secret_data take precedence new_secret_dict["data"] |= current_secret_data else: # Do Nothing current_app.logger.info( f"Secret {secret_name} in namespace {secret_namespace}" " is already in a good state, doing nothing." ) return True current_app.logger.info( f"Storing secret {secret_name} in namespace" f" {secret_namespace} in cluster." ) store_kubernetes_secret( new_secret_dict, secret_namespace, update=update_secret ) return True def get_secret_metadata(secret_dict): """ Returns secret name and namespace from metadata field in a yaml string. :param secret_dict: Dictionary of the secret as returned by read_namespaced_secret :type secret_dict: dict :return: Tuple containing secret name and secret namespace :rtype: tuple """ secret_name = secret_dict["metadata"]["name"] # default namespace is flux-system, but other namespace can be # provided in secret metadata if "namespace" in secret_dict["metadata"]: secret_namespace = secret_dict["metadata"]["namespace"] else: secret_namespace = "flux-system" return secret_name, secret_namespace def get_kubernetes_secret_data(secret_name, namespace): """ Get secret from Kubernetes :param secret_name: Name of the secret :type secret_name: string :param namespace: Namespace of the secret :type namespace: string :return: The contents of a kubernetes secret or None if the secret does not exist. :rtype: dict or None """ api_client_instance = api_client.ApiClient() api_instance = client.CoreV1Api(api_client_instance) try: secret = api_instance.read_namespaced_secret(secret_name, namespace).data except ApiException as ex: # 404 is expected when the optional secret does not exist. if ex.status != 404: raise ex return None return secret def store_kubernetes_secret(secret_dict, namespace, update=False): """ Stores either a new secret in the cluster, or updates an existing one. :param secret_dict: Dictionary of the secret as returned by read_namespaced_secret :type secret_dict: dict :param namespace: Namespace of the secret :type namespace: string :param update: If True, use `patch_kubernetes_secret`, otherwise use `create_from_yaml` (default: False) :type update: boolean :return: None :rtype: None """ api_client_instance = api_client.ApiClient() if update: verb = "updated" api_response = patch_kubernetes_secret(secret_dict, namespace) else: verb = "created" try: api_response = create_from_yaml( api_client_instance, yaml_objects=[secret_dict], namespace=namespace ) except FailToCreateError as ex: current_app.logger.info(f"Secret not created because of exception {ex}") raise ex current_app.logger.info(f"Secret {verb} with api response: {api_response}") def store_kustomization(kustomization_template_filepath, app_slug): """ Add a kustomization that installs app {app_slug} to the cluster. :param kustomization_template_filepath: Path to the template that describes the kustomization. The template should have an `{{ app }}` entry. :type kustomization_template_filepath: string :param app_slug: Slug for the app, used to replace `{{ app }}` in the template :return: True on success :rtype: boolean """ kustomization_dict = read_template_to_dict(kustomization_template_filepath, {"app": app_slug}) custom_objects_api = client.CustomObjectsApi() try: api_response = custom_objects_api.create_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta2", namespace="flux-system", plural="kustomizations", body=kustomization_dict) except FailToCreateError as ex: current_app.logger.info( f"Could not create {app_slug} Kustomization because of exception {ex}") raise ex current_app.logger.debug(f"Kustomization created with api response: {api_response}") return True def delete_kustomization(kustomization_name): """ Deletes a kustomization. Note that if the kustomization has `prune: true` in its spec, this will trigger deletion of other elements generated by the Kustomizartion. See App.uninstall() to learn what implications this has for what will and will not be deleted by the kustomize-controller. :param kustomization_name: name of the kustomization to delete :type kustomization_name: string :return: Response of delete API call :rtype: dict """ custom_objects_api = client.CustomObjectsApi() body = client.V1DeleteOptions() try: api_response = custom_objects_api.delete_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta2", namespace="flux-system", plural="kustomizations", name=kustomization_name, body=body) except ApiException as ex: current_app.logger.info( f"Could not delete {kustomization_name} Kustomization because of exception {ex}") raise ex current_app.logger.debug(f"Kustomization deleted with api response: {api_response}") return api_response def read_template_to_dict(template_filepath, template_globals): """ Reads a Jinja2 template that contains yaml and turns it into a dict. :param template_filepath: The path to an existing Jinja2 template :type template_filepath: string :param template_globals: The variables substituted in the template :type template_globals: dict :return: dict, or None if anything fails """ env = jinja2.Environment( extensions=["jinja2_base64_filters.Base64Filters"]) env.filters["generate_password"] = generate_password # Check if k8s secret already exists, if not, generate it with open(template_filepath, encoding="UTF-8") as template_file: lines = template_file.read() templated_dict = yaml.safe_load( env.from_string(lines, globals=template_globals).render() ) return templated_dict return None def patch_kubernetes_secret(secret_dict, namespace): """ Patches secret in the cluster with new data. Warning: currently ignores everything that's not in secret_dict["data"] :param secret_dict: Dictionary of the secret as returned by read_namespaced_secret :type secret_dict: dict :param namespace: Namespace of the secret :type namespace: string :return: Response of the patch API call """ api_client_instance = api_client.ApiClient() api_instance = client.CoreV1Api(api_client_instance) name = secret_dict["metadata"]["name"] body = {} body["data"] = secret_dict["data"] return api_instance.patch_namespaced_secret(name, namespace, body) def generate_password(length): """ Generates a password with letters and digits. :param length: The amount of characters in the password :type length: int :return: Generated password :rtype: string """ length = int(length) password = "".join((secrets.choice(string.ascii_letters + string.digits) for i in range(length))) return password def gen_htpasswd(user, password): """ Generate htpasswd entry for user with password. :param user: Username used in the htpasswd entry :type user: string :param password: Password for the user, will get encrypted. :type password: string :return: htpassword line entry :rtype: string """ return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}" def get_all_kustomizations(namespace='flux-system'): """ Returns all flux kustomizations in a namespace. :param namespace: namespace that contains kustomizations. Default: `flux-system` :type namespace: str :return: 'items' in dict returned by CustomObjectsApi.list_namespaced_custom_object() :rtype: dict[] """ api = client.CustomObjectsApi() api_response = api.list_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta1", plural="kustomizations", namespace=namespace, ) return api_response def get_all_helmreleases(namespace='stackspin', label_selector=""): """ Lists all helmreleases in a certain namespace (stackspin by default) :param namespace: namespace that contains helmreleases. Default: `stackspin-apps` :type namespace: str :param label_selector: a label selector to limit the list (optional) :type label_selector: str :return: List of helmreleases :rtype: dict[] """ api_instance = client.CustomObjectsApi() try: api_response = api_instance.list_namespaced_custom_object( group="helm.toolkit.fluxcd.io", version="v2beta1", namespace=namespace, plural="helmreleases", label_selector=label_selector) except ApiException as error: if error.status == 404: return None # Raise all non-404 errors raise error return api_response['items'] def get_kustomization(name, namespace='flux-system'): """ Returns all info of a Flux kustomization with name 'name' :param name: Name of the kustomizatoin :type name: string :param namespace: Namespace of the kustomization :type namespace: string :return: kustomization as returned by the API :rtype: dict """ api = client.CustomObjectsApi() try: resource = api.get_namespaced_custom_object( group="kustomize.toolkit.fluxcd.io", version="v1beta1", name=name, namespace=namespace, plural="kustomizations", ) except client.exceptions.ApiException as error: if error.status == 404: return None # Raise all non-404 errors raise error return resource