361 lines
12 KiB
Python
361 lines
12 KiB
Python
"""
|
|
List of functions to get data from Flux Kustomizations and Helmreleases
|
|
"""
|
|
import crypt
|
|
import secrets
|
|
import string
|
|
|
|
import jinja2
|
|
import yaml
|
|
from kubernetes import client, config
|
|
from kubernetes.client import api_client
|
|
from kubernetes.client.exceptions import ApiException
|
|
from kubernetes.utils import create_from_yaml
|
|
from kubernetes.utils.create_from_yaml import FailToCreateError
|
|
from flask import current_app
|
|
|
|
from config import LOAD_INCLUSTER_CONFIG
|
|
|
|
# Load the kube config once
|
|
#
|
|
# By default this loads whatever we define in the `KUBECONFIG` env variable,
|
|
# otherwise loads the config from default locations, similar to what kubectl
|
|
# does.
|
|
if LOAD_INCLUSTER_CONFIG:
|
|
config.load_incluster_config()
|
|
else:
|
|
config.load_kube_config()
|
|
|
|
def create_variables_secret(app_slug, variables_filepath):
|
|
"""Checks if a variables secret for app_name already exists, generates it if necessary.
|
|
|
|
If a secret already exists, loops through keys from the template, and adds
|
|
values for keys that miss in the Kubernetes secret, but are available in
|
|
the template.
|
|
|
|
:param app_slug: The slug of the app, used in the oauth secrets
|
|
:type app_slug: string
|
|
:param variables_filepath: The path to an existing jinja2 template
|
|
:type variables_filepath: string
|
|
:return: returns True, unless an exception gets raised by the Kubernetes API
|
|
:rtype: boolean
|
|
"""
|
|
new_secret_dict = read_template_to_dict(
|
|
variables_filepath,
|
|
{"app": app_slug})
|
|
secret_name, secret_namespace = get_secret_metadata(new_secret_dict)
|
|
current_secret_data = get_kubernetes_secret_data(
|
|
secret_name, secret_namespace
|
|
)
|
|
if current_secret_data is None:
|
|
# Create new secret
|
|
update_secret = False
|
|
elif current_secret_data.keys() != new_secret_dict["data"].keys():
|
|
# Update current secret with new keys
|
|
update_secret = True
|
|
current_app.logger.info(
|
|
f"Secret {secret_name} in namespace {secret_namespace}"
|
|
" already exists. Merging..."
|
|
)
|
|
# Merge dicts. Values from current_secret_data take precedence
|
|
new_secret_dict["data"] |= current_secret_data
|
|
else:
|
|
# Do Nothing
|
|
current_app.logger.info(
|
|
f"Secret {secret_name} in namespace {secret_namespace}"
|
|
" is already in a good state, doing nothing."
|
|
)
|
|
return True
|
|
current_app.logger.info(
|
|
f"Storing secret {secret_name} in namespace"
|
|
f" {secret_namespace} in cluster."
|
|
)
|
|
store_kubernetes_secret(
|
|
new_secret_dict, secret_namespace, update=update_secret
|
|
)
|
|
return True
|
|
|
|
|
|
def get_secret_metadata(secret_dict):
|
|
"""
|
|
Returns secret name and namespace from metadata field in a yaml string.
|
|
|
|
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
|
|
:type secret_dict: dict
|
|
:return: Tuple containing secret name and secret namespace
|
|
:rtype: tuple
|
|
"""
|
|
secret_name = secret_dict["metadata"]["name"]
|
|
# default namespace is flux-system, but other namespace can be
|
|
# provided in secret metadata
|
|
if "namespace" in secret_dict["metadata"]:
|
|
secret_namespace = secret_dict["metadata"]["namespace"]
|
|
else:
|
|
secret_namespace = "flux-system"
|
|
return secret_name, secret_namespace
|
|
|
|
|
|
def get_kubernetes_secret_data(secret_name, namespace):
|
|
"""
|
|
Get secret from Kubernetes
|
|
|
|
:param secret_name: Name of the secret
|
|
:type secret_name: string
|
|
:param namespace: Namespace of the secret
|
|
:type namespace: string
|
|
|
|
:return: The contents of a kubernetes secret or None if the secret does not exist.
|
|
:rtype: dict or None
|
|
"""
|
|
api_client_instance = api_client.ApiClient()
|
|
api_instance = client.CoreV1Api(api_client_instance)
|
|
try:
|
|
secret = api_instance.read_namespaced_secret(secret_name, namespace).data
|
|
except ApiException as ex:
|
|
# 404 is expected when the optional secret does not exist.
|
|
if ex.status != 404:
|
|
raise ex
|
|
return None
|
|
return secret
|
|
|
|
|
|
def store_kubernetes_secret(secret_dict, namespace, update=False):
|
|
"""
|
|
Stores either a new secret in the cluster, or updates an existing one.
|
|
|
|
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
|
|
:type secret_dict: dict
|
|
:param namespace: Namespace of the secret
|
|
:type namespace: string
|
|
:param update: If True, use `patch_kubernetes_secret`,
|
|
otherwise use `create_from_yaml` (default: False)
|
|
:type update: boolean
|
|
|
|
:return: None
|
|
:rtype: None
|
|
"""
|
|
api_client_instance = api_client.ApiClient()
|
|
if update:
|
|
verb = "updated"
|
|
api_response = patch_kubernetes_secret(secret_dict, namespace)
|
|
else:
|
|
verb = "created"
|
|
try:
|
|
api_response = create_from_yaml(
|
|
api_client_instance,
|
|
yaml_objects=[secret_dict],
|
|
namespace=namespace
|
|
)
|
|
except FailToCreateError as ex:
|
|
current_app.logger.info(f"Secret not created because of exception {ex}")
|
|
raise ex
|
|
current_app.logger.info(f"Secret {verb} with api response: {api_response}")
|
|
|
|
|
|
def store_kustomization(kustomization_template_filepath, app_slug):
|
|
"""
|
|
Add a kustomization that installs app {app_slug} to the cluster.
|
|
|
|
:param kustomization_template_filepath: Path to the template that describes
|
|
the kustomization. The template should have an `{{ app }}` entry.
|
|
:type kustomization_template_filepath: string
|
|
:param app_slug: Slug for the app, used to replace `{{ app }}` in the
|
|
template
|
|
:return: True on success
|
|
:rtype: boolean
|
|
"""
|
|
kustomization_dict = read_template_to_dict(kustomization_template_filepath,
|
|
{"app": app_slug})
|
|
custom_objects_api = client.CustomObjectsApi()
|
|
try:
|
|
api_response = custom_objects_api.create_namespaced_custom_object(
|
|
group="kustomize.toolkit.fluxcd.io",
|
|
version="v1beta2",
|
|
namespace="flux-system",
|
|
plural="kustomizations",
|
|
body=kustomization_dict)
|
|
except FailToCreateError as ex:
|
|
current_app.logger.info(
|
|
f"Could not create {app_slug} Kustomization because of exception {ex}")
|
|
raise ex
|
|
current_app.logger.debug(f"Kustomization created with api response: {api_response}")
|
|
return True
|
|
|
|
def delete_kustomization(kustomization_name):
|
|
"""
|
|
Deletes a kustomization.
|
|
|
|
Note that if the kustomization has `prune: true` in its spec, this will
|
|
trigger deletion of other elements generated by the Kustomizartion. See
|
|
App.uninstall() to learn what implications this has for what will and will
|
|
not be deleted by the kustomize-controller.
|
|
|
|
:param kustomization_name: name of the kustomization to delete
|
|
:type kustomization_name: string
|
|
|
|
:return: Response of delete API call
|
|
:rtype: dict
|
|
"""
|
|
custom_objects_api = client.CustomObjectsApi()
|
|
body = client.V1DeleteOptions()
|
|
try:
|
|
api_response = custom_objects_api.delete_namespaced_custom_object(
|
|
group="kustomize.toolkit.fluxcd.io",
|
|
version="v1beta2",
|
|
namespace="flux-system",
|
|
plural="kustomizations",
|
|
name=kustomization_name,
|
|
body=body)
|
|
except ApiException as ex:
|
|
current_app.logger.info(
|
|
f"Could not delete {kustomization_name} Kustomization because of exception {ex}")
|
|
raise ex
|
|
current_app.logger.debug(f"Kustomization deleted with api response: {api_response}")
|
|
return api_response
|
|
|
|
|
|
def read_template_to_dict(template_filepath, template_globals):
|
|
"""
|
|
Reads a Jinja2 template that contains yaml and turns it into a dict.
|
|
|
|
:param template_filepath: The path to an existing Jinja2 template
|
|
:type template_filepath: string
|
|
:param template_globals: The variables substituted in the template
|
|
:type template_globals: dict
|
|
:return: dict, or None if anything fails
|
|
"""
|
|
env = jinja2.Environment(
|
|
extensions=["jinja2_base64_filters.Base64Filters"])
|
|
env.filters["generate_password"] = generate_password
|
|
# Check if k8s secret already exists, if not, generate it
|
|
with open(template_filepath, encoding="UTF-8") as template_file:
|
|
lines = template_file.read()
|
|
templated_dict = yaml.safe_load(
|
|
env.from_string(lines, globals=template_globals).render()
|
|
)
|
|
return templated_dict
|
|
return None
|
|
|
|
|
|
def patch_kubernetes_secret(secret_dict, namespace):
|
|
"""
|
|
Patches secret in the cluster with new data.
|
|
|
|
Warning: currently ignores everything that's not in secret_dict["data"]
|
|
|
|
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
|
|
:type secret_dict: dict
|
|
:param namespace: Namespace of the secret
|
|
:type namespace: string
|
|
:return: Response of the patch API call
|
|
"""
|
|
api_client_instance = api_client.ApiClient()
|
|
api_instance = client.CoreV1Api(api_client_instance)
|
|
name = secret_dict["metadata"]["name"]
|
|
body = {}
|
|
body["data"] = secret_dict["data"]
|
|
return api_instance.patch_namespaced_secret(name, namespace, body)
|
|
|
|
|
|
def generate_password(length):
|
|
"""
|
|
Generates a password with letters and digits.
|
|
|
|
:param length: The amount of characters in the password
|
|
:type length: int
|
|
:return: Generated password
|
|
:rtype: string
|
|
"""
|
|
length = int(length)
|
|
password = "".join((secrets.choice(string.ascii_letters + string.digits)
|
|
for i in range(length)))
|
|
return password
|
|
|
|
|
|
def gen_htpasswd(user, password):
|
|
"""
|
|
Generate htpasswd entry for user with password.
|
|
|
|
:param user: Username used in the htpasswd entry
|
|
:type user: string
|
|
:param password: Password for the user, will get encrypted.
|
|
:type password: string
|
|
:return: htpassword line entry
|
|
:rtype: string
|
|
"""
|
|
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
|
|
|
|
|
|
def get_all_kustomizations(namespace='flux-system'):
|
|
"""
|
|
Returns all flux kustomizations in a namespace.
|
|
:param namespace: namespace that contains kustomizations. Default: `flux-system`
|
|
:type namespace: str
|
|
:return: 'items' in dict returned by CustomObjectsApi.list_namespaced_custom_object()
|
|
:rtype: dict[]
|
|
"""
|
|
api = client.CustomObjectsApi()
|
|
api_response = api.list_namespaced_custom_object(
|
|
group="kustomize.toolkit.fluxcd.io",
|
|
version="v1beta1",
|
|
plural="kustomizations",
|
|
namespace=namespace,
|
|
)
|
|
return api_response
|
|
|
|
|
|
def get_all_helmreleases(namespace='stackspin', label_selector=""):
|
|
"""
|
|
Lists all helmreleases in a certain namespace (stackspin by default)
|
|
|
|
:param namespace: namespace that contains helmreleases. Default: `stackspin-apps`
|
|
:type namespace: str
|
|
:param label_selector: a label selector to limit the list (optional)
|
|
:type label_selector: str
|
|
|
|
:return: List of helmreleases
|
|
:rtype: dict[]
|
|
"""
|
|
api_instance = client.CustomObjectsApi()
|
|
|
|
try:
|
|
api_response = api_instance.list_namespaced_custom_object(
|
|
group="helm.toolkit.fluxcd.io",
|
|
version="v2beta1",
|
|
namespace=namespace,
|
|
plural="helmreleases",
|
|
label_selector=label_selector)
|
|
except ApiException as error:
|
|
if error.status == 404:
|
|
return None
|
|
# Raise all non-404 errors
|
|
raise error
|
|
return api_response['items']
|
|
|
|
|
|
def get_kustomization(name, namespace='flux-system'):
|
|
"""
|
|
Returns all info of a Flux kustomization with name 'name'
|
|
|
|
:param name: Name of the kustomizatoin
|
|
:type name: string
|
|
:param namespace: Namespace of the kustomization
|
|
:type namespace: string
|
|
:return: kustomization as returned by the API
|
|
:rtype: dict
|
|
"""
|
|
api = client.CustomObjectsApi()
|
|
try:
|
|
resource = api.get_namespaced_custom_object(
|
|
group="kustomize.toolkit.fluxcd.io",
|
|
version="v1beta1",
|
|
name=name,
|
|
namespace=namespace,
|
|
plural="kustomizations",
|
|
)
|
|
except client.exceptions.ApiException as error:
|
|
if error.status == 404:
|
|
return None
|
|
# Raise all non-404 errors
|
|
raise error
|
|
return resource
|