stackable_webhook/servers/
conversion.rs1use std::{fmt::Debug, net::SocketAddr};
2
3use axum::{Json, Router, routing::post};
4use k8s_openapi::{
5 ByteString,
6 apiextensions_apiserver::pkg::apis::apiextensions::v1::{
7 CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
8 WebhookConversion,
9 },
10};
11pub use kube::core::conversion::ConversionReview;
16use kube::{
17 Api, Client, ResourceExt,
18 api::{Patch, PatchParams},
19};
20use snafu::{OptionExt, ResultExt, Snafu};
21use tokio::{sync::mpsc, try_join};
22use tracing::instrument;
23use x509_cert::{
24 Certificate,
25 der::{EncodePem, pem::LineEnding},
26};
27
28use crate::{
29 WebhookError, WebhookHandler, WebhookServer, constants::CONVERSION_WEBHOOK_HTTPS_PORT,
30 options::WebhookOptions,
31};
32
33#[derive(Debug, Snafu)]
34pub enum ConversionWebhookError {
35 #[snafu(display("failed to create webhook server"))]
36 CreateWebhookServer { source: WebhookError },
37
38 #[snafu(display("failed to run webhook server"))]
39 RunWebhookServer { source: WebhookError },
40
41 #[snafu(display("failed to receive certificate from channel"))]
42 ReceiveCertificateFromChannel,
43
44 #[snafu(display("failed to convert CA certificate into PEM format"))]
45 ConvertCaToPem { source: x509_cert::der::Error },
46
47 #[snafu(display("failed to reconcile CRDs"))]
48 ReconcileCrds {
49 #[snafu(source(from(ConversionWebhookError, Box::new)))]
50 source: Box<ConversionWebhookError>,
51 },
52
53 #[snafu(display("failed to update CRD {crd_name:?}"))]
54 UpdateCrd {
55 source: kube::Error,
56 crd_name: String,
57 },
58}
59
60impl<F> WebhookHandler<ConversionReview, ConversionReview> for F
61where
62 F: FnOnce(ConversionReview) -> ConversionReview,
63{
64 fn call(self, req: ConversionReview) -> ConversionReview {
65 self(req)
66 }
67}
68
69#[derive(Debug)]
71pub struct ConversionWebhookOptions {
72 pub socket_addr: SocketAddr,
74
75 pub namespace: String,
77
78 pub service_name: String,
80
81 pub maintain_crds: bool,
86
87 pub field_manager: String,
90}
91
92pub struct ConversionWebhookServer {
96 crds: Vec<CustomResourceDefinition>,
97 options: ConversionWebhookOptions,
98 router: Router,
99 client: Client,
100}
101
102impl ConversionWebhookServer {
103 #[instrument(
173 name = "create_conversion_webhook_server",
174 skip(crds_and_handlers, client)
175 )]
176 pub async fn new<H>(
177 crds_and_handlers: impl IntoIterator<Item = (CustomResourceDefinition, H)>,
178 options: ConversionWebhookOptions,
179 client: Client,
180 ) -> Result<Self, ConversionWebhookError>
181 where
182 H: WebhookHandler<ConversionReview, ConversionReview> + Clone + Send + Sync + 'static,
183 {
184 tracing::debug!("create new conversion webhook server");
185
186 let mut router = Router::new();
187 let mut crds = Vec::new();
188 for (crd, handler) in crds_and_handlers {
189 let crd_name = crd.name_any();
190 let handler_fn = |Json(review): Json<ConversionReview>| async {
191 let review = handler.call(review);
192 Json(review)
193 };
194
195 let route = format!("/convert/{crd_name}");
196 router = router.route(&route, post(handler_fn));
197 crds.push(crd);
198 }
199
200 Ok(Self {
201 options,
202 router,
203 client,
204 crds,
205 })
206 }
207
208 pub async fn run(self) -> Result<(), ConversionWebhookError> {
209 tracing::info!("starting conversion webhook server");
210
211 let Self {
212 options,
213 router,
214 client,
215 crds,
216 } = self;
217
218 let ConversionWebhookOptions {
219 socket_addr,
220 namespace: operator_namespace,
221 service_name: operator_service_name,
222 maintain_crds,
223 field_manager,
224 } = &options;
225
226 let subject_alterative_dns_name =
229 format!("{operator_service_name}.{operator_namespace}.svc",);
230
231 let webhook_options = WebhookOptions {
232 subject_alterative_dns_names: vec![subject_alterative_dns_name],
233 socket_addr: *socket_addr,
234 };
235
236 let (server, mut cert_rx) = WebhookServer::new(router, webhook_options)
237 .await
238 .context(CreateWebhookServerSnafu)?;
239
240 let current_cert = cert_rx
246 .recv()
247 .await
248 .context(ReceiveCertificateFromChannelSnafu)?;
249
250 if *maintain_crds {
251 Self::reconcile_crds(
252 &client,
253 field_manager,
254 &crds,
255 operator_namespace,
256 operator_service_name,
257 current_cert,
258 )
259 .await
260 .context(ReconcileCrdsSnafu)?;
261
262 try_join!(
263 Self::run_webhook_server(server),
264 Self::run_crd_reconciliation_loop(
265 cert_rx,
266 &client,
267 field_manager,
268 &crds,
269 operator_namespace,
270 operator_service_name,
271 ),
272 )?;
273 } else {
274 Self::run_webhook_server(server).await?;
275 };
276
277 Ok(())
278 }
279
280 async fn run_webhook_server(server: WebhookServer) -> Result<(), ConversionWebhookError> {
281 server.run().await.context(RunWebhookServerSnafu)
282 }
283
284 async fn run_crd_reconciliation_loop(
285 mut cert_rx: mpsc::Receiver<Certificate>,
286 client: &Client,
287 field_manager: &str,
288 crds: &[CustomResourceDefinition],
289 operator_namespace: &str,
290 operator_service_name: &str,
291 ) -> Result<(), ConversionWebhookError> {
292 while let Some(current_cert) = cert_rx.recv().await {
293 Self::reconcile_crds(
294 client,
295 field_manager,
296 crds,
297 operator_namespace,
298 operator_service_name,
299 current_cert,
300 )
301 .await
302 .context(ReconcileCrdsSnafu)?;
303 }
304 Ok(())
305 }
306
307 #[instrument(skip_all)]
308 async fn reconcile_crds(
309 client: &Client,
310 field_manager: &str,
311 crds: &[CustomResourceDefinition],
312 operator_namespace: &str,
313 operator_service_name: &str,
314 current_cert: Certificate,
315 ) -> Result<(), ConversionWebhookError> {
316 tracing::info!(
317 crds = ?crds.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
318 "Reconciling CRDs"
319 );
320 let ca_bundle = current_cert
321 .to_pem(LineEnding::LF)
322 .context(ConvertCaToPemSnafu)?;
323
324 let crd_api: Api<CustomResourceDefinition> = Api::all(client.clone());
325 for mut crd in crds.iter().cloned() {
326 let crd_name = crd.name_any();
327
328 crd.spec.conversion = Some(CustomResourceConversion {
329 strategy: "Webhook".to_string(),
330 webhook: Some(WebhookConversion {
331 conversion_review_versions: vec!["v1".to_string()],
335 client_config: Some(WebhookClientConfig {
336 service: Some(ServiceReference {
337 name: operator_service_name.to_owned(),
338 namespace: operator_namespace.to_owned(),
339 path: Some(format!("/convert/{crd_name}")),
340 port: Some(CONVERSION_WEBHOOK_HTTPS_PORT.into()),
341 }),
342 ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
343 url: None,
344 }),
345 }),
346 });
347
348 let patch = Patch::Apply(&crd);
349 let patch_params = PatchParams::apply(field_manager);
350 crd_api
351 .patch(&crd_name, &patch_params, &patch)
352 .await
353 .with_context(|_| UpdateCrdSnafu {
354 crd_name: crd_name.to_string(),
355 })?;
356 }
357 Ok(())
358 }
359}