From 3184b9a19d4474e4c836b625092c7a5e59ca80fd Mon Sep 17 00:00:00 2001 From: e-strauss Date: Thu, 29 Jan 2026 19:49:15 +0100 Subject: [PATCH] [SYSTEMDS-3902] Sparse data transfer: Python --> Java This commit implements optimized data transfer for Scipy sparse matrices from Python to the Java runtime. Key changes include the addition of `convertSciPyCSRToMB` and `convertSciPyCOOToMB` in the Java utility layer to directly handle compressed sparse row and coordinate formats. On the Python side, the `SystemDSContext` now supports a `sparse_data_transfer` flag and a new `from_py` method to unify data ingestion. These updates allow sparse data to be transferred without being converted to dense arrays, improving efficiency. Additionally, several data conversion methods were refactored for better maintenance. --- .../runtime/util/Py4jConverterUtils.java | 42 ++ .../systemds/context/systemds_context.py | 79 ++- src/main/python/systemds/utils/converters.py | 327 ++++++----- .../tests/matrix/test_block_converter.py | 145 ++++- .../frame/array/Py4jConverterUtilsTest.java | 240 --------- .../utils/Py4jConverterUtilsTest.java | 510 ++++++++++++++++++ 6 files changed, 952 insertions(+), 391 deletions(-) delete mode 100644 src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java create mode 100644 src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java diff --git a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java index 7faee722d04..f705a3c9c00 100644 --- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java @@ -23,6 +23,7 @@ import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; +import org.apache.log4j.Logger; import org.apache.sysds.common.Types; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.frame.data.columns.Array; @@ -35,6 +36,7 @@ * Utils for converting python data to java. */ public class Py4jConverterUtils { + private static final Logger LOG = Logger.getLogger(Py4jConverterUtils.class); public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, int clen) { return convertPy4JArrayToMB(data, rlen, clen, false, Types.ValueType.FP64); } @@ -63,6 +65,45 @@ public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] co return mb; } + public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) { + LOG.debug("Converting compressed sparse row matrix to MatrixBlock"); + MatrixBlock mb = new MatrixBlock(rlen, clen, true); + mb.allocateSparseRowsBlock(false); + ByteBuffer dataBuf = ByteBuffer.wrap(data); + dataBuf.order(ByteOrder.nativeOrder()); + ByteBuffer indicesBuf = ByteBuffer.wrap(indices); + indicesBuf.order(ByteOrder.nativeOrder()); + ByteBuffer indptrBuf = ByteBuffer.wrap(indptr); + indptrBuf.order(ByteOrder.nativeOrder()); + + // Read indptr array to get row boundaries + int[] rowPtrs = new int[rlen + 1]; + for(int i = 0; i <= rlen; i++) { + rowPtrs[i] = indptrBuf.getInt(); + } + + // Iterate through each row + for(int row = 0; row < rlen; row++) { + int startIdx = rowPtrs[row]; + int endIdx = rowPtrs[row + 1]; + + // Set buffer positions to the start of this row + dataBuf.position(startIdx * Double.BYTES); + indicesBuf.position(startIdx * Integer.BYTES); + + // Process all non-zeros in this row sequentially + for(int idx = startIdx; idx < endIdx; idx++) { + double val = dataBuf.getDouble(); + int colIndex = indicesBuf.getInt(); + mb.set(row, colIndex, val); + } + } + + mb.recomputeNonZeros(); + mb.examSparsity(); + return mb; + } + public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) { MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse); ret.allocateBlock(); @@ -208,6 +249,7 @@ private static void readBufferIntoArray(ByteBuffer buffer, Array array, Types public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) { byte[] ret = null; if(mb.isInSparseFormat()) { + LOG.debug("Converting sparse matrix to dense"); mb.sparseToDense(); } diff --git a/src/main/python/systemds/context/systemds_context.py b/src/main/python/systemds/context/systemds_context.py index 99a6cba57b8..7268e5b86aa 100644 --- a/src/main/python/systemds/context/systemds_context.py +++ b/src/main/python/systemds/context/systemds_context.py @@ -29,17 +29,19 @@ import sys import struct import traceback +import warnings from contextlib import contextmanager from glob import glob from queue import Queue from subprocess import PIPE, Popen from threading import Thread -from time import sleep, time +from time import sleep from typing import Dict, Iterable, Sequence, Tuple, Union from concurrent.futures import ThreadPoolExecutor import numpy as np import pandas as pd +import scipy.sparse as sp from py4j.java_gateway import GatewayParameters, JavaGateway, Py4JNetworkError from systemds.operator import ( Frame, @@ -77,6 +79,7 @@ class SystemDSContext(object): _FIFO_JAVA2PY_PIPES = [] _data_transfer_mode = 0 _multi_pipe_enabled = False + _sparse_data_transfer = True _logging_initialized = False _executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4) @@ -89,6 +92,7 @@ def __init__( py4j_logging_level: int = 50, data_transfer_mode: int = 1, multi_pipe_enabled: bool = False, + sparse_data_transfer: bool = True, ): """Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled Any new instance of this SystemDS Context, would start a separate new JVM. @@ -103,14 +107,26 @@ def __init__( The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL. :param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this, it can be verbose if not set high. - :param data_transfer_mode: default 0, + :param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes (on unix systems) + :param multi_pipe_enabled: default False, if True, use multiple pipes for data transfer + only used if data_transfer_mode is 1. + .. experimental:: This parameter is experimental and may be removed in a future version. + :param sparse_data_transfer: default True, if True, use optimized sparse matrix transfer, + if False, convert sparse matrices to dense arrays before transfer """ + if multi_pipe_enabled: + warnings.warn( + "The 'multi_pipe_enabled' parameter is experimental and may be removed in a future version.", + DeprecationWarning, + stacklevel=2, + ) self.__setup_logging(logging_level, py4j_logging_level) self.__start(port, capture_stdout) self.capture_stats(capture_statistics) self._log.debug("Started JVM and SystemDS python context manager") self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled) + self._sparse_data_transfer = sparse_data_transfer def __setup_data_transfer(self, data_transfer_mode=0, multi_pipe_enabled=False): self._data_transfer_mode = data_transfer_mode @@ -769,21 +785,65 @@ def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> Scalar: # therefore the output type is assign. return Scalar(self, v, assign=True) + def from_py( + self, + src: Union[np.ndarray, sp.spmatrix, pd.DataFrame, pd.Series], + *args: Sequence[VALID_INPUT_TYPES], + **kwargs: Dict[str, VALID_INPUT_TYPES], + ) -> Union[Matrix, Frame]: + """Generate DAGNode representing data given by a python object, which will be sent to SystemDS on need. + :param src: the python object + :param args: unnamed parameters + :param kwargs: named parameters + :return: A Matrix or Frame Node + """ + + def get_params(src, args, kwargs): + unnamed_params = ["'./tmp/{file_name}'"] + if len(src.shape) == 2: + named_params = {"rows": src.shape[0], "cols": src.shape[1]} + elif len(src.shape) == 1: + named_params = {"rows": src.shape[0], "cols": 1} + else: + # TODO Support tensors. + raise ValueError("Only two dimensional arrays supported") + unnamed_params.extend(args) + named_params.update(kwargs) + return unnamed_params, named_params + + if isinstance(src, np.ndarray): + unnamed_params, named_params = get_params(src, args, kwargs) + return Matrix(self, "read", unnamed_params, named_params, local_data=src) + elif isinstance(src, sp.spmatrix): + unnamed_params, named_params = get_params(src, args, kwargs) + return Matrix(self, "read", unnamed_params, named_params, local_data=src) + elif isinstance(src, pd.DataFrame): + unnamed_params, named_params = get_params(src, args, kwargs) + named_params["data_type"] = '"frame"' + return Frame(self, "read", unnamed_params, named_params, local_data=src) + elif isinstance(src, pd.Series): + unnamed_params, named_params = get_params(src, args, kwargs) + named_params["data_type"] = '"frame"' + return Frame(self, "read", unnamed_params, named_params, local_data=src) + else: + raise ValueError(f"Unsupported data type: {type(src)}") + def from_numpy( self, - mat: np.array, + mat: Union[np.ndarray, sp.spmatrix], *args: Sequence[VALID_INPUT_TYPES], **kwargs: Dict[str, VALID_INPUT_TYPES], ) -> Matrix: - """Generate DAGNode representing matrix with data given by a numpy array, which will be sent to SystemDS - on need. + """Generate DAGNode representing matrix with data given by a numpy array or scipy sparse matrix, + which will be sent to SystemDS on need. - :param mat: the numpy array + :param mat: the numpy array or scipy sparse matrix :param args: unnamed parameters :param kwargs: named parameters :return: A Matrix + Note: This method is deprecated. Use from_py instead. """ - + self._log.warning(f"Deprecated method from_numpy. Use from_py instead.") unnamed_params = ["'./tmp/{file_name}'"] if len(mat.shape) == 2: @@ -811,7 +871,9 @@ def from_pandas( :param args: unnamed parameters :param kwargs: named parameters :return: A Frame + Note: This method is deprecated. Use from_py instead. """ + self._log.warning(f"Deprecated method from_pandas. Use from_py instead.") unnamed_params = ["'./tmp/{file_name}'"] if len(df.shape) == 2: @@ -824,9 +886,6 @@ def from_pandas( unnamed_params.extend(args) named_params["data_type"] = '"frame"' - - self._pd_dataframe = df - named_params.update(kwargs) return Frame(self, "read", unnamed_params, named_params, local_data=df) diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 5f4619a8bbc..7cabf2aabdf 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -21,11 +21,13 @@ import struct from time import time +from typing import Union import numpy as np import pandas as pd import concurrent.futures from py4j.java_gateway import JavaClass, JavaGateway, JavaObject, JVMView import os +import scipy.sparse as sp # Constants _HANDSHAKE_OFFSET = 1000 @@ -86,129 +88,6 @@ def _pipe_receive_bytes(pipe, view, offset, end, batch_size_bytes, logger): offset += actual_size -def _pipe_receive_strings( - pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, logger=None -): - """ - Reads UTF-8 encoded strings from the pipe in batches. - Format: 0: - buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] - - # Read more data - t0 = time() - chunk = os.read(fd, batch_size) - t_io += time() - t0 - if not chunk: - raise IOError("Pipe read returned empty data unexpectedly") - - # Append new data to buffer - chunk_len = len(chunk) - if buf_remaining + chunk_len > len(buf): - # Grow buffer if needed - new_buf = bytearray(len(buf) * 2) - new_buf[:buf_remaining] = buf[:buf_remaining] - buf = new_buf - - buf[buf_remaining : buf_remaining + chunk_len] = chunk - buf_remaining += chunk_len - buf_pos = 0 - - # Read length prefix (little-endian int32) - # Note: length can be -1 (0xFFFFFFFF) to indicate null value - length = struct.unpack( - " 0: - buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] - buf_pos = 0 - - # Read more data until we have enough - bytes_needed = length - buf_remaining - while bytes_needed > 0: - t0 = time() - chunk = os.read(fd, min(batch_size, bytes_needed)) - t_io += time() - t0 - if not chunk: - raise IOError("Pipe read returned empty data unexpectedly") - - chunk_len = len(chunk) - if buf_remaining + chunk_len > len(buf): - # Grow buffer if needed - new_buf = bytearray(len(buf) * 2) - new_buf[:buf_remaining] = buf[:buf_remaining] - buf = new_buf - - buf[buf_remaining : buf_remaining + chunk_len] = chunk - buf_remaining += chunk_len - bytes_needed -= chunk_len - - # Decode the string - t0 = time() - if length == 0: - decoded_str = "" - else: - decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8") - t_decode += time() - t0 - - strings.append(decoded_str) - buf_pos += length - buf_remaining -= length - i += 1 - header_received = False - if buf_remaining == _STRING_LENGTH_PREFIX_SIZE: - # There is still data in the buffer, probably the handshake header - received = struct.unpack( - " _STRING_LENGTH_PREFIX_SIZE: - raise ValueError( - "Unexpected number of bytes in buffer: {}".format(buf_remaining) - ) - - t_total = time() - t_total_start - return (strings, t_total, t_decode, t_io, num_strings, header_received) - - def _get_numpy_value_type(jvm, dtype): """Maps numpy dtype to SystemDS ValueType.""" if dtype is np.dtype(np.uint8): @@ -280,37 +159,43 @@ def _pipe_write_task(_pipe_id, _pipe, memview, start, end): return fut_java.result() # Java returns MatrixBlock -def numpy_to_matrix_block(sds, np_arr: np.array): - """Converts a given numpy array, to internal matrix block representation. +def numpy_to_matrix_block(sds, arr: Union[np.ndarray, sp.spmatrix]): + """Converts a given numpy array or scipy sparse matrix to internal matrix block representation. :param sds: The current systemds context. - :param np_arr: the numpy array to convert to matrixblock. + :param arr: the numpy array or scipy sparse matrix to convert to matrixblock. """ - assert np_arr.ndim <= 2, "np_arr invalid, because it has more than 2 dimensions" - rows = np_arr.shape[0] - cols = np_arr.shape[1] if np_arr.ndim == 2 else 1 + assert arr.ndim <= 2, "np_arr invalid, because it has more than 2 dimensions" + rows = arr.shape[0] + cols = arr.shape[1] if arr.ndim == 2 else 1 if rows > 2147483647: raise ValueError("Matrix rows exceed maximum value (2147483647)") # If not numpy array then convert to numpy array - if not isinstance(np_arr, np.ndarray): - np_arr = np.asarray(np_arr, dtype=np.float64) + if isinstance(arr, sp.spmatrix): + if sds._sparse_data_transfer: + return scipy_sparse_matrix_to_matrix_block(sds, arr) + else: + # Convert sparse matrix to dense array + arr = arr.toarray() + if not isinstance(arr, np.ndarray): + arr = np.asarray(arr, dtype=np.float64) jvm: JVMView = sds.java_gateway.jvm ep = sds.java_gateway.entry_point # Flatten and set value type - if np_arr.dtype is np.dtype(np.uint8): - arr = np_arr.ravel() - elif np_arr.dtype is np.dtype(np.int32): - arr = np_arr.ravel() - elif np_arr.dtype is np.dtype(np.float32): - arr = np_arr.ravel() + if arr.dtype is np.dtype(np.uint8): + arr = arr.ravel() + elif arr.dtype is np.dtype(np.int32): + arr = arr.ravel() + elif arr.dtype is np.dtype(np.float32): + arr = arr.ravel() else: - arr = np_arr.ravel().astype(np.float64) + arr = arr.ravel().astype(np.float64) - value_type = _get_numpy_value_type(jvm, np_arr.dtype) + value_type = _get_numpy_value_type(jvm, arr.dtype) if sds._data_transfer_mode == 1: mv = memoryview(arr).cast("B") @@ -334,7 +219,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array): ) else: return _transfer_matrix_block_multi_pipe( - sds, mv, arr, np_arr, total_bytes, rows, cols, value_type, ep, jvm + sds, mv, arr, arr, total_bytes, rows, cols, value_type, ep, jvm ) else: # Prepare byte buffer and send data to java via Py4J @@ -343,6 +228,45 @@ def numpy_to_matrix_block(sds, np_arr: np.array): return j_class.convertPy4JArrayToMB(buf, rows, cols, value_type) +def scipy_sparse_matrix_to_matrix_block(sds, arr: sp.spmatrix): + """Converts a given scipy sparse matrix to an internal matrix block representation. + + :param sds: The current systemds context. + :param arr: The scipy sparse matrix to convert to matrixblock. + """ + jvm: JVMView = sds.java_gateway.jvm + + if sds._data_transfer_mode == 1: + # single pipe implementation + pass + else: + # py4j implementation + j_class: JavaClass = jvm.org.apache.sysds.runtime.util.Py4jConverterUtils + if isinstance(arr, sp.csr_matrix): + data = arr.data.tobytes() + indices = arr.indices.tobytes() + indptr = arr.indptr.tobytes() + nnz = arr.nnz + rows = arr.shape[0] + cols = arr.shape[1] + # convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) + return j_class.convertSciPyCSRToMB(data, indices, indptr, rows, cols, nnz) + elif isinstance(arr, sp.coo_matrix): + data = arr.data.tobytes() + row = arr.row.tobytes() + col = arr.col.tobytes() + nnz = arr.nnz + rows = arr.shape[0] + cols = arr.shape[1] + # convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, int rlen, int clen, int nnz) + return j_class.convertSciPyCOOToMB(data, row, col, rows, cols, nnz) + else: + sds._log.info( + f"Converting unsupported sparse matrix type: {type(arr).__name__} to CSR for efficient transfer." + ) + return scipy_sparse_matrix_to_matrix_block(sds, arr.tocsr()) + + def matrix_block_to_numpy(sds, mb: JavaObject): """Converts a MatrixBlock object in the JVM to a numpy array. @@ -761,6 +685,129 @@ def _pipe_transfer_strings(pipe, pd_series, batch_size=_DEFAULT_BATCH_SIZE_BYTES return (t_total, t_encoding, t_packing, t_io, num_strings) +def _pipe_receive_strings( + pipe, num_strings, batch_size=_DEFAULT_BATCH_SIZE_BYTES, pipe_id=0, logger=None +): + """ + Reads UTF-8 encoded strings from the pipe in batches. + Format: 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + + # Read more data + t0 = time() + chunk = os.read(fd, batch_size) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + # Append new data to buffer + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + buf_pos = 0 + + # Read length prefix (little-endian int32) + # Note: length can be -1 (0xFFFFFFFF) to indicate null value + length = struct.unpack( + " 0: + buf[:buf_remaining] = buf[buf_pos : buf_pos + buf_remaining] + buf_pos = 0 + + # Read more data until we have enough + bytes_needed = length - buf_remaining + while bytes_needed > 0: + t0 = time() + chunk = os.read(fd, min(batch_size, bytes_needed)) + t_io += time() - t0 + if not chunk: + raise IOError("Pipe read returned empty data unexpectedly") + + chunk_len = len(chunk) + if buf_remaining + chunk_len > len(buf): + # Grow buffer if needed + new_buf = bytearray(len(buf) * 2) + new_buf[:buf_remaining] = buf[:buf_remaining] + buf = new_buf + + buf[buf_remaining : buf_remaining + chunk_len] = chunk + buf_remaining += chunk_len + bytes_needed -= chunk_len + + # Decode the string + t0 = time() + if length == 0: + decoded_str = "" + else: + decoded_str = buf[buf_pos : buf_pos + length].decode("utf-8") + t_decode += time() - t0 + + strings.append(decoded_str) + buf_pos += length + buf_remaining -= length + i += 1 + header_received = False + if buf_remaining == _STRING_LENGTH_PREFIX_SIZE: + # There is still data in the buffer, probably the handshake header + received = struct.unpack( + " _STRING_LENGTH_PREFIX_SIZE: + raise ValueError( + "Unexpected number of bytes in buffer: {}".format(buf_remaining) + ) + + t_total = time() - t_total_start + return (strings, t_total, t_decode, t_io, num_strings, header_received) + + def _get_elem_size_for_type(d_type): """Returns the element size in bytes for a given SystemDS type.""" return { diff --git a/src/main/python/tests/matrix/test_block_converter.py b/src/main/python/tests/matrix/test_block_converter.py index ef4b28b1bc4..afa9ec9963d 100644 --- a/src/main/python/tests/matrix/test_block_converter.py +++ b/src/main/python/tests/matrix/test_block_converter.py @@ -26,6 +26,7 @@ from py4j.java_gateway import JVMView from systemds.context import SystemDSContext from systemds.utils.converters import matrix_block_to_numpy, numpy_to_matrix_block +import scipy.sparse as sp class Test_MatrixBlockConverter(unittest.TestCase): @@ -35,7 +36,9 @@ class Test_MatrixBlockConverter(unittest.TestCase): @classmethod def setUpClass(cls): - cls.sds = SystemDSContext(capture_stdout=True, logging_level=50) + cls.sds = SystemDSContext( + capture_stdout=True, logging_level=50, data_transfer_mode=0 + ) @classmethod def tearDownClass(cls): @@ -70,10 +73,150 @@ def test_random_nxk(self): array = np.array([rng.standard_normal(n) for x in range(k)]) self.convert_back_and_forth(array) + def test_random_sparse_csr_nxn(self): + n = 10 + array = sp.rand(n, n, density=0.1, format="csr") + self.convert_back_and_forth(array) + + def test_sparse_csr_rectangular(self): + """Test CSR conversion with rectangular matrices""" + array = sp.rand(5, 10, density=0.2, format="csr") + self.convert_back_and_forth(array) + + def test_sparse_csr_known_values(self): + """Test CSR conversion with a known sparse matrix""" + # Create a known CSR matrix + data = np.array([1.0, 2.0, 3.0, 4.0]) + row = np.array([0, 0, 1, 2]) + col = np.array([0, 2, 1, 2]) + array = sp.csr_matrix((data, (row, col)), shape=(3, 3)) + self.convert_back_and_forth(array) + + def test_empty_dense_0x0(self): + """Test conversion of empty 0x0 dense matrix""" + array = np.array([]).reshape(0, 0) + self.convert_back_and_forth(array) + + def test_empty_dense_0xn(self): + """Test conversion of empty matrix with zero rows""" + array = np.array([]).reshape(0, 5) + self.convert_back_and_forth(array) + + def test_empty_dense_nx0(self): + """Test conversion of empty matrix with zero columns""" + array = np.array([]).reshape(5, 0) + self.convert_back_and_forth(array) + + def test_sparse_csr_empty_rows(self): + """Test CSR conversion with rows that have no non-zero entries""" + # 4x3 matrix: row 0 has 2 entries, row 1 is empty, row 2 has 1 entry, row 3 is empty + data = np.array([1.0, 2.0, 3.0]) + row = np.array([0, 0, 2]) + col = np.array([0, 2, 1]) + array = sp.csr_matrix((data, (row, col)), shape=(4, 3)) + self.convert_back_and_forth(array) + + def test_sparse_csr_first_and_last_row_empty(self): + """Test CSR with first and last rows empty""" + data = np.array([1.0, 2.0]) + row = np.array([1, 1]) + col = np.array([0, 1]) + array = sp.csr_matrix((data, (row, col)), shape=(3, 2)) + self.convert_back_and_forth(array) + + def test_sparse_csr_all_zeros(self): + """Test CSR with no non-zero entries (empty structure)""" + array = sp.csr_matrix((3, 4)) + self.convert_back_and_forth(array) + + def test_sparse_coo_empty_rows(self): + """Test COO conversion with empty rows""" + data = np.array([1.0, 2.0]) + row = np.array([0, 2]) + col = np.array([1, 1]) + array = sp.coo_matrix((data, (row, col)), shape=(4, 2)) + self.convert_back_and_forth(array) + + def test_dense_all_zeros(self): + """Test dense matrix with all zeros""" + array = np.zeros((4, 3)) + self.convert_back_and_forth(array) + + def test_dense_single_row(self): + """Test dense matrix with single row (1xn)""" + array = np.array([[1.0, 2.0, 3.0]]) + self.convert_back_and_forth(array) + + def test_dense_single_column(self): + """Test dense matrix with single column (nx1)""" + array = np.array([[1.0], [2.0], [3.0]]) + self.convert_back_and_forth(array) + + def test_random_sparse_coo_nxn(self): + n = 10 + array = sp.rand(n, n, density=0.1, format="coo") + self.convert_back_and_forth(array) + + def test_sparse_csr_empty_0x0(self): + """Test empty 0x0 CSR matrix""" + array = sp.csr_matrix((0, 0)) + self.convert_back_and_forth(array) + + def test_sparse_csr_empty_0xn(self): + """Test CSR with zero rows""" + array = sp.csr_matrix((0, 4)) + self.convert_back_and_forth(array) + + def test_sparse_csr_empty_nx0(self): + """Test CSR with zero columns""" + array = sp.csr_matrix((4, 0)) + self.convert_back_and_forth(array) + + def test_sparse_csr_single_element(self): + """Test 1x1 CSR with one non-zero""" + array = sp.csr_matrix(([3.14], ([0], [0])), shape=(1, 1)) + self.convert_back_and_forth(array) + + def test_sparse_csr_single_row(self): + """Test 1xn CSR (single row)""" + array = sp.csr_matrix(([1.0, 2.0], ([0, 0], [0, 2])), shape=(1, 4)) + self.convert_back_and_forth(array) + + def test_sparse_csr_single_column(self): + """Test nx1 CSR (single column)""" + array = sp.csr_matrix(([1.0, 2.0], ([0, 2], [0, 0])), shape=(4, 1)) + self.convert_back_and_forth(array) + + def test_sparse_csr_empty_columns(self): + """Test CSR where some columns have no non-zero entries""" + # 3x4 matrix: only columns 0 and 2 have entries + data = np.array([1.0, 2.0, 3.0]) + row = np.array([0, 1, 2]) + col = np.array([0, 2, 2]) + array = sp.csr_matrix((data, (row, col)), shape=(3, 4)) + self.convert_back_and_forth(array) + + def test_sparse_coo_single_element(self): + """Test COO with single non-zero""" + array = sp.coo_matrix(([1.0], ([2], [3])), shape=(4, 5)) + self.convert_back_and_forth(array) + + def test_sparse_coo_all_zeros(self): + """Test COO with no non-zero entries""" + array = sp.coo_matrix((2, 3)) + self.convert_back_and_forth(array) + + def test_sparse_csc_rectangular(self): + """Test CSC conversion (fallback path: converted to dense in converter)""" + array = sp.csc_matrix(([1.0, 2.0, 3.0], ([0, 1, 2], [0, 0, 1])), shape=(3, 2)) + self.convert_back_and_forth(array) + def convert_back_and_forth(self, array): matrix_block = numpy_to_matrix_block(self.sds, array) # use the ability to call functions on matrix_block. returned = matrix_block_to_numpy(self.sds, matrix_block) + if isinstance(array, sp.spmatrix): + array = array.toarray() self.assertTrue(np.allclose(array, returned)) diff --git a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java deleted file mode 100644 index 466c3337d83..00000000000 --- a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.test.component.frame.array; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.charset.StandardCharsets; - -import org.apache.sysds.common.Types; -import org.apache.sysds.common.Types.ValueType; -import org.apache.sysds.runtime.util.Py4jConverterUtils; -import org.apache.sysds.runtime.frame.data.columns.Array; -import org.junit.Test; - -public class Py4jConverterUtilsTest { - - @Test - public void testConvertUINT8() { - int numElements = 4; - byte[] data = {1, 2, 3, 4}; - Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.UINT8); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1, result.get(0)); - assertEquals(2, result.get(1)); - assertEquals(3, result.get(2)); - assertEquals(4, result.get(3)); - } - - @Test - public void testConvertINT32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putInt(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1, result.get(0)); - assertEquals(2, result.get(1)); - assertEquals(3, result.get(2)); - assertEquals(4, result.get(3)); - } - - @Test - public void testConvertINT64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putLong((long) i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1L, result.get(0)); - assertEquals(2L, result.get(1)); - assertEquals(3L, result.get(2)); - assertEquals(4L, result.get(3)); - } - - - @Test - public void testConvertHASH32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putInt(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals("1", result.get(0)); - assertEquals("2", result.get(1)); - assertEquals("3", result.get(2)); - assertEquals("4", result.get(3)); - } - - @Test - public void testConvertHASH64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(int i = 1; i <= numElements; i++) { - buffer.putLong((long) i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals("1", result.get(0)); - assertEquals("2", result.get(1)); - assertEquals("3", result.get(2)); - assertEquals("4", result.get(3)); - } - - @Test - public void testConvertFP32() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(float i = 1.1f; i <= numElements + 1; i += 1.0) { - buffer.putFloat(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP32); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1.1f, result.get(0)); - assertEquals(2.1f, result.get(1)); - assertEquals(3.1f, result.get(2)); - assertEquals(4.1f, result.get(3)); - } - - @Test - public void testConvertFP64() { - int numElements = 4; - ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * numElements); - buffer.order(ByteOrder.nativeOrder()); - for(double i = 1.1; i <= numElements + 1; i += 1.0) { - buffer.putDouble(i); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP64); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(1.1, result.get(0)); - assertEquals(2.1, result.get(1)); - assertEquals(3.1, result.get(2)); - assertEquals(4.1, result.get(3)); - } - - @Test - public void testConvertBoolean() { - int numElements = 4; - byte[] data = {1, 0, 1, 0}; - Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.BOOLEAN); - assertNotNull(result); - assertEquals(4, result.size()); - assertEquals(true, result.get(0)); - assertEquals(false, result.get(1)); - assertEquals(true, result.get(2)); - assertEquals(false, result.get(3)); - } - - @Test - public void testConvertString() { - int numElements = 2; - String[] strings = {"hello", "world"}; - ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() + 4 + strings[1].length()); - buffer.order(ByteOrder.LITTLE_ENDIAN); - for(String s : strings) { - buffer.putInt(s.length()); - buffer.put(s.getBytes(StandardCharsets.UTF_8)); - } - Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.STRING); - assertNotNull(result); - assertEquals(2, result.size()); - assertEquals("hello", result.get(0)); - assertEquals("world", result.get(1)); - } - - @Test - public void testConvertChar() { - char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'}; - ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * c.length); - buffer.order(ByteOrder.LITTLE_ENDIAN); - for(char s : c) { - buffer.putChar(s); - } - Array result = Py4jConverterUtils.convert(buffer.array(), c.length, Types.ValueType.CHARACTER); - assertNotNull(result); - assertEquals(c.length, result.size()); - - for(int i = 0; i < c.length; i++) { - assertEquals(c[i], result.get(i)); - } - } - - @Test - public void testConvertRow() { - int numElements = 4; - byte[] data = {1, 2, 3, 4}; - Object[] row = Py4jConverterUtils.convertRow(data, numElements, Types.ValueType.UINT8); - assertNotNull(row); - assertEquals(4, row.length); - assertEquals(1, row[0]); - assertEquals(2, row[1]); - assertEquals(3, row[2]); - assertEquals(4, row[3]); - } - - @Test - public void testConvertFused() { - int numElements = 1; - byte[] data = {1, 2, 3, 4}; - Types.ValueType[] valueTypes = {ValueType.UINT8, ValueType.UINT8, ValueType.UINT8, ValueType.UINT8}; - Array[] arrays = Py4jConverterUtils.convertFused(data, numElements, valueTypes); - assertNotNull(arrays); - assertEquals(4, arrays.length); - for(int i = 0; i < 4; i++) { - assertEquals(1 + i, arrays[i].get(0)); - } - } - - @Test(expected = Exception.class) - public void nullData() { - Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN); - } - - @Test(expected = Exception.class) - public void nullValueType() { - Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null); - } - - @Test(expected = Exception.class) - public void unknownValueType() { - Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, ValueType.UNKNOWN); - } -} diff --git a/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java new file mode 100644 index 00000000000..2399f695a7d --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/utils/Py4jConverterUtilsTest.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; + +import org.apache.sysds.common.Types; +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.Py4jConverterUtils; +import org.junit.Test; + +public class Py4jConverterUtilsTest { + + @Test + public void testConvertUINT8() { + int numElements = 4; + byte[] data = {1, 2, 3, 4}; + Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.UINT8); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1, result.get(0)); + assertEquals(2, result.get(1)); + assertEquals(3, result.get(2)); + assertEquals(4, result.get(3)); + } + + @Test + public void testConvertINT32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putInt(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1, result.get(0)); + assertEquals(2, result.get(1)); + assertEquals(3, result.get(2)); + assertEquals(4, result.get(3)); + } + + @Test + public void testConvertINT64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putLong((long) i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.INT64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1L, result.get(0)); + assertEquals(2L, result.get(1)); + assertEquals(3L, result.get(2)); + assertEquals(4L, result.get(3)); + } + + + @Test + public void testConvertHASH32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putInt(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals("1", result.get(0)); + assertEquals("2", result.get(1)); + assertEquals("3", result.get(2)); + assertEquals("4", result.get(3)); + } + + @Test + public void testConvertHASH64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(int i = 1; i <= numElements; i++) { + buffer.putLong((long) i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.HASH64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals("1", result.get(0)); + assertEquals("2", result.get(1)); + assertEquals("3", result.get(2)); + assertEquals("4", result.get(3)); + } + + @Test + public void testConvertFP32() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(float i = 1.1f; i <= numElements + 1; i += 1.0) { + buffer.putFloat(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP32); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1.1f, result.get(0)); + assertEquals(2.1f, result.get(1)); + assertEquals(3.1f, result.get(2)); + assertEquals(4.1f, result.get(3)); + } + + @Test + public void testConvertFP64() { + int numElements = 4; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * numElements); + buffer.order(ByteOrder.nativeOrder()); + for(double i = 1.1; i <= numElements + 1; i += 1.0) { + buffer.putDouble(i); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.FP64); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(1.1, result.get(0)); + assertEquals(2.1, result.get(1)); + assertEquals(3.1, result.get(2)); + assertEquals(4.1, result.get(3)); + } + + @Test + public void testConvertBoolean() { + int numElements = 4; + byte[] data = {1, 0, 1, 0}; + Array result = Py4jConverterUtils.convert(data, numElements, Types.ValueType.BOOLEAN); + assertNotNull(result); + assertEquals(4, result.size()); + assertEquals(true, result.get(0)); + assertEquals(false, result.get(1)); + assertEquals(true, result.get(2)); + assertEquals(false, result.get(3)); + } + + @Test + public void testConvertString() { + int numElements = 2; + String[] strings = {"hello", "world"}; + ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() + 4 + strings[1].length()); + buffer.order(ByteOrder.LITTLE_ENDIAN); + for(String s : strings) { + buffer.putInt(s.length()); + buffer.put(s.getBytes(StandardCharsets.UTF_8)); + } + Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.STRING); + assertNotNull(result); + assertEquals(2, result.size()); + assertEquals("hello", result.get(0)); + assertEquals("world", result.get(1)); + } + + @Test + public void testConvertChar() { + char[] c = {'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'}; + ByteBuffer buffer = ByteBuffer.allocate(Character.BYTES * c.length); + buffer.order(ByteOrder.LITTLE_ENDIAN); + for(char s : c) { + buffer.putChar(s); + } + Array result = Py4jConverterUtils.convert(buffer.array(), c.length, Types.ValueType.CHARACTER); + assertNotNull(result); + assertEquals(c.length, result.size()); + + for(int i = 0; i < c.length; i++) { + assertEquals(c[i], result.get(i)); + } + } + + @Test + public void testConvertRow() { + int numElements = 4; + byte[] data = {1, 2, 3, 4}; + Object[] row = Py4jConverterUtils.convertRow(data, numElements, Types.ValueType.UINT8); + assertNotNull(row); + assertEquals(4, row.length); + assertEquals(1, row[0]); + assertEquals(2, row[1]); + assertEquals(3, row[2]); + assertEquals(4, row[3]); + } + + @Test + public void testConvertFused() { + int numElements = 1; + byte[] data = {1, 2, 3, 4}; + Types.ValueType[] valueTypes = {ValueType.UINT8, ValueType.UINT8, ValueType.UINT8, ValueType.UINT8}; + Array[] arrays = Py4jConverterUtils.convertFused(data, numElements, valueTypes); + assertNotNull(arrays); + assertEquals(4, arrays.length); + for(int i = 0; i < 4; i++) { + assertEquals(1 + i, arrays[i].get(0)); + } + } + + @Test(expected = Exception.class) + public void nullData() { + Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN); + } + + @Test(expected = Exception.class) + public void nullValueType() { + Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, null); + } + + @Test(expected = Exception.class) + public void unknownValueType() { + Py4jConverterUtils.convert(new byte[] {1, 2, 3}, 14, ValueType.UNKNOWN); + } + + @Test + public void testConvertPy4JArrayToMBFP64() { + int rlen = 2; + int clen = 3; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6}; + for(double val : values) { + buffer.putDouble(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.1, mb.get(0, 0), 0.0001); + assertEquals(2.2, mb.get(0, 1), 0.0001); + assertEquals(3.3, mb.get(0, 2), 0.0001); + assertEquals(4.4, mb.get(1, 0), 0.0001); + assertEquals(5.5, mb.get(1, 1), 0.0001); + assertEquals(6.6, mb.get(1, 2), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMBUINT8() { + int rlen = 2; + int clen = 2; + byte[] data = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, ValueType.UINT8); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(0, 1), 0.0001); + assertEquals(3.0, mb.get(1, 0), 0.0001); + assertEquals(4.0, mb.get(1, 1), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMBINT32() { + int rlen = 2; + int clen = 2; + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + int[] values = {10, 20, 30, 40}; + for(int val : values) { + buffer.putInt(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, ValueType.INT32); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(10.0, mb.get(0, 0), 0.0001); + assertEquals(20.0, mb.get(0, 1), 0.0001); + assertEquals(30.0, mb.get(1, 0), 0.0001); + assertEquals(40.0, mb.get(1, 1), 0.0001); + } + + @Test + public void testConvertPy4JArrayToMBFP32() { + int rlen = 2; + int clen = 2; + ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + float[] values = {1.5f, 2.5f, 3.5f, 4.5f}; + for(float val : values) { + buffer.putFloat(val); + } + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen, ValueType.FP32); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertEquals(1.5f, mb.get(0, 0), 0.0001); + assertEquals(2.5f, mb.get(0, 1), 0.0001); + assertEquals(3.5f, mb.get(1, 0), 0.0001); + assertEquals(4.5f, mb.get(1, 1), 0.0001); + } + + @Test(expected = DMLRuntimeException.class) + public void testConvertPy4JArrayToMBSparseNotSupported() { + int rlen = 2; + int clen = 2; + byte[] data = {1, 2, 3, 4}; + Py4jConverterUtils.convertPy4JArrayToMB(data, rlen, clen, true, ValueType.UINT8); + } + + @Test + public void testConvertSciPyCOOToMB() { + int rlen = 10; + int clen = 10; + int nnz = 3; + // Create COO format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0 + ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz); + dataBuf.order(ByteOrder.nativeOrder()); + dataBuf.putDouble(1.0); + dataBuf.putDouble(2.0); + dataBuf.putDouble(3.0); + + ByteBuffer rowBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + rowBuf.order(ByteOrder.nativeOrder()); + rowBuf.putInt(0); + rowBuf.putInt(1); + rowBuf.putInt(2); + + ByteBuffer colBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + colBuf.order(ByteOrder.nativeOrder()); + colBuf.putInt(0); + colBuf.putInt(2); + colBuf.putInt(1); + + MatrixBlock mb = Py4jConverterUtils.convertSciPyCOOToMB( + dataBuf.array(), rowBuf.array(), colBuf.array(), rlen, clen, nnz); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(1, 2), 0.0001); + assertEquals(3.0, mb.get(2, 1), 0.0001); + assertEquals(0.0, mb.get(0, 1), 0.0001); + assertEquals(0.0, mb.get(1, 0), 0.0001); + } + + @Test + public void testConvertSciPyCSRToMB() { + int rlen = 10; + int clen = 10; + int nnz = 3; + // Create CSR format: values at (0,0)=1.0, (1,2)=2.0, (2,1)=3.0 + ByteBuffer dataBuf = ByteBuffer.allocate(Double.BYTES * nnz); + dataBuf.order(ByteOrder.nativeOrder()); + dataBuf.putDouble(1.0); + dataBuf.putDouble(2.0); + dataBuf.putDouble(3.0); + + ByteBuffer indicesBuf = ByteBuffer.allocate(Integer.BYTES * nnz); + indicesBuf.order(ByteOrder.nativeOrder()); + indicesBuf.putInt(0); // column for row 0 + indicesBuf.putInt(2); // column for row 1 + indicesBuf.putInt(1); // column for row 2 + + ByteBuffer indptrBuf = ByteBuffer.allocate(Integer.BYTES * (rlen + 1)); + indptrBuf.order(ByteOrder.nativeOrder()); + indptrBuf.putInt(0); // row 0 starts at index 0 + indptrBuf.putInt(1); // row 1 starts at index 1 + indptrBuf.putInt(2); // row 2 starts at index 2 + indptrBuf.putInt(3); // end marker + + MatrixBlock mb = Py4jConverterUtils.convertSciPyCSRToMB( + dataBuf.array(), indicesBuf.array(), indptrBuf.array(), rlen, clen, nnz); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + assertEquals(1.0, mb.get(0, 0), 0.0001); + assertEquals(2.0, mb.get(1, 2), 0.0001); + assertEquals(3.0, mb.get(2, 1), 0.0001); + assertEquals(0.0, mb.get(0, 1), 0.0001); + assertEquals(0.0, mb.get(1, 0), 0.0001); + } + + @Test + public void testAllocateDenseOrSparseDense() { + int rlen = 5; + int clen = 5; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(!mb.isInSparseFormat()); + } + + @Test + public void testAllocateDenseOrSparseSparse() { + int rlen = 5; + int clen = 5; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, true); + assertNotNull(mb); + assertEquals(rlen, mb.getNumRows()); + assertEquals(clen, mb.getNumColumns()); + assertTrue(mb.isInSparseFormat()); + } + + @Test + public void testAllocateDenseOrSparseLong() { + long rlen = 10L; + long clen = 10L; + MatrixBlock mb = Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + assertNotNull(mb); + assertEquals((int) rlen, mb.getNumRows()); + assertEquals((int) clen, mb.getNumColumns()); + } + + @Test(expected = DMLRuntimeException.class) + public void testAllocateDenseOrSparseLongTooLarge() { + long rlen = Integer.MAX_VALUE + 1L; + long clen = 10L; + Py4jConverterUtils.allocateDenseOrSparse(rlen, clen, false); + } + + @Test + public void testConvertMBtoPy4JDenseArr() { + int rlen = 2; + int clen = 2; + MatrixBlock mb = new MatrixBlock(rlen, clen, false); + mb.allocateBlock(); + mb.set(0, 0, 1.0); + mb.set(0, 1, 2.0); + mb.set(1, 0, 3.0); + mb.set(1, 1, 4.0); + + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + assertNotNull(result); + assertEquals(Double.BYTES * rlen * clen, result.length); + + ByteBuffer buffer = ByteBuffer.wrap(result); + buffer.order(ByteOrder.nativeOrder()); + assertEquals(1.0, buffer.getDouble(), 0.0001); + assertEquals(2.0, buffer.getDouble(), 0.0001); + assertEquals(3.0, buffer.getDouble(), 0.0001); + assertEquals(4.0, buffer.getDouble(), 0.0001); + } + + @Test + public void testConvertMBtoPy4JDenseArrRoundTrip() { + int rlen = 2; + int clen = 3; + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * rlen * clen); + buffer.order(ByteOrder.nativeOrder()); + double[] values = {1.1, 2.2, 3.3, 4.4, 5.5, 6.6}; + for(double val : values) { + buffer.putDouble(val); + } + + MatrixBlock mb = Py4jConverterUtils.convertPy4JArrayToMB(buffer.array(), rlen, clen); + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + + ByteBuffer resultBuffer = ByteBuffer.wrap(result); + resultBuffer.order(ByteOrder.nativeOrder()); + for(double expected : values) { + assertEquals(expected, resultBuffer.getDouble(), 0.0001); + } + } + + @Test + public void testConvertMBtoPy4JDenseArrSparseToDense() { + new Py4jConverterUtils(); + int rlen = 3; + int clen = 3; + MatrixBlock mb = new MatrixBlock(rlen, clen, true); + mb.allocateSparseRowsBlock(false); + mb.set(0, 0, 1.0); + mb.set(2, 2, 2.0); + + byte[] result = Py4jConverterUtils.convertMBtoPy4JDenseArr(mb); + assertNotNull(result); + assertEquals(Double.BYTES * rlen * clen, result.length); + + ByteBuffer buffer = ByteBuffer.wrap(result); + buffer.order(ByteOrder.nativeOrder()); + assertEquals(1.0, buffer.getDouble(), 0.0001); + // Skip to position (2,2) = index 8 + for(int i = 1; i < 8; i++) { + assertEquals(0.0, buffer.getDouble(), 0.0001); + } + assertEquals(2.0, buffer.getDouble(), 0.0001); + } +}