297 lines
10 KiB
Python
297 lines
10 KiB
Python
|
"""
|
||
|
List of functions to get data from Flux Kustomizations and Helmreleases
|
||
|
"""
|
||
|
import crypt
|
||
|
import secrets
|
||
|
import string
|
||
|
|
||
|
import jinja2
|
||
|
import yaml
|
||
|
from kubernetes import client, config
|
||
|
from kubernetes.client import api_client
|
||
|
from kubernetes.client.exceptions import ApiException
|
||
|
from kubernetes.utils import create_from_yaml
|
||
|
from kubernetes.utils.create_from_yaml import FailToCreateError
|
||
|
|
||
|
|
||
|
def create_variables_secret(app_slug, variables_filepath):
|
||
|
"""Checks if a variables secret for app_name already exists, generates it if necessary.
|
||
|
|
||
|
:param app_slug: The slug of the app, used in the oauth secrets
|
||
|
:type app_slug: string
|
||
|
:param variables_filepath: The path to an existing jinja2 template
|
||
|
:type variables_filepath: string
|
||
|
"""
|
||
|
new_secret_dict = read_template_to_dict(
|
||
|
variables_filepath,
|
||
|
{"app": app_slug})
|
||
|
secret_name, secret_namespace = get_secret_metadata(new_secret_dict)
|
||
|
current_secret_data = get_kubernetes_secret_data(
|
||
|
secret_name, secret_namespace
|
||
|
)
|
||
|
if current_secret_data is None:
|
||
|
# Create new secret
|
||
|
update_secret = False
|
||
|
elif current_secret_data.keys() != new_secret_dict["data"].keys():
|
||
|
# Update current secret with new keys
|
||
|
update_secret = True
|
||
|
print(
|
||
|
f"Secret {secret_name} in namespace {secret_namespace}"
|
||
|
" already exists. Merging..."
|
||
|
)
|
||
|
# Merge dicts. Values from current_secret_data take precedence
|
||
|
new_secret_dict["data"] |= current_secret_data
|
||
|
else:
|
||
|
# Do Nothing
|
||
|
print(
|
||
|
f"Secret {secret_name} in namespace {secret_namespace}"
|
||
|
" is already in a good state, doing nothing."
|
||
|
)
|
||
|
return True
|
||
|
print(
|
||
|
f"Storing secret {secret_name} in namespace"
|
||
|
f" {secret_namespace} in cluster."
|
||
|
)
|
||
|
store_kubernetes_secret(
|
||
|
new_secret_dict, secret_namespace, update=update_secret
|
||
|
)
|
||
|
return True
|
||
|
|
||
|
|
||
|
def get_secret_metadata(secret_dict):
|
||
|
"""Returns secret name and namespace from metadata field in a yaml string."""
|
||
|
secret_name = secret_dict["metadata"]["name"]
|
||
|
# default namespace is flux-system, but other namespace can be
|
||
|
# provided in secret metadata
|
||
|
if "namespace" in secret_dict["metadata"]:
|
||
|
secret_namespace = secret_dict["metadata"]["namespace"]
|
||
|
else:
|
||
|
secret_namespace = "flux-system"
|
||
|
return secret_name, secret_namespace
|
||
|
|
||
|
|
||
|
def get_kubernetes_secret_data(secret_name, namespace):
|
||
|
"""Returns the contents of a kubernetes secret or None if the secret does not exist."""
|
||
|
api_client_instance = api_client.ApiClient()
|
||
|
api_instance = client.CoreV1Api(api_client_instance)
|
||
|
try:
|
||
|
secret = api_instance.read_namespaced_secret(secret_name, namespace).data
|
||
|
except ApiException as ex:
|
||
|
# 404 is expected when the optional secret does not exist.
|
||
|
if ex.status != 404:
|
||
|
raise ex
|
||
|
return None
|
||
|
return secret
|
||
|
|
||
|
|
||
|
def store_kubernetes_secret(secret_dict, namespace, update=False):
|
||
|
"""Stores either a new secret in the cluster, or updates an existing one."""
|
||
|
api_client_instance = api_client.ApiClient()
|
||
|
if update:
|
||
|
verb = "updated"
|
||
|
api_response = patch_kubernetes_secret(secret_dict, namespace)
|
||
|
else:
|
||
|
verb = "created"
|
||
|
try:
|
||
|
api_response = create_from_yaml(
|
||
|
api_client_instance,
|
||
|
yaml_objects=[secret_dict],
|
||
|
namespace=namespace
|
||
|
)
|
||
|
except FailToCreateError as ex:
|
||
|
print(f"Secret not {verb} because of exception {ex}")
|
||
|
return
|
||
|
print(f"Secret {verb} with api response: {api_response}")
|
||
|
|
||
|
|
||
|
def store_kustomization(kustomization_template_filepath, app_slug):
|
||
|
"""Add a kustomization that installs app {app_slug} to the cluster"""
|
||
|
kustomization_dict = read_template_to_dict(kustomization_template_filepath,
|
||
|
{"app": app_slug})
|
||
|
api_client_instance = api_client.ApiClient()
|
||
|
custom_objects_api = client.CustomObjectsApi(api_client_instance)
|
||
|
try:
|
||
|
api_response = custom_objects_api.create_namespaced_custom_object(
|
||
|
group="kustomize.toolkit.fluxcd.io",
|
||
|
version="v1beta2",
|
||
|
namespace="flux-system",
|
||
|
plural="kustomizations",
|
||
|
body=kustomization_dict)
|
||
|
|
||
|
|
||
|
# create_from_yaml(
|
||
|
# api_client_instance,
|
||
|
# yaml_objects=[kustomization_dict],
|
||
|
# # All kustomizations live in the flux-system namespace
|
||
|
# namespace="flux-system"
|
||
|
# )
|
||
|
except FailToCreateError as ex:
|
||
|
print(f"Could not create {app_slug} Kustomization because of exception {ex}")
|
||
|
return
|
||
|
print(f"Kustomization created with api response: {api_response}")
|
||
|
|
||
|
|
||
|
def read_template_to_dict(template_filepath, template_globals):
|
||
|
"""Reads a Jinja2 template that contains yaml and turns it into a dict
|
||
|
|
||
|
:param template_filepath: The path to an existing Jinja2 template
|
||
|
:type template_filepath: string
|
||
|
:param template_globals: The variables substituted in the template
|
||
|
:type template_globals: dict
|
||
|
:return: dict, or None if anything fails
|
||
|
"""
|
||
|
env = jinja2.Environment(
|
||
|
extensions=["jinja2_base64_filters.Base64Filters"])
|
||
|
env.filters["generate_password"] = generate_password
|
||
|
# Check if k8s secret already exists, if not, generate it
|
||
|
with open(template_filepath, encoding="UTF-8") as template_file:
|
||
|
lines = template_file.read()
|
||
|
templated_dict = yaml.safe_load(
|
||
|
env.from_string(lines, globals=template_globals).render()
|
||
|
)
|
||
|
return templated_dict
|
||
|
return None
|
||
|
|
||
|
|
||
|
def patch_kubernetes_secret(secret_dict, namespace):
|
||
|
"""Patches secret in the cluster with new data."""
|
||
|
api_client_instance = api_client.ApiClient()
|
||
|
api_instance = client.CoreV1Api(api_client_instance)
|
||
|
name = secret_dict["metadata"]["name"]
|
||
|
body = {}
|
||
|
body["data"] = secret_dict["data"]
|
||
|
return api_instance.patch_namespaced_secret(name, namespace, body)
|
||
|
|
||
|
|
||
|
def generate_password(length):
|
||
|
"""Generates a password of "length" characters."""
|
||
|
length = int(length)
|
||
|
password = "".join((secrets.choice(string.ascii_letters)
|
||
|
for i in range(length)))
|
||
|
return password
|
||
|
|
||
|
|
||
|
def gen_htpasswd(user, password):
|
||
|
"""Generate htpasswd entry for user with password."""
|
||
|
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
|
||
|
|
||
|
def get_all_kustomization_names(namespace='flux-system'):
|
||
|
"""
|
||
|
Returns all flux kustomizations in a namespace.
|
||
|
:param namespace: namespace that contains kustomizations. Default: `flux-system`
|
||
|
:type namespace: str
|
||
|
:return: List of names for kustomizations in namespace
|
||
|
:rtype: list
|
||
|
"""
|
||
|
kustomizations = get_all_kustomizations(namespace)
|
||
|
return_kustomizations = []
|
||
|
for kustomization in kustomizations['items']:
|
||
|
return_kustomizations.append(kustomization['metadata']['name'])
|
||
|
return return_kustomizations
|
||
|
|
||
|
|
||
|
def get_all_kustomizations(namespace='flux-system'):
|
||
|
"""
|
||
|
Returns all flux kustomizations in a namespace.
|
||
|
:param namespace: namespace that contains kustomizations. Default: `flux-system`
|
||
|
:type namespace: str
|
||
|
:return: Kustomizations as returned by CustomObjectsApi.list_namespaced_custom_object()
|
||
|
:rtype: object
|
||
|
"""
|
||
|
config.load_kube_config()
|
||
|
api = client.CustomObjectsApi()
|
||
|
api_response = api.list_namespaced_custom_object(
|
||
|
group="kustomize.toolkit.fluxcd.io",
|
||
|
version="v1beta1",
|
||
|
plural="kustomizations",
|
||
|
namespace=namespace,
|
||
|
)
|
||
|
return api_response
|
||
|
|
||
|
|
||
|
def get_all_helmrelease_names(namespace='stackspin'):
|
||
|
"""
|
||
|
Returns names of all helmreleases in a namespace.
|
||
|
:param namespace: namespace that contains kustomizations. Default: `stackspin`
|
||
|
:type namespace: str
|
||
|
:return: List of names for helmreleases in namespace
|
||
|
:rtype: list
|
||
|
"""
|
||
|
helmreleases = get_all_helmreleases(namespace)
|
||
|
return_helmreleases = []
|
||
|
for helmrelease in helmreleases['items']:
|
||
|
return_helmreleases.append(helmrelease['metadata']['name'])
|
||
|
return return_helmreleases
|
||
|
|
||
|
def get_all_helmreleases(namespace='stackspin'):
|
||
|
"""
|
||
|
Returns all helmreleases in a namespace.
|
||
|
:param namespace: namespace that contains kustomizations. Default: `stackspin`
|
||
|
:type namespace: str
|
||
|
:return: Helmreleases as returned by CustomObjectsApi.list_namespaced_custom_object()
|
||
|
:rtype: object
|
||
|
"""
|
||
|
config.load_kube_config()
|
||
|
api = client.CustomObjectsApi()
|
||
|
api_response = api.list_namespaced_custom_object(
|
||
|
group="helm.toolkit.fluxcd.io",
|
||
|
version="v2beta1",
|
||
|
plural="helmreleases",
|
||
|
namespace=namespace,
|
||
|
)
|
||
|
return api_response
|
||
|
|
||
|
|
||
|
def get_kustomization(name, namespace='flux-system'):
|
||
|
"""Returns all info of a Flux kustomization with name 'name'"""
|
||
|
config.load_kube_config()
|
||
|
api = client.CustomObjectsApi()
|
||
|
try:
|
||
|
resource = api.get_namespaced_custom_object(
|
||
|
group="kustomize.toolkit.fluxcd.io",
|
||
|
version="v1beta1",
|
||
|
name=name,
|
||
|
namespace=namespace,
|
||
|
plural="kustomizations",
|
||
|
)
|
||
|
except client.exceptions.ApiException as error:
|
||
|
if error.status == 404:
|
||
|
return None
|
||
|
# Raise all non-404 errors
|
||
|
raise error
|
||
|
return resource
|
||
|
|
||
|
|
||
|
def get_helmrelease(name, namespace='stackspin-apps'):
|
||
|
"""Returns all info of a Flux helmrelease with name 'name'"""
|
||
|
config.load_kube_config()
|
||
|
api = client.CustomObjectsApi()
|
||
|
try:
|
||
|
resource = api.get_namespaced_custom_object(
|
||
|
group="helm.toolkit.fluxcd.io",
|
||
|
version="v2beta1",
|
||
|
name=name,
|
||
|
namespace=namespace,
|
||
|
plural="helmreleases",
|
||
|
)
|
||
|
except client.exceptions.ApiException as error:
|
||
|
if error.status == 404:
|
||
|
return None
|
||
|
# Raise all non-404 errors
|
||
|
raise error
|
||
|
|
||
|
return resource
|
||
|
|
||
|
|
||
|
def get_readiness(app_status):
|
||
|
"""
|
||
|
Parses an app status's 'conditions' to find a type field called 'Ready' and
|
||
|
returns its status. Works for Kustomizations as well as Helmreleases.
|
||
|
"""
|
||
|
for condition in app_status['conditions']:
|
||
|
if condition['type'] == 'Ready':
|
||
|
return condition['status']
|
||
|
# If this point is reached, no condition "Ready" exists, so the application
|
||
|
# is not ready.
|
||
|
return False
|