Apr 052013
 

Download as ebook

In part 3 we found out that executing Process instances in Parallel.ForEach could starve ThreadPool and cause a deadlock. In this part, we’ll attempt at solving the problem.


What we’d like to have is a function or class that executes and reads the outputs of an external process such that it could be used concurrently, in Parallel.ForEach or with Tasks without ill-effects or without the user taking special precautions to avoid these problems.

A Solution

A naïve solution is to make sure there is at least one thread available in the pool when blocking on the child process and its output. In other words, the number of threads in the pool must be larger than the MaxDegreeOfParallelism of the ForEach loop. This will only work when there are no other users of the ThreadPool, then we can control these two numbers to guarantee this inequality. Even then, we might potentially need a large number of threads in the pool. But the problem is inescapable as we can’t control the complete process at all times. To make matters worse, the API for changing the number of workers in the pool are non-atomic and will always have race conditions when changing these process-wide settings.

Broadly speaking, there are three solutions. The first is to replace Parallel.ForEach with something else. The second is to replace the Process class with our own, such that we have a more flexible design that avoid the wasting a thread to wait on the callback events. The third is to replace the ThreadPool. The last can be done by simply having a private pool for waiting on the callback events. That is, unless we can find a way to make the native Process work with Parallel.ForEach without worrying about this issue.

Of course the best solution would be for the ThreadPool to be smarter and increase the number of threads available. This is the case when we set our wait functions to wait indefinitely. But that takes way too long (in my case about 12 seconds) before the ThreadPool realizes that no progress is being made, and it still has some tasks schedules for execution. It (correctly) assumes that there might be some interdependency between the running-but-not-progressing threads and those tasks waiting for execution.

The third solution is overly complicated and I’d find very little reason to defend it. Thread pools are rather complicated beasts and unless we can reuse one, it’d be an overkill to develop one. The first two solution look promising, so let’s try them out.

Replacing Parallel.ForEach

Clearly the above workarounds aren’t bulletproof. We can easily end up in situations where we timeout, and so it’d be a Red Queen’s Race. A better solution is to avoid the root of the problem, namely, to avoid wasting ThreadPool threads to wait on the processes. This can be done if we could make the wait more efficient, by combining the waits of multiple processes together. In that case, it wouldn’t matter if we used a single ThreadPool thread, or a dedicated one.

To that end, we need to do two things. First, we need to convert the single function into an object that we can move around, because we’ll need to reference its locals directly. Second, we need to separate the wait from all the other setup code.

Here is a wrapper that is disposable, and wraps cleanly around Process.

        public class ProcessExecutor : IDisposable
        {
            public ProcessExecutor(string name, string path)
            {
                _name = name;
                _path = path;
            }

            public void Dispose()
            {
                Close();
            }

            public string Name { get { return _name; } }
            public string StdOut { get { return _stdOut.ToString(); } }
            public string StdErr { get { return _stdErr.ToString(); } }

            // Returns the internal process. Used for getting exit code and other advanced usage.
            // May be proxied by getters. But for now let's trust the consumer.
            public Process Processs { get { return _process; } }

            public bool Run(string args)
            {
                // Make sure we are don't have any old baggage.
                Close();

                // Fresh start.
                _stdOut.Clear();
                _stdErr.Clear();
                _stdOutEvent = new ManualResetEvent(false);
                _stdErrEvent = new ManualResetEvent(false);

                _process = new Process();
                _process.StartInfo = new ProcessStartInfo(_path)
                {
                    Arguments = args,
                    UseShellExecute = false,
                    RedirectStandardOutput = true,
                    RedirectStandardError = true,
                    ErrorDialog = false,
                    CreateNoWindow = true,
                    WorkingDirectory = Path.GetDirectoryName(_path)
                };

                _process.OutputDataReceived += (sender, e) =>
                {
                    _stdOut.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdOutEvent;
                        if (evt != null)
                        {
                            lock (evt)
                            {
                                evt.Set();
                            }
                        }
                    }
                };
                _process.ErrorDataReceived += (sender, e) =>
                {
                    _stdErr.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdErrEvent;
                        lock (evt)
                        {
                            evt.Set();
                        }
                    }
                };

                _sw = Stopwatch.StartNew();
                _process.Start();
                _process.BeginOutputReadLine();
                _process.BeginErrorReadLine();
                _process.Refresh();
                return true;
            }

            public void Cancel()
            {
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Invalidate cached data to requery.
                    proc.Refresh();

                    // Cancel all pending IO ops.
                    proc.CancelErrorRead();
                    proc.CancelOutputRead();

                    Kill();
                }

                var outEvent = _stdOutEvent;
                _stdOutEvent = null;
                if (outEvent != null)
                {
                    lock (outEvent)
                    {
                        outEvent.Close();
                        outEvent.Dispose();
                    }
                }

                var errEvent = _stdErrEvent;
                _stdErrEvent = null;
                if (errEvent != null)
                {
                    lock (errEvent)
                    {
                        errEvent.Close();
                        errEvent.Dispose();
                    }
                }
            }

            public void Wait()
            {
                Wait(-1);
            }

            public bool Wait(int timeoutMs)
            {
                try
                {
                    if (timeoutMs < 0)
                    {
                        // Wait for process and all I/O to finish.
                        _process.WaitForExit();
                        return true;
                    }

                    // Timed waiting. We need to wait for I/O ourselves.
                    if (!_process.WaitForExit(timeoutMs))
                    {
                        Kill();
                    }

                    // Wait for the I/O to finish.
                    var waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    _stdOutEvent.WaitOne(waitMs);

                    waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    return _stdErrEvent.WaitOne(waitMs);
                }
                finally
                {
                    // Cleanup.
                    Cancel();
                }
            }

            private void Close()
            {
                Cancel();
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Dispose in all cases.
                    proc.Close();
                    proc.Dispose();
                }
            }

            private void Kill()
            {
                try
                {
                    // We need to do this in case of a non-UI proc
                    // or one to be forced to cancel.
                    var proc = _process;
                    if (proc != null && !proc.HasExited)
                    {
                        proc.Kill();
                    }
                }
                catch
                {
                    // Kill will throw when/if the process has already exited.
                }
            }

            private readonly string _name;
            private readonly string _path;
            private readonly StringBuilder _stdOut = new StringBuilder(4 * 1024);
            private readonly StringBuilder _stdErr = new StringBuilder(4 * 1024);

            private ManualResetEvent _stdOutEvent;
            private ManualResetEvent _stdErrEvent;
            private Process _process;
            private Stopwatch _sw;
        }

Converting this to use Tasks is left as an exercise to the reader.

Now, we can use this in a different way.

        public static void Run(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            var cts = new CancellationTokenSource();
            var allProcesses = Task.Factory.StartNew(() =>
            {
                var tasks = pathArgs.Select(pair => Task.Factory.StartNew(() =>
                {
                    string name = Path.GetFileNameWithoutExtension(pair.Key);
                    var exec = new ProcessExecutor(name, pair.Key);
                    exec.Run(pair.Value);
                    cts.Token.Register(exec.Cancel);
                    return exec;
                })).ToArray();

                // Wait for individual tasks to finish.
                foreach (var task in tasks)
                {
                    if (task != null)
                    {
                        task.Result.Wait(timeout);
                        task.Result.Cancel();
                        task.Result.Dispose();
                        task.Dispose();
                    }
                }
            });

            // Cancel, if we timed out.
            allProcesses.Wait(timeout);
            cts.Cancel();
        }

This time, we fire each process executor in a separate thread (also a worker on the ThreadPool,) but we wait in a single thread. The worker threads will run, and they will have their async reads queued, which will run once the exec.Run() functions return, and free that thread, at which point the async reads will execute.

Notice that we are creating the new tasks in a worker thread. This is so that we can limit the wait on all of them (although we’d need to have a timeout that depends on their number. But for illustration purpose the above code is sufficient. Now we have the luxury of waiting on all of them in a single Wait call and we can also cancel all of them using the CancellationTokenSource.

The above code is typically finishing in about 5-6 seconds (and as low as 4006ms) for 500 child processes on 12 cores to echo a short string and exit.

One thing to keep in mind when using this code is that if we use ProcessExecutor in another class, as a member, that class should also separate and expose the Run and Wait functions separately. That is, if it simply calls Run followed by Wait in the same function, then they will both execute on the same thread, which will result in the same problem if a number of them get executed on the ThreadPool, as we did. So the abstraction will leak again!

In addition, this puts the onus in the hands of the consumer. Our ProcessExecutor class is not bulletproof on its own.

In the next and final part we’ll go back to the Process class to try and avoid the deadlock issue without assuming a specific usage pattern.

Download as ebook

  One Response to “Async I/O and ThreadPool Deadlock (Part 4)”