diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index eba31bcbb..9e73fe409 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -9,7 +9,13 @@ All notable changes to this project will be documented in this file. - Add support for specifying a `clientAuthenticationMethod` for OIDC ([#1178]). This was originally done in [#1158] and had been reverted in [#1170]. +## Changed + +- Check self RBAC permissions before attempting to list types when garbage collecting possible orphans ([#1179]). + This clears up an error message that was logged when operators tried to list types when they had no permission to do so. + [#1178]: https://github.com/stackabletech/operator-rs/pull/1178 +[#1179]: https://github.com/stackabletech/operator-rs/pull/1179 ## [0.108.0] - 2026-03-10 diff --git a/crates/stackable-operator/src/client.rs b/crates/stackable-operator/src/client.rs index bb9ceb122..52fa77717 100644 --- a/crates/stackable-operator/src/client.rs +++ b/crates/stackable-operator/src/client.rs @@ -1,12 +1,20 @@ use std::{ + any::TypeId, + collections::HashMap, convert::TryFrom, fmt::{Debug, Display}, + sync::{Arc, RwLock}, + time::{Duration, Instant}, }; use either::Either; use futures::StreamExt; use k8s_openapi::{ - ClusterResourceScope, NamespaceResourceScope, apimachinery::pkg::apis::meta::v1::LabelSelector, + ClusterResourceScope, NamespaceResourceScope, + api::authorization::v1::{ + ResourceAttributes, SelfSubjectAccessReview, SelfSubjectAccessReviewSpec, + }, + apimachinery::pkg::apis::meta::v1::LabelSelector, }; use kube::{ Api, Config, @@ -28,6 +36,15 @@ pub type Result = std::result::Result; #[derive(Debug, Snafu)] pub enum Error { + // The following two variants cannot have a `source` field, + // because that would require a leak of the `self` reference outside of can_list() function + // which would not compile. + #[snafu(display("unable to write cache of object list permissions"))] + ListCachePermissionsWrite, + + #[snafu(display("unable to read cache of object list permissions"))] + ListCachePermissionsRead, + #[snafu(display("unable to get resource {resource_name:?}"))] GetResource { source: kube::Error, @@ -91,6 +108,10 @@ pub enum Error { }, } +// Type that maps (resource type, namespace) to (can list, cached at). +// This type is needed to silence clippy warnings about type complexity of the `list_permissions` field in `Client`. +type ListableResourceMap = HashMap<(TypeId, String), (bool, Instant)>; + /// This `Client` can be used to access Kubernetes. /// It wraps an underlying [kube::client::Client] and provides some common functionality. #[derive(Clone)] @@ -103,8 +124,17 @@ pub struct Client { pub default_namespace: String, pub kubernetes_cluster_info: KubernetesClusterInfo, + + /// Cache of `SelfSubjectAccessReview` results keyed by (resource type, namespace). + list_permissions: Arc>, } +/// How long a cached `SelfSubjectAccessReview` result is considered valid. +/// A TTL is used rather than caching indefinitely because RBAC rules can change at runtime +/// (e.g. an admin updates a `ClusterRole`), and we want to pick up such changes eventually +/// without requiring an operator restart. +const LIST_PERMISSION_TTL: Duration = Duration::from_secs(300); + impl Client { pub fn new( client: KubeClient, @@ -125,6 +155,7 @@ impl Client { delete_params: DeleteParams::default(), default_namespace, kubernetes_cluster_info, + list_permissions: Arc::default(), } } @@ -520,6 +551,88 @@ impl Client { Api::all(self.client.clone()) } + /// Returns whether the current service account is allowed to `list` resources of type `T` + /// in the given `namespace`, by performing a [`SelfSubjectAccessReview`]. + /// + /// Results are cached per (resource type, namespace) pair to avoid a SAR API call on every + /// reconciliation. The cache has a TTL of [`LIST_PERMISSION_TTL`] so that RBAC changes made + /// at runtime are eventually picked up without requiring an operator restart. + /// + /// If the review request itself fails (e.g. due to a network error), this returns `true` so + /// that callers fall back to attempting the operation and handling any resulting error. + /// Failures are intentionally not cached: a transient error should not suppress deletion + /// for the full TTL duration. + pub async fn can_list(&self, namespace: &str) -> Result + where + T: Resource + 'static, + { + let key = (TypeId::of::(), namespace.to_string()); + + // This nested block is necessary to ensure the cache lock is dropped before the write() call below to avoid deadlocks. + // Alternatively a drop(cache) call could be used but this is more idiomatic. + { + let cache = self + .list_permissions + .read() + .map_err(|_| Error::ListCachePermissionsRead)?; + if let Some(&(allowed, cached_at)) = cache.get(&key) + && cached_at.elapsed() < LIST_PERMISSION_TTL + { + tracing::debug!( + allowed = allowed, + namespace = namespace, + type_name = std::any::type_name::(), + "object list permission from cache", + ); + + return Ok(allowed); + } + } + + let sar = SelfSubjectAccessReview { + spec: SelfSubjectAccessReviewSpec { + resource_attributes: Some(ResourceAttributes { + namespace: Some(namespace.to_string()), + verb: Some("list".to_string()), + group: Some(T::group(&()).to_string()), + resource: Some(T::plural(&()).to_string()), + ..Default::default() + }), + ..Default::default() + }, + ..Default::default() + }; + + let api: Api = Api::all(self.client.clone()); + let allowed = match api.create(&PostParams::default(), &sar).await { + Ok(response) => { + let allowed = response.status.map(|s| s.allowed).unwrap_or(false); + self.list_permissions + .write() + .map_err(|_| Error::ListCachePermissionsWrite)? + .insert(key, (allowed, Instant::now())); + allowed + } + Err(err) => { + tracing::error!( + namespace = namespace, + type_name = std::any::type_name::(), + error = ?err, + "failed to perform SelfSubjectAccessReview, assuming list is allowed", + ); + true + } + }; + + tracing::debug!( + allowed = allowed, + namespace = namespace, + type_name = std::any::type_name::(), + "object list permissions", + ); + Ok(allowed) + } + #[deprecated(note = "Use Api::get_api instead", since = "0.26.0")] pub fn get_namespaced_api(&self, namespace: &str) -> Api where diff --git a/crates/stackable-operator/src/cluster_resources.rs b/crates/stackable-operator/src/cluster_resources.rs index 6bb0474c0..6c9e8f37f 100644 --- a/crates/stackable-operator/src/cluster_resources.rs +++ b/crates/stackable-operator/src/cluster_resources.rs @@ -25,7 +25,7 @@ use k8s_openapi::{ use kube::{Resource, ResourceExt}; use serde::{Serialize, de::DeserializeOwned}; use snafu::{OptionExt, ResultExt, Snafu}; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::{ client::{Client, GetApi}, @@ -105,6 +105,7 @@ pub trait ClusterResource: + Resource + GetApi + Serialize + + 'static { /// This must be implemented for any [`ClusterResources`] that should be adapted before /// applying depending on the chosen [`ClusterResourceApplyStrategy`]. @@ -713,54 +714,56 @@ impl<'a> ClusterResources<'a> { client: &Client, ) -> Result<()> { if !self.apply_strategy.delete_orphans() { - debug!( - "Skip deleting orphaned resources because of [{}] strategy.", - self.apply_strategy + tracing::debug!( + apply_strategy = ?self.apply_strategy, + "skip deleting orphaned resources because of strategy.", ); return Ok(()); } - match self.list_deployed_cluster_resources::(client).await { - Ok(deployed_cluster_resources) => { - let mut orphaned_resources = Vec::new(); - - for resource in deployed_cluster_resources { - let resource_id = resource.uid().context(MissingObjectKeySnafu { - key: "metadata/uid", - })?; - if !self.resource_ids.contains(&resource_id) { - orphaned_resources.push(resource); - } - } - - if !orphaned_resources.is_empty() { - info!( - "Deleting orphaned {}: {}", - T::plural(&()), - ClusterResources::print_resources(&orphaned_resources), - ); - for resource in orphaned_resources.iter() { - client - .delete(resource) - .await - .context(DeleteOrphanedResourceSnafu)?; - } - } + if !client + .can_list::(&self.namespace) + .await + .context(ListClusterResourcesSnafu)? + { + tracing::debug!( + type_name = std::any::type_name::(), + "skipping deletion of orphans of this type because the operator is not allowed to list them", + ); + return Ok(()); + } - Ok(()) + let deployed_cluster_resources = self + .list_deployed_cluster_resources::(client) + .await + .context(ListClusterResourcesSnafu)?; + + let mut orphaned_resources = Vec::new(); + + for resource in deployed_cluster_resources { + let resource_id = resource.uid().context(MissingObjectKeySnafu { + key: "metadata/uid", + })?; + if !self.resource_ids.contains(&resource_id) { + orphaned_resources.push(resource); } - Err(crate::client::Error::ListResources { - source: kube::Error::Api(s), - }) if s.is_forbidden() => { - debug!( - "Skipping deletion of orphaned {} because the operator is not allowed to list \ - them and is therefore probably not in charge of them.", - T::plural(&()) - ); - Ok(()) + } + + if !orphaned_resources.is_empty() { + tracing::info!( + type_name = std::any::type_name::(), + orphans = ClusterResources::print_resources(&orphaned_resources), + "deleting orphans", + ); + for resource in orphaned_resources.iter() { + client + .delete(resource) + .await + .context(DeleteOrphanedResourceSnafu)?; } - Err(error) => Err(error).context(ListClusterResourcesSnafu), } + + Ok(()) } /// Creates a string containing the names and if present namespaces of the given resources