This library covers the UNICORE REST API, making common tasks like file access, job submission and management, workflow submission and management more convenient, and integrating UNICORE features better with typical Python usage.
The full, up-to-date documentation of the REST API can be found here
In addition, this library contains code for using UFTP (UNICORE FTP) for filesystem mounts with FUSE, a UFTP driver for PyFilesystem and a UNICORE implementation of a Dask Cluster
Development of this library was funded in part by the Human Brain Project
See LICENSE file for licensing information
Install from PyPI with
pip install -U pyunicore
Additional extra packages may be required for your use case:
- Using the UFTP fuse driver requires "fusepy"
- Using UFTP with pyfilesystem requires "fs"
- Creating JWT tokens signed with keys requires the "cryptography" package
You can install (one or more) extras with pip:
pip install -U pyunicore[crypto,fs,fuse]
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import json
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
client = uc_client.Client(credential, base_url)
print(json.dumps(client.properties, indent = 2))
my_job = {'Executable': 'date'}
job = uc_client.new_job(job_description=my_job, inputs=[])
print(json.dumps(job.properties, indent = 2))
job.poll() # wait for job to finish
work_dir = job.working_dir
print(json.dumps(work_dir.properties, indent = 2))
stdout = work_dir.stat("/stdout")
print(json.dumps(stdout.properties, indent = 2))
content = stdout.raw().read()
print(content)
registry_url = "https://localhost:8080/REGISTRY/rest/registries/default_registry"
# authenticate with username/password
credential = uc_credentials.UsernamePassword("demouser", "test123")
r = uc_client.Registry(credential, registry_url)
print(r.site_urls)
More examples for using PyUNICORE can be found in the "integration-tests" folder in the source code repository.
You can create a PyFilesystem FS
object either directly in code, or implicitely via a URL.
The convenient way is via URL:
from fs import open_fs
fs_url = "uftp://demouser:test123@localhost:9000/rest/auth/TEST:/data"
uftp_fs = open_fs(fs_url)
The URL format is
uftp://[username]:[password]@[auth-server-url]:[base-directory]?[token=...][identity=...]
The FS driver supports three types of authentication
- Username/Password - give
username
andpassword
- SSH Key - give
username
and theidentity
parameter, whereidentity
is the filename of a private key. Specify thepassword
if needed to load the private key - Bearer token - give the token value via the
token
parameter
(note: the SSH key authentication using this library requires UFTP Auth server 2.7.0 or later)
PyUNICORE contains a FUSE driver based on fusepy, allowing you to mount a remote filesystem via UFTP. Mounting is a two step process,
- authenticate to an Auth server, giving you the UFTPD host/port and one-time password
- run the FUSE driver
The following code example gives you the basic idea:
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.uftp as uc_uftp
import pyunicore.uftpfuse as uc_fuse
_auth = "https://localhost:9000/rest/auth/TEST"
_base_dir = "/opt/shared-data"
_local_mount_dir = "/tmp/mount"
# authenticate
cred = uc_credentials.UsernamePassword("demouser", "test123")
uftp = uc_uftp.UFTP(uc_client.Transport(cred), _auth, _base_dir)
_host, _port, _password = uftp.authenticate()
# run the fuse driver
fuse = uc_fuse.FUSE(
uc_fuse.UFTPDriver(_host, _port, _password), _local_mount_dir, foreground=False, nothreads=True)
Opens a local server socket for clients to connect to, where traffic gets forwarded to a service on a HPC cluster login (or compute) node. This feature requires UNICORE 9.1.0 or later on the server side.
You can use this feature in two ways
- in your own applications via the
pyunicore.client.Job
class. - you can also open a tunnel from the command line using the 'pyunicore.forwarder' module
Here is an example for a command line tool invocation:
LOCAL_PORT=4322
JOB_URL=https://localhost:8080/DEMO-SITE/rest/core/jobs/some_job_id
REMOTE_PORT=8000
python3 -m pyunicore.forwarder --token <your_auth_token> \
-L $LOCAL_PORT \
$JOB_URL/forward-port?port=REMOTE_PORT \
Your application can now connect to "localhost:4322" but all traffic will be forwarded to port 8000 on the login node.
See
python3 -m pyunicore.forwarder --help
for all options.
PyUNICORE provides an implementation of a Dask Cluster, allowing to run the Dask client on your local host (or in a Jupyter notebook in the Cloud), and have the Dask scheduler and workers running remotely on the HPC site.
A basic usage example:
import pyunicore.client as uc_client
import pyunicore.credentials as uc_credentials
import pyunicore.dask as uc_dask
# Create a UNICORE client for accessing the HPC cluster
base_url = "https://localhost:8080/DEMO-SITE/rest/core"
credential = uc_credentials.UsernamePassword("demouser", "test123")
submitter = uc_client.Client(credential, base_url)
# Create the UNICORECluster instance
uc_cluster = uc_dask.UNICORECluster(
submitter,
queue = "batch",
project = "my-project",
debug=True)
# Start two workers
uc_cluster.scale(2, wait_for_startup=True)
# Create a Dask client connected to the UNICORECluster
from dask.distributed import Client
dask_client = Client(uc_cluster, timeout=120)
That's it! Now Dask will run its computations using the scheduler and workers started via UNICORE on the HPC site.
The pyunicore.helpers
module provides a set of higher-level APIs:
- Connecting to
- a Registry (
pyunicore.helpers.connect_to_registry
). - a site via a Registry URL (
pyunicore.helpers.connect_to_site_from_registry
). - a site via its core URL (
pyunicore.helpers.connect_to_site
).
- a Registry (
- Defining descriptions as a dataclass and easily converting to a
dict
as required bypyunicore.client.Client.new_job
via ato_dict()
method:pyunicore.helpers.jobs.Description
forpyunicore.client.Client.new_job()
pyunicore.helpers.workflows.Description
forpyunicore.client.WorkflowService.new_workflow()
- All possible job statuses that may be returned by the jobs API (
pyunicore.helpers.JobStatus
). - Defining a workflow description
import json
import pyunicore.credentials as uc_credentials
import pyunicore.helpers as helpers
registry_url = "https://localhost:8080/REGISTRY/rest/registries/default_registry"
credentials = uc_credentials.UsernamePassword("demouser", "test123")
client = helpers.connection.connect_to_registry(
registry_url=registry_url,
credentials=credentials,
)
print(json.dumps(client.properties, indent=2))
import json
import pyunicore.credentials as uc_credentials
import pyunicore.helpers as helpers
registry_url = "https://localhost:8080/REGISTRY/rest/registries/default_registry"
site = "DEMO-SITE"
credentials = uc_credentials.UsernamePassword("demouser", "test123")
client = helpers.connection.connect_to_site_from_registry(
registry_url=registry_url,
site_name=site,
credentials=credentials,
)
print(json.dumps(client.properties, indent=2))
import json
import pyunicore.credentials as uc_credentials
import pyunicore.helpers as helpers
site_url = "https://localhost:8080/DEMO-SITE/rest/core"
credentials = uc_credentials.UsernamePassword("demouser", "test123")
client = helpers.connection.connect_to_site(
site_api_url=site_url ,
credentials=credentials,
)
print(json.dumps(client.properties, indent=2))
from pyunicore import helpers
client = ...
resources = helpers.jobs.Resources(nodes=4)
job = helpers.jobs.Description(
executable="ls",
project="demoproject",
resources=resources
)
client.new_job(job.to_dict())
This works analogously for pyunicore.helpers.workflows
.
-
Fork the repository
-
Install the development dependencies
pip install -r requirements-dev.txt
-
Install pre-commit hooks
pre-commit install