move everything to backend folder for migration to dashboard repository

This commit is contained in:
Maarten de Waard 2022-10-12 13:38:51 +02:00
parent af6b006409
commit 92ec7c653d
No known key found for this signature in database
GPG key ID: 1D3E893A657CC8DA
89 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,4 @@
from .kratos_api import *
from .error_handler import *
from .hydra_oauth import *
from .kratos_user import *

View file

@ -0,0 +1,24 @@
from functools import wraps
from areas.roles.role_service import RoleService
from flask_jwt_extended import get_jwt, verify_jwt_in_request
from helpers import Unauthorized
def admin_required():
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
verify_jwt_in_request()
claims = get_jwt()
user_id = claims["user_id"]
is_admin = RoleService.is_user_admin(user_id)
if is_admin:
return fn(*args, **kwargs)
else:
raise Unauthorized("You need to have admin permissions.")
return decorator
return wrapper

View file

@ -0,0 +1,17 @@
"""Generic classes used by different parts of the application"""
import urllib.request
# Instead of processing the redirect, we return, so the application
# can handle the redirect itself. This is needed to extract cookies
# etc.
class RedirectFilter(urllib.request.HTTPRedirectHandler):
"""Overrides the standard redirect handler so it does not automatically
redirect. This allows for inspecting the return values before redirecting or
override the redirect action"""
# pylint: disable=too-many-arguments
# This amount of arguments is expected by the HTTPRedirectHandler
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None

View file

@ -0,0 +1,50 @@
from flask import jsonify
from jsonschema import ValidationError
class KratosError(Exception):
pass
class HydraError(Exception):
pass
class BadRequest(Exception):
pass
class Unauthorized(Exception):
pass
def bad_request_error(e):
message = e.args[0] if e.args else "Bad request to the server."
return jsonify({"errorMessage": message}), 400
def validation_error(e):
original_error = e.description
return (
jsonify({"errorMessage": "{} is not valid.".format(original_error.path[0])}),
400,
)
def kratos_error(e):
message = "[KratosError] " + e.args[0] if e.args else "Failed to contact Kratos."
status_code = e.args[1] if e.args else 500
return jsonify({"errorMessage": message}), status_code
def hydra_error(e):
message = "[HydraError] " + e.args[0] if e.args else "Failed to contact Hydra."
status_code = e.args[1] if e.args else 500
return jsonify({"errorMessage": message}), status_code
def global_error(e):
message = str(e)
return jsonify({"errorMessage": message}), 500
def unauthorized_error(e):
message = str(e)
return jsonify({"errorMessage": message}), 403

View file

@ -0,0 +1,8 @@
"""Custom exception handler to raise consistent exceptions, as different backend
raise different exceptions"""
class BackendError(Exception):
"""The backend error is raised when interacting with
the backend fails or gives an unexpected result. The
error contains a oneliner description of the problem"""

View file

@ -0,0 +1,50 @@
from flask import request, session
from requests_oauthlib import OAuth2Session
from config import *
from helpers import HydraError
class HydraOauth:
@staticmethod
def authorize():
try:
hydra = OAuth2Session(HYDRA_CLIENT_ID)
authorization_url, state = hydra.authorization_url(
HYDRA_AUTHORIZATION_BASE_URL
)
return authorization_url
except Exception as err:
raise HydraError(str(err), 500)
@staticmethod
def get_token(state, code):
try:
hydra = OAuth2Session(
client_id=HYDRA_CLIENT_ID,
state=state,
)
token = hydra.fetch_token(
token_url=TOKEN_URL,
code=code,
client_secret=HYDRA_CLIENT_SECRET,
include_client_id=True,
)
session["hydra_token"] = token
return token
except Exception as err:
raise HydraError(str(err), 500)
@staticmethod
def get_user_info():
try:
hydra = OAuth2Session(
client_id=HYDRA_CLIENT_ID, token=session["hydra_token"]
)
user_info = hydra.get("{}/userinfo".format(HYDRA_PUBLIC_URL))
return user_info.json()
except Exception as err:
raise HydraError(str(err), 500)

View file

@ -0,0 +1,57 @@
from logging import error
import requests
from config import *
from .error_handler import KratosError
class KratosApi:
@staticmethod
def __handleError(res):
if res.status_code >= 400:
message = res.json()["error"]["message"]
raise KratosError(message, res.status_code)
@staticmethod
def get(url):
try:
res = requests.get("{}{}".format(KRATOS_ADMIN_URL, url))
KratosApi.__handleError(res)
return res
except KratosError as err:
raise err
except:
raise KratosError()
@staticmethod
def post(url, data):
try:
res = requests.post("{}{}".format(KRATOS_ADMIN_URL, url), json=data)
KratosApi.__handleError(res)
return res
except KratosError as err:
raise err
except:
raise KratosError()
@staticmethod
def put(url, data):
try:
res = requests.put("{}{}".format(KRATOS_ADMIN_URL, url), json=data)
KratosApi.__handleError(res)
return res
except KratosError as err:
raise err
except:
raise KratosError()
@staticmethod
def delete(url):
try:
res = requests.delete("{}{}".format(KRATOS_ADMIN_URL, url))
KratosApi.__handleError(res)
return res
except KratosError as err:
raise err
except:
raise KratosError()

View file

@ -0,0 +1,392 @@
"""
Implement the Kratos model to interact with kratos users
"""
import json
import re
import urllib.parse
import urllib.request
from typing import Dict
from urllib.request import Request
# Some imports commented out to satisfy pylint. They will be used once more
# functions are migrated to this model
from ory_kratos_client.model.admin_create_identity_body import AdminCreateIdentityBody
from ory_kratos_client.model.admin_create_self_service_recovery_link_body \
import AdminCreateSelfServiceRecoveryLinkBody
from ory_kratos_client.model.admin_update_identity_body import AdminUpdateIdentityBody
from ory_kratos_client.rest import ApiException as KratosApiException
from .classes import RedirectFilter
from .exceptions import BackendError
# pylint: disable=too-many-instance-attributes
class KratosUser():
"""
The User object, interact with the User. It both calls to Kratos as to
the database for storing and retrieving data.
"""
api = None
__uuid = None
email = None
name = None
username = None
state = None
created_at = None
updated_at = None
def __init__(self, api, uuid = None):
self.api = api
self.state = 'active'
if uuid:
try:
obj = api.admin_get_identity(uuid)
if obj:
self.__uuid = uuid
try:
self.name = obj.traits['name']
except KeyError:
self.name = ""
try:
self.username = obj.traits['username']
except KeyError:
self.username = ""
self.email = obj.traits['email']
self.state = obj.state
self.created_at = obj.created_at
self.updated_at = obj.updated_at
except KratosApiException as error:
raise BackendError(f"Unable to get entry, kratos replied with: {error}") from error
def __repr__(self):
return f"\"{self.name}\" <{self.email}>"
@property
def uuid(self):
"""Gets the protected UUID propery"""
return self.__uuid
def save(self):
"""Saves this object into the kratos backend database. If the object
is new, it will create, otherwise update an entry.
:raise: BackendError is an error with Kratos happened.
"""
# Traits are the "profile" values we will set, kratos will complain on
# empty values, so we check if "name" is set and only add it if so.
traits = {'email':self.email}
if self.name:
traits['name'] = self.name
# If we have a UUID, we are updating
if self.__uuid:
body = AdminUpdateIdentityBody(
schema_id="default",
state=self.state,
traits=traits,
)
try:
api_response = self.api.admin_update_identity(self.__uuid,
admin_update_identity_body=body)
except KratosApiException as error:
raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error
else:
body = AdminCreateIdentityBody(
schema_id="default",
traits=traits,
)
try:
# Create an Identity
api_response = self.api.admin_create_identity(
admin_create_identity_body=body)
if api_response.id:
self.__uuid = api_response.id
except KratosApiException as error:
raise BackendError(f"Unable to save entry, kratos replied with:{error}") from error
def delete(self):
"""Deletes the object from kratos
:raise: BackendError if Krator API call fails
"""
if self.__uuid:
try:
self.api.admin_delete_identity(self.__uuid)
return True
except KratosApiException as error:
raise BackendError(
f"Unable to delete entry, kratos replied with: {error}"
) from error
return False
@staticmethod
def find_by_email(api, email):
"""Queries Kratos to find kratos ID for this given identifier
:param api: Kratos ADMIN API Object
:param email: Identifier to look for
:return: Return none or string with ID
"""
kratos_id = None
# Get out user ID by iterating over all available IDs
data = api.admin_list_identities()
for kratos_obj in data.value:
# Unique identifier we use
if kratos_obj.traits['email'] == email:
kratos_id = str(kratos_obj.id)
return KratosUser(api, kratos_id)
return None
@staticmethod
def find_all(api):
"""Queries Kratos to find all kratos users and return them
as a list of KratosUser objects
:return: Return list
"""
kratos_id = None
return_list = []
# Get out user ID by iterating over all available IDs
data = api.admin_list_identities()
for kratos_obj in data.value:
kratos_id = str(kratos_obj.id)
return_list.append(KratosUser(api, kratos_id))
return return_list
@staticmethod
def extract_cookies(cookies):
"""Extract session and CSRF cookie from a list of cookies.
Iterate over a list of cookies and extract the session
cookies required for Kratos User Panel UI
:param cookies: str[], list of cookies
:return: Cookies concatenated as string
:rtype: str
"""
# Find kratos session cookie & csrf
cookie_csrf = None
cookie_session = None
for cookie in cookies:
search = re.match(r'ory_kratos_session=([^;]*);.*$', cookie)
if search:
cookie_session = "ory_kratos_session=" + search.group(1)
search = re.match(r'(csrf_token[^;]*);.*$', cookie)
if search:
cookie_csrf = search.group(1)
if not cookie_csrf or not cookie_session:
raise BackendError("Flow started, but expected cookies not found")
# Combined the relevant cookies
cookie = cookie_csrf + "; " + cookie_session
return cookie
def get_recovery_link(self):
"""Call the kratos API to create a recovery URL for a kratos ID
:param: api Kratos ADMIN API Object
:param: kratos_id UUID of kratos object
:return: Return none or string with recovery URL
"""
try:
# Create body request to get recovery link with admin API
body = AdminCreateSelfServiceRecoveryLinkBody(
expires_in="15m",
identity_id=self.__uuid
)
# Get recovery link from admin API
call = self.api.admin_create_self_service_recovery_link(
admin_create_self_service_recovery_link_body=body)
url = call.recovery_link
except KratosApiException:
return None
return url
def ui_set_password(self, api_url, recovery_url, password):
"""Follow a Kratos UI sequence to set password
Kratos does not provide an interface to set a password directly. However
we still can set a password by following the UI sequence. To so so we
to follow the steps which are normally done in a browser once someone
clicks the recovery link.
:param: api_url URL to public endpoint of API
:param: recovery_url Recovery URL as generated by Kratos
:param: password Password
:raise: Exception with error message as first argument
:return: boolean True on success, False on failure (usualy password
to simple)
"""
# Step 1: Open the recovery link and extract the cookies, as we need them
# for the next steps
try:
# We override the default Redirect handler with our custom handler to
# be able to catch the cookies.
opener = urllib.request.build_opener(RedirectFilter)
# We rewrite the URL we got. It can be we run this from an enviroment
# with different KRATUS_PUBLIC_URL API endpoint then kratos provide
# itself. For example in the case running as a job to create an admin
# account before TLS is setup/working
search = re.match(r'.*(self-service.recovery.flow.*)$', recovery_url)
if search:
recovery_url = api_url + search.group(1)
else:
raise BackendError('Did not find recovery flow')
opener.open(recovery_url)
# If we do not have a 2xx status, urllib throws an error, as we "stopped"
# at our redirect, we expect a 3xx status
except urllib.error.HTTPError as http_error:
# Kratos pre-0.8 returned 302, kratos 0.8 returns 303
if http_error.status in (302, 303):
# Get the cookie and redirect location from the response
# headers
cookies = http_error.headers.get_all('Set-Cookie')
url = http_error.headers.get('Location')
else:
raise BackendError('Unable to fetch recovery link') from http_error
else:
raise BackendError('Recovery link returned unexpected data')
# Step 2: Extract cookies and data for next step. We expect to have an
# authorized session now. We need the cookies for followup calls
# to make changes to the account (set password)
# Get flow id
search = re.match(r'.*\?flow=(.*)', url)
if search:
flow = search.group(1)
else:
raise BackendError('No Flow ID found for recovery sequence')
# Extract cookies with helper function
cookie = self.extract_cookies(cookies)
# Step 3: Get the "UI", kratos expect us to call the API to get the UI
# elements which contains the CSRF token, which is needed when
# posting the password data
try:
url = api_url + "/self-service/settings/flows?id=" + flow
req = Request(url, headers={'Cookie':cookie})
opener = urllib.request.build_opener()
# Execute the request, read the data, decode the JSON, get the
# right CSRF token out of the decoded JSON
obj = json.loads(opener.open(req).read())
csrf_token = obj['ui']['nodes'][0]['attributes']['value']
except Exception as error:
raise BackendError("Unable to get password reset UI") from error
# Step 4: Post out password
url = api_url + "self-service/settings?flow=" + flow
# Create POST data as form data
data = {
'method': 'password',
'password': password,
'csrf_token': csrf_token
}
data = urllib.parse.urlencode(data)
data = data.encode('ascii')
# POST the new password
try:
req = Request(url, data = data, headers={'Cookie':cookie}, method="POST")
opener = urllib.request.build_opener(RedirectFilter)
opener.open(req)
# If we do not have a 2xx status, urllib throws an error, as we "stopped"
# at our redirect, we expect a 3xx status
except urllib.error.HTTPError as http_error:
# Kratos pre-0.8 returned 302, kratos 0.8 returns 303
if http_error.status in (302, 303):
# Kratos only sends HTTP codes after our submission. We should
# now call the `settings` endpoint to see if our call
# succeeded, or else, if there are any messages about why it
# failed
try:
url = api_url + "/self-service/settings/flows?id=" + flow
req = Request(url, headers={'Cookie':cookie, "Accept": "application/json"})
opener = urllib.request.build_opener()
# Execute the request, read the data, decode the JSON
obj = json.loads(opener.open(req).read())
# If the 'state' has changed to 'success', the password was
# set successfully
if obj['state'] == 'success':
return True
# Failure: we check if there are error messages
for node in obj['ui']['nodes']:
if node['messages']:
print(f"Problems with field '{node['meta']['label']['text']}':")
for message in node['messages']:
print(message['text'])
raise BackendError("Password not set") from http_error
except Exception as error:
raise BackendError("Unable to get password reset UI") from error
return False
raise BackendError("Unable to set password by submitting form")
# Pylint complains about app not used. That is correct, but we will use that
# in the future. Ignore this error
# pylint: disable=unused-argument
def get_claims(self, app, roles, mapping=None) -> Dict[str, Dict[str, str]]:
"""Create openID Connect token
Use the userdata stored in the user object to create an OpenID Connect token.
The token returned by this function can be passed to Hydra,
which will store it and serve it to OpenID Connect Clients to retrieve user information.
If you need to relabel a field pass an array of tuples to mapping.
Example: getClaims('nextcloud', mapping=[("name", "username"),("roles", "groups")])
Attributes:
appname - Name or ID of app to connect to
roles - List of roles to add to the `stackspin_roles` claim
mapping - Mapping of the fields
Returns:
OpenID Connect token of type dict
"""
# Name should be set, however, we do not enforce this yet.
# if somebody does not set it's name, we use the email address
# as name
if self.name:
name = self.name
else:
name = self.email
if self.username:
username = self.username
else:
username = self.email
token = {
"name": name,
"preferred_username": username,
"email": self.email,
"stackspin_roles": roles,
}
# Relabel field names
if mapping:
for old_field_name, new_field_name in mapping:
token[new_field_name] = token[old_field_name]
del token[old_field_name]
return dict(id_token=token)

View file

@ -0,0 +1,384 @@
"""
List of functions to get data from Flux Kustomizations and Helmreleases
"""
import crypt
import secrets
import string
import jinja2
import yaml
from kubernetes import client, config
from kubernetes.client import api_client
from kubernetes.client.exceptions import ApiException
from kubernetes.utils import create_from_yaml
from kubernetes.utils.create_from_yaml import FailToCreateError
from flask import current_app
from config import LOAD_INCLUSTER_CONFIG
# Load the kube config once
#
# By default this loads whatever we define in the `KUBECONFIG` env variable,
# otherwise loads the config from default locations, similar to what kubectl
# does.
if LOAD_INCLUSTER_CONFIG:
config.load_incluster_config()
else:
config.load_kube_config()
def create_variables_secret(app_slug, variables_filepath):
"""Checks if a variables secret for app_name already exists, generates it if necessary.
If a secret already exists, loops through keys from the template, and adds
values for keys that miss in the Kubernetes secret, but are available in
the template.
:param app_slug: The slug of the app, used in the oauth secrets
:type app_slug: string
:param variables_filepath: The path to an existing jinja2 template
:type variables_filepath: string
:return: returns True, unless an exception gets raised by the Kubernetes API
:rtype: boolean
"""
new_secret_dict = read_template_to_dict(
variables_filepath,
{"app": app_slug})
secret_name, secret_namespace = get_secret_metadata(new_secret_dict)
current_secret_data = get_kubernetes_secret_data(
secret_name, secret_namespace
)
if current_secret_data is None:
# Create new secret
update_secret = False
elif current_secret_data.keys() != new_secret_dict["data"].keys():
# Update current secret with new keys
update_secret = True
current_app.logger.info(
f"Secret {secret_name} in namespace {secret_namespace}"
" already exists. Merging..."
)
# Merge dicts. Values from current_secret_data take precedence
new_secret_dict["data"] |= current_secret_data
else:
# Do Nothing
current_app.logger.info(
f"Secret {secret_name} in namespace {secret_namespace}"
" is already in a good state, doing nothing."
)
return True
current_app.logger.info(
f"Storing secret {secret_name} in namespace"
f" {secret_namespace} in cluster."
)
store_kubernetes_secret(
new_secret_dict, secret_namespace, update=update_secret
)
return True
def get_secret_metadata(secret_dict):
"""
Returns secret name and namespace from metadata field in a yaml string.
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:return: Tuple containing secret name and secret namespace
:rtype: tuple
"""
secret_name = secret_dict["metadata"]["name"]
# default namespace is flux-system, but other namespace can be
# provided in secret metadata
if "namespace" in secret_dict["metadata"]:
secret_namespace = secret_dict["metadata"]["namespace"]
else:
secret_namespace = "flux-system"
return secret_name, secret_namespace
def get_kubernetes_secret_data(secret_name, namespace):
"""
Get secret from Kubernetes
:param secret_name: Name of the secret
:type secret_name: string
:param namespace: Namespace of the secret
:type namespace: string
:return: The contents of a kubernetes secret or None if the secret does not exist.
:rtype: dict or None
"""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
try:
secret = api_instance.read_namespaced_secret(secret_name, namespace).data
except ApiException as ex:
# 404 is expected when the optional secret does not exist.
if ex.status != 404:
raise ex
return None
return secret
def get_kubernetes_config_map_data(config_map_name, namespace):
"""
Get ConfigMap from Kubernetes
:param config_map_name: Name of the ConfigMap
:type config_map_name: string
:param namespace: Namespace of the ConfigMap
:type namespace: string
:return: The contents of a kubernetes ConfigMap or None if the cm does not exist.
:rtype: dict or None
"""
api_instance = client.CoreV1Api()
try:
config_map = api_instance.read_namespaced_config_map(config_map_name, namespace).data
except ApiException as ex:
# 404 is expected when the optional secret does not exist.
if ex.status != 404:
raise ex
return None
return config_map
def store_kubernetes_secret(secret_dict, namespace, update=False):
"""
Stores either a new secret in the cluster, or updates an existing one.
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:param namespace: Namespace of the secret
:type namespace: string
:param update: If True, use `patch_kubernetes_secret`,
otherwise use `create_from_yaml` (default: False)
:type update: boolean
:return: None
:rtype: None
"""
api_client_instance = api_client.ApiClient()
if update:
verb = "updated"
api_response = patch_kubernetes_secret(secret_dict, namespace)
else:
verb = "created"
try:
api_response = create_from_yaml(
api_client_instance,
yaml_objects=[secret_dict],
namespace=namespace
)
except FailToCreateError as ex:
current_app.logger.info(f"Secret not created because of exception {ex}")
raise ex
current_app.logger.info(f"Secret {verb} with api response: {api_response}")
def store_kustomization(kustomization_template_filepath, app_slug):
"""
Add a kustomization that installs app {app_slug} to the cluster.
:param kustomization_template_filepath: Path to the template that describes
the kustomization. The template should have an `{{ app }}` entry.
:type kustomization_template_filepath: string
:param app_slug: Slug for the app, used to replace `{{ app }}` in the
template
:return: True on success
:rtype: boolean
"""
kustomization_dict = read_template_to_dict(kustomization_template_filepath,
{"app": app_slug})
custom_objects_api = client.CustomObjectsApi()
try:
api_response = custom_objects_api.create_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta2",
namespace="flux-system",
plural="kustomizations",
body=kustomization_dict)
except FailToCreateError as ex:
current_app.logger.info(
f"Could not create {app_slug} Kustomization because of exception {ex}")
raise ex
current_app.logger.debug(f"Kustomization created with api response: {api_response}")
return True
def delete_kustomization(kustomization_name):
"""
Deletes a kustomization.
Note that if the kustomization has `prune: true` in its spec, this will
trigger deletion of other elements generated by the Kustomizartion. See
App.uninstall() to learn what implications this has for what will and will
not be deleted by the kustomize-controller.
:param kustomization_name: name of the kustomization to delete
:type kustomization_name: string
:return: Response of delete API call
:rtype: dict
"""
custom_objects_api = client.CustomObjectsApi()
body = client.V1DeleteOptions()
try:
api_response = custom_objects_api.delete_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta2",
namespace="flux-system",
plural="kustomizations",
name=kustomization_name,
body=body)
except ApiException as ex:
current_app.logger.info(
f"Could not delete {kustomization_name} Kustomization because of exception {ex}")
raise ex
current_app.logger.debug(f"Kustomization deleted with api response: {api_response}")
return api_response
def read_template_to_dict(template_filepath, template_globals):
"""
Reads a Jinja2 template that contains yaml and turns it into a dict.
:param template_filepath: The path to an existing Jinja2 template
:type template_filepath: string
:param template_globals: The variables substituted in the template
:type template_globals: dict
:return: dict, or None if anything fails
"""
env = jinja2.Environment(
extensions=["jinja2_base64_filters.Base64Filters"])
env.filters["generate_password"] = generate_password
# Check if k8s secret already exists, if not, generate it
with open(template_filepath, encoding="UTF-8") as template_file:
lines = template_file.read()
templated_dict = yaml.safe_load(
env.from_string(lines, globals=template_globals).render()
)
return templated_dict
return None
def patch_kubernetes_secret(secret_dict, namespace):
"""
Patches secret in the cluster with new data.
Warning: currently ignores everything that's not in secret_dict["data"]
:param secret_dict: Dictionary of the secret as returned by read_namespaced_secret
:type secret_dict: dict
:param namespace: Namespace of the secret
:type namespace: string
:return: Response of the patch API call
"""
api_client_instance = api_client.ApiClient()
api_instance = client.CoreV1Api(api_client_instance)
name = secret_dict["metadata"]["name"]
body = {}
body["data"] = secret_dict["data"]
return api_instance.patch_namespaced_secret(name, namespace, body)
def generate_password(length):
"""
Generates a password with letters and digits.
:param length: The amount of characters in the password
:type length: int
:return: Generated password
:rtype: string
"""
length = int(length)
password = "".join((secrets.choice(string.ascii_letters + string.digits)
for i in range(length)))
return password
def gen_htpasswd(user, password):
"""
Generate htpasswd entry for user with password.
:param user: Username used in the htpasswd entry
:type user: string
:param password: Password for the user, will get encrypted.
:type password: string
:return: htpassword line entry
:rtype: string
"""
return f"{user}:{crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512))}"
def get_all_kustomizations(namespace='flux-system'):
"""
Returns all flux kustomizations in a namespace.
:param namespace: namespace that contains kustomizations. Default: `flux-system`
:type namespace: str
:return: 'items' in dict returned by CustomObjectsApi.list_namespaced_custom_object()
:rtype: dict[]
"""
api = client.CustomObjectsApi()
api_response = api.list_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
plural="kustomizations",
namespace=namespace,
)
return api_response
def get_all_helmreleases(namespace='stackspin', label_selector=""):
"""
Lists all helmreleases in a certain namespace (stackspin by default)
:param namespace: namespace that contains helmreleases. Default: `stackspin-apps`
:type namespace: str
:param label_selector: a label selector to limit the list (optional)
:type label_selector: str
:return: List of helmreleases
:rtype: dict[]
"""
api_instance = client.CustomObjectsApi()
try:
api_response = api_instance.list_namespaced_custom_object(
group="helm.toolkit.fluxcd.io",
version="v2beta1",
namespace=namespace,
plural="helmreleases",
label_selector=label_selector)
except ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return api_response['items']
def get_kustomization(name, namespace='flux-system'):
"""
Returns all info of a Flux kustomization with name 'name'
:param name: Name of the kustomizatoin
:type name: string
:param namespace: Namespace of the kustomization
:type namespace: string
:return: kustomization as returned by the API
:rtype: dict
"""
api = client.CustomObjectsApi()
try:
resource = api.get_namespaced_custom_object(
group="kustomize.toolkit.fluxcd.io",
version="v1beta1",
name=name,
namespace=namespace,
plural="kustomizations",
)
except client.exceptions.ApiException as error:
if error.status == 404:
return None
# Raise all non-404 errors
raise error
return resource