Execute shell commands from within Prefect flows.

Getting started

Install prefect-shell

The following command will install a version of prefect-shell compatible with your installed version of prefect. If you don’t already have prefect installed, it will install the newest version of prefect as well.

pip install "prefect[shell]"

Upgrade to the latest versions of prefect and prefect-shell:

pip install -U "prefect[shell]"

Register newly installed block types

Register the block types in the prefect-shell module to make them available for use.

prefect block register -m prefect_shell

Examples

Integrate shell commands with Prefect flows

With prefect-shell, you can use shell commands (and/or scripts) in Prefect flows to provide observability and resiliency. prefect-shell can be a useful tool if you’re transitioning your orchestration from shell scripts to Prefect.

Let’s get the shell-abration started!

The Python code below has shell commands embedded in a Prefect flow:

from prefect import flow
from datetime import datetime
from prefect_shell import ShellOperation


@flow
def download_data():
    today = datetime.today().strftime("%Y%m%d")

    # for short running operations, you can use the `run` method
    # which automatically manages the context
    ShellOperation(
        commands=[
            "mkdir -p data",
            "mkdir -p data/${today}"
        ],
        env={"today": today}
    ).run()

    # for long running operations, you can use a context manager
    with ShellOperation(
        commands=[
            "curl -O https://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv",
        ],
        working_dir=f"data/{today}",
    ) as download_csv_operation:

        # trigger runs the process in the background
        download_csv_process = download_csv_operation.trigger()

        # then do other things here in the meantime, like download another file
        ...

        # when you're ready, wait for the process to finish
        download_csv_process.wait_for_completion()

        # if you'd like to get the output lines, you can use the `fetch_result` method
        output_lines = download_csv_process.fetch_result()


if __name__ == "__main__":
    download_data()

Running this script results in output like this:

14:48:16.550 | INFO    | prefect.engine - Created flow run 'tentacled-chachalaca' for flow 'download-data'
14:48:17.977 | INFO    | Flow run 'tentacled-chachalaca' - PID 19360 triggered with 2 commands running inside the '.' directory.
14:48:17.987 | INFO    | Flow run 'tentacled-chachalaca' - PID 19360 completed with return code 0.
14:48:17.994 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 triggered with 1 commands running inside the PosixPath('data/20230201') directory.
14:48:18.009 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dl
14:48:18.010 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
oad  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
14:48:18.840 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
 11 1630k   11  192k    0     0   229k      0  0:00:07 --:--:--  0:00:07  231k
14:48:19.839 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
 83 1630k   83 1368k    0     0   745k      0  0:00:02  0:00:01  0:00:01  747k
14:48:19.993 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
100 1630k  100 1630k    0     0   819k      0  0
14:48:19.994 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
:00:01  0:00:01 --:--:--  821k
14:48:19.996 | INFO    | Flow run 'tentacled-chachalaca' - PID 19363 completed with return code 0.
14:48:19.998 | INFO    | Flow run 'tentacled-chachalaca' - Successfully closed all open processes.
14:48:20.203 | INFO    | Flow run 'tentacled-chachalaca' - Finished in state Completed()

Save shell commands in Prefect blocks

You can save commands within a ShellOperation block, then reuse them across multiple flows.

Save the block with desired commands:

from prefect_shell import ShellOperation


ping_op = ShellOperation(commands=["ping -t 1 prefect.io"])
ping_op.save("block-name")

# Load the saved block:
ping_op = ShellOperation.load("block-name")

Resources

Refer to the prefect-shell SDK documentation linked in the sidebar to explore all the capabilities of the prefect-shell library.