stackable_webhook/servers/
conversion.rs

1use std::{fmt::Debug, net::SocketAddr};
2
3use axum::{Json, Router, routing::post};
4use k8s_openapi::{
5    ByteString,
6    apiextensions_apiserver::pkg::apis::apiextensions::v1::{
7        CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
8        WebhookConversion,
9    },
10};
11// Re-export this type because users of the conversion webhook server require
12// this type to write the handler function. Instead of importing this type from
13// kube directly, consumers can use this type instead. This also eliminates
14// keeping the kube dependency version in sync between here and the operator.
15pub use kube::core::conversion::ConversionReview;
16use kube::{
17    Api, Client, ResourceExt,
18    api::{Patch, PatchParams},
19};
20use snafu::{OptionExt, ResultExt, Snafu};
21use tokio::{sync::mpsc, try_join};
22use tracing::instrument;
23use x509_cert::{
24    Certificate,
25    der::{EncodePem, pem::LineEnding},
26};
27
28use crate::{
29    WebhookError, WebhookHandler, WebhookServer, constants::CONVERSION_WEBHOOK_HTTPS_PORT,
30    options::WebhookOptions,
31};
32
33#[derive(Debug, Snafu)]
34pub enum ConversionWebhookError {
35    #[snafu(display("failed to create webhook server"))]
36    CreateWebhookServer { source: WebhookError },
37
38    #[snafu(display("failed to run webhook server"))]
39    RunWebhookServer { source: WebhookError },
40
41    #[snafu(display("failed to receive certificate from channel"))]
42    ReceiveCertificateFromChannel,
43
44    #[snafu(display("failed to convert CA certificate into PEM format"))]
45    ConvertCaToPem { source: x509_cert::der::Error },
46
47    #[snafu(display("failed to reconcile CRDs"))]
48    ReconcileCrds {
49        #[snafu(source(from(ConversionWebhookError, Box::new)))]
50        source: Box<ConversionWebhookError>,
51    },
52
53    #[snafu(display("failed to update CRD {crd_name:?}"))]
54    UpdateCrd {
55        source: kube::Error,
56        crd_name: String,
57    },
58}
59
60impl<F> WebhookHandler<ConversionReview, ConversionReview> for F
61where
62    F: FnOnce(ConversionReview) -> ConversionReview,
63{
64    fn call(self, req: ConversionReview) -> ConversionReview {
65        self(req)
66    }
67}
68
69// TODO: Add a builder, maybe with `bon`.
70#[derive(Debug)]
71pub struct ConversionWebhookOptions {
72    /// The bind address to bind the HTTPS server to.
73    pub socket_addr: SocketAddr,
74
75    /// The namespace the operator/webhook is running in.
76    pub namespace: String,
77
78    /// The name of the Kubernetes service which points to the operator/webhook.
79    pub service_name: String,
80
81    /// If the CRDs should be maintained automatically. Use the (negated) value from
82    /// `stackable_operator::cli::ProductOperatorRun::disable_crd_maintenance`
83    /// for this.
84    // # Because of https://github.com/rust-lang/cargo/issues/3475 we can not use a real link here
85    pub maintain_crds: bool,
86
87    /// The field manager used to apply Kubernetes objects, typically the operator name, e.g.
88    /// `airflow-operator`.
89    pub field_manager: String,
90}
91
92/// A ready-to-use CRD conversion webhook server.
93///
94/// See [`ConversionWebhookServer::new()`] for usage examples.
95pub struct ConversionWebhookServer {
96    crds: Vec<CustomResourceDefinition>,
97    options: ConversionWebhookOptions,
98    router: Router,
99    client: Client,
100}
101
102impl ConversionWebhookServer {
103    /// Creates a new conversion webhook server, which expects POST requests being made to the
104    /// `/convert/{crd name}` endpoint.
105    ///
106    /// You need to provide a few things for every CRD passed in via the `crds_and_handlers` argument:
107    ///
108    /// 1. The CRD
109    /// 2. A conversion function to convert between CRD versions. Typically you would use the
110    ///    the auto-generated `try_convert` function on CRD spec definition structs for this.
111    /// 3. A [`kube::Client`] used to create/update the CRDs.
112    ///
113    /// The [`ConversionWebhookServer`] takes care of reconciling the CRDs into the Kubernetes
114    /// cluster and takes care of adding itself as conversion webhook. This includes TLS
115    /// certificates and CA bundles.
116    ///
117    /// # Example
118    ///
119    /// ```no_run
120    /// use clap::Parser;
121    /// use stackable_webhook::{
122    ///     servers::{ConversionWebhookServer, ConversionWebhookOptions},
123    ///     constants::CONVERSION_WEBHOOK_HTTPS_PORT,
124    ///     WebhookOptions
125    /// };
126    /// use stackable_operator::{
127    ///     kube::Client,
128    ///     crd::s3::{S3Connection, S3ConnectionVersion},
129    ///     cli::ProductOperatorRun,
130    /// };
131    ///
132    /// # async fn test() {
133    /// // Things that should already be in you operator:
134    /// const OPERATOR_NAME: &str = "product-operator";
135    /// let client = Client::try_default().await.expect("failed to create Kubernetes client");
136    /// let ProductOperatorRun {
137    ///     operator_environment,
138    ///     disable_crd_maintenance,
139    ///     ..
140    /// } = ProductOperatorRun::parse();
141    ///
142    ///  let crds_and_handlers = [
143    ///     (
144    ///         S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1)
145    ///             .expect("failed to merge S3Connection CRD"),
146    ///         S3Connection::try_convert as fn(_) -> _,
147    ///     ),
148    /// ];
149    ///
150    /// let options = ConversionWebhookOptions {
151    ///     socket_addr: format!("0.0.0.0:{CONVERSION_WEBHOOK_HTTPS_PORT}")
152    ///         .parse()
153    ///         .expect("static address is always valid"),
154    ///     namespace: operator_environment.operator_namespace,
155    ///     service_name: operator_environment.operator_service_name,
156    ///     maintain_crds: !disable_crd_maintenance,
157    ///     field_manager: OPERATOR_NAME.to_owned(),
158    /// };
159    ///
160    /// // Construct the conversion webhook server
161    /// let conversion_webhook = ConversionWebhookServer::new(
162    ///     crds_and_handlers,
163    ///     options,
164    ///     client,
165    /// )
166    /// .await
167    /// .expect("failed to create ConversionWebhookServer");
168    ///
169    /// conversion_webhook.run().await.expect("failed to run ConversionWebhookServer");
170    /// # }
171    /// ```
172    #[instrument(
173        name = "create_conversion_webhook_server",
174        skip(crds_and_handlers, client)
175    )]
176    pub async fn new<H>(
177        crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)>,
178        options: ConversionWebhookOptions,
179        client: Client,
180    ) -> Result<Self, ConversionWebhookError>
181    where
182        H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
183    {
184        tracing::debug!("create new conversion webhook server");
185
186        let mut router = Router::new();
187        let mut crds = Vec::new();
188        for (crd, handler) in crds_and_handlers {
189            let crd_name = crd.name_any();
190            let handler_fn = |Json(review): Json<ConversionReview>| async {
191                let review = handler.call(review);
192                Json(review)
193            };
194
195            let route = format!("/convert/{crd_name}");
196            router = router.route(&route, post(handler_fn));
197            crds.push(crd);
198        }
199
200        Ok(Self {
201            options,
202            router,
203            client,
204            crds,
205        })
206    }
207
208    pub async fn run(self) -> Result<(), ConversionWebhookError> {
209        tracing::info!("starting conversion webhook server");
210
211        let Self {
212            options,
213            router,
214            client,
215            crds,
216        } = self;
217
218        let ConversionWebhookOptions {
219            socket_addr,
220            namespace: operator_namespace,
221            service_name: operator_service_name,
222            maintain_crds,
223            field_manager,
224        } = &options;
225
226        // This is how Kubernetes calls us, so it decides about the naming.
227        // AFAIK we can not influence this, so this is the only SAN entry needed.
228        let subject_alterative_dns_name =
229            format!("{operator_service_name}.{operator_namespace}.svc",);
230
231        let webhook_options = WebhookOptions {
232            subject_alterative_dns_names: vec![subject_alterative_dns_name],
233            socket_addr: *socket_addr,
234        };
235
236        let (server, mut cert_rx) = WebhookServer::new(router, webhook_options)
237            .await
238            .context(CreateWebhookServerSnafu)?;
239
240        // We block the ConversionWebhookServer creation until the certificates have been generated.
241        // This way we
242        // 1. Are able to apply the CRDs before we start the actual controllers relying on them
243        // 2. Avoid updating them shortly after as cert have been generated. Doing so would cause
244        // unnecessary "too old resource version" errors in the controllers as the CRD was updated.
245        let current_cert = cert_rx
246            .recv()
247            .await
248            .context(ReceiveCertificateFromChannelSnafu)?;
249
250        if *maintain_crds {
251            Self::reconcile_crds(
252                &client,
253                field_manager,
254                &crds,
255                operator_namespace,
256                operator_service_name,
257                current_cert,
258            )
259            .await
260            .context(ReconcileCrdsSnafu)?;
261
262            try_join!(
263                Self::run_webhook_server(server),
264                Self::run_crd_reconciliation_loop(
265                    cert_rx,
266                    &client,
267                    field_manager,
268                    &crds,
269                    operator_namespace,
270                    operator_service_name,
271                ),
272            )?;
273        } else {
274            Self::run_webhook_server(server).await?;
275        };
276
277        Ok(())
278    }
279
280    async fn run_webhook_server(server: WebhookServer) -> Result<(), ConversionWebhookError> {
281        server.run().await.context(RunWebhookServerSnafu)
282    }
283
284    async fn run_crd_reconciliation_loop(
285        mut cert_rx: mpsc::Receiver<Certificate>,
286        client: &Client,
287        field_manager: &str,
288        crds: &[CustomResourceDefinition],
289        operator_namespace: &str,
290        operator_service_name: &str,
291    ) -> Result<(), ConversionWebhookError> {
292        while let Some(current_cert) = cert_rx.recv().await {
293            Self::reconcile_crds(
294                client,
295                field_manager,
296                crds,
297                operator_namespace,
298                operator_service_name,
299                current_cert,
300            )
301            .await
302            .context(ReconcileCrdsSnafu)?;
303        }
304        Ok(())
305    }
306
307    #[instrument(skip_all)]
308    async fn reconcile_crds(
309        client: &Client,
310        field_manager: &str,
311        crds: &[CustomResourceDefinition],
312        operator_namespace: &str,
313        operator_service_name: &str,
314        current_cert: Certificate,
315    ) -> Result<(), ConversionWebhookError> {
316        tracing::info!(
317            crds = ?crds.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
318            "Reconciling CRDs"
319        );
320        let ca_bundle = current_cert
321            .to_pem(LineEnding::LF)
322            .context(ConvertCaToPemSnafu)?;
323
324        let crd_api: Api<CustomResourceDefinition> = Api::all(client.clone());
325        for mut crd in crds.iter().cloned() {
326            let crd_name = crd.name_any();
327
328            crd.spec.conversion = Some(CustomResourceConversion {
329                strategy: "Webhook".to_string(),
330                webhook: Some(WebhookConversion {
331                    // conversionReviewVersions indicates what ConversionReview versions are understood/preferred by the webhook.
332                    // The first version in the list understood by the API server is sent to the webhook.
333                    // The webhook must respond with a ConversionReview object in the same version it received.
334                    conversion_review_versions: vec!["v1".to_string()],
335                    client_config: Some(WebhookClientConfig {
336                        service: Some(ServiceReference {
337                            name: operator_service_name.to_owned(),
338                            namespace: operator_namespace.to_owned(),
339                            path: Some(format!("/convert/{crd_name}")),
340                            port: Some(CONVERSION_WEBHOOK_HTTPS_PORT.into()),
341                        }),
342                        ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
343                        url: None,
344                    }),
345                }),
346            });
347
348            let patch = Patch::Apply(&crd);
349            let patch_params = PatchParams::apply(field_manager);
350            crd_api
351                .patch(&crd_name, &patch_params, &patch)
352                .await
353                .with_context(|_| UpdateCrdSnafu {
354                    crd_name: crd_name.to_string(),
355                })?;
356        }
357        Ok(())
358    }
359}