Federated Environment

The python SystemDS supports federated execution. To enable this, each of the federated environments have to have a running federated worker.

Start Federated worker

To start a federated worker, you first have to setup your environment variables. A simple guide to do this is in the SystemDS Repository.

If that is setup correctly simply start a worker using the following command. Here the 8001 refer to the port used by the worker.

systemds WORKER 8001

Simple Aggregation Example

In this example we use a single federated worker, and aggregate the sum of its data.

First we need to create some data for our federated worker to use. In this example we simply use Numpy to create a test.csv file.

Currently we also require a metadata file for the federated worker. This should be located next to the test.csv file called test.csv.mtd. To make both the data and metadata simply execute the following

# Python
import os

import numpy as np

if not os.path.isdir("temp"):
    os.mkdir("temp")
a = np.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
np.savetxt("temp/test.csv", a, delimiter=",")
with open("temp/test.csv.mtd", "w") as mtd:
    mtd.write('{ "format":"csv", "header":false, "rows":3, "cols":3 }')

After creating our data the federated worker becomes able to execute federated instructions. The aggregated sum using federated instructions in python SystemDS is done as follows

# Python
import logging

from systemds.context import SystemDSContext

# Create a federated matrix
# Indicate the dimensions of the data:
# Here the first list in the tuple is the top left Coordinate,
# and the second the bottom left coordinate.
# It is ordered as [col,row].
dims = ([0, 0], [3, 3])

# Specify the address + file path from worker:
address = "localhost:8001/temp/test.csv"

with SystemDSContext() as sds:
    fed_a = sds.federated([address], [dims])
    # Sum the federated matrix and call compute to execute
    logging.info(fed_a.sum().compute())
    # Result should be 45.

Multiple Federated Environments

In this example we multiply matrices that are located in different federated environments.

Using the data created from the last example we can simulate multiple federated workers by starting multiple ones on different ports. Start with 3 different terminals, and run one federated environment in each.

systemds WORKER 8001
systemds WORKER 8002
systemds WORKER 8003

Once all three workers are up and running we can leverage all three in the following example

# Python
import logging

import numpy as np
from systemds.context import SystemDSContext

addr1 = "localhost:8001/temp/test.csv"
addr2 = "localhost:8002/temp/test.csv"
addr3 = "localhost:8003/temp/test.csv"

# Create a federated matrix using two federated environments
# Note that the two federated matrices are stacked on top of each other

with SystemDSContext() as sds:
    # federated data on three locations
    fed = sds.federated([addr1, addr2, addr3], [
        ([0, 0], [3, 3]),
        ([3, 0], [6, 3]),
        ([6, 0], [9, 3])])
    # local matrix to multiply with
    loc = sds.from_numpy(np.array([
        [1, 2, 3, 4, 5, 6, 7, 8, 9],
        [1, 2, 3, 4, 5, 6, 7, 8, 9],
        [1, 2, 3, 4, 5, 6, 7, 8, 9]
    ]))
    # Multiply local and federated
    ret = loc @ fed
    # execute the lazy script and print
    logging.info(ret.compute())

The print should look like

[[198. 243. 288.]
 [198. 243. 288.]
 [198. 243. 288.]]

Note

If it does not work, then double check that you have:

a csv file, mtd file, and SystemDS Environment is set correctly.

Multi-tenant Federated Learning

SystemDS supports Multi-tenant Federated Learning, meaning that multiple coordinators learn on shared federated workers. From another perspective, the federated worker allows multiple coordinators to perform model training simultaneously using the data from the respective federated site. This approach enables the worker to operate in a server-like mode, providing multiple tenants with the ability to learn on the federated data at the same time. Tenant isolation ensures that tenant-specific intermediate results are only accessible by the respective tenant.

Limitations

Since the coordinators are differentiated by their IP address in combination with their process ID, the worker is not able to isolate coordinators which share the same IP address and the same process ID. This occurs, for example, when two coordinators are running behind a proxy (same IP address), where both coordinators coincidentally have the same process ID.

A second limitation is showing up in networks using the Dynamic Host Protocol (DHCP). Since the federated worker identifies the coordinator based on the IP address, the worker does not re-identify the coordinator when its IP address has changed, i.e., when DHCP renews its IP address.