## Copyright (C) 2009 VZLU Prague, a.s., Czech Republic ## ## Author: Jaroslav Hajek ## Several improvements thanks to: Travis Collier ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 3 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; see the file COPYING. If not, see ## . ## -*- texinfo -*- ## @deftypefn{Function File} [@var{o1}, @var{o2}, @dots{}] = parcellfun (@var{nproc}, @var{fun}, @var{a1}, @var{a2}, @dots{}) ## @deftypefnx{Function File} parcellfun (nproc, fun, @dots{}, "UniformOutput", @var{val}) ## @deftypefnx{Function File} parcellfun (nproc, fun, @dots{}, "ErrorHandler", @var{errfunc}) ## Evaluates a function for multiple argument sets using multiple processes. ## @var{nproc} should specify the number of processes. A maximum recommended value is ## equal to number of CPUs on your machine or one less. ## @var{fun} is a function handle pointing to the requested evaluating function. ## @var{a1}, @var{a2} etc. should be cell arrays of equal size. ## @var{o1}, @var{o2} etc. will be set to corresponding output arguments. ## ## The UniformOutput and ErrorHandler options are supported with meaning identical ## to @dfn{cellfun}. ## A VerboseLevel option controlling the level output is supported. ## A value of 0 is quiet, 1 is normal, and 2 or more enables ## debugging output. ## ## NOTE: this function is implemented using "fork" and a number of pipes for IPC. ## Suitable for systems with an efficient "fork" implementation (such as GNU/Linux), ## on other systems (Windows) it should be used with caution. ## Also, if you use a multithreaded BLAS, it may be wise to turn off multi-threading ## when using this function. ## ## CAUTION: This function should be regarded as experimental. Although all subprocesses ## should be cleared in theory, there is always a danger of a subprocess hanging up, ## especially if unhandled errors occur. Under GNU and compatible systems, the following ## shell command may be used to display orphaned Octave processes: ## ps --ppid 1 | grep octave ## ## @end deftypefn function varargout = parcellfun (nproc, fun, varargin) if (nargin < 3 || ! isscalar (nproc) || nproc <= 0) print_usage (); endif if (ischar (fun)) fun = str2func (fun); elseif (! isa (fun, "function_handle")) error ("parcellfun: fun must be either a function handle or name") endif uniform_output = true; error_handler = []; verbose_level = 1; # default to normal output level args = varargin; nargs = length (varargin); ## parse options if (nargs > 1) do if (strcmp (args{nargs-1}, "UniformOutput")) uniform_output = args{nargs}; nargs -= 2; continue; endif if (strcmp (args{nargs-1}, "ErrorHandler")) error_handler = args{nargs}; nargs -= 2; continue; endif if (strcmp (args{nargs-1}, "VerboseLevel")) verbose_level = args{nargs}; nargs -= 2; continue; endif break; until (nargs < 2); endif args = args(1:nargs); if (length (args) == 0) print_usage (); elseif (length (args) > 1 && ! size_equal (args{:})) error ("arguments size must match"); endif nproc = min (nproc, numel (args{1})); ## create communication pipes. cmdr = cmdw = resr = resw = zeros (nproc, 1); err = 0; for i = 1:nproc ## command pipes [cmdr(i), cmdw(i), err, msg] = pipe (); if (err) break; endif ## result pipes [resr(i), resw(i), err, msg] = pipe (); if (err) break; endif endfor if (! err) ## status pipe [statr, statw, err, msg] = pipe (); endif if (err) error ("failed to open pipe: %s", msg); endif iproc = 0; # the parent process nsuc = 0; # number of processes succesfully forked. fflush (stdout); # prevent subprocesses from inheriting buffered output pids = zeros (nproc, 1); ## fork subprocesses for i = 1:nproc [pid, msg] = fork (); if (pid > 0) ## parent process. fork succeded. nsuc ++; pids(i) = pid; if (verbose_level > 1) fprintf (stderr,'parcellfun: child process %d created\n', pids(i)); fflush (stderr); endif elseif (pid == 0) ## child process. iproc = i; break; elseif (pid < 0) ## parent process. fork failed. err = 1; break; endif endfor if (iproc) ## child process. close unnecessary pipe ends. fclose (statr); for i = 1:nproc ## we won't write commands and read results fclose (cmdw (i)); fclose (resr (i)); if (i != iproc) ## close also those pipes that don't belong to us. fclose (cmdr (i)); fclose (resw (i)); endif endfor else ## parent process. close unnecessary pipe ends. fclose (statw); for i = 1:nproc ## we won't read commands and write results fclose (cmdr (i)); fclose (resw (i)); endfor if (nsuc) ## we forked some processes. if this is less than we opted for, gripe ## but continue. if (nsuc < nproc) warning ("parcellfun: only %d out of %d processes forked", nsuc, nproc); nproc = nsuc; endif else ## this is bad. error ("parcellfun: failed to fork processes"); endif endif ## At this point, everything should be OK (?) if (iproc) ## the border patrol. we really don't want errors escape after the forks. unwind_protect try ## child process. indicate ready state. fwrite (statw, -iproc, "double"); fflush (statw); do ## get command cmd = fread (cmdr(iproc), 1, "double"); if (cmd) ## we've got a job to do. prepare argument and return lists. res = cell (1, nargout); argsc = cell (1, nargs); for i = 1:nargs argsc{i} = args{i}{cmd}; endfor if (isempty (error_handler)) ## unguarded evaluation. [res{:}] = fun (argsc{:}); else ## guarded evaluation try [res{:}] = fun (argsc{:}); catch errs.index = cmd; [errs.message, errs.identifier] = lasterr (); [res{:}] = error_handler (errs, argsc{:}); end_try_catch endif ## indicate ready state. fwrite (statw, iproc, "double"); fflush (statw); ## write the result. ## FIXME: this can fail. fsave (resw(iproc), res); fflush (resw(iproc)); endif until (cmd == 0) catch ## just indicate the error. don't quit this function !!!! fputs (stderr, "\n"); warning ("parcellfun: unhandled error in subprocess %d", iproc); ## send a termination notice. fwrite (statw, -iproc, "double"); fflush (statw); end_try_catch unwind_protect_cleanup ## This is enclosed in another handler to prevent errors from escaping. ## If something goes wrong, we'll get a broken pipe signal, but anything ## is better than skipping the following __exit__. try fclose (statw); fclose (resw(iproc)); fclose (cmdr(iproc)); end_try_catch ## no more work for us. We call __exit__, which bypasses termination sequences. __exit__ (); ## we should never get here. exit (); end_unwind_protect else ## parent process. njobs = numel (varargin{1}); res = cell (nargout, njobs); pjobs = 0; pending = zeros (1, nproc); unwind_protect while (pjobs < njobs || any (pending)) ## if pipe contains no more data, that's bad if (feof (statr)) warning ("parcellfun: premature exit due to closed pipe"); break; endif ## wait for a process state. isubp = fread (statr, 1, "double"); if (isubp > 0) ijob = pending(isubp); ## we have a result ready. res(:, ijob) = fload (resr(isubp)); ## clear pending state pending(isubp) = 0; else isubp = -isubp; if (pending(isubp)) ## premature exit means an unhandled error occured in a subprocess. ## the process should have griped, we just try to exit gracefully. pending(isubp) = 0; ## no more jobs to start. njobs = pjobs; ## skip the rest; don't send commands to the process. continue; endif endif if (pjobs < njobs) ijob = ++pjobs; ## send the next job to the process. fwrite (cmdw(isubp), ijob, "double"); fflush (cmdw(isubp)); ## set pending state pending(isubp) = ijob; else ## send terminating signal fwrite (cmdw(isubp), 0, "double"); fclose (cmdw(isubp)); endif if( verbose_level > 0 ) fprintf (stderr, "\rparcellfun: %d/%d jobs done", pjobs - sum (pending != 0), njobs); fflush (stderr); endif endwhile if (verbose_level > 0) fputs (stderr, "\n"); fflush (stderr); endif unwind_protect_cleanup ## send termination signals to active processes. for isubp = find (pending) ## send terminating signal fwrite (cmdw(isubp), 0, "double"); fclose (cmdw(isubp)); endfor ## close all pipe ends fclose (statr); for i = 1:nproc fclose (resr(i)); endfor ## explicitly recognize all terminated processes. for i = 1:nproc if (verbose_level > 1) fprintf(stderr,'parcellfun: waiting for child process %d to close\n', pids(i)); fflush (stderr); endif [pid, status] = waitpid (pids(i)); endfor end_unwind_protect ## we're finished. transform the result. varargout = cell (1, nargout); shape = size (varargin{1}); for i = 1:nargout varargout{i} = reshape (res(i,:), shape); if (uniform_output) varargout{i} = cell2mat (varargout{i}); endif endfor endif endfunction