Dec 222014
 

Downturn

Tianhe-2 by Jack Dongarra

Tianhe-2


The semi-annual Top500 list shows a rather worrying trend for the 4th consecutive time in its last incarnation of Nov 2014. The list just hit its “lowest turnover rate in two decades.” The combined performance of the Top500 systems went from 274 Pflops to 309 Pflops in six months. The annual performance growth sits at ~23% at the moment, down from historic 90% per annum (measured between 1994-2008).

What this means is that there is practically no change in the Top500 most powerful computers in the world, and the trend is picking up speed in the wrong direction.
There are certainly cycles as technology and economies go through booms and busts, yet it seems there has never been this long a slowdown since 1993 when the list was first published.
There is only one new entry in the top-10 (in last place) with 3.58 Pflops from the US. To see the slope of the slowdown, here is a graph from the presentation.

There is reason to think the trend will reverse, eventually, but the best estimates point to the 2016-2018 period. That the trend can be traced to mid-2008 hints at the economic downturn as a cause. However, there is also reason to think competition has cooled off, or possibly technology is the bottleneck. This is support by the fact that a significant application area is government/military/classified, which aren’t nearly as sensitive to economic downturns as the scientific establishment. Technology is in the middle of a boom in terms of co-procs/embedded-proc as a new class of computers, so it’s hard to chalk this off as a technology bottleneck. If competition is to blame, it’s a mystery to me as to why this should be the case now, especially that US-China are more overtly in competition than ever before, at 46% and 12% entries respectively. Russia, with 2% of the Top500 entries, is at one of its lowest points in terms of relationship with Europe and the west since the cold war, and is implicated in a hot-war.

Supercomputing Performance Development - Copyright Top500.org

Supercomputing Performance Development and Stagnation

It should be mentioned that the US and Asia have lost a few percent points each in terms of entries since June last (Japan, the only exception, gained 2 entries) and Europe gained a few, surpassing China in raw power after being taken over two years ago. Perhaps the budgets and plans that reflect the political climate hasn’t yet materialized in terms of supercomputing power. It’s interesting to see how this will play out, as it’s a disturbing trend, one that has implications in terms of technology, science research, and a long history of healthy—and mischievous—competition between nations to simulate weather and destruction—both natural and man-made.

The presentation slides show the slowdown with all the glory and color of graphs and numbers.

Efficiency

The only relatively—good news is that the second most efficient system went from 3418 Mflops/Watt to a new record of 4272 Mflops/Watt since June last, improving on the top contender at 3459 Mflops/Watt by 23.5%.
In fact, there are five updated entries in the top-10 most efficient systems, four of which are new. Personally I find this exciting and encouraging, but without more raw power many applications can’t be improved further.

The Green500 list, which is similar to Top500 in that it tracks the world’s most power-efficient systems, as opposed to the most compute-capable ones, has more good news. The latest edition, which is published after Top500’s latest, lists two machines that are more power-efficient than the LX that holds the top entry in the Top500’s power-efficiency list. At 5271 Mflop/Watt, the L-CSC at the GSI Helmholtz Center in Germany improves on the LX by yet another 23.5%.

L-CSC at GSI, Copyright Thomas Ernsting from heise.de

L-CSC at GSI

To put this in perspective, the most efficient consumer GPUs hit 23 Gflops (for AMD) and 28 Glops (for Nvidia). However, these are the numbers for single-precision (32-bit) floating-point ops, not the “full-precision” required by the Linpack, which requires 64-bits or more. The double-precision GPU performance is 1/4th the single-precision, at best. Typically it’s 1/8th or less. For AMD the most efficient double-precision GPU will reach 5.5 Gflops and 7.2 Gflops for Nvidia, which aren’t the most efficient single-precision GPUs (GPUs are differentiated for different markets, so they don’t compete with themselves). Meaning that even the most efficient consumer GPUs hardly makes the cut on their own, without any overhead or even a motherboard and CPU. The L-CSC uses Intel Ivy-Bridge CPUs and AMD FirePro Workstation GPUs to achieve the efficiency record.

Exa-FLOPS

The above record performance if scaled to 1 Exaflops would require a mere 190 MW energy. While this is still significant, it is the closest we’ve come to the DARPA target of 67 MW (by 2020).
Whether the architecture of the L-CSC scales to Exaflops or not is a different matter altogether, but the energy efficiency question which hitherto has been the most formidable obstacle to reaching Exa-scale performance seems to be well within reach. Still, 67 MW is a rather optimistic target as it would require a 2.8x efficiency improvement over the current numbers. Nonetheless, in 2007, when plans for Exa-scale supercomputing was laid out, the technology of the day would require 3000 MW when scaled to Exa-flops. There has been, in effect, a net 15x efficiency improvement in the past 7 years (no doubt in major part due to co-proc technology).

On a related note, at least the US seems to have plans to pushing the envelope towards Exa-scale computing, according to a very recent announcement. The US gov. plans “to spend $325m on two new supercomputers, and a further $100m on technology development, to put the USA back on the road to Exascale computing.” The $100m is especially exciting news as it’s not going to a vendor for building or upgrading a supercomputer, rather it’s allocated for technology. Beyond that, this should give a decent push for the healthy competitive spirit to start rolling again.

How to Use a Million Cores?

There has been a significant research and interest in parallel algorithms and libraries in the past decade—in major part—precisely to address the issue of scalability. Most implementation of algorithms do not scale to tens or hundreds of cores (let alone thousands or millions,) even if in theory the algorithm itself is reasonably easy to parallelize. The world’s fastest single machine (by a margin of 2x from the next competitor,) the Chinese Tianhe-2, a.k.a. Milkyway-2, has 3.12 million cores to play with.

The main issue has to do with communication. But in the parallelizing algorithms, the major problem is even closer to raw computing—overheads. It seems that the biggest bottleneck to parallelizing efficiently on even the same socket is the overhead of partitioning, scatter, and gather. The last step of which typically is the killer.(Interesting presentation on scalability with HPX and more published papers here.) I’ve been following some of the libraries and compilers in the C++ world and HPX as well as TBB seem to be doing a very decent and promising job. HPX is especially promising and is well worth taking a look into as it’s C++ standards conformant and even has a good chance of getting some of its functionality into the standards body by 2017 (the next planned C++ standard voting meeting). In addition, it supports distributing across compute nodes. Like TBB, it’s OSS.

But the shorter answer is that these machines are designed for specific applications and typically have the software available, so there is a very good idea as to what hardware characteristics will deliver the best performance, both computational and power consumption. In addition, they run multiple parallel versions, or scenarios, concurrently that are independent of one another, which reduces scalability issues dramatically. This is actually a good thing for simulations as some, if not most, scenarios are discarded anyway, and the sooner one discovers their unfitness the better. I do not have the reference at hand at the moment, unfortunately, but I believe the record for scaling on the most cores was reached sometime back (circa 2013) with 1 million cores utilized towards solving a single problem, which is impressive by any measure.

Often the supercomputer is shared between users. The US Titan, the second fastest machine, has thousands of users and applications running on it. To that end the Lustre filesystem (based on ZFS) was practically created for it. With 40 PB storage and 1.4 TB/s throughput _on disk_, it’s not exactly a standard-issue I/O system. (This presentation on Lustre shows the performance achieved on Titan.) This means that Linpack numbers should be taken with a large grain of salt when comparing these behemoths.

Why Supercompute?

I’m perhaps as cynical as anyone about the utility of these beasts of a machine and I’ve pointed out the military use of these machines, which is unfortunate. However, I much rather have the testing of nuclear (and other WMD) done in virtual simulations on machines that push the state of the art and most likely trickle the technology down to civilian and commercial use, than to have them done by actually blowing up parts of our planet. Indeed, it was precisely the ban on nuclear arms testing that first pushed the tests literally underground (the French and the US are the best known examples of covertly resuming tests underground and in the oceans,) before ultimately going fully into simulation. As such, I’m not at all torn on my position when it comes to the use of supercomputers for military purposes, considering the aggressive nature of homo-sapiens (irony noted in the lack of wisdom when playing with WMDs) and the fact that there is beneficial side-effects to this alternative.

Now, if only we could run conflicts through simulations to avoid the shedding of blood, much like how territorial animals display their prowess by war screams and showing their fangs to avoid physical conflict, and walk away when the winner is obvious to both, I think the world would be a vastly better place. Alas, something tells me we do like getting physical for its own sake, often when there is absolutely nothing of significance to gain, and much too much to lose. Nobody said pride was a virtue without a cost.

Share
Apr 082013
 

In part 4 we’ve replaced Parallel.ForEach with Task. This allowed us run the processes on the ThreadPool threads, but waited only in the main thread. So the worker threads were never blocked on waits. They executed the child process, then processed the output callbacks and exited promptly. All the while, the main thread is waiting for everything to finish completely. To do this, we needed to both break the Run logic from the Wait. In addition, we had to keep the Process instances alive for the callbacks to work and we can detect the end correctly.

This worked well for us. Except for the nagging problem that we can’t use Parallel.ForEach. Or rather, if we did, even accidentally, we’d deadlock as we did before. Also, our wrapper isn’t readily reusable in other classes without explicitly separating the Run call from the Wait on separate threads as we did. Clearly this isn’t a bulletproof solution.

It might be tempting to think that the Process class is a thin wrapper around the OS process, but in fact it’s rather complex. It does a lot for us, and we shouldn’t throw it away. It’d be great if we could avoid the problem with ThreadPool altogether. Remember the reason we’re using it was because the synchronous version deadlocked as well.

What if we could improve the synchronous version?

Synchronous I/O Take 2

Recall that the deadlock happened because to read until the end of the output stream, we need to wait for the process to exit. The process, in its turn, wouldn’t exit until it has written all its output, which we’re reading. Since we’re reading only one stream at a time (either StandardOutput or StandardError,) if the buffer of the one we’re not reading gets full, we deadlock.

The solution would be to read a little from each. This would work, except for the little problem that if we read from a stream that doesn’t have data, we’d block until it gets data. This is exactly like the situation we were trying to avoid. A deadlock will happen when the other stream’s buffer is full, so the child process will block on writing to it, while we are waiting to read from the other stream that has no data yet.

Peek to the Rescue?

The Process class exposes both StandardOutput or StandardError streams, which have a Peek() function that returns the next character without removing it from the buffer. It returns -1 when we have no data to read, so we postpone reading until Peek() returns > -1.

Albeit, this won’t work. As many have pointed out, StreamReader.Peek can block! Which is ironic, considering that one would typically use it to poll the stream.

It seems we have no more hope in the synchronous world. We have no getters to query the available data as in NetworkStream.DataAvailable and length will throw an exception as it needs a seekable stream (which we haven’t). So we’re back to the Asynchronous world.

The Solution: No Solution!

I was almost sure I found an answer with direct access to the StandardOutput or StandardError streams. After all, these are just wrappers around the Win32 pipes. The async I/O in .Net is really a wrapper around the native Windows async infrastructure. So, in theory, we don’t need all the layers that Process and other class add on top of the raw interfaces. Asynchronous I/O in Windows works by passing an (typically manual-reset) event object and a callback function. All we really need is the event to get triggered. Lo and behold, we are also get a wait-handle in the IAsyncResult that BeginRead of Stream returns. So we could wait on it directly, as these are triggered by the FileSystem drivers, after issuing async reads like this:

  var outAsyncRes = process.StandardOutput.BaseStream.BeginRead(outBuffer, 0, BUFFER_LEN, null, null);
  var errAsyncRes = process.StandardError.BaseStream.BeginRead(errBuffer, 0, BUFFER_LEN, null, null);
  var events = new[] { outAsyncRes.AsyncWaitHandle, errAsyncRes.AsyncWaitHandle };
  WaitHandle.WaitAny(events, waitTimeMs);

Except, this wouldn’t work. There are two reasons why this doesn’t work, one blame goes to the OS and one to .Net.

Async Event Not Triggered by OS

The first issue is that Windows doesn’t always signal this even. You read that right. In fact, a comment in the FileStream code reads:

                // Consider uncommenting this someday soon - the EventHandle 
                // in the Overlapped struct is really useless half of the
                // time today since the OS doesn't signal it. [...]

True, the state of the event object is not changed if the operation finishes before the function returns.

.Net Callback Interop via ThreadPool

Because the event isn’t signaled in all cases, .Net needs to manually signal this event object when the Overlapped I/O callback is invoked. You’d think that would save the day. Albeit, the Overlapped I/O callback doesn’t call into managed code. The CLR handles such callbacks in a special way. In fact it knows about File I/O and .Net wrappers aren’t written in pure P/Invoke, but rather by support from the CLR as well as standard P/Invoke.

Because the system can’t invoke managed callbacks, the solution is for the CLR to do it itself. Of course this needs to be one in a responsive fashion, and without blocking all of CLR for each callback invocation  What better solution than to queue a task on the ThreadPool that invokes the .Net callback? This callback will signal the event object we got in the IAsyncResult and, if set, it’ll call our delegate that we could pass to the BeginRead call.

So we’re back 180 degrees to ThreadPool and the original dilemma.

Conclusion

The task at hand seemed simple enough. Indeed, even after 5 posts, I still feel the frustration of not finding a generic solution that is both scalable and abstract. We’ve tried simple synchronous I/O, then switched to async, only to find that our waits can’t be satisfied because they are serviced by the worker threads that we are using to wait for the reads to complete. This took us to a polling strategy, that, once again, failed because the classes we are working with do not allow us to poll without blocking. The OS could have saved the day but because we’re in the belly of .Net, we have to play with its rules and that meant the OS callbacks had to be serviced on the same worker threads we are blocking. The ThreadPool is another stubbornly designed process-wide object that doesn’t allow us to gracefully and, more importantly, thread-safely, query and modify.

This means that we either need to explicitly use Tasks and the ProcessExecutor class we designed in the previous post or we need to roll our own internal thread pool to service the waits.

It is very easy to overlook a cyclic dependency such as this, especially in the wake of abstraction and separation of responsibilities. The complex nature of things was the perfect setup to overlook the subtle assumptions and expectations of each part of the code: the process spawning, the I/O readers, Parallel.ForEach and ultimately, the generic and omnipresent, ThreadPool.

The only hope for solving similar problems is to first find them. By incorrectly assuming (as I did) that any thread-safe function can be wrapped in Parallel.ForEach, and patting oneself for the marvels and simplicity of modern programming languages and for being proud of our silently-failing achievement, we only miss the opportunity to do so. By testing our code and verifying our assumptions, with skepticism and cynicism, rather than confidence and pride, do we stand a chance at finding out the sad truth, at least on the (hopefully) rare cases that we fail. Or abstraction fails, at any rate.

I can only wonder with a grin about other similar cases in the wild and how they are running slower than their brain-dead, one-at-a-time versions.

Share
Apr 052013
 

In part 3 we found out that executing Process instances in Parallel.ForEach could starve ThreadPool and cause a deadlock. In this part, we’ll attempt at solving the problem.

What we’d like to have is a function or class that executes and reads the outputs of an external process such that it could be used concurrently, in Parallel.ForEach or with Tasks without ill-effects or without the user taking special precautions to avoid these problems.

A Solution

A naïve solution is to make sure there is at least one thread available in the pool when blocking on the child process and its output. In other words, the number of threads in the pool must be larger than the MaxDegreeOfParallelism of the ForEach loop. This will only work when there are no other users of the ThreadPool, then we can control these two numbers to guarantee this inequality. Even then, we might potentially need a large number of threads in the pool. But the problem is inescapable as we can’t control the complete process at all times. To make matters worse, the API for changing the number of workers in the pool are non-atomic and will always have race conditions when changing these process-wide settings.

Broadly speaking, there are three solutions. The first is to replace Parallel.ForEach with something else. The second is to replace the Process class with our own, such that we have a more flexible design that avoid the wasting a thread to wait on the callback events. The third is to replace the ThreadPool. The last can be done by simply having a private pool for waiting on the callback events. That is, unless we can find a way to make the native Process work with Parallel.ForEach without worrying about this issue.

Of course the best solution would be for the ThreadPool to be smarter and increase the number of threads available. This is the case when we set our wait functions to wait indefinitely. But that takes way too long (in my case about 12 seconds) before the ThreadPool realizes that no progress is being made, and it still has some tasks schedules for execution. It (correctly) assumes that there might be some interdependency between the running-but-not-progressing threads and those tasks waiting for execution.

The third solution is overly complicated and I’d find very little reason to defend it. Thread pools are rather complicated beasts and unless we can reuse one, it’d be an overkill to develop one. The first two solution look promising, so let’s try them out.

Replacing Parallel.ForEach

Clearly the above workarounds aren’t bulletproof. We can easily end up in situations where we timeout, and so it’d be a Red Queen’s Race. A better solution is to avoid the root of the problem, namely, to avoid wasting ThreadPool threads to wait on the processes. This can be done if we could make the wait more efficient, by combining the waits of multiple processes together. In that case, it wouldn’t matter if we used a single ThreadPool thread, or a dedicated one.

To that end, we need to do two things. First, we need to convert the single function into an object that we can move around, because we’ll need to reference its locals directly. Second, we need to separate the wait from all the other setup code.

Here is a wrapper that is disposable, and wraps cleanly around Process.

        public class ProcessExecutor : IDisposable
        {
            public ProcessExecutor(string name, string path)
            {
                _name = name;
                _path = path;
            }

            public void Dispose()
            {
                Close();
            }

            public string Name { get { return _name; } }
            public string StdOut { get { return _stdOut.ToString(); } }
            public string StdErr { get { return _stdErr.ToString(); } }

            // Returns the internal process. Used for getting exit code and other advanced usage.
            // May be proxied by getters. But for now let's trust the consumer.
            public Process Processs { get { return _process; } }

            public bool Run(string args)
            {
                // Make sure we are don't have any old baggage.
                Close();

                // Fresh start.
                _stdOut.Clear();
                _stdErr.Clear();
                _stdOutEvent = new ManualResetEvent(false);
                _stdErrEvent = new ManualResetEvent(false);

                _process = new Process();
                _process.StartInfo = new ProcessStartInfo(_path)
                {
                    Arguments = args,
                    UseShellExecute = false,
                    RedirectStandardOutput = true,
                    RedirectStandardError = true,
                    ErrorDialog = false,
                    CreateNoWindow = true,
                    WorkingDirectory = Path.GetDirectoryName(_path)
                };

                _process.OutputDataReceived += (sender, e) =>
                {
                    _stdOut.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdOutEvent;
                        if (evt != null)
                        {
                            lock (evt)
                            {
                                evt.Set();
                            }
                        }
                    }
                };
                _process.ErrorDataReceived += (sender, e) =>
                {
                    _stdErr.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdErrEvent;
                        lock (evt)
                        {
                            evt.Set();
                        }
                    }
                };

                _sw = Stopwatch.StartNew();
                _process.Start();
                _process.BeginOutputReadLine();
                _process.BeginErrorReadLine();
                _process.Refresh();
                return true;
            }

            public void Cancel()
            {
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Invalidate cached data to requery.
                    proc.Refresh();

                    // Cancel all pending IO ops.
                    proc.CancelErrorRead();
                    proc.CancelOutputRead();

                    Kill();
                }

                var outEvent = _stdOutEvent;
                _stdOutEvent = null;
                if (outEvent != null)
                {
                    lock (outEvent)
                    {
                        outEvent.Close();
                        outEvent.Dispose();
                    }
                }

                var errEvent = _stdErrEvent;
                _stdErrEvent = null;
                if (errEvent != null)
                {
                    lock (errEvent)
                    {
                        errEvent.Close();
                        errEvent.Dispose();
                    }
                }
            }

            public void Wait()
            {
                Wait(-1);
            }

            public bool Wait(int timeoutMs)
            {
                try
                {
                    if (timeoutMs < 0)
                    {
                        // Wait for process and all I/O to finish.
                        _process.WaitForExit();
                        return true;
                    }

                    // Timed waiting. We need to wait for I/O ourselves.
                    if (!_process.WaitForExit(timeoutMs))
                    {
                        Kill();
                    }

                    // Wait for the I/O to finish.
                    var waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    _stdOutEvent.WaitOne(waitMs);

                    waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    return _stdErrEvent.WaitOne(waitMs);
                }
                finally
                {
                    // Cleanup.
                    Cancel();
                }
            }

            private void Close()
            {
                Cancel();
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Dispose in all cases.
                    proc.Close();
                    proc.Dispose();
                }
            }

            private void Kill()
            {
                try
                {
                    // We need to do this in case of a non-UI proc
                    // or one to be forced to cancel.
                    var proc = _process;
                    if (proc != null && !proc.HasExited)
                    {
                        proc.Kill();
                    }
                }
                catch
                {
                    // Kill will throw when/if the process has already exited.
                }
            }

            private readonly string _name;
            private readonly string _path;
            private readonly StringBuilder _stdOut = new StringBuilder(4 * 1024);
            private readonly StringBuilder _stdErr = new StringBuilder(4 * 1024);

            private ManualResetEvent _stdOutEvent;
            private ManualResetEvent _stdErrEvent;
            private Process _process;
            private Stopwatch _sw;
        }

Converting this to use Tasks is left as an exercise to the reader.

Now, we can use this in a different way.

        public static void Run(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            var cts = new CancellationTokenSource();
            var allProcesses = Task.Factory.StartNew(() =>
            {
                var tasks = pathArgs.Select(pair => Task.Factory.StartNew(() =>
                {
                    string name = Path.GetFileNameWithoutExtension(pair.Key);
                    var exec = new ProcessExecutor(name, pair.Key);
                    exec.Run(pair.Value);
                    cts.Token.Register(exec.Cancel);
                    return exec;
                })).ToArray();

                // Wait for individual tasks to finish.
                foreach (var task in tasks)
                {
                    if (task != null)
                    {
                        task.Result.Wait(timeout);
                        task.Result.Cancel();
                        task.Result.Dispose();
                        task.Dispose();
                    }
                }
            });

            // Cancel, if we timed out.
            allProcesses.Wait(timeout);
            cts.Cancel();
        }

This time, we fire each process executor in a separate thread (also a worker on the ThreadPool,) but we wait in a single thread. The worker threads will run, and they will have their async reads queued, which will run once the exec.Run() functions return, and free that thread, at which point the async reads will execute.

Notice that we are creating the new tasks in a worker thread. This is so that we can limit the wait on all of them (although we’d need to have a timeout that depends on their number. But for illustration purpose the above code is sufficient. Now we have the luxury of waiting on all of them in a single Wait call and we can also cancel all of them using the CancellationTokenSource.

The above code is typically finishing in about 5-6 seconds (and as low as 4006ms) for 500 child processes on 12 cores to echo a short string and exit.

One thing to keep in mind when using this code is that if we use ProcessExecutor in another class, as a member, that class should also separate and expose the Run and Wait functions separately. That is, if it simply calls Run followed by Wait in the same function, then they will both execute on the same thread, which will result in the same problem if a number of them get executed on the ThreadPool, as we did. So the abstraction will leak again!

In addition, this puts the onus in the hands of the consumer. Our ProcessExecutor class is not bulletproof on its own.

In the next and final part we’ll go back to the Process class to try and avoid the deadlock issue without assuming a specific usage pattern.

Share
Apr 042013
 

In part 2 we discovered that by executing Process instances in Parallel.ForEach we are getting reduced performance and our waits are timing out. In this part we’ll dive deep into the problem and find out what is going on.

Deadlock

Clearly the problem has to do with the Process class and/or with spawning processes in general. Looking more carefully, we notice that besides the spawned process we also have two asynchronous reads. We have intentionally requested these to be executed on separate threads. But that shouldn’t be a problem, unless we have misused the async API.

It is reasonable to suspect that the approach used to do the async reading is at fault. This is where I ventured to look at the Process class code. After all, it’s a wrapper on Win32 API, and it might make assumptions that I was ignoring or contradicting. Regrettably, that didn’t help to figure out what was going on, except for initiating me to write the said previous post.

Looking at the BeginOutputReadLine() function, we see it creating an AsyncStreamReader, which is internal to the .Net Framework and then calls BeginReadLine(), which presumably is where the async action happens.

        [System.Runtime.InteropServices.ComVisible(false)] 
        public void BeginOutputReadLine() {

            if(outputStreamReadMode == StreamReadMode.undefined) { 
                outputStreamReadMode = StreamReadMode.asyncMode;
            } 
            else if (outputStreamReadMode != StreamReadMode.asyncMode) {
                throw new InvalidOperationException(SR.GetString(SR.CantMixSyncAsyncOperation));
            }

            if (pendingOutputRead)
                throw new InvalidOperationException(SR.GetString(SR.PendingAsyncOperation)); 

            pendingOutputRead = true;
            // We can't detect if there's a pending sychronous read, tream also doesn't. 
            if (output == null) {
                if (standardOutput == null) {
                    throw new InvalidOperationException(SR.GetString(SR.CantGetStandardOut));
                } 

                Stream s = standardOutput.BaseStream; 
                output = new AsyncStreamReader(this, s, new UserCallBack(this.OutputReadNotifyUser), standardOutput.CurrentEncoding); 
            }
            output.BeginReadLine(); 
        }

Within the AsyncStreamReader.BeginReadLine() we see the familiar asynchronous read on Stream using Stream.BeginRead().

        // User calls BeginRead to start the asynchronous read
        internal void BeginReadLine() { 
            if( cancelOperation) {
                cancelOperation = false; 
            } 

            if( sb == null ) { 
                sb = new StringBuilder(DefaultBufferSize);
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
            else { 
                FlushMessageQueue();
            } 
        }

Unfortunately, I had incorrectly assumed that this async wait was executed on one of the I/O Completion Port threads of the ThreadPool. It seems that this is not the case, as ThreadPool.GetAvailableThreads() always returned the same number for completionPortThreads (incidentally, workerThreads value didn’t change much as well, but I didn’t notice that at first).

A breakthrough came when I started changing the maximum parallelism (i.e. maximum thread count) of Parallel.ForEach.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout, int maxThreads)
        {
            Parallel.ForEach(pathArgs, new ParallelOptions { MaxDegreeOfParallelism = maxThreads },
                             arg => Task.Factory.StartNew(() => DoJob(arg.Value.Split(' '))).Wait() );
        }

I thought I should increase the maximum number of threads to resolve the issue. Indeed, for certain values of MaxDegreeOfParallelism, I could never reproduce the problem (all processes finished very swiftly, and no timeouts). For everything else, the problem was reproducible most of the time. Nine out of ten I’d get timeouts. However, and to my surprise, the problem went away when I reduced MaxDegreeOfParallelism!

The magic number was 12. Yes, the number of cores at disposal on my dev machine. If we limit the number of concurrent ForEach executions to less than 12, everything finishes swiftly, otherwise, we get timeouts and finishing ExecAll() takes a long time. In fact, with maxThreads=11, 500 process executions finish under 8500ms, which is very commendable. However, with maxThreads=12, every 12 process wait until they timeout, which would take several minutes to finish all 500.

With this information, I tried increasing the ThreadPool limit of threads using ThreadPool.SetMaxThreads(). But it turns out the defaults are 1023 worker threads and 1000 for I/O Completion Port threads, as reported by ThreadPool.GetMaxThreads(). I was assuming that if the available thread count was lower than the required, the ThreadPool would simply create new threads until it reached the maximum configured value.

ThreadPool-Deadlock

Diagram showing deadlock (created by www.gliffy.com)

Putting It All Together

The assumption that Parallel.ForEach executes its body on the ThreadPool, assuming said body is a black-box is clearly flawed. In our case the body is initiating asynchronous I/O which needs their own threads. Apparently, these do not come from the I/O thread pool but the worker thread pool. In addition, the number of threads in this pool is initially set to that of the available number of cores on the target machine. Even worse, it will resist creating new threads until absolutely necessary. Unfortunately, in our case it’s too late, as our waits are timing out. What I left until this point (both for dramatic effect and to leave the solution to you, the reader, to find out) is that the timeouts were happening on the StandardOutput and StandardError streams. That is, even though the child processes had exited a long time ago, we were still waiting to read their output.

Let me spell it out, if it’s not obvious: Each call to spawn and wait for a child process is executed on a ThreadPool worker thread, and is using it exclusively until the waits return. The async stream reads on StandardOutput and StandardError need to run on some thread. Since they are apparently queued to run on a ThreadPool thread, they will starve if we use all of the available threads in the pool to wait on them to finish. Thereby timing out on the read waits (because we have a deadlock).

This is a case of Leaky Abstraction, as our black box of a “execute on ThreadPool” failed miserably when the code executed itself depended on the ThreadPool. Specifically, when we had used all available threads in the pool, we left none for our code that depends on the ThreadPool to use. We shot ourselves in the proverbial foot. Our abstraction failed.

In the next part we’ll attempt to solve the problem.

Share
Apr 032013
 

In part 1 we discovered a deadlock in the synchronous approach to reading the output of Process and we solved it using asynchronous reads. Today we’ll parallelize the execution in an attempt to maximize efficiency and concurrency.

Parallel Execution

Now, let’s complicate our lives with some concurrency, shall we?

If we are to spawn many processes, we could (and should) utilize all the cores at our disposal. Thanks to Parallel.For and Parallel.ForEach this task is made much simpler than otherwise.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            Parallel.ForEach(pathArgs, arg => ExecWithAsyncTasks(arg.Key, arg.Value, timeout));
        }

Things couldn’t be any simpler! We pass a list of executable paths and their arguments as KeyValuePair and a timeout in milliseconds. Except, this won’t work… at least not always.

First, let’s discuss how it will not work, then let’s understand the why before we attempt to fix it.

When Abstraction Backfires

The above code works like a charm in many cases. When it doesn’t, a number of waits timeout. This is unacceptable as we wouldn’t know if we got all the output or part of it, unless we get a clean exit with no timeouts. I first noticed this issue in a completely different way. I was looking at the task manager Process Explorer (if not using it, start now and I promise not to tell anyone,) to see how amazingly faster things are with that single ForEach line. I was expecting to see a dozen or so (on a 12-core machine) child processes spawning and vanishing in quick succession. Instead, and to my chagrin, I saw most of the time just one child! One!

And after many trials and head-scratching and reading, it became clear that the waits were timing out, even though clearly the children had finished and exited. Indeed, because typically a process would run in much less time than the timeout, it was now slower with the parallelized code than with the sequential version. This wasn’t obvious at first, and reasonably I suspected some children were taking too long, or they had too much to write to the output pipes that could be deadlocking (which wasn’t unfamiliar to me).

Testbed

To troubleshoot something as complex as this, one should start with clean test-case, with minimum number of variables. This calls for a dummy child that would do exactly as I told it, so that I could simulate different scenarios. One such scenario would be not to spawn any children at all, and just test the Parallel.ForEach with some in-proc task (i.e. just a local function that does similar work to that of a child).

using System;
using System.Threading;

namespace Child
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length < 2 || args.Length % 2 != 0)
            {
                Console.WriteLine("Usage: [echo|fill|sleep|return] ");
                return;
            }

            DoJob(args);
        }

        private static void DoJob(string[] args)
        {
            for (int argIdx = 0; argIdx < args.Length; argIdx += 2)
            {
                switch (args[argIdx].ToLowerInvariant())
                {
                    case "echo":
                        Console.WriteLine(args[argIdx + 1]);
                        break;

                    case "fill":
                        var rd = new Random();
                        int bytes = int.Parse(args[argIdx + 1]);
                        while (bytes-- > 0)
                        {
                            // Generate a random string as long as the .
                            Console.Write(rd.Next('a', 'z'));
                        }
                        break;

                    case "sleep":
                        Thread.Sleep(int.Parse(args[argIdx + 1]));
                        break;

                    case "return":
                        Environment.ExitCode = int.Parse(args[argIdx + 1]);
                        break;

                    default:
                        Console.WriteLine("Unknown command [" + args[argIdx] + "]. Skipping.");
                        break;
                }
            }
        }
    }
}

Now we can give the child process commands to change its behavior, from dumping data to its output to sleeping to returning immediately.

Once the problem is reproduced, we can narrow it down to pin-point the source. Running the exact same command in the same process (i. e. without spawning another process) results in no problems at all. Calling DoJob 500 times directly in Parallel.ForEach finishes in under 500ms (often under 450ms). So we can be sure Parallel.ForEach is working fine.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            Parallel.ForEach(pathArgs, arg => Task.Factory.StartNew(() => DoJob(arg.Value.Split(' '))).Wait() );
        }

Even executing as a new task (within the Parallel.ForEach) doesn’t result in any noticeable different in time. The reason for this good performance when running the jobs in new tasks is probably because the ThreadPool scheduler does fetch the task to execute immediately when we call Wait() and executes it. That is, because both the Task.Factory.StartNew() call as well as the DoJob() call are executed ultimately on the ThreadPool, and because Task is designed specifically to utilize it, when we call Wait() on the task, it knows that it should schedule the next job in the queue, which in this case is the job of the task on which we executed the Wait! Since the caller of Wait() happens to be running on the ThreadPool, it simply executes it instead of scheduling it on a different thread and blocking. Dumping the Thread.CurrentThread.ManagedThreadId from before the Task.Factory.StartNew() call and from within DoJob shows that indeed both are executed in the same thread. The overhead of creating and scheduling a Task is negligible, so we don’t see much of a change in time over 500 executions.

All this is great and comforting, but still doesn’t help us resolve the problem at hand: why aren’t our processes spawned and executed at the highest possible efficiency? And why are they timing out?

In the next part we’ll dive deep into the problem and find out what is going on.

Share
Apr 022013
 

I’ve mentioned in a past post that it was conceived while reading the source code for the System.Diagnostics.Process class. This post is about the reason that pushed me to read the source code in an attempt to fix the issue. It turned out that this was yet another case of Leaky Abstraction, which is a special interest of mine.

As it turned out, this post ended being way too long (even for me). I don’t like installments, but I felt that it is something that is worth trying as the size was prohibitive for single-post consumption. As such, I’ve split it up on 5 parts, so that each part would be around a 1000 words or less. I’ll post one part a day.

To give you an idea of the scope and subject of what’s to come, here is a quick overview. In part 1 I’ll lay out the problem. We are trying to spawn processes, read their output and kill if they take too long. Our first attempt is to use simple synchronous I/O to read the output and discover a deadlock. We solve the deadlock using asynchronous I/O. In part 2 we parallelize the code and discover reduced performance and yet another deadlock. We create a testbed and set about to investigate the problem at depth. In part 3 we will find out the root cause and we’ll discuss the mechanics (how and why) we hit such a problem. In part 4 we’ll discuss solutions to the problem and develop a generic solutions (with code) to fix the problem. Finally, in part 5 we see whether or not a generic solution could work before we summarize and conclude.

Let’s begin at the very beginning. Suppose you want to execute some program (call it child), get all its output (and error) and, if it doesn’t exit within some time limit, kill it. Notice that there is no interaction and no input. This is how tests are executed in Phalanger using a test runner.

Synchronous I/O

The Process class has conveniently exposed the underlying pipes to the child process using stream instances StandardOutput and StandardError. And, like many, we too might be tempted to simply call StandardOutput.ReadToEnd() and StandardError.ReadToEnd(). Albeit, that would work, until it doesn’t. As Raymond Chen noted, it’ll work as long as the data fits into the internal pipe buffer. The problem with this approach is that we are asking to read until we reach the end of the data, which will only happen for certainty when the child process we spawned exits. However, when the buffer of the pipe which the child writes its output to is full, the child has to wait until there is free space in the buffer to write to. But, you say, what if we always read and empty the buffer?

Good idea, except, we need to do that for both StandardOutput and StandardError at the same time. In the StandardOutput.ReadToEnd() call we read every byte coming in the buffer until the child process exits. While we have drained the StandardOutput buffer (so that the child process can’t be possibly blocked on that,) if it fills the StandardError buffer, which we aren’t reading yet, we will deadlock. The child won’t exit until it fully writes to the StandardError buffer (which is full because no one is reading it,) meanwhile, we are waiting for the process to exit so we can be sure we read to the end of the StandardOutput before we return (and start reading StandardError). The same problem exists for StandardOutput, if we first read StandardError, hence the need to drain both pipe buffers as they are fed, not one after the other.

Async Reading

The obvious (and only practical) solution is to read both pipes at the same time using separate threads. To that end, there are mainly two approaches. The pre-4.0 approach (async events), and the 4.5-and-up approach (tasks).

Async Reading with Events

The code is reasonably straight forward as it uses .Net events. We have two manual-reset events and two delegates that get called asynchronously when we read a line from each pipe. We get null data when we hit the end of file (i.e. when the process exits) for each of the two pipes.

        public static string ExecWithAsyncEvents(string path, string args, int timeoutMs)
        {
            using (var outputWaitHandle = new ManualResetEvent(false))
            {
                using (var errorWaitHandle = new ManualResetEvent(false))
                {
                    using (var process = new Process())
                    {
                        process.StartInfo = new ProcessStartInfo(path);
                        process.StartInfo.Arguments = args;
                        process.StartInfo.UseShellExecute = false;
                        process.StartInfo.RedirectStandardOutput = true;
                        process.StartInfo.RedirectStandardError = true;
                        process.StartInfo.ErrorDialog = false;
                        process.StartInfo.CreateNoWindow = true;

                        var sb = new StringBuilder(1024);
                        process.OutputDataReceived += (sender, e) =>
                        {
                            sb.AppendLine(e.Data);
                            if (e.Data == null)
                            {
                                outputWaitHandle.Set();
                            }
                        };
                        process.ErrorDataReceived += (sender, e) =>
                        {
                            sb.AppendLine(e.Data);
                            if (e.Data == null)
                            {
                                errorWaitHandle.Set();
                            }
                        };

                        process.Start();
                        process.BeginOutputReadLine();
                        process.BeginErrorReadLine();

                        process.WaitForExit(timeoutMs);
                        outputWaitHandle.WaitOne(timeoutMs);
                        errorWaitHandle.WaitOne(timeoutMs);

                        process.CancelErrorRead();
                        process.CancelOutputRead();

                        return sb.ToString();
                    }
                }
            }
        }

We certainly can improve on the above code (for example we should make the total wait limit <= timeoutMs) but you get the point with this sample. Also, no error handling or killing the child process when it times out and doesn’t exit.

Async Reading with Tasks

A much more simplified and sanitized approach is to use the new System.Threading.Tasks namespace/framework to do all the heavy-lifting for us. As you can see, the code has been cut by half and it’s much more readable, but we need Framework 4.5 and newer for this to work (although my target is 4.0, but for comparison purposes I gave it a spin). The results are the same.

        public static string ExecWithAsyncTasks(string path, string args, int timeout)
        {
            using (var process = new Process())
            {
                process.StartInfo = new ProcessStartInfo(path);
                process.StartInfo.Arguments = args;
                process.StartInfo.UseShellExecute = false;
                process.StartInfo.RedirectStandardOutput = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.ErrorDialog = false;
                process.StartInfo.CreateNoWindow = true;

                var sb = new StringBuilder(1024);

                process.Start();
                var stdOutTask = process.StandardOutput.ReadToEndAsync();
                var stdErrTask = process.StandardError.ReadToEndAsync();

                process.WaitForExit(timeout);
                stdOutTask.Wait(timeout);
                stdErrTask.Wait(timeout);

                return sb.ToString();
            }
        }

Again, a healthy doze of error-handling is in order, but for illustration purposes left out. A point worthy of mention is that we can’t assume we read the streams by the time the child exits. There is a race condition and we still need to wait for the I/O operations to finish before we can read the results.

In the next part we’ll parallelize the execution in an attempt to maximize efficiency and concurrency.

Share
QR Code Business Card