KEYCLOAK-2614 Refactor database lock to use 'SELECT FOR UPDATE' pessimistic locking

This commit is contained in:
mposolda 2016-04-08 07:20:54 +02:00
parent ed97a9b6f3
commit 90fc721315
11 changed files with 233 additions and 112 deletions

View file

@ -39,6 +39,7 @@ import org.keycloak.Config;
import org.keycloak.connections.jpa.updater.liquibase.LiquibaseJpaUpdaterProvider;
import org.keycloak.connections.jpa.updater.liquibase.PostgresPlusDatabase;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomInsertLockRecordGenerator;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomLockDatabaseChangeLogGenerator;
import org.keycloak.connections.jpa.updater.liquibase.lock.CustomLockService;
import org.keycloak.connections.jpa.updater.liquibase.lock.DummyLockService;
import org.keycloak.models.KeycloakSession;
@ -93,6 +94,9 @@ public class DefaultLiquibaseConnectionProvider implements LiquibaseConnectionPr
// Change command for creating lock and drop DELETE lock record from it
SqlGeneratorFactory.getInstance().register(new CustomInsertLockRecordGenerator());
// Use "SELECT FOR UPDATE" for locking database
SqlGeneratorFactory.getInstance().register(new CustomLockDatabaseChangeLogGenerator());
}

View file

@ -0,0 +1,86 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.connections.jpa.updater.liquibase.lock;
import liquibase.database.Database;
import liquibase.database.core.DB2Database;
import liquibase.database.core.H2Database;
import liquibase.database.core.MSSQLDatabase;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.core.OracleDatabase;
import liquibase.database.core.PostgresDatabase;
import liquibase.sql.Sql;
import liquibase.sql.UnparsedSql;
import liquibase.sqlgenerator.SqlGeneratorChain;
import liquibase.sqlgenerator.core.LockDatabaseChangeLogGenerator;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
import org.jboss.logging.Logger;
/**
* We use "SELECT FOR UPDATE" pessimistic locking (Same algorithm like Hibernate LockMode.PESSIMISTIC_WRITE )
*
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class CustomLockDatabaseChangeLogGenerator extends LockDatabaseChangeLogGenerator {
private static final Logger logger = Logger.getLogger(CustomLockDatabaseChangeLogGenerator.class);
@Override
public int getPriority() {
return super.getPriority() + 1; // Ensure bigger priority than LockDatabaseChangeLogGenerator
}
@Override
public Sql[] generateSql(LockDatabaseChangeLogStatement statement, Database database, SqlGeneratorChain sqlGeneratorChain) {
Sql selectForUpdateSql = generateSelectForUpdate(database);
return new Sql[] { selectForUpdateSql };
}
private Sql generateSelectForUpdate(Database database) {
String catalog = database.getLiquibaseCatalogName();
String schema = database.getLiquibaseSchemaName();
String rawLockTableName = database.getDatabaseChangeLogLockTableName();
String lockTableName = database.escapeTableName(catalog, schema, rawLockTableName);
String idColumnName = database.escapeColumnName(catalog, schema, rawLockTableName, "ID");
String sqlBase = "SELECT " + idColumnName + " FROM " + lockTableName;
String sqlWhere = " WHERE " + idColumnName + "=1";
String sql;
if (database instanceof MySQLDatabase || database instanceof PostgresDatabase || database instanceof H2Database ||
database instanceof OracleDatabase) {
sql = sqlBase + sqlWhere + " FOR UPDATE";
} else if (database instanceof MSSQLDatabase) {
sql = sqlBase + " WITH (UPDLOCK, ROWLOCK)" + sqlWhere;
} else if (database instanceof DB2Database) {
sql = sqlBase + sqlWhere + " FOR READ ONLY WITH RS USE AND KEEP UPDATE LOCKS";
} else {
sql = sqlBase + sqlWhere;
logger.warnf("No direct support for database %s . Database lock may not work correctly", database.getClass().getName());
}
logger.debugf("SQL command for pessimistic lock: %s", sql);
return new UnparsedSql(sql);
}
}

View file

@ -18,25 +18,16 @@
package org.keycloak.connections.jpa.updater.liquibase.lock;
import java.lang.reflect.Field;
import java.text.DateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import liquibase.database.Database;
import liquibase.database.core.DerbyDatabase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.executor.Executor;
import liquibase.executor.ExecutorService;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogFactory;
import liquibase.sql.visitor.AbstractSqlVisitor;
import liquibase.sql.visitor.SqlVisitor;
import liquibase.statement.core.CreateDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.DropTableStatement;
import liquibase.statement.core.InitializeDatabaseChangeLogLockTableStatement;
import liquibase.statement.core.LockDatabaseChangeLogStatement;
import liquibase.statement.core.RawSqlStatement;
import org.jboss.logging.Logger;
import org.keycloak.common.util.Time;
@ -51,24 +42,6 @@ public class CustomLockService extends StandardLockService {
private static final Logger log = Logger.getLogger(CustomLockService.class);
private long changeLogLocRecheckTimeMillis = -1;
@Override
public void setChangeLogLockRecheckTime(long changeLogLocRecheckTime) {
super.setChangeLogLockRecheckTime(changeLogLocRecheckTime);
this.changeLogLocRecheckTimeMillis = changeLogLocRecheckTime;
}
// Bug in StandardLockService.getChangeLogLockRecheckTime()
@Override
public Long getChangeLogLockRecheckTime() {
if (changeLogLocRecheckTimeMillis == -1) {
return super.getChangeLogLockRecheckTime();
} else {
return changeLogLocRecheckTimeMillis;
}
}
@Override
public void init() throws DatabaseException {
boolean createdTable = false;
@ -84,8 +57,8 @@ public class CustomLockService extends StandardLockService {
database.commit();
} catch (DatabaseException de) {
log.warn("Failed to create lock table. Maybe other transaction created in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); //Log details at debug level
if (log.isTraceEnabled()) {
log.trace(de.getMessage(), de); //Log details at trace level
}
database.rollback();
throw new LockRetryException(de);
@ -115,8 +88,8 @@ public class CustomLockService extends StandardLockService {
} catch (DatabaseException de) {
log.warn("Failed to insert first record to the lock table. Maybe other transaction inserted in the meantime. Retrying...");
if (log.isDebugEnabled()) {
log.debug(de.getMessage(), de); // Log details at debug level
if (log.isTraceEnabled()) {
log.trace(de.getMessage(), de); // Log details at trace level
}
database.rollback();
throw new LockRetryException(de);
@ -140,34 +113,88 @@ public class CustomLockService extends StandardLockService {
}
@Override
public void waitForLock() throws LockException {
public void waitForLock() {
boolean locked = false;
long startTime = Time.toMillis(Time.currentTime());
long timeToGiveUp = startTime + (getChangeLogLockWaitTime());
boolean nextAttempt = true;
while (!locked && Time.toMillis(Time.currentTime()) < timeToGiveUp) {
while (nextAttempt) {
locked = acquireLock();
if (!locked) {
int remainingTime = ((int)(timeToGiveUp / 1000)) - Time.currentTime();
log.debugf("Waiting for changelog lock... Remaining time: %d seconds", remainingTime);
try {
Thread.sleep(getChangeLogLockRecheckTime());
} catch (InterruptedException e) {
e.printStackTrace();
if (remainingTime > 0) {
log.debugf("Will try to acquire log another time. Remaining time: %d seconds", remainingTime);
} else {
nextAttempt = false;
}
} else {
nextAttempt = false;
}
}
if (!locked) {
DatabaseChangeLogLock[] locks = listLocks();
String lockedBy;
if (locks.length > 0) {
DatabaseChangeLogLock lock = locks[0];
lockedBy = lock.getLockedBy() + " since " + DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.SHORT).format(lock.getLockGranted());
int timeout = ((int)(getChangeLogLockWaitTime() / 1000));
throw new IllegalStateException("Could not acquire change log lock within specified timeout " + timeout + " seconds. Currently locked by other transaction");
}
}
@Override
public boolean acquireLock() {
if (hasChangeLogLock) {
// We already have a lock
return true;
}
Executor executor = ExecutorService.getInstance().getExecutor(database);
try {
database.rollback();
// Ensure table created and lock record inserted
this.init();
} catch (DatabaseException de) {
throw new IllegalStateException("Failed to retrieve lock", de);
}
try {
log.debug("Trying to lock database");
executor.execute(new LockDatabaseChangeLogStatement());
log.debug("Successfully acquired database lock");
hasChangeLogLock = true;
database.setCanCacheLiquibaseTableInfo(true);
return true;
} catch (DatabaseException de) {
log.warn("Lock didn't yet acquired. Will possibly retry to acquire lock. Details: " + de.getMessage());
if (log.isTraceEnabled()) {
log.debug(de.getMessage(), de);
}
return false;
}
}
@Override
public void releaseLock() {
try {
if (hasChangeLogLock) {
log.debug("Going to release database lock");
database.commit();
} else {
lockedBy = "UNKNOWN";
log.warn("Attempt to release lock, which is not owned by current transaction");
}
} catch (Exception e) {
log.error("Database error during release lock", e);
} finally {
try {
hasChangeLogLock = false;
database.setCanCacheLiquibaseTableInfo(false);
database.rollback();
} catch (DatabaseException e) {
;
}
throw new LockException("Could not acquire change log lock. Currently locked by " + lockedBy);
}
}

View file

@ -17,6 +17,7 @@
package org.keycloak.connections.jpa.updater.liquibase.lock;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.StandardLockService;
@ -27,6 +28,15 @@ import liquibase.lockservice.StandardLockService;
*/
public class DummyLockService extends StandardLockService {
@Override
public int getPriority() {
return Integer.MAX_VALUE;
}
@Override
public void init() throws DatabaseException {
}
@Override
public void waitForLock() throws LockException {
}

View file

@ -46,7 +46,7 @@ public class LiquibaseDBLockProvider implements DBLockProvider {
private final LiquibaseDBLockProviderFactory factory;
private final KeycloakSession session;
private LockService lockService;
private CustomLockService lockService;
private Connection dbConnection;
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
@ -69,7 +69,6 @@ public class LiquibaseDBLockProvider implements DBLockProvider {
this.lockService = new CustomLockService();
lockService.setChangeLogLockWaitTime(factory.getLockWaitTimeoutMillis());
lockService.setChangeLogLockRecheckTime(factory.getLockRecheckTimeMillis());
lockService.setDatabase(liquibase.getDatabase());
} catch (LiquibaseException exception) {
safeRollbackConnection();
@ -94,16 +93,15 @@ public class LiquibaseDBLockProvider implements DBLockProvider {
lockService.waitForLock();
this.maxAttempts = DEFAULT_MAX_ATTEMPTS;
return;
} catch (LockException le) {
if (le.getCause() != null && le.getCause() instanceof LockRetryException) {
} catch (LockRetryException le) {
// Indicates we should try to acquire lock again in different transaction
safeRollbackConnection();
restart();
maxAttempts--;
} else {
throw new IllegalStateException("Failed to retrieve lock", le);
// TODO: Possibility to forcefully retrieve lock after timeout instead of just give-up?
}
} catch (RuntimeException re) {
safeRollbackConnection();
safeCloseConnection();
throw re;
}
}
}
@ -111,14 +109,16 @@ public class LiquibaseDBLockProvider implements DBLockProvider {
@Override
public void releaseLock() {
try {
lockService.releaseLock();
} catch (LockException e) {
logger.error("Could not release lock", e);
}
lockService.reset();
}
@Override
public boolean supportsForcedUnlock() {
// Implementation based on "SELECT FOR UPDATE" can't force unlock as it's locked by other transaction
return false;
}
@Override
public void destroyLockInfo() {
try {

View file

@ -31,24 +31,17 @@ public class LiquibaseDBLockProviderFactory implements DBLockProviderFactory {
private static final Logger logger = Logger.getLogger(LiquibaseDBLockProviderFactory.class);
private long lockRecheckTimeMillis;
private long lockWaitTimeoutMillis;
protected long getLockRecheckTimeMillis() {
return lockRecheckTimeMillis;
}
protected long getLockWaitTimeoutMillis() {
return lockWaitTimeoutMillis;
}
@Override
public void init(Config.Scope config) {
int lockRecheckTime = config.getInt("lockRecheckTime", 2);
int lockWaitTimeout = config.getInt("lockWaitTimeout", 900);
this.lockRecheckTimeMillis = Time.toMillis(lockRecheckTime);
this.lockWaitTimeoutMillis = Time.toMillis(lockWaitTimeout);
logger.debugf("Liquibase lock provider configured with lockWaitTime: %d seconds, lockRecheckTime: %d seconds", lockWaitTimeout, lockRecheckTime);
logger.debugf("Liquibase lock provider configured with lockWaitTime: %d seconds", lockWaitTimeout);
}
@Override
@ -63,7 +56,6 @@ public class LiquibaseDBLockProviderFactory implements DBLockProviderFactory {
@Override
public void setTimeouts(long lockRecheckTimeMillis, long lockWaitTimeoutMillis) {
this.lockRecheckTimeMillis = lockRecheckTimeMillis;
this.lockWaitTimeoutMillis = lockWaitTimeoutMillis;
}

View file

@ -124,6 +124,11 @@ public class MongoDBLockProvider implements DBLockProvider {
}
}
@Override
public boolean supportsForcedUnlock() {
return true;
}
@Override
public void destroyLockInfo() {
db.getCollection(DB_LOCK_COLLECTION).remove(new BasicDBObject());

View file

@ -34,9 +34,18 @@ public interface DBLockProvider extends Provider {
void waitForLock();
/**
* Release previously acquired lock
*/
void releaseLock();
/**
* @return true if provider supports forced unlock at startup
*/
boolean supportsForcedUnlock();
/**
* Will destroy whole state of DB lock (drop table/collection to track locking).
* */

View file

@ -34,52 +34,38 @@ public class DBLockManager {
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
public void waitForLock(KeycloakSessionFactory sessionFactory) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
private final KeycloakSession session;
@Override
public void run(KeycloakSession session) {
DBLockProvider lock = getDBLock(session);
lock.waitForLock();
}
});
public DBLockManager(KeycloakSession session) {
this.session = session;
}
public void releaseLock(KeycloakSessionFactory sessionFactory) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
DBLockProvider lock = getDBLock(session);
lock.releaseLock();
}
});
}
public void checkForcedUnlock(KeycloakSessionFactory sessionFactory) {
public void checkForcedUnlock() {
if (Boolean.getBoolean("keycloak.dblock.forceUnlock")) {
DBLockProvider lock = getDBLock();
if (lock.supportsForcedUnlock()) {
logger.forcedReleaseDBLock();
releaseLock(sessionFactory);
lock.releaseLock();
} else {
throw new IllegalStateException("Forced unlock requested, but provider " + lock + " doesn't support it");
}
}
}
// Try to detect ID from realmProvider
public DBLockProvider getDBLock(KeycloakSession session) {
String realmProviderId = getRealmProviderId(session);
public DBLockProvider getDBLock() {
String realmProviderId = getRealmProviderId();
return session.getProvider(DBLockProvider.class, realmProviderId);
}
public DBLockProviderFactory getDBLockFactory(KeycloakSession session) {
String realmProviderId = getRealmProviderId(session);
public DBLockProviderFactory getDBLockFactory() {
String realmProviderId = getRealmProviderId();
return (DBLockProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(DBLockProvider.class, realmProviderId);
}
private String getRealmProviderId(KeycloakSession session) {
private String getRealmProviderId() {
RealmProviderFactory realmProviderFactory = (RealmProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(RealmProvider.class);
return realmProviderFactory.getId();
}

View file

@ -25,6 +25,7 @@ import org.keycloak.Config;
import org.keycloak.exportimport.ExportImportManager;
import org.keycloak.migration.MigrationModelManager;
import org.keycloak.models.*;
import org.keycloak.models.dblock.DBLockProvider;
import org.keycloak.services.managers.DBLockManager;
import org.keycloak.models.utils.PostMigrationEvent;
import org.keycloak.models.utils.RepresentationToModel;
@ -91,9 +92,10 @@ public class KeycloakApplication extends Application {
ExportImportManager exportImportManager;
DBLockManager dbLockManager = new DBLockManager();
dbLockManager.checkForcedUnlock(sessionFactory);
dbLockManager.waitForLock(sessionFactory);
DBLockManager dbLockManager = new DBLockManager(sessionFactory.create());
dbLockManager.checkForcedUnlock();
DBLockProvider dbLock = dbLockManager.getDBLock();
dbLock.waitForLock();
try {
migrateModel();
@ -130,7 +132,7 @@ public class KeycloakApplication extends Application {
importAddUser();
} finally {
dbLockManager.releaseLock(sessionFactory);
dbLock.releaseLock();
}
if (exportImportManager.isRunExport()) {

View file

@ -54,17 +54,17 @@ public class DBLockTest extends AbstractModelTest {
super.before();
// Set timeouts for testing
DBLockManager lockManager = new DBLockManager();
DBLockProviderFactory lockFactory = lockManager.getDBLockFactory(session);
DBLockManager lockManager = new DBLockManager(session);
DBLockProviderFactory lockFactory = lockManager.getDBLockFactory();
lockFactory.setTimeouts(LOCK_RECHECK_MILLIS, LOCK_TIMEOUT_MILLIS);
// Drop lock table, just to simulate racing threads for create lock table and insert lock record into it.
lockManager.getDBLock(session).destroyLockInfo();
lockManager.getDBLock().destroyLockInfo();
commit();
}
// @Test // TODO: Running -Dtest=DBLockTest,UserModelTest might cause issues sometimes. Reenable this once DB lock is refactored.
@Test
public void testLockConcurrently() throws Exception {
long startupTime = System.currentTimeMillis();
@ -112,7 +112,7 @@ public class DBLockTest extends AbstractModelTest {
}
private void lock(KeycloakSession session, Semaphore semaphore) {
DBLockProvider dbLock = new DBLockManager().getDBLock(session);
DBLockProvider dbLock = new DBLockManager(session).getDBLock();
dbLock.waitForLock();
try {
semaphore.increase();