#!/usr/bin/env bash # # bbq - Simple Bash message queues using Linux named pipes. # # bbq.sh provides a few convenient Bash functions for creating and working with # Linux named pipes (FIFO). Its primary use case is to set up multiple worker # processes getting work from a single job queue in your shell script. # # Please see the examples/basic script for an example usage of bbq.sh. # # Requirements: # - Linux, Bash, GNU coreutils, and flock from util-linux. # if [[ ${bbq_SOURCED:-} ]]; then return 0 fi bbq_error () { echo "$@" >&2; } bbq_debug () { if [[ ${bbq_DEBUG:-} ]]; then echo "$@" >&2; fi; } if [[ $BASH_SOURCE == "$0" ]]; then bbq_error "Please source bbq.sh instead of running it!" exit 1 fi # Set it to enable some debugging outputs. bbq_DEBUG= # The default number of worker processes to run by bbq-start bbq_WORKER_COUNT=4 # The named pipe to be used by default to serve as a message queue. bbq_FIFO= # Each "job" in bbq is a ${bbq_CHUNK_SIZE}-byte chunk of bash commands # written to $bbq_FIFO that is used as a message queue. # bbq_CHUNK_SIZE=1024 # bytes; on Linux the limit is 4k. # Create a named pipe (FIFO) and set it as the default FIFO ($bbq_FIFO). # # If an argument is provided it's taken to be the path of the FIFO to be # created; otherwise, a random file name will be chosen for the FIFO in the # current directory. # bbq-new () { # [queue] local queue=${1:-} [[ $queue ]] || queue=bbq-$$-$RANDOM [[ $queue == /* ]] || queue=$PWD/$queue mkfifo -m 0600 "$queue" || return $? bbq_FIFO=$queue } # Start worker processes in the background to process the queue and wait for # them to end. # # Arguments: # # queue - Optional. Path to the named pipe for the workers to read # messages/commands from. If ommitted then the queue is # taken to be $bbq_FIFO, and in which case, it will also be # deleted at the end. # # Options: # # -w COUNT - Number of worker processes to create that work on the queue. # Default is 4 workers. # bbq-start () { # [-w COUNT] [queue] (declare -A workers # pid -> exit code bbq_owns_the_pipe= trap ' kill ${!workers[*]} >/dev/null 2>&1 || true if [[ $bbq_owns_the_pipe ]]; then rm -f "$bbq_FIFO"; fi ' EXIT _bbq_start_workers "$@" ) & } # Enqueue arbitrary Bash commands as a fixed length string into the $bbq_FIFO. # # Arguments: # # command - Arbitrary Bash command to add to the queue. # # queue - Optional. If ommitted, $bbq_FIFO is assumed; otherwise, it # should be a path to a message queue created by bbq-new. # # This command MUST ONLY be run after bbq-start. # # NOTE: This command may block on writing to the pipe if the pipe buffer is full. # Therefore, it's recommended that you run it as the last part of your # script. Alternatively, you can also run it in the background (i.e., # with '&') directly or indirectly to avoid blocking the flow of your # script. # bbq () { # <command> [queue] local code=$1 queue=${2:-$bbq_FIFO} if (( ${#code} > bbq_CHUNK_SIZE )); then bbq_error "Encoded message exceeds allowed chunk size ($bbq_CHUNK_SIZE): $code" return 1 fi [[ -p $queue ]] || { bbq_error "'$queue' must be a named pipe!"; return 1; } # Accoridng to docs and google, on linux, read/write less than PIPE_BUF (4k # bytes) on a FIFO is atomic. So, we don't need to lock before writes. # # See also 'man fifo' and 'man 7 pipe'. # printf "%-${bbq_CHUNK_SIZE}s" "${code:0:$bbq_CHUNK_SIZE}" >"$queue" } # Internal implementation for bbq-start() # _bbq_start_workers () { # [-w COUNT] [queue] local option; OPTIND=1 while getopts ':w:' option "$@"; do case $option in w) bbq_WORKER_COUNT=$OPTARG ;; :) bbq_error "$FUNCNAME: Missing option argument for -$OPTARG"; return 1 ;; \?) bbq_error "$FUNCNAME: Unknown option: -$OPTARG"; return 1 ;; esac done shift $((OPTIND - 1)) bbq_WORKER_COUNT=${bbq_WORKER_COUNT:-4} printf "%d" "$bbq_WORKER_COUNT" >/dev/null 2>&1 \ && (( bbq_WORKER_COUNT > 0 )) || { bbq_error "Worker count should be > 0" return 1 } local queue=${1:-} if [[ $queue ]]; then [[ $queue == /* ]] || queue=$PWD/$queue bbq_FIFO=$queue else bbq_owns_the_pipe=1 fi [[ -p ${bbq_FIFO:?required} ]] || { bbq_error "$bbq_FIFO must be a named pipe!" return 1 } # Fork the workers to do work. # This needs to be done before we can enqueue anything without blocking. # local i for ((i=0; i < $bbq_WORKER_COUNT; i++)); do _bbq_worker & workers[$!]= done # Keep a write FD open to the FIFO to prevent the read ends from # getting EOFs, which happens when all write FDs are closed. # local write_fd; exec {write_fd}>"$bbq_FIFO" || return $? # Wait for the worker processes to exit local pid failed= for pid in ${!workers[*]}; do wait $pid && workers[$pid]=0 || { workers[$pid]=$? bbq_error "Worker $pid exited! (rc=${workers[$pid]})" failed=1 } done [[ ! $failed ]] } # Represents a background worker sub process that takes commands from FIFO. # _bbq_worker () { local read_fd exec {read_fd}<"${bbq_FIFO:?required}" || return $? local code _bbq_pop () { local rc flock $read_fd || return $? read -u $read_fd -rN "$bbq_CHUNK_SIZE" code || rc=$? flock -u $read_fd || return $? return $rc } while true; do code=; _bbq_pop || { bbq_debug "Worker $BASHPID: Failed dequeuing! code=$code" continue } eval "$code" # NOTE: # - If you 'set -e' then the worker could die due to a non-zero # exit from evaluating $code. # - If all your workers exited (i.e., no more FDs reading on the # pipe) then the enqueue operation will either block on opening the write end, # or you get a SIGPIPE when writing to the pipe. done bbq_debug "Worker $BASHPID quit" eval "exec $read_fd>&-" } bbq_SOURCED=1