stackable_webhook/servers/conversion.rs
1use std::{fmt::Debug, net::SocketAddr};
2
3use axum::{Json, Router, routing::post};
4use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
5// Re-export this type because users of the conversion webhook server require
6// this type to write the handler function. Instead of importing this type from
7// kube directly, consumers can use this type instead. This also eliminates
8// keeping the kube dependency version in sync between here and the operator.
9pub use kube::core::conversion::ConversionReview;
10use kube::{Client, ResourceExt};
11use snafu::{ResultExt, Snafu};
12use tokio::sync::{mpsc, oneshot};
13use tracing::instrument;
14use x509_cert::Certificate;
15
16use crate::{
17 WebhookError, WebhookHandler, WebhookServer,
18 maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions},
19 options::WebhookOptions,
20};
21
22#[derive(Debug, Snafu)]
23pub enum ConversionWebhookError {
24 #[snafu(display("failed to create webhook server"))]
25 CreateWebhookServer { source: WebhookError },
26
27 #[snafu(display("failed to run webhook server"))]
28 RunWebhookServer { source: WebhookError },
29
30 #[snafu(display("failed to receive certificate from channel"))]
31 ReceiveCertificateFromChannel,
32
33 #[snafu(display("failed to convert CA certificate into PEM format"))]
34 ConvertCaToPem { source: x509_cert::der::Error },
35
36 #[snafu(display("failed to reconcile CRDs"))]
37 ReconcileCrds {
38 #[snafu(source(from(ConversionWebhookError, Box::new)))]
39 source: Box<ConversionWebhookError>,
40 },
41
42 #[snafu(display("failed to update CRD {crd_name:?}"))]
43 UpdateCrd {
44 source: kube::Error,
45 crd_name: String,
46 },
47}
48
49impl<F> WebhookHandler<ConversionReview, ConversionReview> for F
50where
51 F: FnOnce(ConversionReview) -> ConversionReview,
52{
53 fn call(self, req: ConversionReview) -> ConversionReview {
54 self(req)
55 }
56}
57
58// TODO: Add a builder, maybe with `bon`.
59#[derive(Debug)]
60pub struct ConversionWebhookOptions<'a> {
61 /// The bind address to bind the HTTPS server to.
62 pub socket_addr: SocketAddr,
63
64 /// The namespace the operator/webhook is running in.
65 pub namespace: &'a str,
66
67 /// The name of the Kubernetes service which points to the operator/webhook.
68 pub service_name: &'a str,
69}
70
71/// A ready-to-use CRD conversion webhook server.
72///
73/// See [`ConversionWebhookServer::new()`] for usage examples.
74pub struct ConversionWebhookServer(WebhookServer);
75
76impl ConversionWebhookServer {
77 /// The default socket address the conversion webhook server binds to, see
78 /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`].
79 pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS;
80
81 /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being
82 /// made to the `/convert/{CRD_NAME}` endpoint.
83 ///
84 /// The TLS certificate is automatically generated and rotated.
85 ///
86 /// ## Parameters
87 ///
88 /// This function expects the following parameters:
89 ///
90 /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`]
91 /// to a handler function. In most cases, the generated `CustomResource::try_merge` function
92 /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview`
93 /// signature.
94 /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the
95 /// webhook server, eg. the socket address used to listen on.
96 ///
97 /// ## Return Values
98 ///
99 /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`]
100 /// variant:
101 ///
102 /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See
103 /// [`ConversionWebhookServer::run`] for more details.
104 /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly
105 /// generated TLS certificate. This channel is used by the CRD maintainer to trigger a
106 /// reconcile of the CRDs it maintains.
107 ///
108 /// ## Example
109 ///
110 /// ```no_run
111 /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider};
112 /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions};
113 /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion};
114 ///
115 /// # #[tokio::main]
116 /// # async fn main() {
117 /// # CryptoProvider::install_default(default_provider()).unwrap();
118 /// let crds_and_handlers = vec![
119 /// (
120 /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1)
121 /// .expect("the S3Connection CRD must be merged"),
122 /// S3Connection::try_convert,
123 /// )
124 /// ];
125 ///
126 /// let options = ConversionWebhookOptions {
127 /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS,
128 /// namespace: "stackable-operators",
129 /// service_name: "product-operator",
130 /// };
131 ///
132 /// let (conversion_webhook_server, _certificate_rx) =
133 /// ConversionWebhookServer::new(crds_and_handlers, options)
134 /// .await
135 /// .unwrap();
136 ///
137 /// conversion_webhook_server.run().await.unwrap();
138 /// # }
139 /// ```
140 #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))]
141 pub async fn new<H>(
142 crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)>,
143 options: ConversionWebhookOptions<'_>,
144 ) -> Result<(Self, mpsc::Receiver<Certificate>), ConversionWebhookError>
145 where
146 H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
147 {
148 tracing::debug!("create new conversion webhook server");
149
150 let mut router = Router::new();
151
152 for (crd, handler) in crds_and_handlers {
153 let crd_name = crd.name_any();
154 let handler_fn = |Json(review): Json<ConversionReview>| async {
155 let review = handler.call(review);
156 Json(review)
157 };
158
159 // TODO (@Techassi): Make this part of the trait mentioned above
160 let route = format!("/convert/{crd_name}");
161 router = router.route(&route, post(handler_fn));
162 }
163
164 let ConversionWebhookOptions {
165 socket_addr,
166 namespace: operator_namespace,
167 service_name: operator_service_name,
168 } = &options;
169
170 // This is how Kubernetes calls us, so it decides about the naming.
171 // AFAIK we can not influence this, so this is the only SAN entry needed.
172 // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes)
173 // HTTP clients can use the FQDN of the service for testing or user use-cases.
174 let subject_alterative_dns_name =
175 format!("{operator_service_name}.{operator_namespace}.svc",);
176
177 let webhook_options = WebhookOptions {
178 subject_alterative_dns_names: vec![subject_alterative_dns_name],
179 socket_addr: *socket_addr,
180 };
181
182 let (server, certificate_rx) = WebhookServer::new(router, webhook_options)
183 .await
184 .context(CreateWebhookServerSnafu)?;
185
186 Ok((Self(server), certificate_rx))
187 }
188
189 /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`],
190 /// and a [`oneshot::Receiver`].
191 ///
192 /// ## Parameters
193 ///
194 /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`]
195 /// to a handler function. In most cases, the generated `CustomResource::try_merge` function
196 /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview`
197 /// signature.
198 /// - `operator_service_name`: The name of the Kubernetes service name which points to the
199 /// operator/conversion webhook. This is used to construct the service reference in the CRD
200 /// `spec.conversion` field.
201 /// - `operator_namespace`: The namespace the operator runs in. This is used to construct the
202 /// service reference in the CRD `spec.conversion` field.
203 /// - `disable_maintainer`: A boolean value to indicate if the [`CustomResourceDefinitionMaintainer`]
204 /// should be disabled.
205 /// - `client`: A [`kube::Client`] used to maintain the custom resource definitions.
206 ///
207 /// See the referenced items for more details on usage.
208 ///
209 /// ## Return Values
210 ///
211 /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See
212 /// [`ConversionWebhookServer::run`] for more details.
213 /// - The [`CustomResourceDefinitionMaintainer`] which is used to run the maintainer. See
214 /// [`CustomResourceDefinitionMaintainer::run`] for more details.
215 /// - A [`oneshot::Receiver`] which is triggered after the initial reconciliation of the CRDs
216 /// succeeded. This signal can be used to deploy any custom resources defined by these CRDs.
217 ///
218 /// ## Example
219 ///
220 /// ```no_run
221 /// # use futures_util::TryFutureExt;
222 /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider};
223 /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions};
224 /// use stackable_operator::{kube::Client, crd::s3::{S3Connection, S3ConnectionVersion}};
225 ///
226 /// # #[tokio::main]
227 /// # async fn main() {
228 /// # CryptoProvider::install_default(default_provider()).unwrap();
229 /// let client = Client::try_default().await.unwrap();
230 ///
231 /// let crds_and_handlers = vec![
232 /// (
233 /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1)
234 /// .expect("the S3Connection CRD must be merged"),
235 /// S3Connection::try_convert,
236 /// )
237 /// ];
238 ///
239 /// let (conversion_webhook_server, crd_maintainer, _initial_reconcile_rx) =
240 /// ConversionWebhookServer::with_maintainer(
241 /// crds_and_handlers,
242 /// "my-operator",
243 /// "my-namespace",
244 /// "my-field-manager",
245 /// false,
246 /// client,
247 /// )
248 /// .await
249 /// .unwrap();
250 ///
251 /// let conversion_webhook_server = conversion_webhook_server
252 /// .run()
253 /// .map_err(|err| err.to_string());
254 ///
255 /// let crd_maintainer = crd_maintainer
256 /// .run()
257 /// .map_err(|err| err.to_string());
258 ///
259 /// // Run both the conversion webhook server and crd_maintainer concurrently, eg. with
260 /// // futures::try_join!.
261 /// futures_util::try_join!(conversion_webhook_server, crd_maintainer).unwrap();
262 /// # }
263 /// ```
264 pub async fn with_maintainer<'a, H>(
265 // TODO (@Techassi): Use a trait type here which can be used to build all part of the
266 // conversion webhook server and a CRD maintainer.
267 crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)> + Clone,
268 operator_service_name: &'a str,
269 operator_namespace: &'a str,
270 field_manager: &'a str,
271 disable_maintainer: bool,
272 client: Client,
273 ) -> Result<
274 (
275 Self,
276 CustomResourceDefinitionMaintainer<'a>,
277 oneshot::Receiver<()>,
278 ),
279 ConversionWebhookError,
280 >
281 where
282 H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
283 {
284 let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS;
285
286 // TODO (@Techassi): These should be moved into a builder
287 let webhook_options = ConversionWebhookOptions {
288 service_name: operator_service_name,
289 namespace: operator_namespace,
290 socket_addr,
291 };
292
293 let (conversion_webhook_server, certificate_rx) =
294 Self::new(crds_and_handlers.clone(), webhook_options).await?;
295
296 let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd);
297
298 // TODO (@Techassi): These should be moved into a builder
299 let maintainer_options = CustomResourceDefinitionMaintainerOptions {
300 webhook_https_port: socket_addr.port(),
301 disabled: disable_maintainer,
302 operator_service_name,
303 operator_namespace,
304 field_manager,
305 };
306
307 let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
308 client,
309 certificate_rx,
310 definitions,
311 maintainer_options,
312 );
313
314 Ok((conversion_webhook_server, maintainer, initial_reconcile_rx))
315 }
316
317 /// Runs the [`ConversionWebhookServer`] asynchronously.
318 pub async fn run(self) -> Result<(), ConversionWebhookError> {
319 tracing::info!("run conversion webhook server");
320 self.0.run().await.context(RunWebhookServerSnafu)
321 }
322}