Search this site


Metadata

Articles

Projects

Presentations

Distributed xargs

I like xargs. However, xargs becomes less useful when you want to run in parallel many cpu-intensive tasks with more parallelism than you have cpus cores local to your machine.

Enter dxargs. For now, dxargs is a simple python script that will distribute tasks in a similar way to xargs but will distribute them to remote hosts over ssh. Basically, it's a threadpool of ssh sessions. An idle worker will ask for something to do, letting you get the maximum throughput possible; meaning your faster servers will be given more tasks to execute than slower ones simply because they complete them sooner.

As an example, let's run 'hostname' in parallel across a few machines for 100 total calls.

% seq 100 | ./dxargs.py -P0 -n1 --hosts "snack scorn" hostname | sort | uniq -c
    14 scorn.csh.rit.edu
    86 snack.home

# Now use per-input-set output collating:
% seq 100 | ./dxargs.py -P0 -n1 --hosts "snack scorn" --output_dir=/tmp/t 'uname -a'
% ls /tmp/t | tail -5
535.95.0.snack.1191918835
535.96.0.snack.1191918835
535.97.0.snack.1191918835
535.98.0.snack.1191918835
535.99.0.snack.1191918835
% cat /tmp/t/535.99.0.snack.1191918835
Linux snack.home 2.6.20-15-generic #2 SMP Sun Apr 15 06:17:24 UTC 2007 x86_64 GNU/Linux
Design requirements:
  • Argument input must work the same way as xargs (-n<num>, etc) and come from stdin
  • Don't violate POLA where unnecessary - same flags as xargs.
Basically, I want dxargs to be a drop in replacement for xargs with respect to compatibility. I may intentionally break compatibility later where it makes sense, however. Also, don't consider this first version POLA-compliant.

Neat features so far:

  • Uses OpenSSH Protocol 2's "Control" sockets (-M and -S flags) to keep the session handshaking down to once per host.
  • Each worker competes for work with the goal of having zero idle workers.
  • Collatable output to a specified directory by input set, pid, number, host, and time
  • '0' (aka -P0) for parallelism means parallelize to the same size as the host list
  • Ability to specify multiplicity by machine with notation like 'snack*4' to indicate snack can run 4 tasks in parallel
  • 'stdout' writing is wrapped with a mutex, so tasks can't interfere with output midline (I see this often with xargs)
Desired features (not yet implemented):
  • Retrying of input sets when workers malfunction
  • Good handling of ssh problems (worker connect timeouts, etc
  • More xargs and xapply behaviors

Download dxargs.py