stackable_webhook/servers/
conversion.rs

1use std::{fmt::Debug, net::SocketAddr};
2
3use axum::{Json, Router, routing::post};
4use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
5// Re-export this type because users of the conversion webhook server require
6// this type to write the handler function. Instead of importing this type from
7// kube directly, consumers can use this type instead. This also eliminates
8// keeping the kube dependency version in sync between here and the operator.
9pub use kube::core::conversion::ConversionReview;
10use kube::{Client, ResourceExt};
11use snafu::{ResultExt, Snafu};
12use tokio::sync::{mpsc, oneshot};
13use tracing::instrument;
14use x509_cert::Certificate;
15
16use crate::{
17    WebhookError, WebhookHandler, WebhookServer,
18    maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions},
19    options::WebhookOptions,
20};
21
22#[derive(Debug, Snafu)]
23pub enum ConversionWebhookError {
24    #[snafu(display("failed to create webhook server"))]
25    CreateWebhookServer { source: WebhookError },
26
27    #[snafu(display("failed to run webhook server"))]
28    RunWebhookServer { source: WebhookError },
29
30    #[snafu(display("failed to receive certificate from channel"))]
31    ReceiveCertificateFromChannel,
32
33    #[snafu(display("failed to convert CA certificate into PEM format"))]
34    ConvertCaToPem { source: x509_cert::der::Error },
35
36    #[snafu(display("failed to reconcile CRDs"))]
37    ReconcileCrds {
38        #[snafu(source(from(ConversionWebhookError, Box::new)))]
39        source: Box<ConversionWebhookError>,
40    },
41
42    #[snafu(display("failed to update CRD {crd_name:?}"))]
43    UpdateCrd {
44        source: kube::Error,
45        crd_name: String,
46    },
47}
48
49impl<F> WebhookHandler<ConversionReview, ConversionReview> for F
50where
51    F: FnOnce(ConversionReview) -> ConversionReview,
52{
53    fn call(self, req: ConversionReview) -> ConversionReview {
54        self(req)
55    }
56}
57
58// TODO: Add a builder, maybe with `bon`.
59#[derive(Debug)]
60pub struct ConversionWebhookOptions<'a> {
61    /// The bind address to bind the HTTPS server to.
62    pub socket_addr: SocketAddr,
63
64    /// The namespace the operator/webhook is running in.
65    pub namespace: &'a str,
66
67    /// The name of the Kubernetes service which points to the operator/webhook.
68    pub service_name: &'a str,
69}
70
71/// A ready-to-use CRD conversion webhook server.
72///
73/// See [`ConversionWebhookServer::new()`] for usage examples.
74pub struct ConversionWebhookServer(WebhookServer);
75
76impl ConversionWebhookServer {
77    /// The default socket address the conversion webhook server binds to, see
78    /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`].
79    pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS;
80
81    /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being
82    /// made to the `/convert/{CRD_NAME}` endpoint.
83    ///
84    /// The TLS certificate is automatically generated and rotated.
85    ///
86    /// ## Parameters
87    ///
88    /// This function expects the following parameters:
89    ///
90    /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`]
91    ///   to a handler function. In most cases, the generated `CustomResource::try_merge` function
92    ///   should be used. It provides the expected `fn(ConversionReview) -> ConversionReview`
93    ///   signature.
94    /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the
95    ///   webhook server, eg. the socket address used to listen on.
96    ///
97    /// ## Return Values
98    ///
99    /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`]
100    /// variant:
101    ///
102    /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See
103    ///   [`ConversionWebhookServer::run`] for more details.
104    /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly
105    ///   generated TLS certificate. This channel is used by the CRD maintainer to trigger a
106    ///   reconcile of the CRDs it maintains.
107    ///
108    /// ## Example
109    ///
110    /// ```no_run
111    /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider};
112    /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions};
113    /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion};
114    ///
115    /// # #[tokio::main]
116    /// # async fn main() {
117    /// # CryptoProvider::install_default(default_provider()).unwrap();
118    /// let crds_and_handlers = vec![
119    ///     (
120    ///         S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1)
121    ///             .expect("the S3Connection CRD must be merged"),
122    ///         S3Connection::try_convert,
123    ///     )
124    /// ];
125    ///
126    /// let options = ConversionWebhookOptions {
127    ///     socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS,
128    ///     namespace: "stackable-operators",
129    ///     service_name: "product-operator",
130    /// };
131    ///
132    /// let (conversion_webhook_server, _certificate_rx) =
133    ///         ConversionWebhookServer::new(crds_and_handlers, options)
134    ///             .await
135    ///             .unwrap();
136    ///
137    /// conversion_webhook_server.run().await.unwrap();
138    /// # }
139    /// ```
140    #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))]
141    pub async fn new<H>(
142        crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)>,
143        options: ConversionWebhookOptions<'_>,
144    ) -> Result<(Self, mpsc::Receiver<Certificate>), ConversionWebhookError>
145    where
146        H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
147    {
148        tracing::debug!("create new conversion webhook server");
149
150        let mut router = Router::new();
151
152        for (crd, handler) in crds_and_handlers {
153            let crd_name = crd.name_any();
154            let handler_fn = |Json(review): Json<ConversionReview>| async {
155                let review = handler.call(review);
156                Json(review)
157            };
158
159            // TODO (@Techassi): Make this part of the trait mentioned above
160            let route = format!("/convert/{crd_name}");
161            router = router.route(&route, post(handler_fn));
162        }
163
164        let ConversionWebhookOptions {
165            socket_addr,
166            namespace: operator_namespace,
167            service_name: operator_service_name,
168        } = &options;
169
170        // This is how Kubernetes calls us, so it decides about the naming.
171        // AFAIK we can not influence this, so this is the only SAN entry needed.
172        // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes)
173        // HTTP clients can use the FQDN of the service for testing or user use-cases.
174        let subject_alterative_dns_name =
175            format!("{operator_service_name}.{operator_namespace}.svc",);
176
177        let webhook_options = WebhookOptions {
178            subject_alterative_dns_names: vec![subject_alterative_dns_name],
179            socket_addr: *socket_addr,
180        };
181
182        let (server, certificate_rx) = WebhookServer::new(router, webhook_options)
183            .await
184            .context(CreateWebhookServerSnafu)?;
185
186        Ok((Self(server), certificate_rx))
187    }
188
189    /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`],
190    /// and a [`oneshot::Receiver`].
191    ///
192    /// ## Parameters
193    ///
194    /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`]
195    ///   to a handler function. In most cases, the generated `CustomResource::try_merge` function
196    ///   should be used. It provides the expected `fn(ConversionReview) -> ConversionReview`
197    ///   signature.
198    /// - `operator_service_name`: The name of the Kubernetes service name which points to the
199    ///   operator/conversion webhook. This is used to construct the service reference in the CRD
200    ///   `spec.conversion` field.
201    /// - `operator_namespace`: The namespace the operator runs in. This is used to construct the
202    ///   service reference in the CRD `spec.conversion` field.
203    /// - `disable_maintainer`: A boolean value to indicate if the [`CustomResourceDefinitionMaintainer`]
204    ///   should be disabled.
205    /// - `client`: A [`kube::Client`] used to maintain the custom resource definitions.
206    ///
207    /// See the referenced items for more details on usage.
208    ///
209    /// ## Return Values
210    ///
211    /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See
212    ///   [`ConversionWebhookServer::run`] for more details.
213    /// - The [`CustomResourceDefinitionMaintainer`] which is used to run the maintainer. See
214    ///   [`CustomResourceDefinitionMaintainer::run`] for more details.
215    /// - A [`oneshot::Receiver`] which is triggered after the initial reconciliation of the CRDs
216    ///   succeeded. This signal can be used to deploy any custom resources defined by these CRDs.
217    ///
218    /// ## Example
219    ///
220    /// ```no_run
221    /// # use futures_util::TryFutureExt;
222    /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider};
223    /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions};
224    /// use stackable_operator::{kube::Client, crd::s3::{S3Connection, S3ConnectionVersion}};
225    ///
226    /// # #[tokio::main]
227    /// # async fn main() {
228    /// # CryptoProvider::install_default(default_provider()).unwrap();
229    /// let client = Client::try_default().await.unwrap();
230    ///
231    /// let crds_and_handlers = vec![
232    ///     (
233    ///         S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1)
234    ///             .expect("the S3Connection CRD must be merged"),
235    ///         S3Connection::try_convert,
236    ///     )
237    /// ];
238    ///
239    /// let (conversion_webhook_server, crd_maintainer, _initial_reconcile_rx) =
240    ///     ConversionWebhookServer::with_maintainer(
241    ///         crds_and_handlers,
242    ///         "my-operator",
243    ///         "my-namespace",
244    ///         "my-field-manager",
245    ///         false,
246    ///         client,
247    ///     )
248    ///     .await
249    ///     .unwrap();
250    ///
251    /// let conversion_webhook_server = conversion_webhook_server
252    ///     .run()
253    ///     .map_err(|err| err.to_string());
254    ///
255    /// let crd_maintainer = crd_maintainer
256    ///     .run()
257    ///     .map_err(|err| err.to_string());
258    ///
259    /// // Run both the conversion webhook server and crd_maintainer concurrently, eg. with
260    /// // futures::try_join!.
261    /// futures_util::try_join!(conversion_webhook_server, crd_maintainer).unwrap();
262    /// # }
263    /// ```
264    pub async fn with_maintainer<'a, H>(
265        // TODO (@Techassi): Use a trait type here which can be used to build all part of the
266        // conversion webhook server and a CRD maintainer.
267        crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)> + Clone,
268        operator_service_name: &'a str,
269        operator_namespace: &'a str,
270        field_manager: &'a str,
271        disable_maintainer: bool,
272        client: Client,
273    ) -> Result<
274        (
275            Self,
276            CustomResourceDefinitionMaintainer<'a>,
277            oneshot::Receiver<()>,
278        ),
279        ConversionWebhookError,
280    >
281    where
282        H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
283    {
284        let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS;
285
286        // TODO (@Techassi): These should be moved into a builder
287        let webhook_options = ConversionWebhookOptions {
288            service_name: operator_service_name,
289            namespace: operator_namespace,
290            socket_addr,
291        };
292
293        let (conversion_webhook_server, certificate_rx) =
294            Self::new(crds_and_handlers.clone(), webhook_options).await?;
295
296        let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd);
297
298        // TODO (@Techassi): These should be moved into a builder
299        let maintainer_options = CustomResourceDefinitionMaintainerOptions {
300            webhook_https_port: socket_addr.port(),
301            disabled: disable_maintainer,
302            operator_service_name,
303            operator_namespace,
304            field_manager,
305        };
306
307        let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
308            client,
309            certificate_rx,
310            definitions,
311            maintainer_options,
312        );
313
314        Ok((conversion_webhook_server, maintainer, initial_reconcile_rx))
315    }
316
317    /// Runs the [`ConversionWebhookServer`] asynchronously.
318    pub async fn run(self) -> Result<(), ConversionWebhookError> {
319        tracing::info!("run conversion webhook server");
320        self.0.run().await.context(RunWebhookServerSnafu)
321    }
322}