> mvccEntryFlowable = toMvccEntryFlowable(ctx, commandKeyPredicate);
+ return batchOperation(mvccEntryFlowable, ctx, NonBlockingStore::batch);
+ }
+
+ /**
+ * Takes all the modified entries in the flowable and writes or removes them from the stores in a single batch
+ * operation per store.
+ *
+ * The {@link HandleFlowables} is provided for the sole reason of allowing reuse of this method by different callers.
+ * @param mvccEntryFlowable flowable containing modified entries
+ * @param ctx the context with modifications
+ * @param flowableHandler callback handler that actually should subscribe to the underlying store
+ * @param key type
+ * @param value type
+ * @return a stage that when complete will contain how many write operations were done
+ */
+ private CompletionStage batchOperation(Flowable> mvccEntryFlowable, InvocationContext ctx,
+ HandleFlowables flowableHandler) {
+ return Single.using(
+ this::acquireReadLock,
+ ignore -> {
+ checkStoreAvailability();
+ if (log.isTraceEnabled()) {
+ log.trace("Writing batch to stores");
+ }
+
+ return Flowable.fromIterable(stores)
+ .filter(storeStatus -> !storeStatus.characteristics.contains(Characteristic.READ_ONLY))
+ .flatMapSingle(storeStatus -> {
+ Flowable> flowableToUse;
+ boolean shared = storeStatus.config.shared();
+ if (shared) {
+ if (log.isTraceEnabled()) {
+ log.tracef("Store %s is shared, checking skip shared stores and ignoring entries not" +
+ " primarily owned by this node", storeStatus.store);
+ }
+ flowableToUse = mvccEntryFlowable.filter(mvccEntry -> !mvccEntry.isSkipSharedStore());
+ } else {
+ flowableToUse = mvccEntryFlowable;
+ }
+
+ boolean segmented = storeStatus.config.segmented();
+
+ // Now we have to split this stores' flowable into two (one for remove and one for put)
+ flowableToUse = flowableToUse.publish().autoConnect(2);
+
+ Flowable> removeFlowable = createRemoveFlowable(
+ flowableToUse, shared, segmented, storeStatus);
+
+ ByRef.Long writeCount = new ByRef.Long(0);
+
+ Flowable>> writeFlowable =
+ createWriteFlowable(flowableToUse, ctx, shared, segmented, writeCount, storeStatus);
+
+ CompletionStage storeBatchStage = flowableHandler.handleFlowables(storeStatus.store(),
+ segmentCount(segmented), removeFlowable, writeFlowable);
+
+ return Single.fromCompletionStage(storeBatchStage
+ .thenApply(ignore2 -> writeCount.get()));
+ // Only take the last element for the count - ensures all stores are completed
+ }).last(0L);
+
+ },
+ this::releaseReadLock
+ ).toCompletionStage();
+ }
+
+ private Flowable> createRemoveFlowable(
+ Flowable> flowableToUse, boolean shared, boolean segmented, StoreStatus storeStatus) {
+
+ Flowable keyRemoveFlowable = flowableToUse
+ .filter(MVCCEntry::isRemoved)
+ .map(MVCCEntry::getKey);
+
+ Flowable> flowable;
+ if (segmented) {
+ flowable = keyRemoveFlowable
+ .groupBy(keyPartitioner::getSegment)
+ .map(SegmentPublisherWrapper::wrap);
+ flowable = filterSharedSegments(flowable, null, shared);
+ } else {
+ if (shared && !isInvalidationCache) {
+ keyRemoveFlowable = keyRemoveFlowable.filter(k ->
+ distributionManager.getCacheTopology().getDistribution(k).isPrimary());
+ }
+ flowable = Flowable.just(SingleSegmentPublisher.singleSegment(keyRemoveFlowable));
+ }
+
+ if (log.isTraceEnabled()) {
+ flowable = flowable.doOnSubscribe(sub ->
+ log.tracef("Store %s has subscribed to remove batch", storeStatus.store));
+ flowable = flowable.map(sp -> {
+ int segment = sp.getSegment();
+ return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(sp)
+ .doOnNext(keyToRemove -> log.tracef("Emitting key %s for removal from segment %s",
+ keyToRemove, segment)));
+ });
+ }
+
+ return flowable;
+ }
+
+ private Flowable>> createWriteFlowable(
+ Flowable> flowableToUse, InvocationContext ctx, boolean shared, boolean segmented,
+ ByRef.Long writeCount, StoreStatus storeStatus) {
+
+ Flowable> entryWriteFlowable = flowableToUse
+ .filter(mvccEntry -> !mvccEntry.isRemoved())
+ .map(mvcEntry -> {
+ K key = mvcEntry.getKey();
+ InternalCacheValue sv = internalEntryFactory.getValueFromCtx(key, ctx);
+ //noinspection unchecked
+ return (MarshallableEntry) marshallableEntryFactory.create(key, (InternalCacheValue) sv);
+ });
+
+ Flowable>> flowable;
+ if (segmented) {
+ // Note the writeCount includes entries that aren't written due to being shared
+ // at this point
+ entryWriteFlowable = entryWriteFlowable.doOnNext(obj -> writeCount.inc());
+ flowable = entryWriteFlowable
+ .groupBy(me -> keyPartitioner.getSegment(me.getKey()))
+ .map(SegmentPublisherWrapper::wrap);
+ // The writeCount will be decremented for each grouping of values ignored
+ flowable = filterSharedSegments(flowable, writeCount, shared);
+ } else {
+ if (shared && !isInvalidationCache) {
+ entryWriteFlowable = entryWriteFlowable.filter(me ->
+ distributionManager.getCacheTopology().getDistribution(me.getKey()).isPrimary());
+ }
+ entryWriteFlowable = entryWriteFlowable.doOnNext(obj -> writeCount.inc());
+ flowable = Flowable.just(SingleSegmentPublisher.singleSegment(entryWriteFlowable));
+ }
+
+ if (log.isTraceEnabled()) {
+ flowable = flowable.doOnSubscribe(sub ->
+ log.tracef("Store %s has subscribed to write batch", storeStatus.store));
+ flowable = flowable.map(sp -> {
+ int segment = sp.getSegment();
+ return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(sp)
+ .doOnNext(me -> log.tracef("Emitting entry %s for write to segment %s",
+ me, segment)));
+ });
+ }
+ return flowable;
+ }
+
+ private Flowable> filterSharedSegments(
+ Flowable> flowable, ByRef.Long writeCount, boolean shared) {
+ if (!shared || isInvalidationCache) {
+ return flowable;
+ }
+ return flowable.map(sp -> {
+ if (distributionManager.getCacheTopology().getSegmentDistribution(sp.getSegment()).isPrimary()) {
+ return sp;
+ }
+ Flowable emptyFlowable = Flowable.fromPublisher(sp);
+ if (writeCount != null) {
+ emptyFlowable = emptyFlowable.doOnNext(ignore -> writeCount.dec())
+ .ignoreElements()
+ .toFlowable();
+ } else {
+ emptyFlowable = emptyFlowable.take(0);
+ }
+ // Unfortunately we need to still need to subscribe to the publisher even though we don't want
+ // the store to use its values. Thus we just return them an empty SegmentPublisher.
+ return SingleSegmentPublisher.singleSegment(sp.getSegment(), emptyFlowable);
+ });
+ }
+
+ /**
+ * Creates a Flowable of MVCCEntry(s) that were modified due to the commands in the transactional context
+ * @param ctx the transactional context
+ * @param commandKeyPredicate predicate to test if a key/command combination should be written
+ * @param key type
+ * @param value type
+ * @return a Flowable containing MVCCEntry(s) for the modifications in the tx context
+ */
+ private Flowable> toMvccEntryFlowable(TxInvocationContext ctx,
+ BiPredicate super WriteCommand, Object> commandKeyPredicate) {
+ return Flowable.fromIterable(ctx.getCacheTransaction().getAllModifications())
+ .filter(writeCommand -> !writeCommand.hasAnyFlag(FlagBitSets.SKIP_CACHE_STORE | FlagBitSets.ROLLING_UPGRADE))
+ .concatMap(writeCommand -> entriesFromCommand(writeCommand, ctx, commandKeyPredicate));
+ }
+
+ private Flowable> entriesFromCommand(WCT writeCommand, InvocationContext ctx,
+ BiPredicate super WCT, Object> commandKeyPredicate) {
+ if (writeCommand instanceof DataWriteCommand) {
+ Object key = ((DataWriteCommand) writeCommand).getKey();
+ MVCCEntry entry = acquireKeyFromContext(ctx, writeCommand, key, commandKeyPredicate);
+ return entry != null ? Flowable.just(entry) : Flowable.empty();
+ } else {
+ if (writeCommand instanceof InvalidateCommand) {
+ return Flowable.empty();
+ }
+ // Assume multiple key command
+ return Flowable.fromIterable(writeCommand.getAffectedKeys())
+ .concatMapMaybe(key -> {
+ MVCCEntry entry = acquireKeyFromContext(ctx, writeCommand, key, commandKeyPredicate);
+ // We use an empty Flowable to symbolize a miss - which is filtered by ofType just below
+ return entry != null ? Maybe.just(entry) : Maybe.empty();
+ });
+ }
+ }
+
+ private MVCCEntry acquireKeyFromContext(InvocationContext ctx, WCT command, Object key,
+ BiPredicate super WCT, Object> commandKeyPredicate) {
+ if (commandKeyPredicate == null || commandKeyPredicate.test(command, key)) {
+ //noinspection unchecked
+ MVCCEntry entry = (MVCCEntry) ctx.lookupEntry(key);
+ if (entry.isChanged()) {
+ return entry;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Here just to create a lambda for method reuse of
+ * {@link #batchOperation(Flowable, InvocationContext, HandleFlowables)}
+ */
+ interface HandleFlowables {
+ CompletionStage handleFlowables(NonBlockingStore store, int publisherCount,
+ Flowable> removeFlowable,
+ Flowable>> putFlowable);
+ };
+
+ /**
+ * Provides a function that groups entries by their segments (via keyPartitioner).
+ */
+ private Function groupingFunction(Function toKeyFunction) {
+ return value -> keyPartitioner.getSegment(toKeyFunction.apply(value));
+ }
+
+ /**
+ * Returns how many segments the user must worry about when segmented or not.
+ * @param segmented whether the store is segmented
+ * @return how many segments the store must worry about
+ */
+ private int segmentCount(boolean segmented) {
+ return segmented ? segmentCount : 1;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return unavailableExceptionMessage == null;
+ }
+
+ @Override
+ public CompletionStage addSegments(IntSet segments) {
+ return Completable.using(
+ this::acquireReadLock,
+ ignore -> {
+ checkStoreAvailability();
+ if (log.isTraceEnabled()) {
+ log.tracef("Adding segments %s to stores", segments);
+ }
+ return Flowable.fromIterable(stores)
+ .filter(PersistenceManagerImpl::shouldInvokeSegmentMethods)
+ .flatMapCompletable(storeStatus -> Completable.fromCompletionStage(storeStatus.store.addSegments(segments)));
+ },
+ this::releaseReadLock
+ ).toCompletionStage(allSegmentedOrShared);
+ }
+
+ @Override
+ public CompletionStage removeSegments(IntSet segments) {
+ return Completable.using(
+ this::acquireReadLock,
+ ignore -> {
+ checkStoreAvailability();
+ if (log.isTraceEnabled()) {
+ log.tracef("Removing segments %s from stores", segments);
+ }
+ return Flowable.fromIterable(stores)
+ .filter(PersistenceManagerImpl::shouldInvokeSegmentMethods)
+ .flatMapCompletable(storeStatus -> Completable.fromCompletionStage(storeStatus.store.removeSegments(segments)));
+ },
+ this::releaseReadLock
+ ).toCompletionStage(allSegmentedOrShared);
+ }
+
+ private static boolean shouldInvokeSegmentMethods(StoreStatus storeStatus) {
+ return storeStatus.characteristics.contains(Characteristic.SEGMENTABLE) &&
+ !storeStatus.characteristics.contains(Characteristic.SHAREABLE);
+ }
+
+ public List> getAllStores(Predicate> predicate) {
+ long stamp = acquireReadLock();
+ try {
+ return stores.stream()
+ .filter(storeStatus -> predicate.test(storeStatus.characteristics))
+ .map(StoreStatus::store)
+ .collect(Collectors.toCollection(ArrayList::new));
+ } finally {
+ releaseReadLock(stamp);
+ }
+ }
+
+ /**
+ * Method must be here for augmentation to tell blockhound this method is okay to block
+ */
+ private long acquireReadLock() {
+ return lock.readLock();
+ }
+
+ /**
+ * Opposite of acquireReadLock here for symmetry
+ */
+ private void releaseReadLock(long stamp) {
+ lock.unlockRead(stamp);
+ }
+
+ private void checkStoreAvailability() {
+ if (!enabled) return;
+
+ String message = unavailableExceptionMessage;
+ if (message != null) {
+ throw new StoreUnavailableException(message);
+ }
+ }
+
+ static class StoreStatus {
+ final NonBlockingStore, ?> store;
+ final StoreConfiguration config;
+ final Set characteristics;
+ // This variable is protected by PersistenceManagerImpl#lock and also the fact that availability check can
+ // only be ran one at a time
+ boolean availability = true;
+
+ StoreStatus(NonBlockingStore, ?> store, StoreConfiguration config, Set characteristics) {
+ this.store = store;
+ this.config = config;
+ this.characteristics = characteristics;
+ }
+
+ NonBlockingStore store() {
+ return (NonBlockingStore) store;
+ }
+ }
+
+ boolean anyLocksHeld() {
+ return lock.isReadLocked() || lock.isWriteLocked();
+ }
+}