bbp_workflow.task

Collection of base tasks for the Workflow engine.

Classes

EnvCfg(*args, **kwargs)

Standard set of environment configuration parameters.

IPyParallel(*args, **kwargs)

Allocate ipyparallel cluster and run remote_script.

IPyParallelExclusive(*args, **kwargs)

IPyParallel which uses exclusive nodes with engine(task) per cpus.

KgCfg(*args, **kwargs)

Knowledge Graph configuration parameters.

KgTask(*args, **kwargs)

Base class for Knowledge Graph enabled tasks.

LookupKgEntity(*args, **kwargs)

Base class for Knowledge Graph lookup tasks.

MakeRemoteFolder(*args, **kwargs)

Creates a folder on the host file system.

MakeVirtualEnv(*args, **kwargs)

Creates a python virtual environment on the host file system.

OutputCfg(*args, **kwargs)

Common configuration for tasks producing output at the specified file system location.

RemoteHostCfg(*args, **kwargs)

Add host parameter to your task using this mixin.

SbatchTask(*args, **kwargs)

Sbatch task.

SlurmCfg(*args, **kwargs)

Standard set of Slurm configuration parameters.

SrunTask(*args, **kwargs)

Srun task.

class bbp_workflow.task.EnvCfg(*args, **kwargs)

Bases: Config

Standard set of environment configuration parameters.

enable_internet = If True, enables Internet access by exporting https_proxy.(False)
env = Comma separated or multi-line extra environment variables to export.()
module_archive = Module archive to use. Default value can be placed in the `[DEFAULT]` section of the config file or, as usual, on the command line.(archive/2024-09)
module_path = Custom MODULEPATH to be exported before module load.()
modules = Environment modules to load.(None)
virtual_env = Full path to python virtual environment containing `bin/activate`. Default value should be placed in the `[DEFAULT]` section of the config file.()
class bbp_workflow.task.IPyParallel(*args, **kwargs)

Bases: _JobPathMixin, Task

Allocate ipyparallel cluster and run remote_script.

Inherits SlurmCfg, EnvCfg parameters, enabling customization of this task.

args = Arguments that will be passed to the command. You can also make your custom `@property` named args, to dynamically assemble args.()
modules = Environment modules to load.(py-bbp-workflow)
ntasks = Number of ipyparallel engines to launch.(0)
remote_script()

Will run in the context of ipyparallel cluster.

Use Client to talk to the cluster:

def remote_script(self):
    from ipyparallel import Client
    client = Client()
    lview = client.load_balanced_view()
    lview.block = True
    def f(_):
        return 'foo'
    result = lview.map(f, range(2))
    print(result)
class bbp_workflow.task.IPyParallelExclusive(*args, **kwargs)

Bases: IPyParallel

IPyParallel which uses exclusive nodes with engine(task) per cpus.

cpus_per_task = Amount of cpus allocated per engine. Increase to 2 or 3 if your engines consume too much memory but you still want to use exclusive nodes.(1)
exclusive = Allocate nodes exclusively and do not share them with other jobs.(True)
mem = Real memory required per node.(0)
class bbp_workflow.task.KgCfg(*args, **kwargs)

Bases: Config

Knowledge Graph configuration parameters.

kg_base = Full url to KG instance. By default production will be used.(None)
kg_org = KG organization to use. By default `bbp` organisation will be used.(None)
kg_proj = KG project to use.(None)
class bbp_workflow.task.KgTask(*args, **kwargs)

Bases: KgCfg, Task

Base class for Knowledge Graph enabled tasks.

done(msg, entity)

Mark output as done and print entity URL.

from_file(*args, **kwargs)

Delegate to DataDownload from_file method using kg params from the task.

from_id(entity_cls, resource_id, on_no_result=None)

Delegate to entity from_id method using kg params from the task.

from_json_str(*args, **kwargs)

Delegate to DataDownload from_json_str method using kg params from the task.

Make link to nexus web from the resource_id.

name = Name of the knowledge graph entity that will be created by the task.(None)
output()

RunAnywayTarget.

publish(entity, **kwargs)

Delegate to entity publish method using kg params from the task.

class bbp_workflow.task.LookupKgEntity(*args, **kwargs)

Bases: KgCfg, ExternalTask

Base class for Knowledge Graph lookup tasks.

url = Existing knowledge graph entity URL.
class bbp_workflow.task.MakeRemoteFolder(*args, **kwargs)

Bases: Task

Creates a folder on the host file system.

output()

RemoteTarget (path=self.path).

path = Folder path.
class bbp_workflow.task.MakeVirtualEnv(*args, **kwargs)

Bases: Task

Creates a python virtual environment on the host file system.

Usage:

bbp-workflow launch --follow bbp_workflow.task MakeVirtualEnv \
    virtual-env=/gpfs/bbp.cscs.ch/home/${USER}/tmp/venv \
    modules=py-bbp-workflow \
    packages='bmtk'
install_luigi = Install luigi in the python virtual environment.(True)
module_archive = Module archive to use.(unstable)
modules = Environment modules to load.(python)
output()

RemoteTarget(self.host, self.virtual_env).

packages = Python packages to install.()
requirements = Requirements file containing the python packages to install.(None)
virtual_env = Full path to python virtual environment where `bin/activate` will be created.
class bbp_workflow.task.OutputCfg(*args, **kwargs)

Bases: Config

Common configuration for tasks producing output at the specified file system location.

path_prefix = Default value can be provided in the `DEFAULT` cfg file section.
class bbp_workflow.task.RemoteHostCfg(*args, **kwargs)

Bases: Config

Add host parameter to your task using this mixin.

host = Remote host which is accessed by the task.(bbpv1.epfl.ch)
class bbp_workflow.task.SbatchTask(*args, **kwargs)

Bases: _JobPathMixin, Task

Sbatch task.

Inherits SlurmCfg, EnvCfg, RemoteHostCfg parameters, enabling customization of this task.

args = Arguments that will be passed to the command. You can also make your custom `@property` named args, to dynamically assemble args.()
command = The command `sbatch` will schedule for execution.
class bbp_workflow.task.SlurmCfg(*args, **kwargs)

Bases: Config

Standard set of Slurm configuration parameters.

Slurm based tasks will extend this class, so they can be parametrized the same way.

account = Default value can be provided in the  `DEFAULT` section of the accompanying cfg file.()
chdir = Have the remote processes do a chdir to path before beginning execution.(None)
constraint = Specify which type of nodes to use in allocation.(cpu)
cpus_per_task = Amount of cpus allocated per task.(0)
exclusive = Allocate nodes exclusively and do not share them with other jobs.(False)
job_name = Specify a name for the job.(None)
job_output = Job output.(None)
mem = Specify the real memory required per node.(None)
mem_per_cpu = Minimum memory required per allocated CPU.(None)
mpi = Identify the type of MPI to be used.(None)
nodes = Number of nodes to be allocated.(0)
ntasks = Number of tasks to launch.(0)
ntasks_per_node = Number of tasks to launch on each allocated node.(0)
partition = Default value can be provided in the  `DEFAULT` section of the accompanying cfg file.(prod)
qos = Quality of service. Choices: {, bigjob, longjob, jenkins, normal}()
time = Set a limit on the total run time of the job allocation.(None)
class bbp_workflow.task.SrunTask(*args, **kwargs)

Bases: Task

Srun task.

Inherits SlurmCfg, EnvCfg, RemoteHostCfg parameters, enabling customization of this task.

args = Arguments that will be passed to the command. You can also make your custom `@property` named args, to dynamically assemble args.()
command = The command `srun` will schedule for execution.
return_result = If True, will capture the output of the task.(False)
run()

Will run self.command on the self.host using SLURM allocation.

If the task which extends SrunTask implements self.remote_script, the source of the self.remote_script method will be stored in a tmp file. This tmp file will be submitted as the first argument to the self.command and self.args will follow.