Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

import org.apache.log4j.Logger;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.frame.data.columns.Array;
Expand All @@ -35,6 +36,7 @@
* Utils for converting python data to java.
*/
public class Py4jConverterUtils {
private static final Logger LOG = Logger.getLogger(Py4jConverterUtils.class);
public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, int clen) {
return convertPy4JArrayToMB(data, rlen, clen, false, Types.ValueType.FP64);
}
Expand Down Expand Up @@ -63,6 +65,45 @@ public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] co
return mb;
}

public static MatrixBlock convertSciPyCSRToMB(byte[] data, byte[] indices, byte[] indptr, int rlen, int clen, int nnz) {
LOG.debug("Converting compressed sparse row matrix to MatrixBlock");
MatrixBlock mb = new MatrixBlock(rlen, clen, true);
mb.allocateSparseRowsBlock(false);
ByteBuffer dataBuf = ByteBuffer.wrap(data);
dataBuf.order(ByteOrder.nativeOrder());
ByteBuffer indicesBuf = ByteBuffer.wrap(indices);
indicesBuf.order(ByteOrder.nativeOrder());
ByteBuffer indptrBuf = ByteBuffer.wrap(indptr);
indptrBuf.order(ByteOrder.nativeOrder());

// Read indptr array to get row boundaries
int[] rowPtrs = new int[rlen + 1];
for(int i = 0; i <= rlen; i++) {
rowPtrs[i] = indptrBuf.getInt();
}

// Iterate through each row
for(int row = 0; row < rlen; row++) {
int startIdx = rowPtrs[row];
int endIdx = rowPtrs[row + 1];

// Set buffer positions to the start of this row
dataBuf.position(startIdx * Double.BYTES);
indicesBuf.position(startIdx * Integer.BYTES);

// Process all non-zeros in this row sequentially
for(int idx = startIdx; idx < endIdx; idx++) {
double val = dataBuf.getDouble();
int colIndex = indicesBuf.getInt();
mb.set(row, colIndex, val);
}
}

mb.recomputeNonZeros();
mb.examSparsity();
return mb;
}

public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse) {
MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
ret.allocateBlock();
Expand Down Expand Up @@ -208,6 +249,7 @@ private static void readBufferIntoArray(ByteBuffer buffer, Array<?> array, Types
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
byte[] ret = null;
if(mb.isInSparseFormat()) {
LOG.debug("Converting sparse matrix to dense");
mb.sparseToDense();
}

Expand Down
79 changes: 69 additions & 10 deletions src/main/python/systemds/context/systemds_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@
import sys
import struct
import traceback
import warnings
from contextlib import contextmanager
from glob import glob
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep, time
from time import sleep
from typing import Dict, Iterable, Sequence, Tuple, Union
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pandas as pd
import scipy.sparse as sp
from py4j.java_gateway import GatewayParameters, JavaGateway, Py4JNetworkError
from systemds.operator import (
Frame,
Expand Down Expand Up @@ -77,6 +79,7 @@ class SystemDSContext(object):
_FIFO_JAVA2PY_PIPES = []
_data_transfer_mode = 0
_multi_pipe_enabled = False
_sparse_data_transfer = True
_logging_initialized = False
_executor_pool = ThreadPoolExecutor(max_workers=os.cpu_count() * 2 or 4)

Expand All @@ -89,6 +92,7 @@ def __init__(
py4j_logging_level: int = 50,
data_transfer_mode: int = 1,
multi_pipe_enabled: bool = False,
sparse_data_transfer: bool = True,
):
"""Starts a new instance of SystemDSContext, in which the connection to a JVM systemds instance is handled
Any new instance of this SystemDS Context, would start a separate new JVM.
Expand All @@ -103,14 +107,26 @@ def __init__(
The logging levels are as follows: 10 DEBUG, 20 INFO, 30 WARNING, 40 ERROR, 50 CRITICAL.
:param py4j_logging_level: The logging level for Py4j to use, since all communication to the JVM is done through this,
it can be verbose if not set high.
:param data_transfer_mode: default 0,
:param data_transfer_mode: default 0, 0 for py4j, 1 for using pipes (on unix systems)
:param multi_pipe_enabled: default False, if True, use multiple pipes for data transfer
only used if data_transfer_mode is 1.
.. experimental:: This parameter is experimental and may be removed in a future version.
:param sparse_data_transfer: default True, if True, use optimized sparse matrix transfer,
if False, convert sparse matrices to dense arrays before transfer
"""

if multi_pipe_enabled:
warnings.warn(
"The 'multi_pipe_enabled' parameter is experimental and may be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
self.__setup_logging(logging_level, py4j_logging_level)
self.__start(port, capture_stdout)
self.capture_stats(capture_statistics)
self._log.debug("Started JVM and SystemDS python context manager")
self.__setup_data_transfer(data_transfer_mode, multi_pipe_enabled)
self._sparse_data_transfer = sparse_data_transfer

def __setup_data_transfer(self, data_transfer_mode=0, multi_pipe_enabled=False):
self._data_transfer_mode = data_transfer_mode
Expand Down Expand Up @@ -769,21 +785,65 @@ def scalar(self, v: Dict[str, VALID_INPUT_TYPES]) -> Scalar:
# therefore the output type is assign.
return Scalar(self, v, assign=True)

def from_py(
self,
src: Union[np.ndarray, sp.spmatrix, pd.DataFrame, pd.Series],
*args: Sequence[VALID_INPUT_TYPES],
**kwargs: Dict[str, VALID_INPUT_TYPES],
) -> Union[Matrix, Frame]:
"""Generate DAGNode representing data given by a python object, which will be sent to SystemDS on need.
:param src: the python object
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Matrix or Frame Node
"""

def get_params(src, args, kwargs):
unnamed_params = ["'./tmp/{file_name}'"]
if len(src.shape) == 2:
named_params = {"rows": src.shape[0], "cols": src.shape[1]}
elif len(src.shape) == 1:
named_params = {"rows": src.shape[0], "cols": 1}
else:
# TODO Support tensors.
raise ValueError("Only two dimensional arrays supported")
unnamed_params.extend(args)
named_params.update(kwargs)
return unnamed_params, named_params

if isinstance(src, np.ndarray):
unnamed_params, named_params = get_params(src, args, kwargs)
return Matrix(self, "read", unnamed_params, named_params, local_data=src)
elif isinstance(src, sp.spmatrix):
unnamed_params, named_params = get_params(src, args, kwargs)
return Matrix(self, "read", unnamed_params, named_params, local_data=src)
elif isinstance(src, pd.DataFrame):
unnamed_params, named_params = get_params(src, args, kwargs)
named_params["data_type"] = '"frame"'
return Frame(self, "read", unnamed_params, named_params, local_data=src)
elif isinstance(src, pd.Series):
unnamed_params, named_params = get_params(src, args, kwargs)
named_params["data_type"] = '"frame"'
return Frame(self, "read", unnamed_params, named_params, local_data=src)
else:
raise ValueError(f"Unsupported data type: {type(src)}")

def from_numpy(
self,
mat: np.array,
mat: Union[np.ndarray, sp.spmatrix],
*args: Sequence[VALID_INPUT_TYPES],
**kwargs: Dict[str, VALID_INPUT_TYPES],
) -> Matrix:
"""Generate DAGNode representing matrix with data given by a numpy array, which will be sent to SystemDS
on need.
"""Generate DAGNode representing matrix with data given by a numpy array or scipy sparse matrix,
which will be sent to SystemDS on need.

:param mat: the numpy array
:param mat: the numpy array or scipy sparse matrix
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Matrix
Note: This method is deprecated. Use from_py instead.
"""

self._log.warning(f"Deprecated method from_numpy. Use from_py instead.")
unnamed_params = ["'./tmp/{file_name}'"]

if len(mat.shape) == 2:
Expand Down Expand Up @@ -811,7 +871,9 @@ def from_pandas(
:param args: unnamed parameters
:param kwargs: named parameters
:return: A Frame
Note: This method is deprecated. Use from_py instead.
"""
self._log.warning(f"Deprecated method from_pandas. Use from_py instead.")
unnamed_params = ["'./tmp/{file_name}'"]

if len(df.shape) == 2:
Expand All @@ -824,9 +886,6 @@ def from_pandas(

unnamed_params.extend(args)
named_params["data_type"] = '"frame"'

self._pd_dataframe = df

named_params.update(kwargs)
return Frame(self, "read", unnamed_params, named_params, local_data=df)

Expand Down
Loading
Loading