API

utils

utils.create_project(root_dir='.', host='localhost', port=27017)

Creates a new project.

Parameters
  • name – the name of the project, which must be unique in the MongoDB used by the project.

  • root_dir – the root directory for the project. If the folder does not exist it will be created. Three additional subdirectories will be created: scripts, workspaces and logs.

  • host – the host for the MongoDB database.

  • port – the port for the MongoDB database.

Returns

Project

utils.drop_database(host='localhost', port=27017)

Project

class workflow_manager.Project(name, host='localhost', port=27017, user=None)
property active_process
property active_processes
check_active_process()
check_active_processes()
clear_db()
clear_system()
clear_workspaces()
delete_process(pid, delete_workspaces=True)
delete_script(label)
delete_workspace(id)
get_next_process(unused_cores=None)
get_process(pid)
get_process_status(arg=None, limit=40, script=None, sort=- 1)
get_script(script_id)
get_workspace(wksp_num)
import_script(path, script_id=None, rerun_processes=False)
kill(process_id)
kill_all()
list(arg=None, limit=40, script=None, sort=- 1)
list_processes(arg=None, limit=40, script=None, sort=- 1)
list_scripts()
list_workspaces()
new_script(label)
new_workspace(label=None)
process(pid)
process_completed(proc_id)
process_path(path, base_path=None)
processes(label=None, status=None, limit=0)
property processes_dir
property processes_pending
queue(process_id, propagate=False)
property root_dir
run_script(label, params)
script(script_id, can_create=False)
property scripts_dir
start_next_process()
property total_used_cores
trim_process_index()
trim_script_index()
trim_workspace_index()
workspace(wksp_num)
property workspaces_dir

Script

class workflow_manager.Script(project, label, create=False)
add_dependency(parent_script_label)
add_dependent(child_script_label)
add_source(path, update_mode='copy')
append_data(key, value)

Appends data to a list.

clear_sources()
property cores
property data
property directory
find_existing_process_by_parent(parent_id)
has_source(source)
load_as_module()
new_process(args)
property on_hold
property path
queue_dependent(dependent_script, parent_process)
queue_dependents(parent_process=None, status='pending')
queue_process(params={}, parent_id=None)
reset()
run(params={}, parent_id=None, status='pending')
set_cores_required(number_cores)

Sets the number of cores used by this script. Default is 1. This allows BPM to run multiple scripts/processes on multi-core machines.

set_data(key, value)

Sets the value of a key.

set_filename(filename)
set_hold_state(state)

Sets whether the script is allowed to run. If not it will stop the propagation of processes.

set_program(program)
set_script(script)
set_source(path, update_mode='copy')
set_source_path(path)
set_time_limit(timelimit)

Sets the time limit for which a process for this script can run for. Default is -1, which means it can run for an unlimited amount of time.

spawn(params)
update_from_source()

Process

class workflow_manager.Process(project, id)
append_data(key, value)

Appends data to a list.

check_status()
clear_metadata()
completed(status=True, message='', dt=None)
property cores

Returns the number or cores required by this script/process

property data
property duration
get_ancestor(gen=None, label=None, id=None)

This return an ancestor of a process that matches the given generation, label, or id.

ancestor = process.get_ancestor(gen=2) will return the ancestor two generations back, or None if none is found.

ancestor = process.get_ancestor(id=63) will return the ancestor with an id of 63, or None if none is found.

ancestor = process.get_ancestor(label=’fit’) will return the ancestor with the label “fit”, or None if none is found.

This function is a convinient wrapper to:

process.get_ancestor_by_generation(gen) process.get_ancestor_by_id(id) process.get_ancestor_by_label(label)

get_ancestor_by_generation(gen)

This return an ancestor of a process that matches the given generation.

ancestor = process.get_ancestor(gen=2) will return the ancestor two generations back, or None if none is found.

get_ancestor_by_id(id)

This return an ancestor of a process that matches the given id.

ancestor = process.get_ancestor(id=63) will return the ancestor with an id of 63, or None if none is found.

get_ancestor_by_label(label)

This return an ancestor of a process that matches the given label.

ancestor = process.get_ancestor(label=’fit’) will return the ancestor with the label “fit”, or None if none is found.

get_decendents(label=None)
get_list_of_logs()
get_workspace(label, create=False)

Returns a workspace with a given label that is associated with the process.

If the workspace does not exist, it will create and return a new workspace if create=True otherwise it will return None.

get_workspaces()
has_metadata()
kill()
property label

Returns the label of the script for this process

log(num_lines=None)
property message

Returns the message for this process

property metadata
open_latest_log()
open_logfile(logdatetime, mode='r')
property params
property parent

Returns the parent process for this process, i.e., the process that spawned this process.

This can strung together: parent_process = process.parent.parent.parent

property parent_process
property process_dir
progress(progress)
purge_logs(N=3)
queue(params=None, propagate=True)
queue_dependent(dependent_script)
reverse_file(myfile)

Iterates the lines of a file, but in reverse order

property root

Returns the root process for this sequence of processes, i.e., the original process that spawned this sequence process.

property script

Returns the script for this process

set_data(key, value)

Sets the value of a key.

set_metadata(key, value)
set_param(key, value)
start()
property started
property status

Returns the status this process

workspace(label, create=False)

Workspace

class workflow_manager.Workspace(project, workspace_id=None, label=None)

A workspace is where data is stored. These are typically stored in path_to_project_dir/workspaces.

Workspaces are typically created through processes which means they are associated with the process but you can create orphan workspaces. In other words, they aren’t associated with a process. An orphan workspace can be created through the project using the Project.new_workspace(label=None) command.

add_data(data)

Adds data as a dictionary.

Example:

workspace.add_data({‘elements’:[1,3,5]})

append_data(key, value)

Appends data to a list.

clear()
clear_directory_contents(directory_path, protected_dirs=[])
compare_versions(label=None, label2=None)
copy_directory_contents(source_dir, dest_dir)
copy_file(filepath)
copy_files(filepaths)

Copies files into the workspace.

create_version(label=None, overwrite=False)
property data

Returns a dictionary of the data associated with the workspace.

delete(filepath)
delete_folder()
extract_zipfile(filepath)

Extracts a zip file into the workspace.

list()
md5sum(filepath)
open_file(filename, mode='r')

Opens a file in the workspace and returns the file pointer object.

path(*args)
set_data(key, value)

Sets the value of a key.