Reference Guide for Python Users
Introduction
SystemML enables flexible, scalable machine learning. This flexibility is achieved through the specification of a highlevel declarative machine learning language that comes in two flavors, one with an Rlike syntax (DML) and one with a Pythonlike syntax (PyDML).
Algorithm scripts written in DML and PyDML can be run on Hadoop, on Spark, or in Standalone mode. No script modifications are required to change between modes. SystemML automatically performs advanced optimizations based on data and cluster characteristics, so much of the need to manually tweak algorithms is largely reduced or eliminated. To understand more about DML and PyDML, we recommend that you read Beginner’s Guide to DML and PyDML.
For convenience of Python users, SystemML exposes several languagelevel APIs that allow Python users to use SystemML and its algorithms without the need to know DML or PyDML. We explain these APIs in the below sections.
matrix class
The matrix class is an experimental feature that is often referred to as Python DSL. It allows the user to perform linear algebra operations in SystemML using a NumPylike interface. It implements basic matrix operators, matrix functions as well as converters to common Python types (for example: Numpy arrays, PySpark DataFrame and Pandas DataFrame).
The primary reason for supporting this API is to reduce the learning curve for an average Python user, who is more likely to know Numpy library, rather than the DML language.
Operators
The operators supported are:
 Arithmetic operators: +, , , /, //, %, * as well as dot (i.e. matrix multiplication)
 Indexing in the matrix
 Relational/Boolean operators: <, <=, >, >=, ==, !=, &, 
This class also supports several input/output formats such as NumPy arrays, Pandas DataFrame, SciPy sparse matrix and PySpark DataFrame.
Here is a small example that demonstrates the usage:
>>> import systemml as sml
>>> import numpy as np
>>> m1 = sml.matrix(np.ones((3,3)) + 2)
>>> m2 = sml.matrix(np.ones((3,3)) + 3)
>>> m2 = m1 * (m2 + m1)
>>> m4 = 1.0  m2
>>> m4.sum(axis=1).toNumPy()
array([[60.],
[60.],
[60.]])
Lazy evaluation
By default, the operations are evaluated lazily to avoid conversion overhead and also to maximize optimization scope.
To disable lazy evaluation, please us set_lazy
method:
>>> import systemml as sml
>>> import numpy as np
>>> m1 = sml.matrix(np.ones((3,3)) + 2)
Welcome to Apache SystemML!
>>> m2 = sml.matrix(np.ones((3,3)) + 3)
>>> np.add(m1, m2) + m1
# This matrix (mVar4) is backed by below given PyDML script (which is not yet evaluated). To fetch the data of this matrix, invoke toNumPy() or toDF() or toPandas() methods.
mVar2 = load(" ", format="csv")
mVar1 = load(" ", format="csv")
mVar3 = mVar1 + mVar2
mVar4 = mVar3 + mVar1
save(mVar4, " ")
>>> sml.set_lazy(False)
>>> m1 = sml.matrix(np.ones((3,3)) + 2)
>>> m2 = sml.matrix(np.ones((3,3)) + 3)
>>> np.add(m1, m2) + m1
# This matrix (mVar8) is backed by NumPy array. To fetch the NumPy array, invoke toNumPy() method.
Since matrix is backed by lazy evaluation and uses a recursive Depth First Search (DFS),
you may run into RuntimeError: maximum recursion depth exceeded
.
Please see below troubleshooting steps
Dealing with the loops
It is important to note that this API doesnot pushdown loop, which means the SystemML engine essentially gets an unrolled DML script. This can lead to two issues:

Since matrix is backed by lazy evaluation and uses a recursive Depth First Search (DFS), you may run into
RuntimeError: maximum recursion depth exceeded
. Please see below troubleshooting steps 
Significant parsing/compilation overhead of potentially large unrolled DML script.
The unrolling of the for loop can be demonstrated by the below example:
>>> import systemml as sml
>>> import numpy as np
>>> m1 = sml.matrix(np.ones((3,3)) + 2)
Welcome to Apache SystemML!
>>> m2 = sml.matrix(np.ones((3,3)) + 3)
>>> m3 = m1
>>> for i in range(5):
... m3 = m1 * m3 + m1
...
>>> m3
# This matrix (mVar12) is backed by below given PyDML script (which is not yet evaluated). To fetch the data of this matrix, invoke toNumPy() or toDF() or toPandas() methods.
mVar1 = load(" ", format="csv")
mVar3 = mVar1 * mVar1
mVar4 = mVar3 + mVar1
mVar5 = mVar1 * mVar4
mVar6 = mVar5 + mVar1
mVar7 = mVar1 * mVar6
mVar8 = mVar7 + mVar1
mVar9 = mVar1 * mVar8
mVar10 = mVar9 + mVar1
mVar11 = mVar1 * mVar10
mVar12 = mVar11 + mVar1
save(mVar12, " ")
We can reduce the impact of this unrolling by eagerly evaluating the variables inside the loop:
>>> import systemml as sml
>>> import numpy as np
>>> m1 = sml.matrix(np.ones((3,3)) + 2)
Welcome to Apache SystemML!
>>> m2 = sml.matrix(np.ones((3,3)) + 3)
>>> m3 = m1
>>> for i in range(5):
... m3 = m1 * m3 + m1
... sml.eval(m3)
Builtin functions
In addition to the above mentioned operators, following functions are supported.

transpose: Transposes the input matrix.

Aggregation functions: prod, sum, mean, var, sd, max, min, argmin, argmax, cumsum
Description  Parameters  

prod(self)  Return the product of all cells in matrix  self: input matrix object 
sum(self, axis=None)  Compute the sum along the specified axis  axis : int, optional 
mean(self, axis=None)  Compute the arithmetic mean along the specified axis  axis : int, optional 
var(self, axis=None)  Compute the variance along the specified axis. We assume that delta degree of freedom is 1 (unlike NumPy which assumes ddof=0).  axis : int, optional 
moment(self, moment=1, axis=None)  Calculates the nth moment about the mean  moment : int (can be 1, 2, 3 or 4), axis : int, optional 
sd(self, axis=None)  Compute the standard deviation along the specified axis  axis : int, optional 
max(self, other=None, axis=None)  Compute the maximum value along the specified axis  other: matrix or numpy array (& other supported types) or scalar, axis : int, optional 
min(self, other=None, axis=None)  Compute the minimum value along the specified axis  other: matrix or numpy array (& other supported types) or scalar, axis : int, optional 
argmin(self, axis=None)  Returns the indices of the minimum values along an axis.  axis : int, optional,(only axis=1, i.e. rowIndexMax is supported in this version) 
argmax(self, axis=None)  Returns the indices of the maximum values along an axis.  axis : int, optional (only axis=1, i.e. rowIndexMax is supported in this version) 
cumsum(self, axis=None)  Returns the indices of the maximum values along an axis.  axis : int, optional (only axis=0, i.e. cumsum along the rows is supported in this version) 
 Global statistical builtIn functions: exp, log, abs, sqrt, round, floor, ceil, sin, cos, tan, sinh, cosh, tanh, asin, acos, atan, sign, solve
Description  Parameters  

solve(A, b)  Computes the least squares solution for system of linear equations A %*% x = b  A, b: input matrices 
 Builtin sampling functions: normal, uniform, poisson
Description  Parameters  

normal(loc=0.0, scale=1.0, size=(1,1), sparsity=1.0)  Draw random samples from a normal (Gaussian) distribution.  loc: Mean (“centre”) of the distribution, scale: Standard deviation (spread or “width”) of the distribution, size: Output shape (only tuple of length 2, i.e. (m, n), supported), sparsity: Sparsity (between 0.0 and 1.0). 
uniform(low=0.0, high=1.0, size=(1,1), sparsity=1.0)  Draw samples from a uniform distribution.  low: Lower boundary of the output interval, high: Upper boundary of the output interval, size: Output shape (only tuple of length 2, i.e. (m, n), supported), sparsity: Sparsity (between 0.0 and 1.0). 
poisson(lam=1.0, size=(1,1), sparsity=1.0)  Draw samples from a Poisson distribution.  lam: Expectation of interval, should be > 0, size: Output shape (only tuple of length 2, i.e. (m, n), supported), sparsity: Sparsity (between 0.0 and 1.0). 
 Other builtin functions: hstack, vstack, trace
Description  Parameters  

hstack(self, other)  Stack matrices horizontally (column wise). Invokes cbind internally.  self: lhs matrix object, other: rhs matrix object 
vstack(self, other)  Stack matrices vertically (row wise). Invokes rbind internally.  self: lhs matrix object, other: rhs matrix object 
trace(self)  Return the sum of the cells of the main diagonal square matrix  self: input matrix 
Here is an example that uses the above functions and trains a simple linear regression model:
>>> import numpy as np
>>> from sklearn import datasets
>>> import systemml as sml
>>> # Load the diabetes dataset
>>> diabetes = datasets.load_diabetes()
>>> # Use only one feature
>>> diabetes_X = diabetes.data[:, np.newaxis, 2]
>>> # Split the data into training/testing sets
>>> X_train = diabetes_X[:20]
>>> X_test = diabetes_X[20:]
>>> # Split the targets into training/testing sets
>>> y_train = diabetes.target[:20]
>>> y_test = diabetes.target[20:]
>>> # Train Linear Regression model
>>> X = sml.matrix(X_train)
>>> y = sml.matrix(np.matrix(y_train).T)
>>> A = X.transpose().dot(X)
>>> b = X.transpose().dot(y)
>>> beta = sml.solve(A, b).toNumPy()
>>> y_predicted = X_test.dot(beta)
>>> print('Residual sum of squares: %.2f' % np.mean((y_predicted  y_test) ** 2))
Residual sum of squares: 25282.12
For all the above functions, we always return a two dimensional matrix, especially for aggregation functions with axis. For example: Assuming m1 is a matrix of (3, n), NumPy returns a 1d vector of dimension (3,) for operation m1.sum(axis=1) whereas SystemML returns a 2d matrix of dimension (3, 1).
Note: an evaluated matrix contains a data field computed by eval method as DataFrame or NumPy array.
Support for NumPy’s universal functions
The matrix class also supports most of NumPy’s universal functions (i.e. ufuncs):
pip install ignoreinstalled 'numpy>=1.13.0rc2'
This will enable NumPy’s functions to invoke matrix class:
import systemml as sml
import numpy as np
m1 = sml.matrix(np.ones((3,3)) + 2)
m2 = sml.matrix(np.ones((3,3)) + 3)
np.add(m1, m2)
The matrix class doesnot support following ufuncs:
 Complex number related ufunc (for example:
conj
)  Hyperbolic/inversehyperbolic functions (for example: sinh, arcsinh, cosh, …)
 Bitwise operators
 Xor operator
 Infinite/Nanchecking (for example: isreal, iscomplex, isfinite, isinf, isnan)
 Other ufuncs: copysign, nextafter, modf, frexp, trunc.
Design Decisions of matrix class (Developer documentation)

Until eval() method is invoked, we create an AST (not exposed to the user) that consist of unevaluated operations and data required by those operations. As an anology, a spark user can treat eval() method similar to calling RDD.persist() followed by RDD.count().

The AST consist of two kinds of nodes: either of type matrix or of type DMLOp. Both these classes expose _visit method, that helps in traversing the AST in DFS manner.

A matrix object can either be evaluated or not. If evaluated, the attribute ‘data’ is set to one of the supported types (for example: NumPy array or DataFrame). In this case, the attribute ‘op’ is set to None. If not evaluated, the attribute ‘op’ which refers to one of the intermediate node of AST and if of type DMLOp. In this case, the attribute ‘data’ is set to None.

DMLOp has an attribute ‘inputs’ which contains list of matrix objects or DMLOp.

To simplify the traversal, every matrix object is considered immutable and an matrix operations creates a new matrix object. As an example: m1 = sml.matrix(np.ones((3,3))) creates a matrix object backed by ‘data=(np.ones((3,3))’. m1 = m1 * 2 will create a new matrix object which is now backed by ‘op=DMLOp( …)’ whose input is earlier created matrix object.

Left indexing (implemented in __setitem__ method) is a special case, where Python expects the existing object to be mutated. To ensure the above property, we make deep copy of existing object and point any references to the leftindexed matrix to the newly created object. Then the leftindexed matrix is set to be backed by DMLOp consisting of following pydml: leftindexedmatrix = newdeepcopiedmatrix leftindexedmatrix[index] = value

Please use m.print_ast() and/or type m for debugging. Here is a sample session:
>>> npm = np.ones((3,3))
>>> m1 = sml.matrix(npm + 3)
>>> m2 = sml.matrix(npm + 5)
>>> m3 = m1 + m2
>>> m3
mVar2 = load(" ", format="csv")
mVar1 = load(" ", format="csv")
mVar3 = mVar1 + mVar2
save(mVar3, " ")
>>> m3.print_ast()
 [mVar3] (op).
 [mVar1] (data).
 [mVar2] (data).
MLContext API
The Spark MLContext API offers a programmatic interface for interacting with SystemML from Spark using languages such as Scala, Java, and Python. As a result, it offers a convenient way to interact with SystemML from the Spark Shell and from Notebooks such as Jupyter and Zeppelin.
Usage
The below example demonstrates how to invoke the algorithm scripts/algorithms/MultiLogReg.dml using Python MLContext API.
from sklearn import datasets, neighbors
from pyspark.sql import DataFrame, SQLContext
import systemml as sml
import pandas as pd
import os, imp
sqlCtx = SQLContext(sc)
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target + 1
n_samples = len(X_digits)
# Split the data into training/testing sets and convert to PySpark DataFrame
X_df = sqlCtx.createDataFrame(pd.DataFrame(X_digits[:.9 * n_samples]))
y_df = sqlCtx.createDataFrame(pd.DataFrame(y_digits[:.9 * n_samples]))
ml = sml.MLContext(sc)
# Get the path of MultiLogReg.dml
scriptPath = os.path.join(imp.find_module("systemml")[1], 'systemmljava', 'scripts', 'algorithms', 'MultiLogReg.dml')
script = sml.dml(scriptPath).input(X=X_df, Y_vec=y_df).output("B_out")
beta = ml.execute(script).get('B_out').toNumPy()
mllearn API
mllearn API is designed to be compatible with scikitlearn and MLLib. The classes that are part of mllearn API are LogisticRegression, LinearRegression, SVM, NaiveBayes and Caffe2DML.
The below code describes how to use mllearn API for training:
The below code describes how to use mllearn API for prediction:
Please note that when training using mllearn API (i.e. model.fit(X_df)
), SystemML
expects that labels have been converted to 1based value.
This avoids unnecessary decoding overhead for large dataset if the label columns has already been decoded.
For scikitlearn API, there is no such requirement.
The table below describes the parameter available for mllearn algorithms:
Parameters  Description of the Parameters  LogisticRegression  LinearRegression  SVM  NaiveBayes 

sparkSession  PySpark SparkSession  X  X  X  X 
penalty  Used to specify the norm used in the penalization (default: ‘l2’)  only ‘l2’ supported       
fit_intercept  Specifies whether to add intercept or not (default: True)  X  X  X   
normalize  This parameter is ignored when fit_intercept is set to False. (default: False)  X  X  X   
max_iter  Maximum number of iterations (default: 100)  X  X  X   
max_inner_iter  Maximum number of inner iterations, or 0 if no maximum limit provided (default: 0)  X       
tol  Tolerance used in the convergence criterion (default: 0.000001)  X  X  X   
C  1/regularization parameter (default: 1.0). To disable regularization, please use float(“inf”)  X  X  X   
solver  Algorithm to use in the optimization problem.  Only ‘newtoncg’ solver supported  Supports either ‘newtoncg’ or ‘directsolve’ (default: ‘newtoncg’). Depending on the size and the sparsity of the feature matrix, one or the other solver may be more efficient. ‘directsolve’ solver is more efficient when the number of features is relatively small (m < 1000) and input matrix X is either tall or fairly dense; otherwise ‘newtoncg’ solver is more efficient.     
is_multi_class  Specifies whether to use binaryclass or multiclass classifier (default: False)      X   
laplace  Laplace smoothing specified by the user to avoid creation of 0 probabilities (default: 1.0)        X 
In the below example, we invoke SystemML’s Logistic Regression algorithm on digits datasets.
# Scikitlearn way
from sklearn import datasets, neighbors
from systemml.mllearn import LogisticRegression
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target
n_samples = len(X_digits)
X_train = X_digits[:int(.9 * n_samples)]
y_train = y_digits[:int(.9 * n_samples)]
X_test = X_digits[int(.9 * n_samples):]
y_test = y_digits[int(.9 * n_samples):]
logistic = LogisticRegression(spark)
print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test))
Output:
LogisticRegression score: 0.927778
You can also save the trained model and load it later for prediction:
# Assuming logistic.fit(X_train, y_train) is already invoked
logistic.save('logistic_model')
new_logistic = LogisticRegression(spark)
new_logistic.load('logistic_model')
print('LogisticRegression score: %f' % new_logistic.score(X_test, y_test))
Passing PySpark DataFrame
To train the above algorithm on larger dataset, we can load the dataset into DataFrame and pass it to the fit
method:
from sklearn import datasets
from systemml.mllearn import LogisticRegression
import pandas as pd
from sklearn.metrics import accuracy_score
import systemml as sml
digits = datasets.load_digits()
X_digits = digits.data
y_digits = digits.target
n_samples = len(X_digits)
# Split the data into training/testing sets and convert to PySpark DataFrame
df_train = sml.convertToLabeledDF(sqlCtx, X_digits[:int(.9 * n_samples)], y_digits[:int(.9 * n_samples)])
X_test = spark.createDataFrame(pd.DataFrame(X_digits[int(.9 * n_samples):]))
logistic = LogisticRegression(spark)
logistic.fit(df_train)
y_predicted = logistic.predict(X_test)
y_predicted = y_predicted.select('prediction').toPandas().as_matrix().flatten()
y_test = y_digits[int(.9 * n_samples):]
print('LogisticRegression score: %f' % accuracy_score(y_test, y_predicted))
Output:
LogisticRegression score: 0.922222
MLPipeline interface
In the below example, we demonstrate how the same LogisticRegression
class can allow SystemML to fit seamlessly into
large data pipelines.
# MLPipeline way
from pyspark.ml import Pipeline
from systemml.mllearn import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 2.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 2.0),
(4, "b spark who", 1.0),
(5, "g d a y", 2.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 2.0),
(8, "e spark program", 1.0),
(9, "a e c l", 2.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 2.0)
], ["id", "text", "label"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20)
lr = LogisticRegression(sqlCtx)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
test = spark.createDataFrame([
(12, "spark i j k"),
(13, "l m n"),
(14, "mapreduce spark"),
(15, "apache hadoop")], ["id", "text"])
prediction = model.transform(test)
prediction.show()
Output:
++++++++
__INDEX id text words features probabilityprediction
++++++++
 1.0 12 spark i j k [spark, i, j, k](20,[5,6,7],[2.0,...[0.99999999999975... 1.0
 2.0 13 l m n [l, m, n](20,[8,9,10],[1.0...[1.37552128844736... 2.0
 3.0 14mapreduce spark[mapreduce, spark](20,[5,10],[1.0,1...[0.99860290938153... 1.0
 4.0 15 apache hadoop [apache, hadoop](20,[9,14],[1.0,1...[5.41688748236143... 2.0
++++++++
Troubleshooting Python APIs
Unable to load SystemML.jar into current pyspark session.
While using SystemML’s Python package through pyspark or notebook (SparkContext is not previously created in the session), the below method is not required. However, if the user wishes to use SystemML through sparksubmit and has not previously invoked
systemml.defmatrix.setSparkContext
(sc) Before using the matrix, the user needs to invoke this function if SparkContext is not previously created in the session.
 sc: SparkContext
 SparkContext
Example:
import systemml as sml
import numpy as np
sml.setSparkContext(sc)
m1 = sml.matrix(np.ones((3,3)) + 2)
m2 = sml.matrix(np.ones((3,3)) + 3)
m2 = m1 * (m2 + m1)
m4 = 1.0  m2
m4.sum(axis=1).toNumPy()
If SystemML was not installed via pip, you may have to download SystemML.jar and provide it to pyspark via driverclasspath
and jars
.
matrix API is running slow when set_lazy(False) or when eval() is called often.
This is a known issue. The matrix API is slow in this scenario due to slow Py4J conversion from Java MatrixObject or Java RDD to Python NumPy or DataFrame.
To resolve this for now, we recommend writing the matrix to FileSystemML and using load
function.
maximum recursion depth exceeded
SystemML matrix is backed by lazy evaluation and uses a recursive Depth First Search (DFS).
Python can throw RuntimeError: maximum recursion depth exceeded
when the recursion of DFS exceeds beyond the limit
set by Python. There are two ways to address it:

Increase the limit in Python:
import sys some_large_number = 2000 sys.setrecursionlimit(some_large_number)

Evaluate the intermeditate matrix to cutoff large recursion.