diff --git a/src/main/java/org/flag4j/concurrency/Configurations.java b/src/main/java/org/flag4j/concurrency/Configurations.java index 38a3a3d53..ea14fc6a3 100644 --- a/src/main/java/org/flag4j/concurrency/Configurations.java +++ b/src/main/java/org/flag4j/concurrency/Configurations.java @@ -29,7 +29,7 @@ /** * Configurations for standard and concurrent operations. */ -public abstract class Configurations { +public final class Configurations { private Configurations() { throw new IllegalStateException(ErrorMessages.getUtilityClassErrMsg()); } @@ -41,29 +41,24 @@ private Configurations() { /** * The default block size for blocked algorithms. */ - private static final int DEFAULT_BLOCK_SIZE = 64; + public static final int DEFAULT_BLOCK_SIZE = 64; /** * The default minimum recursive size for recursive algorithms. */ - private static final int DEFAULT_MIN_RECURSIVE_SIZE = 128; - + public static final int DEFAULT_MIN_RECURSIVE_SIZE = 128; /** * The block size to use in blocked algorithms. */ private static int blockSize = DEFAULT_BLOCK_SIZE; - /** - * The minimum size of tensor/matrix/vector to make recursive calls on in recursive algorithms. - */ - private static int minRecursiveSize = DEFAULT_MIN_RECURSIVE_SIZE; - /** * Sets the number of threads for use in concurrent operations as the number of processors available to the Java * virtual machine. Note that this value may change during runtime. This method will include logical cores so the value * returned may be higher than the number of physical cores on the machine if hyper-threading is enabled. *

- * This is implemented as: numThreads = {@link Runtime#availableProcessors() Runtime.getRuntime().availableProcessors()}; + * @implNote This is implemented as: + * numThreads = {@link Runtime#availableProcessors() Runtime.getRuntime().availableProcessors()}; * @return The new value of numThreads, i.e. the number of available processors. */ public static int setNumThreadsAsAvailableProcessors() { @@ -108,30 +103,11 @@ public static void setBlockSize(int blockSize) { } - /** - * Gets the minimum size of tensor/matrix/vector to make recursive calls on in recursive algorithms. - * @return minimum size of tensor/matrix/vector to make recursive calls on in recursive algorithms. - */ - public static int getMinRecursiveSize() { - return minRecursiveSize; - } - - - /** - * Sets the minimum size of tensor/matrix/vector to make recursive calls on in recursive algorithms. - * @param minRecursiveSize New minimum size. - */ - public static void setMinRecursiveSize(int minRecursiveSize) { - Configurations.minRecursiveSize = Math.max(1, minRecursiveSize); - } - - /** * Resets all configurations to their default values. */ public static void resetAll() { ThreadManager.setParallelismLevel(DEFAULT_NUM_THREADS); blockSize = DEFAULT_BLOCK_SIZE; - minRecursiveSize = DEFAULT_MIN_RECURSIVE_SIZE; } } diff --git a/src/main/java/org/flag4j/concurrency/TensorOperation.java b/src/main/java/org/flag4j/concurrency/TensorOperation.java new file mode 100644 index 000000000..bed5f6e44 --- /dev/null +++ b/src/main/java/org/flag4j/concurrency/TensorOperation.java @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2024. Jacob Watters + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.flag4j.concurrency; + + +/** + * Functional interface for general tensor operation. + */ +@FunctionalInterface +public interface TensorOperation { + + /** + * Applies a tensor operation over the specified index range. + * @param startIdx Staring index for operation. + * @param endIdx Ending index for operation. + */ + void apply(int startIdx, int endIdx); +} diff --git a/src/main/java/org/flag4j/concurrency/ThreadManager.java b/src/main/java/org/flag4j/concurrency/ThreadManager.java index 5d7a038a6..4012e9731 100644 --- a/src/main/java/org/flag4j/concurrency/ThreadManager.java +++ b/src/main/java/org/flag4j/concurrency/ThreadManager.java @@ -26,20 +26,16 @@ import org.flag4j.util.ErrorMessages; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.function.IntConsumer; -import java.util.logging.Level; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; import java.util.logging.Logger; -import java.util.stream.IntStream; /** * This class contains the base thread pool for all concurrent operations and several methods for managing the * pool. */ -public class ThreadManager { +public final class ThreadManager { private ThreadManager() { // Hide default constructor for utility class. throw new IllegalStateException(ErrorMessages.getUtilityClassErrMsg()); @@ -68,7 +64,7 @@ private ThreadManager() { /** * Thread pool for managing threads executing concurrent operations. */ - protected static ExecutorService threadPool = Executors.newFixedThreadPool(parallelismLevel, daemonFactory); + private static ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(parallelismLevel, daemonFactory); /** @@ -78,7 +74,7 @@ private ThreadManager() { */ protected static void setParallelismLevel(int parallelismLevel) { ThreadManager.parallelismLevel = Math.max(parallelismLevel, 1); - threadPool = Executors.newFixedThreadPool(parallelismLevel, daemonFactory); + threadPool.setCorePoolSize(parallelismLevel); } @@ -92,44 +88,110 @@ public static int getParallelismLevel() { /** - * Applies a concurrent loop to a function. - * @param startIndex Starting index for concurrent loop (inclusive). - * @param endIndex Ending index for concurrent loop (exclusive). - * @param function Function to apply each iteration. Function may be dependent on iteration index but should - * individual iterations should be independent of each other. + * Computes a specified tensor operation concurrently by evenly dividing work amoung available threads (specified by + * {@link Configurations#getNumThreads()}). + * @param totalSize Total size of the outer loop for the operation. + * @param operation Operation to be computed. */ - public static void concurrentLoop(int startIndex, int endIndex, IntConsumer function) { - try { - threadPool.submit(() -> IntStream.range(startIndex, endIndex).parallel().forEach(function)).get(); - } catch (InterruptedException | ExecutionException e) { - threadLogger.setLevel(Level.WARNING); - threadLogger.warning(e.getMessage()); - Thread.currentThread().interrupt(); + public static void concurrentOperation(final int totalSize, final TensorOperation operation) { + // Calculate chunk size. + int chunkSize = (totalSize + parallelismLevel - 1) / parallelismLevel; + List> futures = new ArrayList<>(parallelismLevel); + + for(int threadIndex = 0; threadIndex < parallelismLevel; threadIndex++) { + final int startIdx = threadIndex * chunkSize; + final int endIdx = Math.min(startIdx + chunkSize, totalSize); + + if(startIdx >= endIdx) break; // No more indices to process. + + futures.add(ThreadManager.threadPool.submit(() -> { + operation.apply(startIdx, endIdx); + })); + } + + // Wait for all tasks to complete. + for(Future future : futures) { + try { + future.get(); // Ensure all tasks are complete. + } catch (InterruptedException | ExecutionException e) { + // An exception occured. + threadLogger.warning(e.getMessage()); + Thread.currentThread().interrupt(); + } } } /** - * Applies a concurrent strided-loop to a function. - * @param startIndex Starting index for concurrent loop (inclusive). - * @param endIndex Ending index for concurrent loop (exclusive). - * @param step Step size for the index variable of the loop (i.e. the stride size). - * @param function Function to apply each iteration. Function may be dependent on iteration index but should - * individual iterations should be independent of each other. + * Computes a specified blocked tensor operation concurrently by evenly dividing work amoung available threads (specified by + * {@link Configurations#getNumThreads()}). + * @param totalSize Total size of the outer loop for the operation. + * @param blockSize Size of the block used in the blocekdOperation. + * @param blockedOperation Operation to be computed. + */ + public static void concurrentBlockedOperation(final int totalSize, final int blockSize, final TensorOperation blockedOperation) { + // Calculate chunk size for blocks. + int numBlocks = (totalSize + blockSize - 1) / blockSize; + List> futures = new ArrayList<>(parallelismLevel); + + for(int blockIndex = 0; blockIndex < numBlocks; blockIndex++) { + final int startBlock = blockIndex * blockSize; + final int endBlock = Math.min(startBlock + blockSize, totalSize); + + futures.add(threadPool.submit(() -> { + blockedOperation.apply(startBlock, endBlock); + })); + } + + // Wait for all tasks to complete. + for(Future future : futures) { + try { + future.get(); // Ensure all tasks are complete. + } catch (InterruptedException | ExecutionException e) { + // An exception occured. + threadLogger.warning(e.getMessage()); + Thread.currentThread().interrupt(); + } + } + } + + // TODO: TEMP FOR TESTING. + /** + * Executes a concurrent operation on a given range of indices. + * The operation is split across multiple threads, each handling a subset of the range. + * + * @param totalTasks The total number of tasks (e.g., rows in a matrix) to be processed. + * @param task A lambda expression or function that takes three arguments: start index, end index, and thread ID. + * This function represents the work to be done by each thread for its assigned range. */ - public static void concurrentLoop(int startIndex, int endIndex, int step, IntConsumer function) { - if(step <= 0) - throw new IllegalArgumentException(ErrorMessages.getNegValueErr(startIndex)); + public static void concurrentOperation(int totalTasks, TriConsumer task) { + int numThreads = Runtime.getRuntime().availableProcessors(); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + + int tasksPerThread = (totalTasks + numThreads - 1) / numThreads; // Ceiling division + + for (int threadId = 0; threadId < numThreads; threadId++) { + int startIdx = threadId * tasksPerThread; + int endIdx = Math.min(startIdx + tasksPerThread, totalTasks); + + if (startIdx < endIdx) { + final int finalThreadId = threadId; + executor.submit(() -> task.accept(startIdx, endIdx, finalThreadId)); + } + } + + executor.shutdown(); + try { - int range = endIndex - startIndex; - int iterations = range/step + ((range%step == 0) ? 0 : 1); - threadPool.submit(() -> IntStream.range(0, iterations).parallel().forEach( - i -> function.accept(startIndex + i*step)) - ).get(); - } catch (InterruptedException | ExecutionException e) { - threadLogger.setLevel(Level.WARNING); - threadLogger.warning(e.getMessage()); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); + throw new RuntimeException("Thread execution interrupted", e); } } + + @FunctionalInterface + public interface TriConsumer { + void accept(T t, U u, V v); + } } diff --git a/src/main/java/org/flag4j/core/Shape.java b/src/main/java/org/flag4j/core/Shape.java index 5757e730d..ceb3d3d21 100644 --- a/src/main/java/org/flag4j/core/Shape.java +++ b/src/main/java/org/flag4j/core/Shape.java @@ -24,7 +24,6 @@ package org.flag4j.core; -import org.flag4j.arrays.dense.Tensor; import org.flag4j.util.ArrayUtils; import org.flag4j.util.ParameterChecks; @@ -311,12 +310,4 @@ public String toString() { return joiner.toString(); } - - - public static void main(String[] args) { - Shape s = new Shape(); - Tensor t = new Tensor(s); - - System.out.println(t.entries.length); - } } diff --git a/src/main/java/org/flag4j/operations/dense/complex/ComplexDenseElemDiv.java b/src/main/java/org/flag4j/operations/dense/complex/ComplexDenseElemDiv.java index 96beef75d..bffcc0037 100644 --- a/src/main/java/org/flag4j/operations/dense/complex/ComplexDenseElemDiv.java +++ b/src/main/java/org/flag4j/operations/dense/complex/ComplexDenseElemDiv.java @@ -81,8 +81,11 @@ public static CNumber[] elemDivConcurrent(CNumber[] src1, Shape shape1, CNumber[ ParameterChecks.assertEqualShape(shape1, shape2); CNumber[] product = new CNumber[src1.length]; - ThreadManager.concurrentLoop(0, product.length, - (i)->product[i] = src1[i].div(src2[i])); + ThreadManager.concurrentOperation(product.length, (start, end)->{ + for(int i=start; iproduct[i] = src1[i].mult(src2[i]) - ); + ThreadManager.concurrentOperation(product.length, ((startIdx, endIdx) -> { + for(int i=startIdx; i { - int src1IndexStart = i*cols2; - int destIndexStart = i*rows2; - int end = src1IndexStart + cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i{ - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=0; ii - * WARNING: These methods do not perform any sanity checks. + *

This class contains several low level methods for computing complex matrix-matrix multiplications. This includes transpose + * multiplications.

+ * + *

WARNING: These methods do not perform any sanity checks.

*/ public final class ComplexDenseMatrixMultiplication { @@ -72,11 +73,14 @@ public static CNumber[] standard(CNumber[] src1, Shape shape1, CNumber[] src2, S src1Index = src1IndexStart; destIndex = destIndexStart + j; end = src1Index + rows2; + CNumber sum = dest[destIndex]; while(src1Index { - int src1IndexStart = i*cols1; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int src1IndexStart = i*rows2; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int iBound = Math.min(ii + blockSize, rows1); - - for(int kk = 0; kk { + for(int ii=startIdx; ii { - int src1Index = i*cols1; - int src2Index = 0; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii+blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axis1, axis2); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i]; // Apply transpose for the element + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axes); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i]; // Apply transpose for the element + // Compute transpose concurrently. + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int srcIndex = i; - int destIndex = i*numRows; - int end = destIndex + numRows; - - while (destIndex < end) { - dest[destIndex++] = src[srcIndex]; - srcIndex += numCols; + ThreadManager.concurrentOperation(numCols, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int j=0; j { + for(int i=startIdx; i { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axis1, axis2); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i].conj(); // Apply transpose for the element + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axes); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i].conj(); // Apply conjugate transpose for the element + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int srcIndex = i; - int destIndex = i*numRows; - int end = destIndex + numRows; - - while (destIndex < end) { - dest[destIndex++] = src[srcIndex].conj(); - srcIndex += numCols; + ThreadManager.concurrentOperation(numCols, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int j=0; j { + for(int i=startIdx; i product[i] = src1[i]/src2[i] - ); + ThreadManager.concurrentOperation(product.length, (startIdx, endIdx) -> { + for(int i=startIdx; i product[i] = src1[i]*src2[i] - ); + ThreadManager.concurrentOperation(product.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int src1IndexStart = i*cols2; - int destIndexStart = i*rows2; - int end = src1IndexStart + cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i{ - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int src1IndexStart = i*cols1; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startRow, endRow) -> { + for (int i = startRow; i < endRow; i++) { + int src1IndexStart = i * cols1; + int destIndexStart = i * cols2; - for(int j=0; j { - int src1IndexStart = i*rows2; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startRow, endRow) -> { + for(int i = startRow; i { - int iBound = Math.min(ii + blockSize, rows1); - - for(int jj = 0; jj { + for (int jj = 0; jj < cols2; jj += blockSize) { int jBound = Math.min(jj + blockSize, cols2); - for(int kk = 0; kk { - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (blockStart, blockEnd) -> { + for(int ii=blockStart; ii { - int src1Index = i*cols1; - int src2Index = 0; + ThreadManager.concurrentOperation(rows1, (rowStart, rowEnd) -> { + for(int i=rowStart; i { - int iBound = Math.min(ii+blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (rowStart, rowEnd) -> { + for(int ii=rowStart; ii { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axes); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i]; // Apply transpose for the element + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int[] destIndices = shape.getIndices(i); - ArrayUtils.swap(destIndices, axis1, axis2); // Compute destination indices. - dest[destShape.entriesIndex(destIndices)] = src[i]; // Apply transpose for the element + ThreadManager.concurrentOperation(src.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int srcIndex = i; - int destIndex = i*numRows; - int end = destIndex + numRows; - - while (destIndex < end) { - dest[destIndex++] = src[srcIndex]; - srcIndex += numCols; + ThreadManager.concurrentOperation(numCols, (startIdx, endIdx) -> { + for(int i=startIdx; i{ - int blockHeight = Math.min(ii+blockSize, numRows) - ii; - int srcIndexStart = ii*numCols; - int destIndexStart = ii; - - for(int jj=0; jj { + for(int ii=startIdx; ii= warmupRuns) { - bTime += (bEnd-bStart)*10e-6; - sTime += (sEnd-sStart)*10e-6; - } - } - - System.out.printf("Shape: (%d, %d)\n\n", rows, cols); - System.out.printf("Standard Time: %.5f ms\n", sTime/numRuns); - System.out.printf("Blocked Time: %.5f ms\n", bTime/numRuns); - } } diff --git a/src/main/java/org/flag4j/operations/dense/real/RealDenseVectorOperations.java b/src/main/java/org/flag4j/operations/dense/real/RealDenseVectorOperations.java index 81865c709..2dc0de633 100644 --- a/src/main/java/org/flag4j/operations/dense/real/RealDenseVectorOperations.java +++ b/src/main/java/org/flag4j/operations/dense/real/RealDenseVectorOperations.java @@ -97,12 +97,14 @@ public static double[] outerProduct(double[] src1, double[] src2) { public static double[] outerProductConcurrent(double[] src1, double[] src2) { double[] dest = new double[src1.length*src2.length]; - ThreadManager.concurrentLoop(0, src1.length, (int i)->{ - int destIndex = i*src2.length; - double v1 = src1[i]; - - for(double v2 : src2) { - dest[destIndex++] = v1*v2; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; iproduct[i] = src1[i].div(src2[i]) - ); + ThreadManager.concurrentOperation(product.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - double divisor = src2[i].re*src2[i].re + src2[i].im*src2[i].im; - quotient[i] = new CNumber(src1[i]*src2[i].re / divisor, -src1[i]*src2[i].im / divisor); + ThreadManager.concurrentOperation(quotient.length, (startIdx, endIdx) -> { + for(int i=startIdx; iproduct[i] = src1[i].mult(src2[i]) - ); + ThreadManager.concurrentOperation(product.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { + for(int i=startIdx; i { - int src1IndexStart = i*cols2; - int destIndexStart = i*rows2; - int end = src1IndexStart + cols2; - - for(int j=0; j{ - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int src1IndexStart = i*cols2; - int destIndexStart = i*rows2; - int end = src1IndexStart + cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { + for(int ii=startIdx; ii{ - int iBound = Math.min(ii + blockSize, rows1); + for(int jj = 0; jj { - int src1IndexStart = i*cols1; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int src1IndexStart = i*rows2; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int iBound = Math.min(ii + blockSize, rows1); + final int blockSize = Configurations.getBlockSize(); - for(int kk = 0; kk { + for(int ii=startIdx; ii { - int src1Index = i*cols1; - int src2Index = 0; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii+blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int src1IndexStart = i*cols1; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int src1IndexStart = i*rows2; - int destIndexStart = i*cols2; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii + blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - int iBound = Math.min(ii + blockSize, rows1); + final int blockSize = Configurations.getBlockSize(); - for(int kk = 0; kk { + for(int ii=startIdx; ii { - int src1Index = i*cols1; - int src2Index = 0; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int iBound = Math.min(ii+blockSize, rows1); + ThreadManager.concurrentBlockedOperation(rows1, blockSize, (startIdx, endIdx) -> { + for(int ii=startIdx; ii { - // Loop over non-zero entries of sparse matrix. - for(int j=0; j { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - CNumber val = dest[i]; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int jj=0; jj { + for(int ii=startIdx; ii { - double[] localResult = new double[cols2]; // Store the result for the local thread. - int destRow = i*cols2; - int src1Row = i*cols1; + ThreadManager.concurrentOperation(rows1, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int r1 = rowIndices[i]; - int c1 = colIndices[i]; - - int destRowStart = r1 * cols2; - int src2RowStart = c1 * cols2; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i destAtomic = new AtomicReferenceArray<>(rows1 * cols2); - for(int i=0; i { - int row = rowIndices1[i]; - int col = colIndices1[i]; - int destRow = row*cols2; - int src2Row = col*cols2; - for(int j=0; j { - for(int j=0; j { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int jj=0; jj { + for(int ii=startIdx; ii { - // Loop over non-zero entries of sparse matrix. - for(int j=0; j { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(var i=startIdx; i { - // Loop over non-zero entries of sparse matrix. - for(int j=0; j { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int j=0; j { + for(int i=startIdx; i { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; - CNumber product = src2[col].mult(src1[i]); - - synchronized (dest) { - dest[row] = dest[row].add(product); + synchronized (dest) { + dest[row] = dest[row].add(product); + } } }); @@ -516,13 +544,19 @@ public static CNumber[] concurrentBlockedVector(double[] src1, Shape shape1, CNu ArrayUtils.fill(dest, 0); // Blocked matrix-vector multiply - ThreadManager.concurrentLoop(0, rows1, bsize, ii -> { - for(int jj=0; jj { + for(int ii=startIdx; ii { - for(int j=0; j { + for(int i=startIdx; i { - int row = rowIndices[i]; - int col = colIndices[i]; - CNumber product = src1[i].mult(src2[col]); + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - for(int jj=0; jj { + for(int ii=startIdx; ii> map = SparseUtils.createMap(src2.length, rowIndices2); - ThreadManager.concurrentLoop(0, src1.length, (i)->{ - int c1 = colIndices1[i]; // = k - - // Check if any values in src2 have the same row index as the column index of the value in src1. - if(map.containsKey(c1)) { - int r1 = rowIndices1[i]; // = i - int rowIdx = r1*cols2; - - for(int j : map.get(c1)) { // Iterate over all entries in src2 where rowIndices[j] == colIndices[j] - int idx = rowIdx + colIndices2[j]; - destMap.put(idx, destMap.getOrDefault(idx, CNumber.ZERO).add(src1[i].mult(src2[j]))); + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int r1 = rowIndices1[i]; // = i - int c1 = colIndices1[i]; // = k + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i> map = SparseUtils.createMap(src2.length, rowIndices2); - ThreadManager.concurrentLoop(0, src1.length, (i)->{ - int c1 = colIndices1[i]; // = k + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int r1 = rowIndices1[i]; // = i - int c1 = colIndices1[i]; // = k - - for(int j=0; j { + for(int i=startIdx; i> map = SparseUtils.createMap(src2.length, rowIndices2); - ThreadManager.concurrentLoop(0, src1.length, (i)->{ - int c1 = colIndices1[i]; // = k + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int r1 = rowIndices1[i]; // = i - int c1 = colIndices1[i]; // = k - - for(int j=0; j { + for(int i=startIdx; i> map = SparseUtils.createMap(src2.length, rowIndices2); - ThreadManager.concurrentLoop(0, src1.length, (i)->{ - int c1 = colIndices1[i]; // = k + ThreadManager.concurrentOperation(src1.length, (startIdx, endIdx) -> { + for(int i=startIdx; i { - int r1 = rowIndices1[i]; // = i - int c1 = colIndices1[i]; // = k - - for(int j=0; j { + for(int i=startIdx; inot verify that {@code indices} is a permutation. + * + * @param src Array to swap elements within. + * @param indices Array containing indices of the permutation. If the {@code src} array has length {@code N}, then + * the array must be a permutation of {@code {0, 1, 2, ..., N-1}}. + */ + public static void swapUnsafe(final int[] src, final int[] indices) { + int[] swapped = new int[src.length]; + int i = 0; + + for(int value : indices) + swapped[i++] = src[value]; + + System.arraycopy(swapped, 0, src, 0, swapped.length); + } + + /** * Swaps to elements in an array. This is done in place. * diff --git a/target/flag4j-v0.1.0-beta.jar b/target/flag4j-v0.1.0-beta.jar index c4c2f7ad1..af96840b8 100644 Binary files a/target/flag4j-v0.1.0-beta.jar and b/target/flag4j-v0.1.0-beta.jar differ