KEYCLOAK-2529 Concurrent startup by more cluster nodes at the same time. Added DBLockProvider

This commit is contained in:
mposolda 2016-03-02 12:04:10 +01:00
parent ad63b18781
commit 8da768a514
37 changed files with 1755 additions and 244 deletions

View file

@ -57,6 +57,29 @@
database. This can be a relational database or Mongo. To make sure your database doesn't become a single
point of failure you may also want to deploy your database to a cluster.
</para>
<section>
<title>DB lock</title>
<para>Note that Keycloak supports concurrent startup by more cluster nodes at the same. This is ensured by DB lock, which prevents that some
startup actions (migrating database from previous version, importing realms at startup, initial bootstrap of admin user) are always executed just by one
cluster node at a time and other cluster nodes need to wait until the current node finishes startup actions and release the DB lock.
</para>
<para>
By default, the maximum timeout for lock is 900 seconds, so in case that second node is not able to acquire the lock within 900 seconds, it fails to start.
The lock checking is done every 2 seconds by default. Typically you won't need to increase/decrease the default value, but just in case
it's possible to configure it in <literal>standalone/configuration/keycloak-server.json</literal>:
<programlisting>
<![CDATA[
"dblock": {
"jpa": {
"lockWaitTimeout": 900,
"lockRecheckTime": 2
}
}
]]>
</programlisting>
or similarly if you're using Mongo (just by replace <literal>jpa</literal> with <literal>mongo</literal>)
</para>
</section>
</section>
<section>

View file

@ -126,7 +126,7 @@ public class DefaultJpaConnectionProviderFactory implements JpaConnectionProvide
properties.put("hibernate.dialect", driverDialect);
}
String schema = config.get("schema");
String schema = getSchema();
if (schema != null) {
properties.put(JpaUtils.HIBERNATE_DEFAULT_SCHEMA, schema);
}
@ -167,7 +167,7 @@ public class DefaultJpaConnectionProviderFactory implements JpaConnectionProvide
}
if (currentVersion == null || !JpaUpdaterProvider.LAST_VERSION.equals(currentVersion)) {
updater.update(session, connection, schema);
updater.update(connection, schema);
} else {
logger.debug("Database is up to date");
}
@ -212,7 +212,8 @@ public class DefaultJpaConnectionProviderFactory implements JpaConnectionProvide
}
}
private Connection getConnection() {
@Override
public Connection getConnection() {
try {
String dataSourceLookup = config.get("dataSource");
if (dataSourceLookup != null) {
@ -226,6 +227,11 @@ public class DefaultJpaConnectionProviderFactory implements JpaConnectionProvide
throw new RuntimeException("Failed to connect to database", e);
}
}
@Override
public String getSchema() {
return config.get("schema");
}
@Override
public Map<String,String> getOperationalInfo() {

View file

@ -17,6 +17,8 @@
package org.keycloak.connections.jpa;
import java.sql.Connection;
import org.keycloak.provider.ProviderFactory;
/**
@ -24,4 +26,9 @@ import org.keycloak.provider.ProviderFactory;
*/
public interface JpaConnectionProviderFactory extends ProviderFactory<JpaConnectionProvider> {
// Caller is responsible for closing connection
Connection getConnection();
String getSchema();
}

View file

@ -17,7 +17,6 @@
package org.keycloak.connections.jpa.updater;
import org.keycloak.models.KeycloakSession;
import org.keycloak.provider.Provider;
import java.sql.Connection;
@ -33,7 +32,7 @@ public interface JpaUpdaterProvider extends Provider {
public String getCurrentVersionSql(String defaultSchema);
public void update(KeycloakSession session, Connection connection, String defaultSchema);
public void update(Connection connection, String defaultSchema);
public void validate(Connection connection, String defaultSchema);

View file

@ -20,18 +20,10 @@ package org.keycloak.connections.jpa.updater.liquibase;
import liquibase.Contexts;
import liquibase.Liquibase;
import liquibase.changelog.ChangeSet;
import liquibase.changelog.DatabaseChangeLog;
import liquibase.changelog.RanChangeSet;
import liquibase.database.Database;
import liquibase.database.DatabaseFactory;
import liquibase.database.core.DB2Database;
import liquibase.database.jvm.JdbcConnection;
import liquibase.logging.LogFactory;
import liquibase.logging.LogLevel;
import liquibase.resource.ClassLoaderResourceAccessor;
import liquibase.servicelocator.ServiceLocator;
import org.jboss.logging.Logger;
import org.keycloak.connections.jpa.updater.JpaUpdaterProvider;
import org.keycloak.connections.jpa.updater.liquibase.conn.LiquibaseConnectionProvider;
import org.keycloak.models.KeycloakSession;
import java.sql.Connection;
@ -46,8 +38,14 @@ public class LiquibaseJpaUpdaterProvider implements JpaUpdaterProvider {
private static final Logger logger = Logger.getLogger(LiquibaseJpaUpdaterProvider.class);
private static final String CHANGELOG = "META-INF/jpa-changelog-master.xml";
private static final String DB2_CHANGELOG = "META-INF/db2-jpa-changelog-master.xml";
public static final String CHANGELOG = "META-INF/jpa-changelog-master.xml";
public static final String DB2_CHANGELOG = "META-INF/db2-jpa-changelog-master.xml";
private final KeycloakSession session;
public LiquibaseJpaUpdaterProvider(KeycloakSession session) {
this.session = session;
}
@Override
public String getCurrentVersionSql(String defaultSchema) {
@ -55,7 +53,7 @@ public class LiquibaseJpaUpdaterProvider implements JpaUpdaterProvider {
}
@Override
public void update(KeycloakSession session, Connection connection, String defaultSchema) {
public void update(Connection connection, String defaultSchema) {
logger.debug("Starting database update");
// Need ThreadLocal as liquibase doesn't seem to have API to inject custom objects into tasks
@ -108,145 +106,14 @@ public class LiquibaseJpaUpdaterProvider implements JpaUpdaterProvider {
}
private Liquibase getLiquibase(Connection connection, String defaultSchema) throws Exception {
ServiceLocator sl = ServiceLocator.getInstance();
if (!System.getProperties().containsKey("liquibase.scan.packages")) {
if (sl.getPackages().remove("liquibase.core")) {
sl.addPackageToScan("liquibase.core.xml");
}
if (sl.getPackages().remove("liquibase.parser")) {
sl.addPackageToScan("liquibase.parser.core.xml");
}
if (sl.getPackages().remove("liquibase.serializer")) {
sl.addPackageToScan("liquibase.serializer.core.xml");
}
sl.getPackages().remove("liquibase.ext");
sl.getPackages().remove("liquibase.sdk");
}
LogFactory.setInstance(new LogWrapper());
// Adding PostgresPlus support to liquibase
DatabaseFactory.getInstance().register(new PostgresPlusDatabase());
Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(connection));
if (defaultSchema != null) {
database.setDefaultSchemaName(defaultSchema);
}
String changelog = (database instanceof DB2Database) ? DB2_CHANGELOG : CHANGELOG;
logger.debugf("Using changelog file: %s", changelog);
return new Liquibase(changelog, new ClassLoaderResourceAccessor(getClass().getClassLoader()), database);
LiquibaseConnectionProvider liquibaseProvider = session.getProvider(LiquibaseConnectionProvider.class);
return liquibaseProvider.getLiquibase(connection, defaultSchema);
}
@Override
public void close() {
}
private static class LogWrapper extends LogFactory {
private liquibase.logging.Logger logger = new liquibase.logging.Logger() {
@Override
public void setName(String name) {
}
@Override
public void setLogLevel(String level) {
}
@Override
public void setLogLevel(LogLevel level) {
}
@Override
public void setLogLevel(String logLevel, String logFile) {
}
@Override
public void severe(String message) {
LiquibaseJpaUpdaterProvider.logger.error(message);
}
@Override
public void severe(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.error(message, e);
}
@Override
public void warning(String message) {
// Ignore this warning as cascaded drops doesn't work anyway with all DBs, which we need to support
if ("Database does not support drop with cascade".equals(message)) {
LiquibaseJpaUpdaterProvider.logger.debug(message);
} else {
LiquibaseJpaUpdaterProvider.logger.warn(message);
}
}
@Override
public void warning(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.warn(message, e);
}
@Override
public void info(String message) {
LiquibaseJpaUpdaterProvider.logger.debug(message);
}
@Override
public void info(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.debug(message, e);
}
@Override
public void debug(String message) {
LiquibaseJpaUpdaterProvider.logger.trace(message);
}
@Override
public LogLevel getLogLevel() {
if (LiquibaseJpaUpdaterProvider.logger.isTraceEnabled()) {
return LogLevel.DEBUG;
} else if (LiquibaseJpaUpdaterProvider.logger.isDebugEnabled()) {
return LogLevel.INFO;
} else {
return LogLevel.WARNING;
}
}
@Override
public void debug(String message, Throwable e) {
LiquibaseJpaUpdaterProvider.logger.trace(message, e);
}
@Override
public void setChangeLog(DatabaseChangeLog databaseChangeLog) {
}
@Override
public void setChangeSet(ChangeSet changeSet) {
}
@Override
public int getPriority() {
return 0;
}
};
@Override
public liquibase.logging.Logger getLog(String name) {
return logger;
}
@Override
public liquibase.logging.Logger getLog() {
return logger;
}
}
public static String getTable(String table, String defaultSchema) {
return defaultSchema != null ? defaultSchema + "." + table : table;
}

View file

@ -30,7 +30,7 @@ public class LiquibaseJpaUpdaterProviderFactory implements JpaUpdaterProviderFac
@Override
public JpaUpdaterProvider create(KeycloakSession session) {
return new LiquibaseJpaUpdaterProvider();
return new LiquibaseJpaUpdaterProvider(session);
}
@Override

View file

@ -0,0 +1,235 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.conn;
import java.sql.Connection;
import liquibase.Liquibase;
import liquibase.changelog.ChangeSet;
import liquibase.changelog.DatabaseChangeLog;
import liquibase.database.Database;
import liquibase.database.DatabaseFactory;
import liquibase.database.core.DB2Database;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.LiquibaseException;
import liquibase.lockservice.LockService;
import liquibase.lockservice.LockServiceFactory;
import liquibase.logging.LogFactory;
import liquibase.logging.LogLevel;
import liquibase.resource.ClassLoaderResourceAccessor;
import liquibase.servicelocator.ServiceLocator;
import liquibase.sqlgenerator.SqlGeneratorFactory;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.connections.jpa.updater.liquibase.LiquibaseJpaUpdaterProvider;
import org.keycloak.connections.jpa.updater.liquibase.PostgresPlusDatabase;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomInsertLockRecordGenerator;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomLockService;
import org.keycloak.connections.jpa.updater.liquibase.lock.DummyLockService;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class DefaultLiquibaseConnectionProvider implements LiquibaseConnectionProviderFactory, LiquibaseConnectionProvider {
private static final Logger logger = Logger.getLogger(DefaultLiquibaseConnectionProvider.class);
private volatile boolean initialized = false;
@Override
public LiquibaseConnectionProvider create(KeycloakSession session) {
if (!initialized) {
synchronized (this) {
if (!initialized) {
baseLiquibaseInitialization();
initialized = true;
}
}
}
return this;
}
protected void baseLiquibaseInitialization() {
ServiceLocator sl = ServiceLocator.getInstance();
if (!System.getProperties().containsKey("liquibase.scan.packages")) {
if (sl.getPackages().remove("liquibase.core")) {
sl.addPackageToScan("liquibase.core.xml");
}
if (sl.getPackages().remove("liquibase.parser")) {
sl.addPackageToScan("liquibase.parser.core.xml");
}
if (sl.getPackages().remove("liquibase.serializer")) {
sl.addPackageToScan("liquibase.serializer.core.xml");
}
sl.getPackages().remove("liquibase.ext");
sl.getPackages().remove("liquibase.sdk");
}
LogFactory.setInstance(new LogWrapper());
// Adding PostgresPlus support to liquibase
DatabaseFactory.getInstance().register(new PostgresPlusDatabase());
// Change command for creating lock and drop DELETE lock record from it
SqlGeneratorFactory.getInstance().register(new CustomInsertLockRecordGenerator());
// We wrap liquibase update in CustomLockService provided by DBLockProvider. No need to lock inside liquibase itself.
LockServiceFactory.getInstance().register(new DummyLockService());
}
@Override
public void init(Config.Scope config) {
}
@Override
public void postInit(KeycloakSessionFactory factory) {
}
@Override
public void close() {
}
@Override
public String getId() {
return "default";
}
@Override
public Liquibase getLiquibase(Connection connection, String defaultSchema) throws LiquibaseException {
Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(new JdbcConnection(connection));
if (defaultSchema != null) {
database.setDefaultSchemaName(defaultSchema);
}
String changelog = (database instanceof DB2Database) ? LiquibaseJpaUpdaterProvider.DB2_CHANGELOG : LiquibaseJpaUpdaterProvider.CHANGELOG;
logger.debugf("Using changelog file: %s", changelog);
return new Liquibase(changelog, new ClassLoaderResourceAccessor(getClass().getClassLoader()), database);
}
private static class LogWrapper extends LogFactory {
private liquibase.logging.Logger logger = new liquibase.logging.Logger() {
@Override
public void setName(String name) {
}
@Override
public void setLogLevel(String level) {
}
@Override
public void setLogLevel(LogLevel level) {
}
@Override
public void setLogLevel(String logLevel, String logFile) {
}
@Override
public void severe(String message) {
DefaultLiquibaseConnectionProvider.logger.error(message);
}
@Override
public void severe(String message, Throwable e) {
DefaultLiquibaseConnectionProvider.logger.error(message, e);
}
@Override
public void warning(String message) {
// Ignore this warning as cascaded drops doesn't work anyway with all DBs, which we need to support
if ("Database does not support drop with cascade".equals(message)) {
DefaultLiquibaseConnectionProvider.logger.debug(message);
} else {
DefaultLiquibaseConnectionProvider.logger.warn(message);
}
}
@Override
public void warning(String message, Throwable e) {
DefaultLiquibaseConnectionProvider.logger.warn(message, e);
}
@Override
public void info(String message) {
DefaultLiquibaseConnectionProvider.logger.debug(message);
}
@Override
public void info(String message, Throwable e) {
DefaultLiquibaseConnectionProvider.logger.debug(message, e);
}
@Override
public void debug(String message) {
if (DefaultLiquibaseConnectionProvider.logger.isTraceEnabled()) {
DefaultLiquibaseConnectionProvider.logger.trace(message);
}
}
@Override
public LogLevel getLogLevel() {
if (DefaultLiquibaseConnectionProvider.logger.isTraceEnabled()) {
return LogLevel.DEBUG;
} else if (DefaultLiquibaseConnectionProvider.logger.isDebugEnabled()) {
return LogLevel.INFO;
} else {
return LogLevel.WARNING;
}
}
@Override
public void debug(String message, Throwable e) {
DefaultLiquibaseConnectionProvider.logger.trace(message, e);
}
@Override
public void setChangeLog(DatabaseChangeLog databaseChangeLog) {
}
@Override
public void setChangeSet(ChangeSet changeSet) {
}
@Override
public int getPriority() {
return 0;
}
};
@Override
public liquibase.logging.Logger getLog(String name) {
return logger;
}
@Override
public liquibase.logging.Logger getLog() {
return logger;
}
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.conn;
import java.sql.Connection;
import liquibase.Liquibase;
import liquibase.exception.LiquibaseException;
import org.keycloak.provider.Provider;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface LiquibaseConnectionProvider extends Provider {
Liquibase getLiquibase(Connection connection, String defaultSchema) throws LiquibaseException;
}

View file

@ -0,0 +1,26 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.conn;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface LiquibaseConnectionProviderFactory extends ProviderFactory<LiquibaseConnectionProvider> {
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.conn;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.provider.Spi;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LiquibaseConnectionSpi implements Spi {
@Override
public boolean isInternal() {
return true;
}
@Override
public String getName() {
return "connectionsLiquibase";
}
@Override
public Class<? extends Provider> getProviderClass() {
return LiquibaseConnectionProvider.class;
}
@Override
public Class<? extends ProviderFactory> getProviderFactoryClass() {
return LiquibaseConnectionProviderFactory.class;
}
}

View file

@ -0,0 +1,64 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import java.util.ArrayList;
import java.util.List;
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.sql.Sql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.AbstractSqlGenerator;
import liquibase.statement.core.DeleteStatement;
import liquibase.statement.core.InitializeDatabaseChangeLogLockTableStatement;
/**
* We need to remove DELETE SQL command, which liquibase adds by default when inserting record to table lock. This is causing buggy behaviour
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CustomInsertLockRecordGenerator extends AbstractSqlGenerator<InitializeDatabaseChangeLogLockTableStatement> {
@Override
public int getPriority() {
return super.getPriority() + 1; // Ensure bigger priority than InitializeDatabaseChangeLogLockTableGenerator
}
@Override
public ValidationErrors validate(InitializeDatabaseChangeLogLockTableStatement initializeDatabaseChangeLogLockTableStatement, Database database, SqlGeneratorChain sqlGeneratorChain) {
return new ValidationErrors();
}
@Override
public Sql[] generateSql(InitializeDatabaseChangeLogLockTableStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {
// Generated by InitializeDatabaseChangeLogLockTableGenerator
Sql[] sqls = sqlGeneratorChain.generateSql(statement, database);
// Removing delete statement
List<Sql> result = new ArrayList<>();
for (Sql sql : sqls) {
String sqlCommand = sql.toSql();
if (!sqlCommand.toUpperCase().contains("DELETE")) {
result.add(sql);
}
}
return result.toArray(new Sql[result.size()]);
}
}

View file

@ -0,0 +1,175 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import java.lang.reflect.Field;
import java.text.DateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import liquibase.database.Database;
import liquibase.database.core.DerbyDatabase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogFactory;
import liquibase.sql.visitor.AbstractSqlVisitor;
import liquibase.sql.visitor.SqlVisitor;
import liquibase.statement.core.CreateDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.DropTableStatement;
import liquibase.statement.core.InitializeDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.RawSqlStatement;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
import org.keycloak.common.util.reflections.Reflections;
/**
* Liquibase lock service, which has some bugfixes and assumes timeouts to be configured in milliseconds
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CustomLockService extends StandardLockService {
private static final Logger log = Logger.getLogger(CustomLockService.class);
private long changeLogLocRecheckTimeMillis = -1;
@Override
public void setChangeLogLockRecheckTime(long changeLogLocRecheckTime) {
super.setChangeLogLockRecheckTime(changeLogLocRecheckTime);
this.changeLogLocRecheckTimeMillis = changeLogLocRecheckTime;
}
// Bug in StandardLockService.getChangeLogLockRecheckTime()
@Override
public Long getChangeLogLockRecheckTime() {
if (changeLogLocRecheckTimeMillis == -1) {
return super.getChangeLogLockRecheckTime();
} else {
return changeLogLocRecheckTimeMillis;
}
}
@Override
public void init() throws DatabaseException {
boolean createdTable = false;
Executor executor = ExecutorService.getInstance().getExecutor(database);
if (!hasDatabaseChangeLogLockTable()) {
try {
if (log.isTraceEnabled()) {
log.trace("Create Database Lock Table");
}
executor.execute(new CreateDatabaseChangeLogLockTableStatement());
database.commit();
} catch (DatabaseException de) {
log.warn("Failed to create lock table. Maybe other transaction created in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); //Log details at debug level
}
database.rollback();
throw new LockRetryException(de);
}
log.debugf("Created database lock table with name: %s", database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName()));
try {
Field field = Reflections.findDeclaredField(StandardLockService.class, "hasDatabaseChangeLogLockTable");
Reflections.setAccessible(field);
field.set(CustomLockService.this, true);
} catch (IllegalAccessException iae) {
throw new RuntimeException(iae);
}
createdTable = true;
}
if (!isDatabaseChangeLogLockTableInitialized(createdTable)) {
try {
if (log.isTraceEnabled()) {
log.trace("Initialize Database Lock Table");
}
executor.execute(new InitializeDatabaseChangeLogLockTableStatement());
database.commit();
} catch (DatabaseException de) {
log.warn("Failed to insert first record to the lock table. Maybe other transaction inserted in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); // Log details at debug level
}
database.rollback();
throw new LockRetryException(de);
}
log.debug("Initialized record in the database lock table");
}
// Keycloak doesn't support Derby, but keep it for sure...
if (executor.updatesDatabase() && database instanceof DerbyDatabase && ((DerbyDatabase) database).supportsBooleanDataType()) { //check if the changelog table is of an old smallint vs. boolean format
String lockTable = database.escapeTableName(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName());
Object obj = executor.queryForObject(new RawSqlStatement("select min(locked) as test from " + lockTable + " fetch first row only"), Object.class);
if (!(obj instanceof Boolean)) { //wrong type, need to recreate table
executor.execute(new DropTableStatement(database.getLiquibaseCatalogName(), database.getLiquibaseSchemaName(), database.getDatabaseChangeLogLockTableName(), false));
executor.execute(new CreateDatabaseChangeLogLockTableStatement());
executor.execute(new InitializeDatabaseChangeLogLockTableStatement());
}
}
}
@Override
public void waitForLock() throws LockException {
boolean locked = false;
long startTime = Time.toMillis(Time.currentTime());
long timeToGiveUp = startTime + (getChangeLogLockWaitTime());
while (!locked && Time.toMillis(Time.currentTime()) < timeToGiveUp) {
locked = acquireLock();
if (!locked) {
int remainingTime = ((int)(timeToGiveUp / 1000)) - Time.currentTime();
log.debugf("Waiting for changelog lock... Remaining time: %d seconds", remainingTime);
try {
Thread.sleep(getChangeLogLockRecheckTime());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (!locked) {
DatabaseChangeLogLock[] locks = listLocks();
String lockedBy;
if (locks.length > 0) {
DatabaseChangeLogLock lock = locks[0];
lockedBy = lock.getLockedBy() + " since " + DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.SHORT).format(lock.getLockGranted());
} else {
lockedBy = "UNKNOWN";
}
throw new LockException("Could not acquire change log lock. Currently locked by " + lockedBy);
}
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import liquibase.exception.LockException;
import liquibase.lockservice.StandardLockService;
/**
* Dummy lock service injected to Liquibase. Doesn't need to do anything as we already have a lock when Liquibase update is called.
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class DummyLockService extends StandardLockService {
@Override
public void waitForLock() throws LockException {
}
@Override
public void releaseLock() throws LockException {
}
}

View file

@ -0,0 +1,159 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import java.sql.Connection;
import java.sql.SQLException;
import liquibase.Liquibase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LiquibaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.LockService;
import org.jboss.logging.Logger;
import org.keycloak.connections.jpa.JpaConnectionProvider;
import org.keycloak.connections.jpa.JpaConnectionProviderFactory;
import org.keycloak.connections.jpa.updater.liquibase.conn.LiquibaseConnectionProvider;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.dblock.DBLockProvider;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LiquibaseDBLockProvider implements DBLockProvider {
private static final Logger logger = Logger.getLogger(LiquibaseDBLockProvider.class);
// 3 should be sufficient (Potentially one failure for createTable and one for insert record)
private int DEFAULT_MAX_ATTEMPTS = 3;
private final LiquibaseDBLockProviderFactory factory;
private final KeycloakSession session;
private LockService lockService;
private Connection dbConnection;
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
public LiquibaseDBLockProvider(LiquibaseDBLockProviderFactory factory, KeycloakSession session) {
this.factory = factory;
this.session = session;
init();
}
private void init() {
LiquibaseConnectionProvider liquibaseProvider = session.getProvider(LiquibaseConnectionProvider.class);
JpaConnectionProviderFactory jpaProviderFactory = (JpaConnectionProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(JpaConnectionProvider.class);
this.dbConnection = jpaProviderFactory.getConnection();
String defaultSchema = jpaProviderFactory.getSchema();
try {
Liquibase liquibase = liquibaseProvider.getLiquibase(dbConnection, defaultSchema);
this.lockService = new CustomLockService();
lockService.setChangeLogLockWaitTime(factory.getLockWaitTimeoutMillis());
lockService.setChangeLogLockRecheckTime(factory.getLockRecheckTimeMillis());
lockService.setDatabase(liquibase.getDatabase());
} catch (LiquibaseException exception) {
safeRollbackConnection();
safeCloseConnection();
throw new IllegalStateException(exception);
}
}
// Assumed transaction was rolled-back and we want to start with new DB connection
private void restart() {
safeCloseConnection();
this.dbConnection = null;
this.lockService = null;
init();
}
@Override
public void waitForLock() {
while (maxAttempts > 0) {
try {
lockService.waitForLock();
this.maxAttempts = DEFAULT_MAX_ATTEMPTS;
return;
} catch (LockException le) {
if (le.getCause() != null && le.getCause() instanceof LockRetryException) {
// Indicates we should try to acquire lock again in different transaction
restart();
maxAttempts--;
} else {
throw new IllegalStateException("Failed to retrieve lock", le);
// TODO: Possibility to forcefully retrieve lock after timeout instead of just give-up?
}
}
}
}
@Override
public void releaseLock() {
try {
lockService.releaseLock();
} catch (LockException e) {
logger.error("Could not release lock", e);
}
lockService.reset();
}
@Override
public void destroyLockInfo() {
try {
this.lockService.destroy();
dbConnection.commit();
logger.debug("Destroyed lock table");
} catch (DatabaseException | SQLException de) {
logger.error("Failed to destroy lock table");
safeRollbackConnection();
}
}
@Override
public void close() {
safeCloseConnection();
}
private void safeRollbackConnection() {
if (dbConnection != null) {
try {
this.dbConnection.rollback();
} catch (SQLException se) {
logger.warn("Failed to rollback connection after error", se);
}
}
}
private void safeCloseConnection() {
// Close after creating EntityManagerFactory to prevent in-mem databases from closing
if (dbConnection != null) {
try {
dbConnection.close();
} catch (SQLException e) {
logger.warn("Failed to close connection", e);
}
}
}
}

View file

@ -0,0 +1,79 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.dblock.DBLockProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LiquibaseDBLockProviderFactory implements DBLockProviderFactory {
private static final Logger logger = Logger.getLogger(LiquibaseDBLockProviderFactory.class);
private long lockRecheckTimeMillis;
private long lockWaitTimeoutMillis;
protected long getLockRecheckTimeMillis() {
return lockRecheckTimeMillis;
}
protected long getLockWaitTimeoutMillis() {
return lockWaitTimeoutMillis;
}
@Override
public void init(Config.Scope config) {
int lockRecheckTime = config.getInt("lockRecheckTime", 2);
int lockWaitTimeout = config.getInt("lockWaitTimeout", 900);
this.lockRecheckTimeMillis = Time.toMillis(lockRecheckTime);
this.lockWaitTimeoutMillis = Time.toMillis(lockWaitTimeout);
logger.debugf("Liquibase lock provider configured with lockWaitTime: %d seconds, lockRecheckTime: %d seconds", lockWaitTimeout, lockRecheckTime);
}
@Override
public void postInit(KeycloakSessionFactory factory) {
}
@Override
public LiquibaseDBLockProvider create(KeycloakSession session) {
return new LiquibaseDBLockProvider(this, session);
}
@Override
public void setTimeouts(long lockRecheckTimeMillis, long lockWaitTimeoutMillis) {
this.lockRecheckTimeMillis = lockRecheckTimeMillis;
this.lockWaitTimeoutMillis = lockWaitTimeoutMillis;
}
@Override
public void close() {
}
@Override
public String getId() {
return "jpa";
}
}

View file

@ -0,0 +1,39 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
/**
* Indicates that retrieve lock wasn't successful, but it worth to retry it in different transaction (For example if we were trying to create LOCK table, but other transaction
* created the table in the meantime etc)
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class LockRetryException extends RuntimeException {
public LockRetryException(String message) {
super(message);
}
public LockRetryException(Throwable cause) {
super(cause);
}
public LockRetryException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -30,4 +30,5 @@
<include file="META-INF/jpa-changelog-1.7.0.xml"/>
<include file="META-INF/db2-jpa-changelog-1.8.0.xml"/>
<include file="META-INF/jpa-changelog-1.9.0.xml"/>
<include file="META-INF/jpa-changelog-1.9.1.xml"/>
</databaseChangeLog>

View file

@ -0,0 +1,18 @@
#
# Copyright 2016 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.keycloak.connections.jpa.updater.liquibase.conn.DefaultLiquibaseConnectionProvider

View file

@ -0,0 +1,18 @@
#
# Copyright 2016 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.keycloak.connections.jpa.updater.liquibase.lock.LiquibaseDBLockProviderFactory

View file

@ -16,4 +16,5 @@
#
org.keycloak.connections.jpa.JpaConnectionSpi
org.keycloak.connections.jpa.updater.JpaUpdaterSpi
org.keycloak.connections.jpa.updater.JpaUpdaterSpi
org.keycloak.connections.jpa.updater.liquibase.conn.LiquibaseConnectionSpi

View file

@ -78,7 +78,13 @@ public class DefaultMongoConnectionFactoryProvider implements MongoConnectionPro
private static final Logger logger = Logger.getLogger(DefaultMongoConnectionFactoryProvider.class);
private volatile MongoClient client;
private static final int STATE_BEFORE_INIT = 0; // Even before MongoClient is created
private static final int STATE_BEFORE_UPDATE = 1; // Mongo client was created, but DB is not yet updated to last version
private static final int STATE_AFTER_UPDATE = 2; // Mongo client was created and DB updated. DB is fully initialized now
private volatile int state = STATE_BEFORE_INIT;
private MongoClient client;
private MongoStore mongoStore;
private DB db;
@ -86,15 +92,6 @@ public class DefaultMongoConnectionFactoryProvider implements MongoConnectionPro
private Map<String,String> operationalInfo;
@Override
public MongoConnectionProvider create(KeycloakSession session) {
lazyInit(session);
TransactionMongoStoreInvocationContext invocationContext = new TransactionMongoStoreInvocationContext(mongoStore);
session.getTransaction().enlist(new MongoKeycloakTransaction(invocationContext));
return new DefaultMongoConnectionProvider(db, mongoStore, invocationContext);
}
@Override
public void init(Config.Scope config) {
this.config = config;
@ -105,33 +102,22 @@ public class DefaultMongoConnectionFactoryProvider implements MongoConnectionPro
}
@Override
public DB getDBBeforeUpdate() {
lazyInitBeforeUpdate();
return db;
}
private void lazyInit(KeycloakSession session) {
if (client == null) {
private void lazyInitBeforeUpdate() {
if (state == STATE_BEFORE_INIT) {
synchronized (this) {
if (client == null) {
if (state == STATE_BEFORE_INIT) {
try {
this.client = createMongoClient();
String dbName = config.get("db", "keycloak");
this.db = client.getDB(dbName);
String databaseSchema = config.get("databaseSchema");
if (databaseSchema != null) {
if (databaseSchema.equals("update")) {
MongoUpdaterProvider mongoUpdater = session.getProvider(MongoUpdaterProvider.class);
if (mongoUpdater == null) {
throw new RuntimeException("Can't update database: Mongo updater provider not found");
}
mongoUpdater.update(session, db);
} else {
throw new RuntimeException("Invalid value for databaseSchema: " + databaseSchema);
}
}
this.mongoStore = new MongoStoreImpl(db, getManagedEntities());
state = STATE_BEFORE_UPDATE;
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -140,6 +126,53 @@ public class DefaultMongoConnectionFactoryProvider implements MongoConnectionPro
}
}
@Override
public MongoConnectionProvider create(KeycloakSession session) {
lazyInit(session);
TransactionMongoStoreInvocationContext invocationContext = new TransactionMongoStoreInvocationContext(mongoStore);
session.getTransaction().enlist(new MongoKeycloakTransaction(invocationContext));
return new DefaultMongoConnectionProvider(db, mongoStore, invocationContext);
}
private void lazyInit(KeycloakSession session) {
lazyInitBeforeUpdate();
if (state == STATE_BEFORE_UPDATE) {
synchronized (this) {
if (state == STATE_BEFORE_UPDATE) {
try {
update(session);
this.mongoStore = new MongoStoreImpl(db, getManagedEntities());
state = STATE_AFTER_UPDATE;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
private void update(KeycloakSession session) {
String databaseSchema = config.get("databaseSchema");
if (databaseSchema != null) {
if (databaseSchema.equals("update")) {
MongoUpdaterProvider mongoUpdater = session.getProvider(MongoUpdaterProvider.class);
if (mongoUpdater == null) {
throw new RuntimeException("Can't update database: Mongo updater provider not found");
}
mongoUpdater.update(session, db);
} else {
throw new RuntimeException("Invalid value for databaseSchema: " + databaseSchema);
}
}
}
private Class[] getManagedEntities() throws ClassNotFoundException {
Class[] entityClasses = new Class[entities.length];
for (int i = 0; i < entities.length; i++) {
@ -160,6 +193,7 @@ public class DefaultMongoConnectionFactoryProvider implements MongoConnectionPro
return "default";
}
/**
* Override this method if you want more possibility to configure Mongo client. It can be also used to inject mongo client
* from different source.

View file

@ -27,6 +27,9 @@ import org.keycloak.provider.Provider;
*/
public interface MongoConnectionProvider extends Provider {
/**
* @return Fully updated and initialized DB
*/
DB getDB();
MongoStore getMongoStore();

View file

@ -17,10 +17,17 @@
package org.keycloak.connections.mongo;
import com.mongodb.DB;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
*/
public interface MongoConnectionProviderFactory extends ProviderFactory<MongoConnectionProvider> {
/**
* @return DB object, which may not be yet updated to current Keycloak version. Useful just if something needs to be done even before DB update (for example acquire DB lock)
*/
DB getDBBeforeUpdate();
}

View file

@ -0,0 +1,137 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.mongo.lock;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.DuplicateKeyException;
import com.mongodb.WriteResult;
import org.jboss.logging.Logger;
import org.keycloak.common.util.HostUtils;
import org.keycloak.common.util.Time;
import org.keycloak.models.dblock.DBLockProvider;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class MongoDBLockProvider implements DBLockProvider {
private static final String DB_LOCK_COLLECTION = "dblock";
private static final Logger logger = Logger.getLogger(MongoDBLockProvider .class);
private final MongoDBLockProviderFactory factory;
private final DB db;
public MongoDBLockProvider(MongoDBLockProviderFactory factory, DB db) {
this.factory = factory;
this.db = db;
}
@Override
public void waitForLock() {
boolean locked = false;
long startTime = Time.toMillis(Time.currentTime());
long timeToGiveUp = startTime + (factory.getLockWaitTimeoutMillis());
while (!locked && Time.toMillis(Time.currentTime()) < timeToGiveUp) {
locked = acquireLock();
if (!locked) {
int remainingTime = ((int)(timeToGiveUp / 1000)) - Time.currentTime();
logger.debugf("Waiting for changelog lock... Remaining time: %d seconds", remainingTime);
try {
Thread.sleep(factory.getLockRecheckTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (!locked) {
DBObject query = new BasicDBObject("_id", 1);
DBCursor cursor = db.getCollection(DB_LOCK_COLLECTION).find(query);
String lockedBy;
if (cursor.hasNext()) {
DBObject dbObj = cursor.next();
lockedBy = dbObj.get("lockedBy") + " since " + Time.toDate(((int)((long) dbObj.get("lockedSince") / 1000)));
} else {
lockedBy = "UNKNOWN";
}
throw new IllegalStateException("Could not acquire change log lock. Currently locked by " + lockedBy);
}
}
private boolean acquireLock() {
DBObject query = new BasicDBObject("locked", false);
BasicDBObject update = new BasicDBObject("locked", true);
update.append("_id", 1);
update.append("lockedSince", Time.toMillis(Time.currentTime()));
update.append("lockedBy", HostUtils.getHostName()); // Maybe replace with something better, but doesn't matter for now
try {
WriteResult wr = db.getCollection(DB_LOCK_COLLECTION).update(query, update, true, false);
if (wr.getN() == 1) {
logger.debugf("Successfully acquired DB lock");
return true;
} else {
return false;
}
} catch (DuplicateKeyException dke) {
logger.debugf("Failed acquire lock. Reason: %s", dke.getMessage());
}
return false;
}
@Override
public void releaseLock() {
DBObject query = new BasicDBObject("locked", true);
BasicDBObject update = new BasicDBObject("locked", false);
update.append("_id", 1);
update.append("lockedBy", null);
update.append("lockedSince", null);
try {
WriteResult wr = db.getCollection(DB_LOCK_COLLECTION).update(query, update, true, false);
if (wr.getN() > 0) {
logger.debugf("Successfully released DB lock");
} else {
logger.warnf("Attempt to release DB lock, but nothing was released");
}
} catch (DuplicateKeyException dke) {
logger.debugf("Failed release lock. Reason: %s", dke.getMessage());
}
}
@Override
public void destroyLockInfo() {
db.getCollection(DB_LOCK_COLLECTION).remove(new BasicDBObject());
logger.debugf("Destroyed lock collection");
}
@Override
public void close() {
}
}

View file

@ -0,0 +1,84 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.mongo.lock;
import com.mongodb.DB;
import org.jboss.logging.Logger;
import org.keycloak.Config;
import org.keycloak.common.util.Time;
import org.keycloak.connections.mongo.MongoConnectionProvider;
import org.keycloak.connections.mongo.MongoConnectionProviderFactory;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.dblock.DBLockProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class MongoDBLockProviderFactory implements DBLockProviderFactory {
private static final Logger logger = Logger.getLogger(MongoDBLockProviderFactory.class);
private long lockRecheckTimeMillis;
private long lockWaitTimeoutMillis;
protected long getLockRecheckTimeMillis() {
return lockRecheckTimeMillis;
}
protected long getLockWaitTimeoutMillis() {
return lockWaitTimeoutMillis;
}
@Override
public void init(Config.Scope config) {
int lockRecheckTime = config.getInt("lockRecheckTime", 2);
int lockWaitTimeout = config.getInt("lockWaitTimeout", 900);
this.lockRecheckTimeMillis = Time.toMillis(lockRecheckTime);
this.lockWaitTimeoutMillis = Time.toMillis(lockWaitTimeout);
logger.debugf("Mongo lock provider configured with lockWaitTime: %d seconds, lockRecheckTime: %d seconds", lockWaitTimeout, lockRecheckTime);
}
@Override
public void postInit(KeycloakSessionFactory factory) {
}
@Override
public MongoDBLockProvider create(KeycloakSession session) {
MongoConnectionProviderFactory mongoConnectionFactory = (MongoConnectionProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(MongoConnectionProvider.class);
DB db = mongoConnectionFactory.getDBBeforeUpdate();
return new MongoDBLockProvider(this, db);
}
@Override
public void setTimeouts(long lockRecheckTimeMillis, long lockWaitTimeoutMillis) {
this.lockRecheckTimeMillis = lockRecheckTimeMillis;
this.lockWaitTimeoutMillis = lockWaitTimeoutMillis;
}
@Override
public void close() {
}
@Override
public String getId() {
return "mongo";
}
}

View file

@ -0,0 +1,35 @@
#
# Copyright 2016 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Copyright 2016 Red Hat, Inc. and/or its affiliates
# and other contributors as indicated by the @author tags.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.keycloak.connections.mongo.lock.MongoDBLockProviderFactory

View file

@ -0,0 +1,44 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.dblock;
import org.keycloak.provider.Provider;
/**
* Global database lock to ensure that some actions in DB can be done just be one cluster node at a time.
*
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface DBLockProvider extends Provider {
/**
* Try to retrieve DB lock or wait if retrieve was unsuccessful. Throw exception if lock can't be retrieved within specified timeout (900 seconds by default)
*/
void waitForLock();
void releaseLock();
/**
* Will destroy whole state of DB lock (drop table/collection to track locking).
* */
void destroyLockInfo();
}

View file

@ -0,0 +1,31 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.dblock;
import org.keycloak.provider.ProviderFactory;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public interface DBLockProviderFactory extends ProviderFactory<DBLockProvider> {
/**
* Useful for testing to override provided configuration
*/
void setTimeouts(long lockRecheckTimeMillis, long lockWaitTimeoutMillis);
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.models.dblock;
import org.keycloak.provider.Provider;
import org.keycloak.provider.ProviderFactory;
import org.keycloak.provider.Spi;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class DBLockSpi implements Spi {
@Override
public boolean isInternal() {
return true;
}
@Override
public String getName() {
return "dblock";
}
@Override
public Class<? extends Provider> getProviderClass() {
return DBLockProvider.class;
}
@Override
public Class<? extends ProviderFactory> getProviderFactoryClass() {
return DBLockProviderFactory.class;
}
}

View file

@ -21,6 +21,7 @@ org.keycloak.models.RealmSpi
org.keycloak.models.UserSessionSpi
org.keycloak.models.UserSpi
org.keycloak.models.session.UserSessionPersisterSpi
org.keycloak.models.dblock.DBLockSpi
org.keycloak.migration.MigrationSpi
org.keycloak.hash.PasswordHashSpi
org.keycloak.events.EventListenerSpi

View file

@ -401,4 +401,8 @@ public interface ServicesLogger extends BasicLogger {
@LogMessage(level = ERROR)
@Message(id=90, value="Failed to close ProviderSession")
void failedToCloseProviderSession(@Cause Throwable t);
@LogMessage(level = WARN)
@Message(id=91, value="Forced release of DB lock at startup requested by System property. Make sure to not use this in production environment! And especially when more cluster nodes are started concurrently.")
void forcedReleaseDBLock();
}

View file

@ -0,0 +1,87 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.services.managers;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.RealmProvider;
import org.keycloak.models.RealmProviderFactory;
import org.keycloak.models.dblock.DBLockProvider;
import org.keycloak.models.dblock.DBLockProviderFactory;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.services.ServicesLogger;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class DBLockManager {
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
public void waitForLock(KeycloakSessionFactory sessionFactory) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
DBLockProvider lock = getDBLock(session);
lock.waitForLock();
}
});
}
public void releaseLock(KeycloakSessionFactory sessionFactory) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
DBLockProvider lock = getDBLock(session);
lock.releaseLock();
}
});
}
public void checkForcedUnlock(KeycloakSessionFactory sessionFactory) {
if (Boolean.getBoolean("keycloak.dblock.forceUnlock")) {
logger.forcedReleaseDBLock();
releaseLock(sessionFactory);
}
}
// Try to detect ID from realmProvider
public DBLockProvider getDBLock(KeycloakSession session) {
String realmProviderId = getRealmProviderId(session);
return session.getProvider(DBLockProvider.class, realmProviderId);
}
public DBLockProviderFactory getDBLockFactory(KeycloakSession session) {
String realmProviderId = getRealmProviderId(session);
return (DBLockProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(DBLockProvider.class, realmProviderId);
}
private String getRealmProviderId(KeycloakSession session) {
RealmProviderFactory realmProviderFactory = (RealmProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(RealmProvider.class);
return realmProviderFactory.getId();
}
}

View file

@ -25,6 +25,7 @@ import org.keycloak.Config;
import org.keycloak.exportimport.ExportImportManager;
import org.keycloak.migration.MigrationModelManager;
import org.keycloak.models.*;
import org.keycloak.services.managers.DBLockManager;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.models.utils.RepresentationToModel;
import org.keycloak.representations.idm.RealmRepresentation;
@ -39,7 +40,6 @@ import org.keycloak.services.resources.admin.AdminRoot;
import org.keycloak.services.scheduled.ClearExpiredEvents;
import org.keycloak.services.scheduled.ClearExpiredUserSessions;
import org.keycloak.services.scheduled.ClusterAwareScheduledTaskRunner;
import org.keycloak.services.scheduled.ScheduledTaskRunner;
import org.keycloak.services.util.JsonConfigProvider;
import org.keycloak.services.util.ObjectMapperResolver;
import org.keycloak.timer.TimerProvider;
@ -91,44 +91,56 @@ public class KeycloakApplication extends Application {
singletons.add(new ObjectMapperResolver(Boolean.parseBoolean(System.getProperty("keycloak.jsonPrettyPrint", "false"))));
migrateModel();
boolean bootstrapAdminUser = false;
KeycloakSession session = sessionFactory.create();
ExportImportManager exportImportManager;
DBLockManager dbLockManager = new DBLockManager();
dbLockManager.checkForcedUnlock(sessionFactory);
dbLockManager.waitForLock(sessionFactory);
try {
session.getTransaction().begin();
migrateModel();
ApplianceBootstrap applianceBootstrap = new ApplianceBootstrap(session);
exportImportManager = new ExportImportManager(session);
KeycloakSession session = sessionFactory.create();
try {
session.getTransaction().begin();
boolean createMasterRealm = applianceBootstrap.isNewInstall();
if (exportImportManager.isRunImport() && exportImportManager.isImportMasterIncluded()) {
createMasterRealm = false;
ApplianceBootstrap applianceBootstrap = new ApplianceBootstrap(session);
exportImportManager = new ExportImportManager(session);
boolean createMasterRealm = applianceBootstrap.isNewInstall();
if (exportImportManager.isRunImport() && exportImportManager.isImportMasterIncluded()) {
createMasterRealm = false;
}
if (createMasterRealm) {
applianceBootstrap.createMasterRealm(contextPath);
}
session.getTransaction().commit();
} catch (RuntimeException re) {
if (session.getTransaction().isActive()) {
session.getTransaction().rollback();
}
throw re;
} finally {
session.close();
}
if (createMasterRealm) {
applianceBootstrap.createMasterRealm(contextPath);
if (exportImportManager.isRunImport()) {
exportImportManager.runImport();
} else {
importRealms();
}
session.getTransaction().commit();
importAddUser();
} finally {
session.close();
dbLockManager.releaseLock(sessionFactory);
}
if (exportImportManager.isRunImport()) {
exportImportManager.runImport();
} else {
importRealms();
}
importAddUser();
if (exportImportManager.isRunExport()) {
exportImportManager.runExport();
}
session = sessionFactory.create();
boolean bootstrapAdminUser = false;
KeycloakSession session = sessionFactory.create();
try {
session.getTransaction().begin();
bootstrapAdminUser = new ApplianceBootstrap(session).isNoMasterUser();

View file

@ -237,6 +237,16 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
@ -468,41 +478,6 @@
</profile>
<!-- MySQL -->
<profile>
<activation>
<property>
<name>keycloak.connectionsJpa.driver</name>
<value>com.mysql.jdbc.Driver</value>
</property>
</activation>
<id>mysql</id>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
</profile>
<!-- PostgreSQL -->
<profile>
<activation>
<property>
<name>keycloak.connectionsJpa.driver</name>
<value>org.postgresql.Driver</value>
</property>
</activation>
<id>postgresql</id>
<dependencies>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>clean-jpa</id>
<build>

View file

@ -0,0 +1,173 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.testsuite.model;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.services.managers.DBLockManager;
import org.keycloak.models.dblock.DBLockProvider;
import org.keycloak.models.dblock.DBLockProviderFactory;
import org.keycloak.models.utils.KeycloakModelUtils;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class DBLockTest extends AbstractModelTest {
private static final Logger log = Logger.getLogger(DBLockTest.class);
private static final int SLEEP_TIME_MILLIS = 10;
private static final int THREADS_COUNT = 20;
private static final int ITERATIONS_PER_THREAD = 2;
private static final int LOCK_TIMEOUT_MILLIS = 240000; // Rather bigger to handle slow DB connections in testing env
private static final int LOCK_RECHECK_MILLIS = 10;
@Before
@Override
public void before() throws Exception {
super.before();
// Set timeouts for testing
DBLockManager lockManager = new DBLockManager();
DBLockProviderFactory lockFactory = lockManager.getDBLockFactory(session);
lockFactory.setTimeouts(LOCK_RECHECK_MILLIS, LOCK_TIMEOUT_MILLIS);
// Drop lock table, just to simulate racing threads for create lock table and insert lock record into it.
lockManager.getDBLock(session).destroyLockInfo();
commit();
}
@Test
public void testLockConcurrently() throws Exception {
long startupTime = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore();
final KeycloakSessionFactory sessionFactory = realmManager.getSession().getKeycloakSessionFactory();
List<Thread> threads = new LinkedList<>();
for (int i=0 ; i<THREADS_COUNT ; i++) {
Thread thread = new Thread() {
@Override
public void run() {
for (int i=0 ; i<ITERATIONS_PER_THREAD ; i++) {
try {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
lock(session, semaphore);
}
});
} catch (RuntimeException e) {
semaphore.setException(e);
throw e;
}
}
}
};
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long took = (System.currentTimeMillis() - startupTime);
log.infof("DBLockTest executed in %d ms with total counter %d. THREADS_COUNT=%d, ITERATIONS_PER_THREAD=%d", took, semaphore.getTotal(), THREADS_COUNT, ITERATIONS_PER_THREAD);
Assert.assertEquals(semaphore.getTotal(), THREADS_COUNT * ITERATIONS_PER_THREAD);
Assert.assertNull(semaphore.getException());
}
private void lock(KeycloakSession session, Semaphore semaphore) {
DBLockProvider dbLock = new DBLockManager().getDBLock(session);
dbLock.waitForLock();
try {
semaphore.increase();
Thread.sleep(SLEEP_TIME_MILLIS);
semaphore.decrease();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} finally {
dbLock.releaseLock();
}
}
// Ensure just one thread is allowed to run at the same time
private class Semaphore {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger totalIncreases = new AtomicInteger(0);
private volatile Exception exception = null;
private void increase() {
int current = counter.incrementAndGet();
if (current != 1) {
IllegalStateException ex = new IllegalStateException("Counter has illegal value: " + current);
setException(ex);
throw ex;
}
totalIncreases.incrementAndGet();
}
private void decrease() {
int current = counter.decrementAndGet();
if (current != 0) {
IllegalStateException ex = new IllegalStateException("Counter has illegal value: " + current);
setException(ex);
throw ex;
}
}
private synchronized void setException(Exception exception) {
if (this.exception == null) {
this.exception = exception;
}
}
private synchronized Exception getException() {
return exception;
}
private int getTotal() {
return totalIncreases.get();
}
}
}

View file

@ -83,7 +83,7 @@ public class ModelTest extends AbstractModelTest {
Assert.assertEquals(expected.getPublicKeyPem(), actual.getPublicKeyPem());
Assert.assertEquals(expected.getPrivateKeyPem(), actual.getPrivateKeyPem());
Assert.assertEquals(expected.getDefaultRoles(), actual.getDefaultRoles());
Assert.assertEquals(new HashSet<>(expected.getDefaultRoles()), new HashSet<>(actual.getDefaultRoles()));
Assert.assertEquals(expected.getSmtpConfig(), actual.getSmtpConfig());

View file

@ -39,7 +39,7 @@ log4j.logger.org.keycloak.testsuite=${keycloak.testsuite.logging.level}
# Liquibase updates logged with "info" by default. Logging level can be changed by system property "keycloak.liquibase.logging.level"
keycloak.liquibase.logging.level=info
log4j.logger.org.keycloak.connections.jpa.updater.liquibase.LiquibaseJpaUpdaterProvider=${keycloak.liquibase.logging.level}
log4j.logger.org.keycloak.connections.jpa.updater.liquibase=${keycloak.liquibase.logging.level}
# Enable to view infinispan initialization
# log4j.logger.org.keycloak.models.sessions.infinispan.initializer=trace