> mvccEntryFlowable = toMvccEntryFlowable(ctx, commandKeyPredicate);
- return batchOperation(mvccEntryFlowable, ctx, NonBlockingStore::batch);
- }
-
- /**
- * Takes all the modified entries in the flowable and writes or removes them from the stores in a single batch
- * operation per store.
- *
- * The {@link HandleFlowables} is provided for the sole reason of allowing reuse of this method by different callers.
- * @param mvccEntryFlowable flowable containing modified entries
- * @param ctx the context with modifications
- * @param flowableHandler callback handler that actually should subscribe to the underlying store
- * @param key type
- * @param value type
- * @return a stage that when complete will contain how many write operations were done
- */
- private CompletionStage batchOperation(Flowable> mvccEntryFlowable, InvocationContext ctx,
- HandleFlowables flowableHandler) {
- return Single.using(
- this::acquireReadLock,
- ignore -> {
- checkStoreAvailability();
- if (log.isTraceEnabled()) {
- log.trace("Writing batch to stores");
- }
-
- return Flowable.fromIterable(stores)
- .filter(storeStatus -> !storeStatus.characteristics.contains(Characteristic.READ_ONLY))
- .flatMapSingle(storeStatus -> {
- Flowable> flowableToUse;
- boolean shared = storeStatus.config.shared();
- if (shared) {
- if (log.isTraceEnabled()) {
- log.tracef("Store %s is shared, checking skip shared stores and ignoring entries not" +
- " primarily owned by this node", storeStatus.store);
- }
- flowableToUse = mvccEntryFlowable.filter(mvccEntry -> !mvccEntry.isSkipSharedStore());
- } else {
- flowableToUse = mvccEntryFlowable;
- }
-
- boolean segmented = storeStatus.config.segmented();
-
- // Now we have to split this stores' flowable into two (one for remove and one for put)
- flowableToUse = flowableToUse.publish().autoConnect(2);
-
- Flowable> removeFlowable = createRemoveFlowable(
- flowableToUse, shared, segmented, storeStatus);
-
- ByRef.Long writeCount = new ByRef.Long(0);
-
- Flowable>> writeFlowable =
- createWriteFlowable(flowableToUse, ctx, shared, segmented, writeCount, storeStatus);
-
- CompletionStage storeBatchStage = flowableHandler.handleFlowables(storeStatus.store(),
- segmentCount(segmented), removeFlowable, writeFlowable);
-
- return Single.fromCompletionStage(storeBatchStage
- .thenApply(ignore2 -> writeCount.get()));
- // Only take the last element for the count - ensures all stores are completed
- }).last(0L);
-
- },
- this::releaseReadLock
- ).toCompletionStage();
- }
-
- private Flowable> createRemoveFlowable(
- Flowable> flowableToUse, boolean shared, boolean segmented, StoreStatus storeStatus) {
-
- Flowable keyRemoveFlowable = flowableToUse
- .filter(MVCCEntry::isRemoved)
- .map(MVCCEntry::getKey);
-
- Flowable> flowable;
- if (segmented) {
- flowable = keyRemoveFlowable
- .groupBy(keyPartitioner::getSegment)
- .map(SegmentPublisherWrapper::wrap);
- flowable = filterSharedSegments(flowable, null, shared);
- } else {
- if (shared && !isInvalidationCache) {
- keyRemoveFlowable = keyRemoveFlowable.filter(k ->
- distributionManager.getCacheTopology().getDistribution(k).isPrimary());
- }
- flowable = Flowable.just(SingleSegmentPublisher.singleSegment(keyRemoveFlowable));
- }
-
- if (log.isTraceEnabled()) {
- flowable = flowable.doOnSubscribe(sub ->
- log.tracef("Store %s has subscribed to remove batch", storeStatus.store));
- flowable = flowable.map(sp -> {
- int segment = sp.getSegment();
- return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(sp)
- .doOnNext(keyToRemove -> log.tracef("Emitting key %s for removal from segment %s",
- keyToRemove, segment)));
- });
- }
-
- return flowable;
- }
-
- private Flowable>> createWriteFlowable(
- Flowable> flowableToUse, InvocationContext ctx, boolean shared, boolean segmented,
- ByRef.Long writeCount, StoreStatus storeStatus) {
-
- Flowable> entryWriteFlowable = flowableToUse
- .filter(mvccEntry -> !mvccEntry.isRemoved())
- .map(mvcEntry -> {
- K key = mvcEntry.getKey();
- InternalCacheValue sv = internalEntryFactory.getValueFromCtx(key, ctx);
- //noinspection unchecked
- return (MarshallableEntry) marshallableEntryFactory.create(key, (InternalCacheValue) sv);
- });
-
- Flowable>> flowable;
- if (segmented) {
- // Note the writeCount includes entries that aren't written due to being shared
- // at this point
- entryWriteFlowable = entryWriteFlowable.doOnNext(obj -> writeCount.inc());
- flowable = entryWriteFlowable
- .groupBy(me -> keyPartitioner.getSegment(me.getKey()))
- .map(SegmentPublisherWrapper::wrap);
- // The writeCount will be decremented for each grouping of values ignored
- flowable = filterSharedSegments(flowable, writeCount, shared);
- } else {
- if (shared && !isInvalidationCache) {
- entryWriteFlowable = entryWriteFlowable.filter(me ->
- distributionManager.getCacheTopology().getDistribution(me.getKey()).isPrimary());
- }
- entryWriteFlowable = entryWriteFlowable.doOnNext(obj -> writeCount.inc());
- flowable = Flowable.just(SingleSegmentPublisher.singleSegment(entryWriteFlowable));
- }
-
- if (log.isTraceEnabled()) {
- flowable = flowable.doOnSubscribe(sub ->
- log.tracef("Store %s has subscribed to write batch", storeStatus.store));
- flowable = flowable.map(sp -> {
- int segment = sp.getSegment();
- return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(sp)
- .doOnNext(me -> log.tracef("Emitting entry %s for write to segment %s",
- me, segment)));
- });
- }
- return flowable;
- }
-
- private Flowable> filterSharedSegments(
- Flowable> flowable, ByRef.Long writeCount, boolean shared) {
- if (!shared || isInvalidationCache) {
- return flowable;
- }
- return flowable.map(sp -> {
- if (distributionManager.getCacheTopology().getSegmentDistribution(sp.getSegment()).isPrimary()) {
- return sp;
- }
- Flowable emptyFlowable = Flowable.fromPublisher(sp);
- if (writeCount != null) {
- emptyFlowable = emptyFlowable.doOnNext(ignore -> writeCount.dec())
- .ignoreElements()
- .toFlowable();
- } else {
- emptyFlowable = emptyFlowable.take(0);
- }
- // Unfortunately we need to still need to subscribe to the publisher even though we don't want
- // the store to use its values. Thus we just return them an empty SegmentPublisher.
- return SingleSegmentPublisher.singleSegment(sp.getSegment(), emptyFlowable);
- });
- }
-
- /**
- * Creates a Flowable of MVCCEntry(s) that were modified due to the commands in the transactional context
- * @param ctx the transactional context
- * @param commandKeyPredicate predicate to test if a key/command combination should be written
- * @param key type
- * @param value type
- * @return a Flowable containing MVCCEntry(s) for the modifications in the tx context
- */
- private Flowable> toMvccEntryFlowable(TxInvocationContext ctx,
- BiPredicate super WriteCommand, Object> commandKeyPredicate) {
- return Flowable.fromIterable(ctx.getCacheTransaction().getAllModifications())
- .filter(writeCommand -> !writeCommand.hasAnyFlag(FlagBitSets.SKIP_CACHE_STORE | FlagBitSets.ROLLING_UPGRADE))
- .concatMap(writeCommand -> entriesFromCommand(writeCommand, ctx, commandKeyPredicate));
- }
-
- private Flowable> entriesFromCommand(WCT writeCommand, InvocationContext ctx,
- BiPredicate super WCT, Object> commandKeyPredicate) {
- if (writeCommand instanceof DataWriteCommand) {
- Object key = ((DataWriteCommand) writeCommand).getKey();
- MVCCEntry entry = acquireKeyFromContext(ctx, writeCommand, key, commandKeyPredicate);
- return entry != null ? Flowable.just(entry) : Flowable.empty();
- } else {
- if (writeCommand instanceof InvalidateCommand) {
- return Flowable.empty();
- }
- // Assume multiple key command
- return Flowable.fromIterable(writeCommand.getAffectedKeys())
- .concatMapMaybe(key -> {
- MVCCEntry entry = acquireKeyFromContext(ctx, writeCommand, key, commandKeyPredicate);
- // We use an empty Flowable to symbolize a miss - which is filtered by ofType just below
- return entry != null ? Maybe.just(entry) : Maybe.empty();
- });
- }
- }
-
- private MVCCEntry acquireKeyFromContext(InvocationContext ctx, WCT command, Object key,
- BiPredicate super WCT, Object> commandKeyPredicate) {
- if (commandKeyPredicate == null || commandKeyPredicate.test(command, key)) {
- //noinspection unchecked
- MVCCEntry entry = (MVCCEntry) ctx.lookupEntry(key);
- if (entry.isChanged()) {
- return entry;
- }
- }
- return null;
- }
-
- /**
- * Here just to create a lambda for method reuse of
- * {@link #batchOperation(Flowable, InvocationContext, HandleFlowables)}
- */
- interface HandleFlowables {
- CompletionStage handleFlowables(NonBlockingStore store, int publisherCount,
- Flowable> removeFlowable,
- Flowable>> putFlowable);
- };
-
- /**
- * Provides a function that groups entries by their segments (via keyPartitioner).
- */
- private Function groupingFunction(Function toKeyFunction) {
- return value -> keyPartitioner.getSegment(toKeyFunction.apply(value));
- }
-
- /**
- * Returns how many segments the user must worry about when segmented or not.
- * @param segmented whether the store is segmented
- * @return how many segments the store must worry about
- */
- private int segmentCount(boolean segmented) {
- return segmented ? segmentCount : 1;
- }
-
- @Override
- public boolean isAvailable() {
- return unavailableExceptionMessage == null;
- }
-
- @Override
- public CompletionStage addSegments(IntSet segments) {
- return Completable.using(
- this::acquireReadLock,
- ignore -> {
- checkStoreAvailability();
- if (log.isTraceEnabled()) {
- log.tracef("Adding segments %s to stores", segments);
- }
- return Flowable.fromIterable(stores)
- .filter(PersistenceManagerImpl::shouldInvokeSegmentMethods)
- .flatMapCompletable(storeStatus -> Completable.fromCompletionStage(storeStatus.store.addSegments(segments)));
- },
- this::releaseReadLock
- ).toCompletionStage(allSegmentedOrShared);
- }
-
- @Override
- public CompletionStage removeSegments(IntSet segments) {
- return Completable.using(
- this::acquireReadLock,
- ignore -> {
- checkStoreAvailability();
- if (log.isTraceEnabled()) {
- log.tracef("Removing segments %s from stores", segments);
- }
- return Flowable.fromIterable(stores)
- .filter(PersistenceManagerImpl::shouldInvokeSegmentMethods)
- .flatMapCompletable(storeStatus -> Completable.fromCompletionStage(storeStatus.store.removeSegments(segments)));
- },
- this::releaseReadLock
- ).toCompletionStage(allSegmentedOrShared);
- }
-
- private static boolean shouldInvokeSegmentMethods(StoreStatus storeStatus) {
- return storeStatus.characteristics.contains(Characteristic.SEGMENTABLE) &&
- !storeStatus.characteristics.contains(Characteristic.SHAREABLE);
- }
-
- public List> getAllStores(Predicate> predicate) {
- long stamp = acquireReadLock();
- try {
- return stores.stream()
- .filter(storeStatus -> predicate.test(storeStatus.characteristics))
- .map(StoreStatus::store)
- .collect(Collectors.toCollection(ArrayList::new));
- } finally {
- releaseReadLock(stamp);
- }
- }
-
- /**
- * Method must be here for augmentation to tell blockhound this method is okay to block
- */
- private long acquireReadLock() {
- return lock.readLock();
- }
-
- /**
- * Opposite of acquireReadLock here for symmetry
- */
- private void releaseReadLock(long stamp) {
- lock.unlockRead(stamp);
- }
-
- private void checkStoreAvailability() {
- if (!enabled) return;
-
- String message = unavailableExceptionMessage;
- if (message != null) {
- throw new StoreUnavailableException(message);
- }
- }
-
- static class StoreStatus {
- final NonBlockingStore, ?> store;
- final StoreConfiguration config;
- final Set characteristics;
- // This variable is protected by PersistenceManagerImpl#lock and also the fact that availability check can
- // only be ran one at a time
- boolean availability = true;
-
- StoreStatus(NonBlockingStore, ?> store, StoreConfiguration config, Set characteristics) {
- this.store = store;
- this.config = config;
- this.characteristics = characteristics;
- }
-
- NonBlockingStore store() {
- return (NonBlockingStore) store;
- }
- }
-
- boolean anyLocksHeld() {
- return lock.isReadLocked() || lock.isWriteLocked();
- }
-}