Python end-to-end tutorial
The goal of this tutorial is to showcase different features of the SystemDS framework that can be accessed with the Python API. For this, we want to use the Adult dataset and predict whether the income of a person exceeds $50K/yr based on census data. The Adult dataset contains attributes like age, workclass, education, marital-status, occupation, race, […] and the labels >50K or <=50K. Most of these features are categorical string values, but the dataset also includes continuous features. For this, we define three different levels with an increasing level of detail with regard to features provided by SystemDS. In the first level, shows the built-in preprocessing capabilities of SystemDS. With the second level, we want to show how we can integrate custom-built networks or algorithms into our Python program.
Prerequisite:
Level 1
This example shows how one can work the SystemDS framework. More precisely, we will make use of the built-in DataManager, Multinomial Logistic Regression function, and the Confusion Matrix function. The dataset used in this tutorial is a preprocessed version of the “UCI Adult Data Set”. If one wants to skip the explanation then the full script is available at the end of this level.
We will train a Multinomial Logistic Regression model on the training dataset and subsequently use the test dataset to assess how well our model can predict if the income is above or below $50K/yr based on the features.
Step 1: Load and prepare data
First, we get our training and testing data from the built-in DataManager. Since the multiLogReg function requires the labels (Y) to be > 0, we add 1 to all labels. This ensures that the smallest label is >= 1. Additionally we will only take a fraction of the training and test set into account to speed up the execution.
from systemds.context import SystemDSContext
from systemds.examples.tutorials.adult import DataManager
from systemds.operator.algorithm import multiLogReg
from systemds.operator.algorithm import multiLogRegPredict
from systemds.operator.algorithm import confusionMatrix
with SystemDSContext() as sds:
d = DataManager()
# limit the sample size
train_count = 15000
test_count = 5000
# Get train and test datasets.
X_frame, Y_frame, Xt_frame, Yt_frame = d.get_preprocessed_dataset(sds)
# Transformation specification
jspec_data = d.get_jspec(sds)
jspec_labels = sds.scalar(f'"{ {"recode": ["income"]} }"')
# Transform frames to matrices.
X, M1 = X_frame.transform_encode(spec=jspec_data)
Xt = Xt_frame.transform_apply(spec=jspec_data, meta=M1)
Y, M2 = Y_frame.transform_encode(spec=jspec_labels)
Yt = Yt_frame.transform_apply(spec=jspec_labels, meta=M2)
# Subsample to make training faster
X = X[0:train_count]
Y = Y[0:train_count]
Xt = Xt[0:test_count]
Yt = Yt[0:test_count]
Here the DataManager contains the code for downloading and setting up either Pandas DataFrames or internal SystemDS Frames, for the best performance and no data transfer from pandas to SystemDS it is recommended to read directly from disk into SystemDS.
Step 2: Training
Now that we prepared the data, we can use the multiLogReg function. First, we will train the model on our training data. Afterward, we can make predictions on the test data and assess the performance of the model.
betas = multiLogReg(X, Y, verbose=False)
Note that nothing has been calculated yet. In SystemDS the calculation is executed once compute() is called. E.g. betas_res = betas.compute().
We can now use the trained model to make predictions on the test data.
[_, y_pred, acc] = multiLogRegPredict(Xt, betas, Y=Yt)
- The multiLogRegPredict function has three return values:
m, a matrix with the mean probability of correctly classifying each label. We do not use it further in this example.
y_pred, is the predictions made using the model
acc, is the accuracy achieved by the model.
Step 3: Confusion Matrix
A confusion matrix is a useful tool to analyze the performance of the model and to obtain a better understanding which classes the model has difficulties separating. The confusionMatrix function takes the predicted labels and the true labels. It then returns the confusion matrix for the predictions and the confusion matrix averages of each true class.
confusion_matrix_abs, _ = confusionMatrix(y_pred, Yt).compute()
Full Script
In the full script, some steps are combined to reduce the overall script.
from systemds.context import SystemDSContext
from systemds.examples.tutorials.adult import DataManager
from systemds.operator.algorithm import multiLogReg
from systemds.operator.algorithm import multiLogRegPredict
from systemds.operator.algorithm import confusionMatrix
with SystemDSContext() as sds:
d = DataManager()
# limit the sample size
train_count = 15000
test_count = 5000
# Get train and test datasets.
X_frame, Y_frame, Xt_frame, Yt_frame = d.get_preprocessed_dataset(sds)
# Transformation specification
jspec_data = d.get_jspec(sds)
jspec_labels = sds.scalar(f'"{ {"recode": ["income"]} }"')
# Transform frames to matrices.
X, M1 = X_frame.transform_encode(spec=jspec_data)
Xt = Xt_frame.transform_apply(spec=jspec_data, meta=M1)
Y, M2 = Y_frame.transform_encode(spec=jspec_labels)
Yt = Yt_frame.transform_apply(spec=jspec_labels, meta=M2)
# Subsample to make training faster
X = X[0:train_count]
Y = Y[0:train_count]
Xt = Xt[0:test_count]
Yt = Yt[0:test_count]
# Train model
betas = multiLogReg(X, Y, verbose=False)
# Apply model
[_, y_pred, acc] = multiLogRegPredict(Xt, betas, Y=Yt)
# Confusion Matrix
confusion_matrix_abs, _ = confusionMatrix(y_pred, Yt).compute()
import logging
logging.info("Confusion Matrix")
logging.info(confusion_matrix_abs)
Level 2
In this level we want to show how we can integrate a custom built algorithm using the Python API. For this we will introduce another dml file, which can be used to train a basic feed forward network.
Step 1: Obtain data
For the whole data setup please refer to level 1, Step 1, as these steps are almost identical, but instead of preparing the test data, we only prepare the training data.
from systemds.context import SystemDSContext
from systemds.examples.tutorials.adult import DataManager
from systemds.operator.algorithm import multiLogReg
from systemds.operator.algorithm import multiLogRegPredict
from systemds.operator.algorithm import confusionMatrix
with SystemDSContext() as sds:
d = DataManager()
# limit the sample size
train_count = 15000
test_count = 5000
# Get train and test datasets.
X_frame, Y_frame, Xt_frame, Yt_frame = d.get_preprocessed_dataset(sds)
# Transformation specification
jspec_data = d.get_jspec(sds)
jspec_labels = sds.scalar(f'"{ {"recode": ["income"]} }"')
# Transform frames to matrices.
X, M1 = X_frame.transform_encode(spec=jspec_data)
Y, M2 = Y_frame.transform_encode(spec=jspec_labels)
# Subsample to make training faster
X = X[0:train_count]
Y = Y[0:train_count]
Step 2: Load the algorithm
We use a neural network with 2 hidden layers, each consisting of 200 neurons. First, we need to source the dml file for neural networks. This file includes all the necessary functions for training, evaluating, and storing the model. The returned object of the source call is further used for calling the functions. The file can be found here:
# Load custom neural network
neural_net_src_path = "tests/examples/tutorials/neural_net_source.dml"
FFN_package = sds.source(neural_net_src_path, "fnn")
Step 3: Training the neural network
Training a neural network in SystemDS using the train function is straightforward. The first two arguments are the training features and the target values we want to fit our model on. Then we need to set the hyperparameters of the model. We choose to train for 1 epoch with a batch size of 16 and a learning rate of 0.01, which are common parameters for neural networks. The seed argument ensures that running the code again yields the same results.
epochs = 1
batch_size = 16
learning_rate = 0.01
seed = 42
network = FFN_package.train(X, Y, epochs, batch_size, learning_rate, seed)
Step 4: Saving the model
For later usage, we can save the trained model. We only need to specify the name of our model and the file path. This call stores the weights and biases of our model. Similarly the transformation metadata to transform input data to the model, is saved.
# Write metadata and trained network to disk.
sds.combine(
network.write('tests/examples/docs_test/end_to_end/network'),
M1.write('tests/examples/docs_test/end_to_end/encode_X'),
M2.write('tests/examples/docs_test/end_to_end/encode_Y')
).compute()
Step 5: Predict on Unseen data
Once the model is saved along with metadata, it is simple to apply it all to unseen data:
# Read metadata and trained network and do prediction.
M1_r = sds.read('tests/examples/docs_test/end_to_end/encode_X')
M2_r = sds.read('tests/examples/docs_test/end_to_end/encode_Y')
network_r = sds.read('tests/examples/docs_test/end_to_end/network')
Xt = Xt_frame.transform_apply(spec=jspec_data, meta=M1_r)
Yt = Yt_frame.transform_apply(spec=jspec_labels, meta=M2_r)
Xt = Xt[0:test_count]
Yt = Yt[0:test_count]
FFN_package_2 = sds.source(neural_net_src_path, "fnn")
probs = FFN_package_2.predict(Xt, network_r)
accuracy = FFN_package_2.eval(probs, Yt).compute()
Full Script NN
The complete script now can be seen here:
from systemds.context import SystemDSContext
from systemds.examples.tutorials.adult import DataManager
from systemds.operator.algorithm import multiLogReg
from systemds.operator.algorithm import multiLogRegPredict
from systemds.operator.algorithm import confusionMatrix
with SystemDSContext() as sds:
d = DataManager()
# limit the sample size
train_count = 15000
test_count = 5000
# Get train and test datasets.
X_frame, Y_frame, Xt_frame, Yt_frame = d.get_preprocessed_dataset(sds)
# Transformation specification
jspec_data = d.get_jspec(sds)
jspec_labels = sds.scalar(f'"{ {"recode": ["income"]} }"')
# Transform frames to matrices.
X, M1 = X_frame.transform_encode(spec=jspec_data)
Y, M2 = Y_frame.transform_encode(spec=jspec_labels)
# Subsample to make training faster
X = X[0:train_count]
Y = Y[0:train_count]
# Load custom neural network
neural_net_src_path = "tests/examples/tutorials/neural_net_source.dml"
FFN_package = sds.source(neural_net_src_path, "fnn")
epochs = 1
batch_size = 16
learning_rate = 0.01
seed = 42
network = FFN_package.train(X, Y, epochs, batch_size, learning_rate, seed)
# Write metadata and trained network to disk.
sds.combine(
network.write('tests/examples/docs_test/end_to_end/network'),
M1.write('tests/examples/docs_test/end_to_end/encode_X'),
M2.write('tests/examples/docs_test/end_to_end/encode_Y')
).compute()
# Read metadata and trained network and do prediction.
M1_r = sds.read('tests/examples/docs_test/end_to_end/encode_X')
M2_r = sds.read('tests/examples/docs_test/end_to_end/encode_Y')
network_r = sds.read('tests/examples/docs_test/end_to_end/network')
Xt = Xt_frame.transform_apply(spec=jspec_data, meta=M1_r)
Yt = Yt_frame.transform_apply(spec=jspec_labels, meta=M2_r)
Xt = Xt[0:test_count]
Yt = Yt[0:test_count]
FFN_package_2 = sds.source(neural_net_src_path, "fnn")
probs = FFN_package_2.predict(Xt, network_r)
accuracy = FFN_package_2.eval(probs, Yt).compute()
import logging
logging.info("accuracy: " + str(accuracy))