Making the readiness probe non-blocking, and backport non-blocking liveness and readiness probes

Closes #22109

Co-authored-by: Martin Bartoš <mabartos@redhat.com>
This commit is contained in:
Alexander Schwartz 2023-07-29 13:17:01 +02:00 committed by Alexander Schwartz
parent 9674e85296
commit 6a78e36d25
8 changed files with 303 additions and 8 deletions

View file

@ -2,10 +2,19 @@ package org.keycloak.config;
public class HealthOptions {
public static final Option HEALTH_ENABLED = new OptionBuilder<>("health-enabled", Boolean.class)
public static final Option<Boolean> HEALTH_ENABLED = new OptionBuilder<>("health-enabled", Boolean.class)
.category(OptionCategory.HEALTH)
.description("If the server should expose health check endpoints. If enabled, health checks are available at the '/health', '/health/ready' and '/health/live' endpoints.")
.defaultValue(Boolean.FALSE)
.buildTime(true)
.build();
public static final Option<Boolean> HEALTH_CLASSIC_PROBES_ENABLED = new OptionBuilder<>("health-classic-probes-enabled", Boolean.class)
.category(OptionCategory.HEALTH)
.description("If enabled, use the original Quarkus blocking handlers for '/health/ready' and '/health/live' endpoints.")
.defaultValue(Boolean.FALSE)
.buildTime(true)
.hidden()
.build();
}

View file

@ -84,6 +84,8 @@ import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.provider.ProviderManager;
import org.keycloak.provider.Spi;
import org.keycloak.quarkus.runtime.integration.health.ReactiveLivenessHandler;
import org.keycloak.quarkus.runtime.integration.health.ReactiveReadinessHandler;
import org.keycloak.quarkus.runtime.Environment;
import org.keycloak.quarkus.runtime.KeycloakRecorder;
import org.keycloak.quarkus.runtime.configuration.Configuration;
@ -95,6 +97,7 @@ import org.keycloak.quarkus.runtime.configuration.mappers.PropertyMapper;
import org.keycloak.quarkus.runtime.configuration.mappers.PropertyMappers;
import org.keycloak.quarkus.runtime.integration.resteasy.KeycloakHandlerChainCustomizer;
import org.keycloak.quarkus.runtime.integration.web.NotFoundHandler;
import org.keycloak.quarkus.runtime.services.health.KeycloakReadyAsyncHealthCheck;
import org.keycloak.quarkus.runtime.services.health.KeycloakReadyHealthCheck;
import org.keycloak.quarkus.runtime.storage.database.jpa.NamedJpaConnectionProviderFactory;
import org.keycloak.quarkus.runtime.themes.FlatClasspathThemeResourceProviderFactory;
@ -604,15 +607,36 @@ class KeycloakProcessor {
if (healthDisabled) {
routes.produce(RouteBuildItem.builder().route(DEFAULT_HEALTH_ENDPOINT.concat("/*")).handler(new NotFoundHandler()).build());
} else {
// local solution until https://github.com/quarkusio/quarkus/issues/35099 is available in Quarkus
if (!isHealthClassicProbesEnabled()) {
routes.produce(RouteBuildItem.builder().route(DEFAULT_HEALTH_ENDPOINT.concat("/live")).handler(new ReactiveLivenessHandler()).build());
routes.produce(RouteBuildItem.builder().route(DEFAULT_HEALTH_ENDPOINT.concat("/ready")).handler(new ReactiveReadinessHandler()).build());
}
}
boolean metricsDisabled = !isMetricsEnabled();
if (healthDisabled || metricsDisabled) {
// disables the single check we provide which depends on metrics enabled
ClassInfo disabledBean = index.getIndex()
ClassInfo disabledBean1 = index.getIndex()
.getClassByName(DotName.createSimple(KeycloakReadyHealthCheck.class.getName()));
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean.asClass(), false));
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean1.asClass(), false));
ClassInfo disabledBean2 = index.getIndex()
.getClassByName(DotName.createSimple(KeycloakReadyAsyncHealthCheck.class.getName()));
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean2.asClass(), false));
} else {
if (isHealthClassicProbesEnabled()) {
// disable new async check
ClassInfo disabledBean2 = index.getIndex()
.getClassByName(DotName.createSimple(KeycloakReadyAsyncHealthCheck.class.getName()));
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean2.asClass(), false));
} else {
// disable old classic check
ClassInfo disabledBean1 = index.getIndex()
.getClassByName(DotName.createSimple(KeycloakReadyHealthCheck.class.getName()));
removeBeans.produce(new BuildTimeConditionBuildItem(disabledBean1.asClass(), false));
}
}
}
@ -851,6 +875,10 @@ class KeycloakProcessor {
return Configuration.getOptionalBooleanValue(NS_KEYCLOAK_PREFIX.concat("health-enabled")).orElse(false);
}
private boolean isHealthClassicProbesEnabled() {
return Configuration.getOptionalBooleanValue(NS_KEYCLOAK_PREFIX.concat("health-classic-probes-enabled")).orElse(false);
}
static JdbcDataSourceBuildItem getDefaultDataSource(List<JdbcDataSourceBuildItem> jdbcDataSources) {
for (JdbcDataSourceBuildItem jdbcDataSource : jdbcDataSources) {
if (jdbcDataSource.isDefault()) {

View file

@ -14,6 +14,8 @@ final class HealthPropertyMappers {
fromOption(HealthOptions.HEALTH_ENABLED)
.to("quarkus.health.extensions.enabled")
.paramLabel(Boolean.TRUE + "|" + Boolean.FALSE)
.build(),
fromOption(HealthOptions.HEALTH_CLASSIC_PROBES_ENABLED)
.build()
};
}

View file

@ -0,0 +1,57 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.quarkus.runtime.integration.health;
import io.quarkus.smallrye.health.runtime.SmallRyeLivenessHandler;
import io.smallrye.health.SmallRyeHealth;
import io.smallrye.health.SmallRyeHealthReporter;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;
/**
* This adds the possibility to have a non-blocking health handler in Quarkus.
* <p>
* Without a non-blocking health check, all liveness and readiness probes will enqueue in the worker thread pool. Under high load
* of if there is a lot of blocking IO happening (for example, during Keycloak cluster rebalancing), this leads to probes being queued.
* Queued probes would lead to timeouts unless the timeouts are configured to 10-20 seconds. Reactive probes avoid the enqueueing
* in the worker thread pool for all non-blocking probes, which will be the default for the (otherwise empty) liveness probe.
* For the readiness probe, this depends on the implementation of the specific readiness probes.
* <p>
* This is a workaround until <a href="https://github.com/quarkusio/quarkus/pull/35100">quarkusio/quarkus#35100</a> is available
* in a regular Quarkus version. Then these classes can be removed.
*
* @author Alexander Schwartz
*/
public abstract class ReactiveHealthHandler implements Handler<RoutingContext> {
@Override
public void handle(RoutingContext context) {
Uni<SmallRyeHealth> health = getHealth();
health.subscribe().with(smallRyeHealth -> {
new SmallRyeLivenessHandler() {
@Override
protected SmallRyeHealth getHealth(SmallRyeHealthReporter reporter, RoutingContext ctx) {
return smallRyeHealth;
}
}.handle(context);
});
}
protected abstract Uni<SmallRyeHealth> getHealth();
}

View file

@ -0,0 +1,35 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.quarkus.runtime.integration.health;
import io.quarkus.arc.Arc;
import io.smallrye.health.SmallRyeHealth;
import io.smallrye.health.SmallRyeHealthReporter;
import io.smallrye.mutiny.Uni;
/**
* @author Alexander Schwartz
*/
public class ReactiveLivenessHandler extends ReactiveHealthHandler {
@Override
protected Uni<SmallRyeHealth> getHealth() {
SmallRyeHealthReporter healthReporter = Arc.container().instance(SmallRyeHealthReporter.class).get();
return healthReporter.getLivenessAsync();
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.quarkus.runtime.integration.health;
import io.quarkus.arc.Arc;
import io.smallrye.health.SmallRyeHealth;
import io.smallrye.health.SmallRyeHealthReporter;
import io.smallrye.mutiny.Uni;
/**
* @author Alexander Schwartz
*/
public class ReactiveReadinessHandler extends ReactiveHealthHandler {
@Override
protected Uni<SmallRyeHealth> getHealth() {
SmallRyeHealthReporter healthReporter = Arc.container().instance(SmallRyeHealthReporter.class).get();
return healthReporter.getReadinessAsync();
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright 2020 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.quarkus.runtime.services.health;
import io.agroal.api.AgroalDataSource;
import io.quarkus.agroal.runtime.health.DataSourceHealthCheck;
import io.quarkus.smallrye.health.runtime.QuarkusAsyncHealthCheckFactory;
import io.smallrye.health.api.AsyncHealthCheck;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Readiness;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicReference;
/**
* Keycloak Healthcheck Readiness Probe.
* <p>
* Performs a hybrid between the passive and the active mode. If there are no healthy connections in the pool,
* it invokes the standard <code>DataSourceHealthCheck</code> that creates a new connection and checks if it's valid.
* <p>
* While the check for healthy connections is non-blocking, the standard check is blocking, so it needs to be wrapped.
* <p>
* When NON_BLOCKING_PROBES is no longer behind a feature flag but the only option, it will replace the
* {@link KeycloakReadyHealthCheck}.
*
* @see <a href="https://github.com/keycloak/keycloak-community/pull/55">Healthcheck API Design</a>
*/
@Readiness
@ApplicationScoped
public class KeycloakReadyAsyncHealthCheck implements AsyncHealthCheck {
/** As the DataSourceHealthCheck doesn't exist as an application scoped bean,
* create our own instance here which exposes the <code>init()</code> call for the delegate. */
MyDataSourceHealthCheck delegate;
private static class MyDataSourceHealthCheck extends DataSourceHealthCheck {
@Override
public void init() {
super.init();
}
}
@PostConstruct
protected void init() {
delegate = new MyDataSourceHealthCheck();
delegate.init();
}
/**
* Date formatter, the same as used by Quarkus. This enables users to quickly compare the date printed
* by the probe with the logs.
*/
static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS").withZone(ZoneId.systemDefault());
@Inject
AgroalDataSource agroalDataSource;
@Inject
QuarkusAsyncHealthCheckFactory healthCheckFactory;
AtomicReference<Instant> failingSince = new AtomicReference<>();
@Override
public Uni<HealthCheckResponse> call() {
HealthCheckResponseBuilder builder = HealthCheckResponse.named("Keycloak database connections async health check").up();
long activeCount = agroalDataSource.getMetrics().activeCount();
long invalidCount = agroalDataSource.getMetrics().invalidCount();
if (activeCount < 1 || invalidCount > 0) {
return healthCheckFactory.callSync(() -> {
HealthCheckResponse activeCheckResult = delegate.call();
if (activeCheckResult.getStatus() == HealthCheckResponse.Status.DOWN) {
builder.down();
Instant failingTime = failingSince.updateAndGet(this::createInstanceIfNeeded);
builder.withData("Failing since", DATE_FORMATTER.format(failingTime));
}
return builder.build();
});
} else {
failingSince.set(null);
return healthCheckFactory.callAsync(() -> Uni.createFrom().item(builder.build()));
}
}
Instant createInstanceIfNeeded(Instant instant) {
if (instant == null) {
return Instant.now();
}
return instant;
}
}

View file

@ -23,15 +23,11 @@ import org.keycloak.it.junit5.extension.DistributionTest;
import org.keycloak.it.utils.KeycloakDistribution;
import static io.restassured.RestAssured.when;
import static org.hamcrest.CoreMatchers.equalTo;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@DistributionTest(keepAlive =true)
public class HealthDistTest {
@ -67,6 +63,28 @@ public class HealthDistTest {
.statusCode(404);
}
@Test
@Launch({ "start-dev", "--health-enabled=true --metrics-enabled=true" })
void testNonBlockingProbes() {
when().get("/health/live").then()
.statusCode(200);
when().get("/health/ready").then()
.statusCode(200)
.body("checks[0].name", equalTo("Keycloak database connections async health check"))
.body("checks.size()", equalTo(1));
}
@Test
@Launch({ "start-dev", "--health-enabled=true --metrics-enabled=true --health-classic-probes-enabled=true" })
void testBlockingProbes() {
when().get("/health/live").then()
.statusCode(200);
when().get("/health/ready").then()
.statusCode(200)
.body("checks[0].name", equalTo("Keycloak database connections health check"))
.body("checks.size()", equalTo(1));
}
@Test
void testUsingRelativePath(KeycloakDistribution distribution) {
for (String relativePath : List.of("/auth", "/auth/", "auth")) {