parent
4c4015cf0b
commit
e02c95f9d3
1 changed files with 64 additions and 48 deletions
|
@ -34,6 +34,7 @@ import java.util.Random;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -128,73 +129,88 @@ public class GlobalLocksTest extends KeycloakModelTest {
|
|||
|
||||
@Test
|
||||
public void testReleaseAllLocksMethod() throws InterruptedException {
|
||||
final int NUMBER_OF_THREADS = 4;
|
||||
ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
|
||||
final int numberOfThreads = 4;
|
||||
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
|
||||
|
||||
CountDownLatch locksAcquired = new CountDownLatch(NUMBER_OF_THREADS);
|
||||
CountDownLatch locksAcquired = new CountDownLatch(numberOfThreads);
|
||||
CountDownLatch testFinished = new CountDownLatch(1);
|
||||
|
||||
LOG.info("Initial locks acquiring phase.");
|
||||
try {
|
||||
// Acquire locks and let the threads wait until the end of this test method
|
||||
executor.submit(() -> {
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
lockProvider.withLock("LOCK_" + i, session -> {
|
||||
locksAcquired.countDown();
|
||||
try {
|
||||
testFinished.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
})
|
||||
);
|
||||
});
|
||||
for (int index = 0; index < numberOfThreads; index++) {
|
||||
final int i = index;
|
||||
executor.submit(() ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
LOG.infof("Acquiring LOCK_%d", i);
|
||||
lockProvider.withLock("LOCK_" + i, session -> {
|
||||
LOG.infof("Lock LOCK_%d acquired.", i);
|
||||
locksAcquired.countDown();
|
||||
try {
|
||||
testFinished.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
LOG.infof("Initial acquiring tx finished for lock LOCK_%d", i);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
locksAcquired.await();
|
||||
if (!locksAcquired.await(5, TimeUnit.MINUTES)) {
|
||||
throw new RuntimeException("Acquiring locks phase took too long.");
|
||||
}
|
||||
|
||||
LOG.info("Expecting timeouts for each lock.");
|
||||
// Test no lock can be acquired because all are still hold by the executor above
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> {
|
||||
throw new RuntimeException("Acquiring lock should not succeed as it was acquired in the first transaction");
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
})
|
||||
);
|
||||
assertThat(counter.get(), Matchers.equalTo(NUMBER_OF_THREADS));
|
||||
for (int index = 0; index < numberOfThreads; index++) {
|
||||
final int i = index;
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
LOG.infof("Attempt to acquire LOCK_%d.", i);
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> {
|
||||
throw new RuntimeException("Acquiring lock should not succeed as it was acquired in the first transaction");
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
LOG.infof("Timeout was successfully received for LOCK_%d", i);
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertThat(counter.get(), Matchers.equalTo(numberOfThreads));
|
||||
|
||||
// Unlock all locks forcefully
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
LOG.infof("Releasing all locks", Thread.currentThread().getName());
|
||||
lockProvider.forceReleaseAllLocks();
|
||||
});
|
||||
|
||||
// Test all locks can be acquired again
|
||||
counter.set(0);
|
||||
IntStream.range(0, NUMBER_OF_THREADS).parallel()
|
||||
.forEach(i ->
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> counter.incrementAndGet());
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
);
|
||||
for (int index = 0; index < numberOfThreads; index++) {
|
||||
final int i = index;
|
||||
inComittedTransaction(s -> {
|
||||
GlobalLockProvider lockProvider = s.getProvider(GlobalLockProvider.class);
|
||||
try {
|
||||
lockProvider.withLock("LOCK_" + i, Duration.ofSeconds(1), is -> {
|
||||
LOG.infof("Lock LOCK_%d acquired again.", i);
|
||||
counter.incrementAndGet();
|
||||
return null;
|
||||
});
|
||||
} catch (LockAcquiringTimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertThat(counter.get(), Matchers.equalTo(NUMBER_OF_THREADS));
|
||||
assertThat(counter.get(), Matchers.equalTo(numberOfThreads));
|
||||
} finally {
|
||||
testFinished.countDown();
|
||||
executor.shutdown();
|
||||
|
|
Loading…
Reference in a new issue