Fuzzball Documentation
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Parallel Workflows with Job Arrays

Overview

Fuzzball supports parallel workloads through task arrays.

A parallel workload has some number of independent tasks that follow a similar pattern. For example, imagine you have 1000 data files, and processing each file through the same algorithm requires 1 core and takes about an hour. If you processed all of these files sequentially, the job would take around 42 days to complete. Now imagine you have a powerful workstation with 32 cores allowing you to process 32 input files simultaneously. As files finish processing, other files will take their place and begin processing. Using this strategy, you can decrease the time needed to process the entire data set to around 31 1/2 hours. If you had access to 1000 cores you could reduce the processing time to a single hour!

Basic example

Here is a basic example Fuzzfile illustrating task arrays in Fuzzball.

version: v1
jobs:
  rollcall:
    image:
      uri: docker://wresch/lolcow
    script: |
      #!/bin/sh
      echo "cow #${FB_TASK_ID}" reporting for duty | cowsay
    resource:
      cpu:
        cores: 1
      memory:
        size: 1GB
    task-array:
      start: 1
      end: 6
      concurrency: 3
Preview workflow in web UI

The task-array section lets you specify that multiple copies of your job should run in parallel. Each task has a task ID. You can set the range using the start and end fields. Jobs contain an environment variable called FB_TASK_ID that allows you to reference the task ID of the currently running job. The concurrency field allows you to specify how many jobs should run in parallel (assuming that enough resources exist in the cluster).

Screenshots from the Fuzzball web UI may give you a better idea of how this task array runs. Note that Fuzzball queues up 3 tasks in the array at a time, respecting the concurrency field.

first three tasks in array starting

After the job runs you can check the logs to see how the FB_TASK_ID variable is used to change the standard output in each job.

herd of cows task array output

A Directory of Files

It is a very common pattern to have a collection of files and to need to do something to all of them. Consider this collection of files in a directory.

$ ls *.txt
file10.txt  file2.txt  file3.txt  file4.txt  file5.txt  file6.txt
file7.txt  file8.txt  file999.txt  file9.txt

$ cat file2.txt
521
560
604
645
417
844
244
1
522
650

Note that the filenames are numbered, but that file number 1 is missing and the last file is 999. The commands in our job cannot just iterate over file numbers.

Here is a generic Fuzzfile that will let you iterate through a directory of files and perform some action on them in a parallel workflow. In this example, we will calculate a mean of the numbers in each file, but you can change the command to do whatever you want. To keep this workflow self contained, the files are generated in the first (non-parallel) step. You can think of this as the data collection step.

version: v1
defaults:
  job:
    mounts:
      v1:
        location: /tmp
jobs:
  collect-data:
    mounts:
      v1:
      #!/bin/sh
    cwd: /tmp
    script: |
      #!/bin/sh
      ## Create some "measurements"
      for f in 2 3 4 5 6 7 8 9 10 999 ; do
        for i in $(seq 1 10) ; do
          echo $(( RANDOM % 1000 )) >> "file${f}.txt"
        done
      done
      echo "$ ls -lh"
      ls -lh
      echo "$ cat file2.txt"
      cat file2.txt
    image:
      uri: docker://alpine
    resource:
      cpu:
        cores: 1
      memory:
        size: 1GB
  process-files:
    depends-on:
      - name: collect-data
        status: finished
    mounts:
      v1:
        location: /tmp
    cwd: /tmp
    script: |
      #!/bin/sh
      file="$(ls *.txt | awk "NR==$FB_TASK_ID {print;exit}")"
      awk '{t+=$1} END {printf("%12s: mean=%.1f\n", FILENAME, t / NR)}' "$file"
    image:
      uri: docker://alpine
    resource:
      cpu:
        cores: 1
      memory:
        size: 1GB
    task-array:
      start: 1
      end: 10
      concurrency: 5
volumes:
  v1:
    reference: volume://user/ephemeral
Preview workflow in web UI

The key to this strategy is this command that appears in the process-files job.

file="$(ls *.txt | awk "NR==$FB_TASK_ID {print;exit}")"

It selects the $FB_TASK_ID-th file from the list of files and feeds that into the actual processing command:

awk '{t+=$1} END {printf("%12s: mean=%.1f\n", FILENAME, t / NR)}' "$file"

There are many ways of selecting the file to process in each task. If you used a container with a modern bash shell you could have used

mapfile -t files < <(ls *.txt)
file="${files[$((FB_TASK_ID - 1))]}"

This job produces logs like the following:

logs from the processing files example job