Changing Quarkus transaction handling for JPA map storage to JTA

This has been recommended as the supported way of transaction handling by the Quarkus team.
Adding handling of exceptions thrown when committing JTA.
Re-adding handling of exceptions when interacting with the entity manager, plus wrapping access to queries to map exceptions during auto-flushing.

Closes #13222
This commit is contained in:
Alexander Schwartz 2022-12-06 15:06:10 +01:00 committed by Pedro Igor
parent 1073a342cf
commit e4804de9e3
12 changed files with 185 additions and 58 deletions

View file

@ -16,6 +16,8 @@
*/
package org.keycloak.connections.jpa;
import org.keycloak.common.Profile;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ExceptionConverter;
import javax.persistence.PersistenceException;
@ -24,7 +26,7 @@ import javax.persistence.PersistenceException;
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
*/
public class JpaExceptionConverter implements ExceptionConverter {
public class JpaExceptionConverter implements ExceptionConverter, EnvironmentDependentProviderFactory {
@Override
public Throwable convert(Throwable e) {
if (!(e instanceof PersistenceException)) return null;
@ -35,4 +37,9 @@ public class JpaExceptionConverter implements ExceptionConverter {
public String getId() {
return "jpa";
}
@Override
public boolean isSupported() {
return !Profile.isFeatureEnabled(Profile.Feature.MAP_STORAGE);
}
}

View file

@ -26,6 +26,7 @@ import java.util.stream.Stream;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import javax.persistence.Parameter;
import javax.persistence.PersistenceException;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaDelete;
@ -194,12 +195,17 @@ public abstract class JpaMapKeycloakTransaction<RE extends JpaRootEntity, E exte
emQuery = emQuery.setLockMode(LockModeType.PESSIMISTIC_WRITE);
}
// In order to cache the result, the full result needs to be retrieved.
// There is also no difference to that in Hibernate, as Hibernate will first retrieve all elements from the ResultSet.
List<RE> resultList = emQuery.getResultList();
cache.put(queryCacheKey, resultList);
try {
// In order to cache the result, the full result needs to be retrieved.
// There is also no difference to that in Hibernate, as Hibernate will first retrieve all elements from the ResultSet.
List<RE> resultList = emQuery.getResultList();
cache.put(queryCacheKey, resultList);
return closing(resultList.stream()).map(this::mapToEntityDelegateUnique);
return closing(resultList.stream()).map(this::mapToEntityDelegateUnique);
} catch (PersistenceException pe) {
// handle exception that could occur on auto-flush when the query is executed
throw PersistenceExceptionConverter.convert(pe.getCause() != null ? pe.getCause() : pe);
}
}
private Map<QueryCacheKey, List<RE>> getQueryCache() {
@ -238,7 +244,12 @@ public abstract class JpaMapKeycloakTransaction<RE extends JpaRootEntity, E exte
JpaPredicateFunction<RE> predicateFunc = mcb.getPredicateFunc();
if (predicateFunc != null) countQuery.where(predicateFunc.apply(cb, countQuery::subquery, root));
return em.createQuery(countQuery).getSingleResult();
try {
return em.createQuery(countQuery).getSingleResult();
} catch (PersistenceException pe) {
// handle exception that could occur on auto-flush when the query is executed
throw PersistenceExceptionConverter.convert(pe.getCause() != null ? pe.getCause() : pe);
}
}
@Override
@ -285,7 +296,12 @@ public abstract class JpaMapKeycloakTransaction<RE extends JpaRootEntity, E exte
JpaPredicateFunction<RE> predicateFunc = mcb.getPredicateFunc();
if (predicateFunc != null) deleteQuery.where(predicateFunc.apply(cb, deleteQuery::subquery, root));
return em.createQuery(deleteQuery).executeUpdate() + removed[0];
try {
return em.createQuery(deleteQuery).executeUpdate() + removed[0];
} catch (PersistenceException pe) {
// handle exception that could occur on auto-flush when the query is executed
throw PersistenceExceptionConverter.convert(pe.getCause() != null ? pe.getCause() : pe);
}
}
private MapModelCriteriaBuilder<String, E, M> createCriteriaBuilderMap() {

View file

@ -32,12 +32,14 @@ public class JpaMapStorageProvider implements MapStorageProvider {
private final KeycloakSession session;
private final EntityManager em;
private final String sessionTxKey;
private final boolean jtaEnabled;
public JpaMapStorageProvider(JpaMapStorageProviderFactory factory, KeycloakSession session, EntityManager em, String sessionTxKey) {
public JpaMapStorageProvider(JpaMapStorageProviderFactory factory, KeycloakSession session, EntityManager em, String sessionTxKey, boolean jtaEnabled) {
this.factory = factory;
this.session = session;
this.em = em;
this.sessionTxKey = sessionTxKey;
this.jtaEnabled = jtaEnabled;
}
@Override
@ -50,8 +52,9 @@ public class JpaMapStorageProvider implements MapStorageProvider {
public <V extends AbstractEntity, M> MapStorage<V, M> getStorage(Class<M> modelType, Flag... flags) {
// validate and update the schema for the storage.
this.factory.validateAndUpdateSchema(this.session, modelType);
// create the JPA transaction and enlist it if needed.
if (session.getAttribute(this.sessionTxKey) == null) {
// Create the JPA transaction and enlist it if needed.
// Don't enlist if JTA is enabled as it has been enlisted with JTA automatically.
if (!jtaEnabled && session.getAttribute(this.sessionTxKey) == null) {
KeycloakTransaction jpaTransaction = new JpaTransactionWrapper(em.getTransaction());
session.getTransactionManager().enlist(jpaTransaction);
session.setAttribute(this.sessionTxKey, jpaTransaction);

View file

@ -35,9 +35,18 @@ import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.spi.PersistenceUnitTransactionType;
import javax.sql.DataSource;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.internal.SessionImpl;
import org.hibernate.jpa.boot.internal.ParsedPersistenceXmlDescriptor;
import org.hibernate.jpa.boot.internal.PersistenceXmlParser;
import org.hibernate.jpa.boot.spi.Bootstrap;
@ -54,6 +63,7 @@ import org.keycloak.common.util.StringPropertyReplacer;
import org.keycloak.component.AmphibianProviderFactory;
import org.keycloak.events.Event;
import org.keycloak.events.admin.AdminEvent;
import org.keycloak.models.ModelException;
import org.keycloak.models.SingleUseObjectValueModel;
import org.keycloak.models.ClientModel;
import org.keycloak.models.ClientScopeModel;
@ -139,6 +149,7 @@ import org.keycloak.models.map.user.MapUserCredentialEntityImpl;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.sessions.RootAuthenticationSessionModel;
import org.keycloak.transaction.JtaTransactionManagerLookup;
public class JpaMapStorageProviderFactory implements
AmphibianProviderFactory<MapStorageProvider>,
@ -244,6 +255,9 @@ public class JpaMapStorageProviderFactory implements
MODEL_TO_TX.put(UserSessionModel.class, JpaUserSessionMapKeycloakTransaction::new);
}
private boolean jtaEnabled;
private JtaTransactionManagerLookup jtaLookup;
public JpaMapStorageProviderFactory() {
int index = ENUMERATOR.getAndIncrement();
this.sessionProviderKey = PROVIDER_ID + "-" + index;
@ -260,7 +274,7 @@ public class JpaMapStorageProviderFactory implements
// check the session for a cached provider before creating a new one.
JpaMapStorageProvider provider = session.getAttribute(this.sessionProviderKey, JpaMapStorageProvider.class);
if (provider == null) {
provider = new JpaMapStorageProvider(this, session, getEntityManager(), this.sessionTxKey);
provider = new JpaMapStorageProvider(this, session, PersistenceExceptionConverter.create(session, getEntityManager()), this.sessionTxKey, this.jtaEnabled);
session.setAttribute(this.sessionProviderKey, provider);
}
return provider;
@ -277,6 +291,8 @@ public class JpaMapStorageProviderFactory implements
@Override
public void postInit(KeycloakSessionFactory factory) {
jtaLookup = (JtaTransactionManagerLookup) factory.getProviderFactory(JtaTransactionManagerLookup.class);
jtaEnabled = jtaLookup != null && jtaLookup.getTransactionManager() != null;
}
@Override
@ -308,6 +324,20 @@ public class JpaMapStorageProviderFactory implements
if (emf == null) {
this.emf = createEntityManagerFactory();
JpaMapUtils.addSpecificNamedQueries(emf);
// consistency check for transaction handling, as this would lead to data-inconsistencies as changes wouldn't commit when expected
if (jtaEnabled && !this.emf.getProperties().get(AvailableSettings.JPA_TRANSACTION_TYPE).equals(PersistenceUnitTransactionType.JTA.name())) {
throw new ModelException("Consistency check failed: If Keycloak is run with JTA, the Entity Manager for JPA map storage should be run with JTA as well.");
}
// consistency check for auto-commit, as this would lead to data-inconsistencies as changes wouldn't roll back when expected
EntityManager em = getEntityManager();
em.unwrap(SessionImpl.class).doWork(connection -> {
if (connection.getAutoCommit()) {
throw new ModelException("The database connection must not use auto-commit. For Quarkus, auto-commit was off once JTA was enabled for the EntityManager.");
}
});
em.close();
}
}
}
@ -376,22 +406,52 @@ public class JpaMapStorageProviderFactory implements
if (!this.validatedModels.contains(modelType)) {
synchronized (SYNC_MODELS.computeIfAbsent(modelType, mc -> new Object())) {
if (!this.validatedModels.contains(modelType)) {
Connection connection = getConnection();
Transaction suspended = null;
try {
if (logger.isDebugEnabled()) printOperationalInfo(connection);
MapJpaUpdaterProvider updater = session.getProvider(MapJpaUpdaterProvider.class);
MapJpaUpdaterProvider.Status status = updater.validate(modelType, connection, config.get("schema"));
if (!status.equals(VALID)) {
update(modelType, connection, session);
if (jtaEnabled) {
suspended = jtaLookup.getTransactionManager().suspend();
jtaLookup.getTransactionManager().begin();
}
} finally {
if (connection != null) {
Connection connection = getConnection();
try {
if (logger.isDebugEnabled()) printOperationalInfo(connection);
MapJpaUpdaterProvider updater = session.getProvider(MapJpaUpdaterProvider.class);
MapJpaUpdaterProvider.Status status = updater.validate(modelType, connection, config.get("schema"));
if (!status.equals(VALID)) {
update(modelType, connection, session);
}
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.warn("Can't close connection", e);
}
}
}
if (jtaEnabled) {
jtaLookup.getTransactionManager().commit();
}
} catch (SystemException | NotSupportedException | RollbackException | HeuristicMixedException |
HeuristicRollbackException e) {
if (jtaEnabled) {
try {
connection.close();
} catch (SQLException e) {
logger.warn("Can't close connection", e);
jtaLookup.getTransactionManager().rollback();
} catch (SystemException ex) {
logger.error("Unable to roll back JTA transaction, e");
}
}
throw new RuntimeException(e);
} finally {
if (suspended != null) {
try {
jtaLookup.getTransactionManager().resume(suspended);
} catch (InvalidTransactionException | SystemException e) {
throw new RuntimeException(e);
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.map.storage.jpa.hibernate;
import org.keycloak.common.Profile;
import org.keycloak.models.map.storage.jpa.PersistenceExceptionConverter;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.provider.ExceptionConverter;
import javax.persistence.PersistenceException;
/**
* This is needed for example by <code>org.keycloak.transaction.JtaTransactionWrapper</code> to map an exception
* that occurs on commit.
*
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @author Alexander Schwartz
*/
public class JpaMapExceptionConverter implements ExceptionConverter, EnvironmentDependentProviderFactory {
@Override
public Throwable convert(Throwable e) {
if (!(e instanceof PersistenceException)) return null;
return PersistenceExceptionConverter.convert(e.getCause() != null ? e.getCause() : e);
}
@Override
public String getId() {
return "jpa-map";
}
@Override
public boolean isSupported() {
return Profile.isFeatureEnabled(Profile.Feature.MAP_STORAGE);
}
}

View file

@ -18,6 +18,7 @@
package org.keycloak.models.map.storage.jpa.liquibase.connection;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -67,7 +68,10 @@ public class DefaultLiquibaseConnectionProvider implements MapLiquibaseConnectio
String scopeId = enterLiquibaseScope();
try {
Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnectionFromPool(connection));
// This acts on the unwrapped database connection as Liquibase will commit and rollback the transaction as needed.
// Otherwise, the connection will not recover from an SQL error when running for example on a PostgreSQL database.
// This was needed when adding support for JTA
Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnectionFromPool(connection.unwrap(Connection.class)));
if (defaultSchema != null) {
database.setDefaultSchemaName(defaultSchema);
}
@ -88,6 +92,9 @@ public class DefaultLiquibaseConnectionProvider implements MapLiquibaseConnectio
// If it returns the Liquibase object, the scope will be closed once the Liquibase object is being closed.
exitLiquibaseScope(scopeId);
throw ex;
} catch (SQLException e) {
exitLiquibaseScope(scopeId);
throw new LiquibaseException(e);
}
}

View file

@ -181,13 +181,15 @@ public class MapJpaLiquibaseUpdaterProvider implements MapJpaUpdaterProvider {
if (modelName.equals("auth-events") || modelName.equals("admin-events"))
modelName = "events";
Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnectionFromPool(connection));
try {
// This acts on the unwrapped database connection as Liquibase will commit and rollback the transaction as needed.
// Otherwise, the connection will not recover from an SQL error when running for example on a PostgreSQL database.
// This was needed when adding support for JTA
try (Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnectionFromPool(connection.unwrap(Connection.class)))) {
// if the database is cockroachdb, use the aggregate changelog (see GHI #11230).
String changelog = database instanceof CockroachDatabase ? "META-INF/jpa-aggregate-changelog.xml" : "META-INF/jpa-" + modelName + "-changelog.xml";
return liquibaseProvider.getLiquibaseForCustomUpdate(connection, defaultSchema, changelog, this.getClass().getClassLoader(), "databasechangelog");
} finally {
database.close();
} catch (SQLException e) {
throw new LiquibaseException(e);
}
}

View file

@ -0,0 +1 @@
org.keycloak.models.map.storage.jpa.hibernate.JpaMapExceptionConverter

View file

@ -39,7 +39,8 @@ public class TransactionPropertyMappers {
ConfigValue storage = context.proceed(NS_KEYCLOAK_PREFIX.concat(STORAGE.getKey()));
if (storage != null && StorageOptions.StorageType.jpa.name().equals(storage.getValue())) {
isJtaEnabled = false;
isJtaEnabled = true;
isXaEnabled = false;
}
if (!isJtaEnabled) {

View file

@ -50,24 +50,6 @@ public class QuarkusJpaMapStorageProviderFactory extends JpaMapStorageProviderFa
return getEntityManagerFactory("keycloak-default").orElseThrow(() -> new IllegalStateException("Failed to resolve the default entity manager factory"));
}
@Override
protected EntityManager getEntityManager() {
EntityManager em = super.getEntityManager();
em.unwrap(SessionImpl.class).doWork(connection -> {
// In the Undertow setup, Hibernate sets the connection to non-autocommit, and in the Quarkus setup the XA transaction manager does this.
// For the Quarkus setup without a XA transaction manager, we didn't find a way to have this setup automatically.
// There is also no known option to configure this in the Agroal DB connection pool in a Quarkus setup:
// While the connection pool supports it, it hasn't been exposed as a Quarkus configuration option.
// At the same time, disabling autocommit is essential to keep the transactional boundaries of the application.
// The failure we've seen is the failed unique constraints that are usually deferred (for example, for client attributes).
// A follow-up issue to track this is here: https://github.com/keycloak/keycloak/issues/13222
if (connection.getAutoCommit()) {
connection.setAutoCommit(false);
}
});
return em;
}
@Override
protected Connection getConnection() {
SessionFactoryImpl entityManagerFactory = getEntityManagerFactory().unwrap(SessionFactoryImpl.class);

View file

@ -22,12 +22,10 @@ import javax.transaction.TransactionManager;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.Profile;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.provider.EnvironmentDependentProviderFactory;
import org.keycloak.transaction.JtaTransactionManagerLookup;
public class QuarkusJtaTransactionManagerLookup implements JtaTransactionManagerLookup, EnvironmentDependentProviderFactory {
public class QuarkusJtaTransactionManagerLookup implements JtaTransactionManagerLookup {
private static final Logger logger = Logger.getLogger(QuarkusJtaTransactionManagerLookup.class);
@ -67,9 +65,4 @@ public class QuarkusJtaTransactionManagerLookup implements JtaTransactionManager
public int order() {
return 100;
}
@Override
public boolean isSupported() {
return !Profile.isFeatureEnabled(Profile.Feature.MAP_STORAGE);
}
}

View file

@ -356,13 +356,19 @@ public final class KeycloakModelUtils {
* @param exception the exception to be checked.
* @return {@code true} if the exception is retriable; {@code false} otherwise.
*/
public static boolean isExceptionRetriable(final Exception exception) {
public static boolean isExceptionRetriable(final Throwable exception) {
Objects.requireNonNull(exception);
// first find the root cause and check if it is a SQLException
Throwable rootCause = exception;
while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
rootCause = rootCause.getCause();
}
// JTA transaction handler might add multiple suppressed exceptions to the root cause, evaluate each of those
for (Throwable suppressed : rootCause.getSuppressed()) {
if (isExceptionRetriable(suppressed)) {
return true;
}
};
if (rootCause instanceof SQLException) {
// check if the exception state is a recoverable one (40001)
return "40001".equals(((SQLException) rootCause).getSQLState());