initial sync/import spi

This commit is contained in:
Bill Burke 2016-10-06 14:48:53 -04:00
parent 67d3730c6d
commit 74325fe133
6 changed files with 585 additions and 3 deletions

View file

@ -109,7 +109,7 @@ public interface KeycloakSession {
UserCache getUserCache();
/**
* A possibly cached view of all users in system.
* A cached view of all users in system including deprecated UserFederationProvider SPI
*
* @return
*/
@ -126,12 +126,12 @@ public interface KeycloakSession {
UserCredentialManager userCredentialManager();
/**
* A possibly cached view of all users in system that does NOT include users available from the deprecated UserFederationProvider SPI.
* A cached view of all users in system that does NOT include users available from the deprecated UserFederationProvider SPI.
*/
UserProvider userStorage();
/**
* Keycloak specific local storage for users. No cache in front, this api talks directly to database.
* Keycloak specific local storage for users. No cache in front, this api talks directly to database configured for Keycloak
*
* @return
*/

View file

@ -36,4 +36,65 @@ public class UserStorageProviderModel extends PrioritizedComponentModel {
super(copy);
}
private transient Integer fullSyncPeriod;
private transient Integer changedSyncPeriod;
private transient Integer lastSync;
private transient Boolean importEnabled;
public boolean isImportEnabled() {
if (importEnabled == null) {
String val = getConfig().getFirst("importEnabled");
if (val == null) importEnabled = false;
importEnabled = Boolean.valueOf(val);
}
return importEnabled;
}
public void setImportEnabled(boolean flag) {
importEnabled = flag;
getConfig().putSingle("importEnabled", Boolean.toString(flag));
}
public int getFullSyncPeriod() {
if (fullSyncPeriod == null) {
String val = getConfig().getFirst("fullSyncPeriod");
if (val == null) fullSyncPeriod = -1;
fullSyncPeriod = Integer.valueOf(val);
}
return fullSyncPeriod;
}
public void setFullSyncPeriod(int fullSyncPeriod) {
this.fullSyncPeriod = fullSyncPeriod;
getConfig().putSingle("fullSyncPeriod", Integer.toString(fullSyncPeriod));
}
public int getChangedSyncPeriod() {
if (changedSyncPeriod == null) {
String val = getConfig().getFirst("changedSyncPeriod");
if (val == null) changedSyncPeriod = -1;
changedSyncPeriod = Integer.valueOf(val);
}
return changedSyncPeriod;
}
public void setChangedSyncPeriod(int changedSyncPeriod) {
this.changedSyncPeriod = changedSyncPeriod;
getConfig().putSingle("changedSyncPeriod", Integer.toString(changedSyncPeriod));
}
public int getLastSync() {
if (lastSync == null) {
String val = getConfig().getFirst("lastSync");
if (val == null) lastSync = 0;
lastSync = Integer.valueOf(val);
}
return lastSync;
}
public void setLastSync(int lastSync) {
this.lastSync = lastSync;
getConfig().putSingle("lastSync", Integer.toString(lastSync));
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.storage.user;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.storage.UserStorageProviderModel;
import java.util.Date;
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
*/
public interface ImportSynchronization {
SynchronizationResult sync(KeycloakSessionFactory sessionFactory, String realmId, UserStorageProviderModel model);
SynchronizationResult syncSince(Date lastSync, KeycloakSessionFactory sessionFactory, String realmId, UserStorageProviderModel model);
}

View file

@ -0,0 +1,28 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.storage.user;
import org.keycloak.models.RealmModel;
import org.keycloak.models.UserModel;
/**
* @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
* @version $Revision: 1 $
*/
public interface ImportedUserValidation {
UserModel validate(RealmModel realmm, UserModel user);
}

View file

@ -0,0 +1,124 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.storage.user;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class SynchronizationResult {
private boolean ignored;
private int added;
private int updated;
private int removed;
private int failed;
public boolean isIgnored() {
return ignored;
}
public void setIgnored(boolean ignored) {
this.ignored = ignored;
}
public int getAdded() {
return added;
}
public void setAdded(int added) {
this.added = added;
}
public int getUpdated() {
return updated;
}
public void setUpdated(int updated) {
this.updated = updated;
}
public int getRemoved() {
return removed;
}
public void setRemoved(int removed) {
this.removed = removed;
}
public int getFailed() {
return failed;
}
public void setFailed(int failed) {
this.failed = failed;
}
public void increaseAdded() {
added++;
}
public void increaseUpdated() {
updated++;
}
public void increaseRemoved() {
removed++;
}
public void increaseFailed() {
failed++;
}
public void add(SynchronizationResult other) {
added += other.added;
updated += other.updated;
removed += other.removed;
failed += other.failed;
}
public String getStatus() {
if (ignored) {
return "Synchronization ignored as it's already in progress";
} else {
String status = String.format("%d imported users, %d updated users", added, updated);
if (removed > 0) {
status += String.format(", %d removed users", removed);
}
if (failed != 0) {
status += String.format(", %d users failed sync! See server log for more details", failed);
}
return status;
}
}
@Override
public String toString() {
return String.format("UserFederationSyncResult [ %s ]", getStatus());
}
public static SynchronizationResult empty() {
return new SynchronizationResult();
}
public static SynchronizationResult ignored() {
SynchronizationResult result = new SynchronizationResult();
result.setIgnored(true);
return result;
}
}

View file

@ -0,0 +1,338 @@
/*
* Copyright 2016 Red Hat, Inc. and/or its affiliates
* and other contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.keycloak.services.managers;
import org.keycloak.cluster.ClusterEvent;
import org.keycloak.cluster.ClusterListener;
import org.keycloak.cluster.ClusterProvider;
import org.keycloak.cluster.ExecutionResult;
import org.keycloak.common.util.Time;
import org.keycloak.models.KeycloakSession;
import org.keycloak.models.KeycloakSessionFactory;
import org.keycloak.models.KeycloakSessionTask;
import org.keycloak.models.RealmModel;
import org.keycloak.models.utils.KeycloakModelUtils;
import org.keycloak.services.ServicesLogger;
import org.keycloak.storage.UserStorageProvider;
import org.keycloak.storage.UserStorageProviderFactory;
import org.keycloak.storage.UserStorageProviderModel;
import org.keycloak.storage.user.ImportSynchronization;
import org.keycloak.storage.user.SynchronizationResult;
import org.keycloak.timer.TimerProvider;
import java.util.List;
import java.util.concurrent.Callable;
/**
* @author <a href="mailto:mposolda@redhat.com">Marek Posolda</a>
*/
public class UserStorageSyncManager {
private static final String USER_STORAGE_TASK_KEY = "user-storage";
protected static final ServicesLogger logger = ServicesLogger.ROOT_LOGGER;
/**
* Check federationProviderModel of all realms and possibly start periodic sync for them
*
* @param sessionFactory
* @param timer
*/
public void bootstrapPeriodic(final KeycloakSessionFactory sessionFactory, final TimerProvider timer) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
List<RealmModel> realms = session.realms().getRealms();
for (final RealmModel realm : realms) {
List<UserStorageProviderModel> providers = realm.getUserStorageProviders();
for (final UserStorageProviderModel provider : providers) {
UserStorageProviderFactory factory = (UserStorageProviderFactory) session.getKeycloakSessionFactory().getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (factory instanceof ImportSynchronization && provider.isImportEnabled()) {
refreshPeriodicSyncForProvider(sessionFactory, timer, provider, realm.getId());
}
}
}
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
clusterProvider.registerListener(USER_STORAGE_TASK_KEY, new UserStorageClusterListener(sessionFactory));
}
});
}
private class Holder {
ExecutionResult<SynchronizationResult> result;
}
public SynchronizationResult syncAllUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserStorageProviderModel provider) {
UserStorageProviderFactory factory = (UserStorageProviderFactory) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization) || !provider.isImportEnabled()) {
return SynchronizationResult.ignored();
}
final Holder holder = new Holder();
// Ensure not executed concurrently on this or any other cluster node
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = provider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(30, provider.getFullSyncPeriod());
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<SynchronizationResult>() {
@Override
public SynchronizationResult call() throws Exception {
updateLastSyncInterval(sessionFactory, provider, realmId);
return ((ImportSynchronization)factory).sync(sessionFactory, realmId, provider);
}
});
}
});
if (holder.result == null || !holder.result.isExecuted()) {
logger.debugf("syncAllUsers for federation provider %s was ignored as it's already in progress", provider.getName());
return SynchronizationResult.ignored();
} else {
return holder.result.getResult();
}
}
public SynchronizationResult syncChangedUsers(final KeycloakSessionFactory sessionFactory, final String realmId, final UserStorageProviderModel provider) {
UserStorageProviderFactory factory = (UserStorageProviderFactory) sessionFactory.getProviderFactory(UserStorageProvider.class, provider.getProviderId());
if (!(factory instanceof ImportSynchronization) || !provider.isImportEnabled()) {
return SynchronizationResult.ignored();
}
final Holder holder = new Holder();
// Ensure not executed concurrently on this or any other cluster node
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
ClusterProvider clusterProvider = session.getProvider(ClusterProvider.class);
// shared key for "full" and "changed" . Improve if needed
String taskKey = provider.getId() + "::sync";
// 30 seconds minimal timeout for now
int timeout = Math.max(30, provider.getChangedSyncPeriod());
holder.result = clusterProvider.executeIfNotExecuted(taskKey, timeout, new Callable<SynchronizationResult>() {
@Override
public SynchronizationResult call() throws Exception {
// See when we did last sync.
int oldLastSync = provider.getLastSync();
updateLastSyncInterval(sessionFactory, provider, realmId);
return ((ImportSynchronization)factory).syncSince(Time.toDate(oldLastSync), sessionFactory, realmId, provider);
}
});
}
});
if (holder.result == null || !holder.result.isExecuted()) {
logger.debugf("syncChangedUsers for federation provider %s was ignored as it's already in progress", provider.getName());
return SynchronizationResult.ignored();
} else {
return holder.result.getResult();
}
}
// Ensure all cluster nodes are notified
public void notifyToRefreshPeriodicSync(KeycloakSession session, RealmModel realm, UserStorageProviderModel provider, boolean removed) {
UserStorageProviderClusterEvent event = UserStorageProviderClusterEvent.createEvent(removed, realm.getId(), provider);
session.getProvider(ClusterProvider.class).notify(USER_STORAGE_TASK_KEY, event);
}
// Executed once it receives notification that some UserFederationProvider was created or updated
protected void refreshPeriodicSyncForProvider(final KeycloakSessionFactory sessionFactory, TimerProvider timer, final UserStorageProviderModel provider, final String realmId) {
logger.debugf("Going to refresh periodic sync for provider '%s' . Full sync period: %d , changed users sync period: %d",
provider.getName(), provider.getFullSyncPeriod(), provider.getChangedSyncPeriod());
if (provider.getFullSyncPeriod() > 0) {
// We want periodic full sync for this provider
timer.schedule(new Runnable() {
@Override
public void run() {
try {
boolean shouldPerformSync = shouldPerformNewPeriodicSync(provider.getLastSync(), provider.getChangedSyncPeriod());
if (shouldPerformSync) {
syncAllUsers(sessionFactory, realmId, provider);
} else {
logger.debugf("Ignored periodic full sync with storage provider %s due small time since last sync", provider.getName());
}
} catch (Throwable t) {
logger.errorDuringFullUserSync(t);
}
}
}, provider.getFullSyncPeriod() * 1000, provider.getId() + "-FULL");
} else {
timer.cancelTask(provider.getId() + "-FULL");
}
if (provider.getChangedSyncPeriod() > 0) {
// We want periodic sync of just changed users for this provider
timer.schedule(new Runnable() {
@Override
public void run() {
try {
boolean shouldPerformSync = shouldPerformNewPeriodicSync(provider.getLastSync(), provider.getChangedSyncPeriod());
if (shouldPerformSync) {
syncChangedUsers(sessionFactory, realmId, provider);
} else {
logger.debugf("Ignored periodic changed-users sync with storage provider %s due small time since last sync", provider.getName());
}
} catch (Throwable t) {
logger.errorDuringChangedUserSync(t);
}
}
}, provider.getChangedSyncPeriod() * 1000, provider.getId() + "-CHANGED");
} else {
timer.cancelTask(provider.getId() + "-CHANGED");
}
}
// Skip syncing if there is short time since last sync time.
private boolean shouldPerformNewPeriodicSync(int lastSyncTime, int period) {
if (lastSyncTime <= 0) {
return true;
}
int currentTime = Time.currentTime();
int timeSinceLastSync = currentTime - lastSyncTime;
return (timeSinceLastSync * 2 > period);
}
// Executed once it receives notification that some UserFederationProvider was removed
protected void removePeriodicSyncForProvider(TimerProvider timer, UserStorageProviderModel fedProvider) {
logger.debugf("Removing periodic sync for provider %s", fedProvider.getName());
timer.cancelTask(fedProvider.getId() + "-FULL");
timer.cancelTask(fedProvider.getId() + "-CHANGED");
}
// Update interval of last sync for given UserFederationProviderModel. Do it in separate transaction
private void updateLastSyncInterval(final KeycloakSessionFactory sessionFactory, UserStorageProviderModel provider, final String realmId) {
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
RealmModel persistentRealm = session.realms().getRealm(realmId);
List<UserStorageProviderModel> persistentFedProviders = persistentRealm.getUserStorageProviders();
for (UserStorageProviderModel persistentFedProvider : persistentFedProviders) {
if (provider.getId().equals(persistentFedProvider.getId())) {
// Update persistent provider in DB
int lastSync = Time.currentTime();
persistentFedProvider.setLastSync(lastSync);
persistentRealm.updateComponent(persistentFedProvider);
// Update "cached" reference
provider.setLastSync(lastSync);
}
}
}
});
}
private class UserStorageClusterListener implements ClusterListener {
private final KeycloakSessionFactory sessionFactory;
public UserStorageClusterListener(KeycloakSessionFactory sessionFactory) {
this.sessionFactory = sessionFactory;
}
@Override
public void run(ClusterEvent event) {
final UserStorageProviderClusterEvent fedEvent = (UserStorageProviderClusterEvent) event;
KeycloakModelUtils.runJobInTransaction(sessionFactory, new KeycloakSessionTask() {
@Override
public void run(KeycloakSession session) {
TimerProvider timer = session.getProvider(TimerProvider.class);
if (fedEvent.isRemoved()) {
removePeriodicSyncForProvider(timer, fedEvent.getStorageProvider());
} else {
refreshPeriodicSyncForProvider(sessionFactory, timer, fedEvent.getStorageProvider(), fedEvent.getRealmId());
}
}
});
}
}
// Send to cluster during each update or remove of federationProvider, so all nodes can update sync periods
public static class UserStorageProviderClusterEvent implements ClusterEvent {
private boolean removed;
private String realmId;
private UserStorageProviderModel storageProvider;
public boolean isRemoved() {
return removed;
}
public void setRemoved(boolean removed) {
this.removed = removed;
}
public String getRealmId() {
return realmId;
}
public void setRealmId(String realmId) {
this.realmId = realmId;
}
public UserStorageProviderModel getStorageProvider() {
return storageProvider;
}
public void setStorageProvider(UserStorageProviderModel federationProvider) {
this.storageProvider = federationProvider;
}
public static UserStorageProviderClusterEvent createEvent(boolean removed, String realmId, UserStorageProviderModel provider) {
UserStorageProviderClusterEvent notification = new UserStorageProviderClusterEvent();
notification.setRemoved(removed);
notification.setRealmId(realmId);
notification.setStorageProvider(provider);
return notification;
}
}
}