Apr 082013
 

In part 4 we’ve replaced Parallel.ForEach with Task. This allowed us run the processes on the ThreadPool threads, but waited only in the main thread. So the worker threads were never blocked on waits. They executed the child process, then processed the output callbacks and exited promptly. All the while, the main thread is waiting for everything to finish completely. To do this, we needed to both break the Run logic from the Wait. In addition, we had to keep the Process instances alive for the callbacks to work and we can detect the end correctly.

This worked well for us. Except for the nagging problem that we can’t use Parallel.ForEach. Or rather, if we did, even accidentally, we’d deadlock as we did before. Also, our wrapper isn’t readily reusable in other classes without explicitly separating the Run call from the Wait on separate threads as we did. Clearly this isn’t a bulletproof solution.

It might be tempting to think that the Process class is a thin wrapper around the OS process, but in fact it’s rather complex. It does a lot for us, and we shouldn’t throw it away. It’d be great if we could avoid the problem with ThreadPool altogether. Remember the reason we’re using it was because the synchronous version deadlocked as well.

What if we could improve the synchronous version?

Synchronous I/O Take 2

Recall that the deadlock happened because to read until the end of the output stream, we need to wait for the process to exit. The process, in its turn, wouldn’t exit until it has written all its output, which we’re reading. Since we’re reading only one stream at a time (either StandardOutput or StandardError,) if the buffer of the one we’re not reading gets full, we deadlock.

The solution would be to read a little from each. This would work, except for the little problem that if we read from a stream that doesn’t have data, we’d block until it gets data. This is exactly like the situation we were trying to avoid. A deadlock will happen when the other stream’s buffer is full, so the child process will block on writing to it, while we are waiting to read from the other stream that has no data yet.

Peek to the Rescue?

The Process class exposes both StandardOutput or StandardError streams, which have a Peek() function that returns the next character without removing it from the buffer. It returns -1 when we have no data to read, so we postpone reading until Peek() returns > -1.

Albeit, this won’t work. As many have pointed out, StreamReader.Peek can block! Which is ironic, considering that one would typically use it to poll the stream.

It seems we have no more hope in the synchronous world. We have no getters to query the available data as in NetworkStream.DataAvailable and length will throw an exception as it needs a seekable stream (which we haven’t). So we’re back to the Asynchronous world.

The Solution: No Solution!

I was almost sure I found an answer with direct access to the StandardOutput or StandardError streams. After all, these are just wrappers around the Win32 pipes. The async I/O in .Net is really a wrapper around the native Windows async infrastructure. So, in theory, we don’t need all the layers that Process and other class add on top of the raw interfaces. Asynchronous I/O in Windows works by passing an (typically manual-reset) event object and a callback function. All we really need is the event to get triggered. Lo and behold, we are also get a wait-handle in the IAsyncResult that BeginRead of Stream returns. So we could wait on it directly, as these are triggered by the FileSystem drivers, after issuing async reads like this:

  var outAsyncRes = process.StandardOutput.BaseStream.BeginRead(outBuffer, 0, BUFFER_LEN, null, null);
  var errAsyncRes = process.StandardError.BaseStream.BeginRead(errBuffer, 0, BUFFER_LEN, null, null);
  var events = new[] { outAsyncRes.AsyncWaitHandle, errAsyncRes.AsyncWaitHandle };
  WaitHandle.WaitAny(events, waitTimeMs);

Except, this wouldn’t work. There are two reasons why this doesn’t work, one blame goes to the OS and one to .Net.

Async Event Not Triggered by OS

The first issue is that Windows doesn’t always signal this even. You read that right. In fact, a comment in the FileStream code reads:

                // Consider uncommenting this someday soon - the EventHandle 
                // in the Overlapped struct is really useless half of the
                // time today since the OS doesn't signal it. [...]

True, the state of the event object is not changed if the operation finishes before the function returns.

.Net Callback Interop via ThreadPool

Because the event isn’t signaled in all cases, .Net needs to manually signal this event object when the Overlapped I/O callback is invoked. You’d think that would save the day. Albeit, the Overlapped I/O callback doesn’t call into managed code. The CLR handles such callbacks in a special way. In fact it knows about File I/O and .Net wrappers aren’t written in pure P/Invoke, but rather by support from the CLR as well as standard P/Invoke.

Because the system can’t invoke managed callbacks, the solution is for the CLR to do it itself. Of course this needs to be one in a responsive fashion, and without blocking all of CLR for each callback invocation  What better solution than to queue a task on the ThreadPool that invokes the .Net callback? This callback will signal the event object we got in the IAsyncResult and, if set, it’ll call our delegate that we could pass to the BeginRead call.

So we’re back 180 degrees to ThreadPool and the original dilemma.

Conclusion

The task at hand seemed simple enough. Indeed, even after 5 posts, I still feel the frustration of not finding a generic solution that is both scalable and abstract. We’ve tried simple synchronous I/O, then switched to async, only to find that our waits can’t be satisfied because they are serviced by the worker threads that we are using to wait for the reads to complete. This took us to a polling strategy, that, once again, failed because the classes we are working with do not allow us to poll without blocking. The OS could have saved the day but because we’re in the belly of .Net, we have to play with its rules and that meant the OS callbacks had to be serviced on the same worker threads we are blocking. The ThreadPool is another stubbornly designed process-wide object that doesn’t allow us to gracefully and, more importantly, thread-safely, query and modify.

This means that we either need to explicitly use Tasks and the ProcessExecutor class we designed in the previous post or we need to roll our own internal thread pool to service the waits.

It is very easy to overlook a cyclic dependency such as this, especially in the wake of abstraction and separation of responsibilities. The complex nature of things was the perfect setup to overlook the subtle assumptions and expectations of each part of the code: the process spawning, the I/O readers, Parallel.ForEach and ultimately, the generic and omnipresent, ThreadPool.

The only hope for solving similar problems is to first find them. By incorrectly assuming (as I did) that any thread-safe function can be wrapped in Parallel.ForEach, and patting oneself for the marvels and simplicity of modern programming languages and for being proud of our silently-failing achievement, we only miss the opportunity to do so. By testing our code and verifying our assumptions, with skepticism and cynicism, rather than confidence and pride, do we stand a chance at finding out the sad truth, at least on the (hopefully) rare cases that we fail. Or abstraction fails, at any rate.

I can only wonder with a grin about other similar cases in the wild and how they are running slower than their brain-dead, one-at-a-time versions.

Share
Apr 052013
 

In part 3 we found out that executing Process instances in Parallel.ForEach could starve ThreadPool and cause a deadlock. In this part, we’ll attempt at solving the problem.

What we’d like to have is a function or class that executes and reads the outputs of an external process such that it could be used concurrently, in Parallel.ForEach or with Tasks without ill-effects or without the user taking special precautions to avoid these problems.

A Solution

A naïve solution is to make sure there is at least one thread available in the pool when blocking on the child process and its output. In other words, the number of threads in the pool must be larger than the MaxDegreeOfParallelism of the ForEach loop. This will only work when there are no other users of the ThreadPool, then we can control these two numbers to guarantee this inequality. Even then, we might potentially need a large number of threads in the pool. But the problem is inescapable as we can’t control the complete process at all times. To make matters worse, the API for changing the number of workers in the pool are non-atomic and will always have race conditions when changing these process-wide settings.

Broadly speaking, there are three solutions. The first is to replace Parallel.ForEach with something else. The second is to replace the Process class with our own, such that we have a more flexible design that avoid the wasting a thread to wait on the callback events. The third is to replace the ThreadPool. The last can be done by simply having a private pool for waiting on the callback events. That is, unless we can find a way to make the native Process work with Parallel.ForEach without worrying about this issue.

Of course the best solution would be for the ThreadPool to be smarter and increase the number of threads available. This is the case when we set our wait functions to wait indefinitely. But that takes way too long (in my case about 12 seconds) before the ThreadPool realizes that no progress is being made, and it still has some tasks schedules for execution. It (correctly) assumes that there might be some interdependency between the running-but-not-progressing threads and those tasks waiting for execution.

The third solution is overly complicated and I’d find very little reason to defend it. Thread pools are rather complicated beasts and unless we can reuse one, it’d be an overkill to develop one. The first two solution look promising, so let’s try them out.

Replacing Parallel.ForEach

Clearly the above workarounds aren’t bulletproof. We can easily end up in situations where we timeout, and so it’d be a Red Queen’s Race. A better solution is to avoid the root of the problem, namely, to avoid wasting ThreadPool threads to wait on the processes. This can be done if we could make the wait more efficient, by combining the waits of multiple processes together. In that case, it wouldn’t matter if we used a single ThreadPool thread, or a dedicated one.

To that end, we need to do two things. First, we need to convert the single function into an object that we can move around, because we’ll need to reference its locals directly. Second, we need to separate the wait from all the other setup code.

Here is a wrapper that is disposable, and wraps cleanly around Process.

        public class ProcessExecutor : IDisposable
        {
            public ProcessExecutor(string name, string path)
            {
                _name = name;
                _path = path;
            }

            public void Dispose()
            {
                Close();
            }

            public string Name { get { return _name; } }
            public string StdOut { get { return _stdOut.ToString(); } }
            public string StdErr { get { return _stdErr.ToString(); } }

            // Returns the internal process. Used for getting exit code and other advanced usage.
            // May be proxied by getters. But for now let's trust the consumer.
            public Process Processs { get { return _process; } }

            public bool Run(string args)
            {
                // Make sure we are don't have any old baggage.
                Close();

                // Fresh start.
                _stdOut.Clear();
                _stdErr.Clear();
                _stdOutEvent = new ManualResetEvent(false);
                _stdErrEvent = new ManualResetEvent(false);

                _process = new Process();
                _process.StartInfo = new ProcessStartInfo(_path)
                {
                    Arguments = args,
                    UseShellExecute = false,
                    RedirectStandardOutput = true,
                    RedirectStandardError = true,
                    ErrorDialog = false,
                    CreateNoWindow = true,
                    WorkingDirectory = Path.GetDirectoryName(_path)
                };

                _process.OutputDataReceived += (sender, e) =>
                {
                    _stdOut.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdOutEvent;
                        if (evt != null)
                        {
                            lock (evt)
                            {
                                evt.Set();
                            }
                        }
                    }
                };
                _process.ErrorDataReceived += (sender, e) =>
                {
                    _stdErr.AppendLine(e.Data);
                    if (e.Data == null)
                    {
                        var evt = _stdErrEvent;
                        lock (evt)
                        {
                            evt.Set();
                        }
                    }
                };

                _sw = Stopwatch.StartNew();
                _process.Start();
                _process.BeginOutputReadLine();
                _process.BeginErrorReadLine();
                _process.Refresh();
                return true;
            }

            public void Cancel()
            {
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Invalidate cached data to requery.
                    proc.Refresh();

                    // Cancel all pending IO ops.
                    proc.CancelErrorRead();
                    proc.CancelOutputRead();

                    Kill();
                }

                var outEvent = _stdOutEvent;
                _stdOutEvent = null;
                if (outEvent != null)
                {
                    lock (outEvent)
                    {
                        outEvent.Close();
                        outEvent.Dispose();
                    }
                }

                var errEvent = _stdErrEvent;
                _stdErrEvent = null;
                if (errEvent != null)
                {
                    lock (errEvent)
                    {
                        errEvent.Close();
                        errEvent.Dispose();
                    }
                }
            }

            public void Wait()
            {
                Wait(-1);
            }

            public bool Wait(int timeoutMs)
            {
                try
                {
                    if (timeoutMs < 0)
                    {
                        // Wait for process and all I/O to finish.
                        _process.WaitForExit();
                        return true;
                    }

                    // Timed waiting. We need to wait for I/O ourselves.
                    if (!_process.WaitForExit(timeoutMs))
                    {
                        Kill();
                    }

                    // Wait for the I/O to finish.
                    var waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    _stdOutEvent.WaitOne(waitMs);

                    waitMs = (int)(timeoutMs - _sw.ElapsedMilliseconds);
                    waitMs = Math.Max(waitMs, 10);
                    return _stdErrEvent.WaitOne(waitMs);
                }
                finally
                {
                    // Cleanup.
                    Cancel();
                }
            }

            private void Close()
            {
                Cancel();
                var proc = _process;
                _process = null;
                if (proc != null)
                {
                    // Dispose in all cases.
                    proc.Close();
                    proc.Dispose();
                }
            }

            private void Kill()
            {
                try
                {
                    // We need to do this in case of a non-UI proc
                    // or one to be forced to cancel.
                    var proc = _process;
                    if (proc != null && !proc.HasExited)
                    {
                        proc.Kill();
                    }
                }
                catch
                {
                    // Kill will throw when/if the process has already exited.
                }
            }

            private readonly string _name;
            private readonly string _path;
            private readonly StringBuilder _stdOut = new StringBuilder(4 * 1024);
            private readonly StringBuilder _stdErr = new StringBuilder(4 * 1024);

            private ManualResetEvent _stdOutEvent;
            private ManualResetEvent _stdErrEvent;
            private Process _process;
            private Stopwatch _sw;
        }

Converting this to use Tasks is left as an exercise to the reader.

Now, we can use this in a different way.

        public static void Run(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            var cts = new CancellationTokenSource();
            var allProcesses = Task.Factory.StartNew(() =>
            {
                var tasks = pathArgs.Select(pair => Task.Factory.StartNew(() =>
                {
                    string name = Path.GetFileNameWithoutExtension(pair.Key);
                    var exec = new ProcessExecutor(name, pair.Key);
                    exec.Run(pair.Value);
                    cts.Token.Register(exec.Cancel);
                    return exec;
                })).ToArray();

                // Wait for individual tasks to finish.
                foreach (var task in tasks)
                {
                    if (task != null)
                    {
                        task.Result.Wait(timeout);
                        task.Result.Cancel();
                        task.Result.Dispose();
                        task.Dispose();
                    }
                }
            });

            // Cancel, if we timed out.
            allProcesses.Wait(timeout);
            cts.Cancel();
        }

This time, we fire each process executor in a separate thread (also a worker on the ThreadPool,) but we wait in a single thread. The worker threads will run, and they will have their async reads queued, which will run once the exec.Run() functions return, and free that thread, at which point the async reads will execute.

Notice that we are creating the new tasks in a worker thread. This is so that we can limit the wait on all of them (although we’d need to have a timeout that depends on their number. But for illustration purpose the above code is sufficient. Now we have the luxury of waiting on all of them in a single Wait call and we can also cancel all of them using the CancellationTokenSource.

The above code is typically finishing in about 5-6 seconds (and as low as 4006ms) for 500 child processes on 12 cores to echo a short string and exit.

One thing to keep in mind when using this code is that if we use ProcessExecutor in another class, as a member, that class should also separate and expose the Run and Wait functions separately. That is, if it simply calls Run followed by Wait in the same function, then they will both execute on the same thread, which will result in the same problem if a number of them get executed on the ThreadPool, as we did. So the abstraction will leak again!

In addition, this puts the onus in the hands of the consumer. Our ProcessExecutor class is not bulletproof on its own.

In the next and final part we’ll go back to the Process class to try and avoid the deadlock issue without assuming a specific usage pattern.

Share
Apr 042013
 

In part 2 we discovered that by executing Process instances in Parallel.ForEach we are getting reduced performance and our waits are timing out. In this part we’ll dive deep into the problem and find out what is going on.

Deadlock

Clearly the problem has to do with the Process class and/or with spawning processes in general. Looking more carefully, we notice that besides the spawned process we also have two asynchronous reads. We have intentionally requested these to be executed on separate threads. But that shouldn’t be a problem, unless we have misused the async API.

It is reasonable to suspect that the approach used to do the async reading is at fault. This is where I ventured to look at the Process class code. After all, it’s a wrapper on Win32 API, and it might make assumptions that I was ignoring or contradicting. Regrettably, that didn’t help to figure out what was going on, except for initiating me to write the said previous post.

Looking at the BeginOutputReadLine() function, we see it creating an AsyncStreamReader, which is internal to the .Net Framework and then calls BeginReadLine(), which presumably is where the async action happens.

        [System.Runtime.InteropServices.ComVisible(false)] 
        public void BeginOutputReadLine() {

            if(outputStreamReadMode == StreamReadMode.undefined) { 
                outputStreamReadMode = StreamReadMode.asyncMode;
            } 
            else if (outputStreamReadMode != StreamReadMode.asyncMode) {
                throw new InvalidOperationException(SR.GetString(SR.CantMixSyncAsyncOperation));
            }

            if (pendingOutputRead)
                throw new InvalidOperationException(SR.GetString(SR.PendingAsyncOperation)); 

            pendingOutputRead = true;
            // We can't detect if there's a pending sychronous read, tream also doesn't. 
            if (output == null) {
                if (standardOutput == null) {
                    throw new InvalidOperationException(SR.GetString(SR.CantGetStandardOut));
                } 

                Stream s = standardOutput.BaseStream; 
                output = new AsyncStreamReader(this, s, new UserCallBack(this.OutputReadNotifyUser), standardOutput.CurrentEncoding); 
            }
            output.BeginReadLine(); 
        }

Within the AsyncStreamReader.BeginReadLine() we see the familiar asynchronous read on Stream using Stream.BeginRead().

        // User calls BeginRead to start the asynchronous read
        internal void BeginReadLine() { 
            if( cancelOperation) {
                cancelOperation = false; 
            } 

            if( sb == null ) { 
                sb = new StringBuilder(DefaultBufferSize);
                stream.BeginRead(byteBuffer, 0 , byteBuffer.Length,  new AsyncCallback(ReadBuffer), null);
            }
            else { 
                FlushMessageQueue();
            } 
        }

Unfortunately, I had incorrectly assumed that this async wait was executed on one of the I/O Completion Port threads of the ThreadPool. It seems that this is not the case, as ThreadPool.GetAvailableThreads() always returned the same number for completionPortThreads (incidentally, workerThreads value didn’t change much as well, but I didn’t notice that at first).

A breakthrough came when I started changing the maximum parallelism (i.e. maximum thread count) of Parallel.ForEach.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout, int maxThreads)
        {
            Parallel.ForEach(pathArgs, new ParallelOptions { MaxDegreeOfParallelism = maxThreads },
                             arg => Task.Factory.StartNew(() => DoJob(arg.Value.Split(' '))).Wait() );
        }

I thought I should increase the maximum number of threads to resolve the issue. Indeed, for certain values of MaxDegreeOfParallelism, I could never reproduce the problem (all processes finished very swiftly, and no timeouts). For everything else, the problem was reproducible most of the time. Nine out of ten I’d get timeouts. However, and to my surprise, the problem went away when I reduced MaxDegreeOfParallelism!

The magic number was 12. Yes, the number of cores at disposal on my dev machine. If we limit the number of concurrent ForEach executions to less than 12, everything finishes swiftly, otherwise, we get timeouts and finishing ExecAll() takes a long time. In fact, with maxThreads=11, 500 process executions finish under 8500ms, which is very commendable. However, with maxThreads=12, every 12 process wait until they timeout, which would take several minutes to finish all 500.

With this information, I tried increasing the ThreadPool limit of threads using ThreadPool.SetMaxThreads(). But it turns out the defaults are 1023 worker threads and 1000 for I/O Completion Port threads, as reported by ThreadPool.GetMaxThreads(). I was assuming that if the available thread count was lower than the required, the ThreadPool would simply create new threads until it reached the maximum configured value.

ThreadPool-Deadlock

Diagram showing deadlock (created by www.gliffy.com)

Putting It All Together

The assumption that Parallel.ForEach executes its body on the ThreadPool, assuming said body is a black-box is clearly flawed. In our case the body is initiating asynchronous I/O which needs their own threads. Apparently, these do not come from the I/O thread pool but the worker thread pool. In addition, the number of threads in this pool is initially set to that of the available number of cores on the target machine. Even worse, it will resist creating new threads until absolutely necessary. Unfortunately, in our case it’s too late, as our waits are timing out. What I left until this point (both for dramatic effect and to leave the solution to you, the reader, to find out) is that the timeouts were happening on the StandardOutput and StandardError streams. That is, even though the child processes had exited a long time ago, we were still waiting to read their output.

Let me spell it out, if it’s not obvious: Each call to spawn and wait for a child process is executed on a ThreadPool worker thread, and is using it exclusively until the waits return. The async stream reads on StandardOutput and StandardError need to run on some thread. Since they are apparently queued to run on a ThreadPool thread, they will starve if we use all of the available threads in the pool to wait on them to finish. Thereby timing out on the read waits (because we have a deadlock).

This is a case of Leaky Abstraction, as our black box of a “execute on ThreadPool” failed miserably when the code executed itself depended on the ThreadPool. Specifically, when we had used all available threads in the pool, we left none for our code that depends on the ThreadPool to use. We shot ourselves in the proverbial foot. Our abstraction failed.

In the next part we’ll attempt to solve the problem.

Share
Apr 032013
 

In part 1 we discovered a deadlock in the synchronous approach to reading the output of Process and we solved it using asynchronous reads. Today we’ll parallelize the execution in an attempt to maximize efficiency and concurrency.

Parallel Execution

Now, let’s complicate our lives with some concurrency, shall we?

If we are to spawn many processes, we could (and should) utilize all the cores at our disposal. Thanks to Parallel.For and Parallel.ForEach this task is made much simpler than otherwise.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            Parallel.ForEach(pathArgs, arg => ExecWithAsyncTasks(arg.Key, arg.Value, timeout));
        }

Things couldn’t be any simpler! We pass a list of executable paths and their arguments as KeyValuePair and a timeout in milliseconds. Except, this won’t work… at least not always.

First, let’s discuss how it will not work, then let’s understand the why before we attempt to fix it.

When Abstraction Backfires

The above code works like a charm in many cases. When it doesn’t, a number of waits timeout. This is unacceptable as we wouldn’t know if we got all the output or part of it, unless we get a clean exit with no timeouts. I first noticed this issue in a completely different way. I was looking at the task manager Process Explorer (if not using it, start now and I promise not to tell anyone,) to see how amazingly faster things are with that single ForEach line. I was expecting to see a dozen or so (on a 12-core machine) child processes spawning and vanishing in quick succession. Instead, and to my chagrin, I saw most of the time just one child! One!

And after many trials and head-scratching and reading, it became clear that the waits were timing out, even though clearly the children had finished and exited. Indeed, because typically a process would run in much less time than the timeout, it was now slower with the parallelized code than with the sequential version. This wasn’t obvious at first, and reasonably I suspected some children were taking too long, or they had too much to write to the output pipes that could be deadlocking (which wasn’t unfamiliar to me).

Testbed

To troubleshoot something as complex as this, one should start with clean test-case, with minimum number of variables. This calls for a dummy child that would do exactly as I told it, so that I could simulate different scenarios. One such scenario would be not to spawn any children at all, and just test the Parallel.ForEach with some in-proc task (i.e. just a local function that does similar work to that of a child).

using System;
using System.Threading;

namespace Child
{
    class Program
    {
        static void Main(string[] args)
        {
            if (args.Length < 2 || args.Length % 2 != 0)
            {
                Console.WriteLine("Usage: [echo|fill|sleep|return] ");
                return;
            }

            DoJob(args);
        }

        private static void DoJob(string[] args)
        {
            for (int argIdx = 0; argIdx < args.Length; argIdx += 2)
            {
                switch (args[argIdx].ToLowerInvariant())
                {
                    case "echo":
                        Console.WriteLine(args[argIdx + 1]);
                        break;

                    case "fill":
                        var rd = new Random();
                        int bytes = int.Parse(args[argIdx + 1]);
                        while (bytes-- > 0)
                        {
                            // Generate a random string as long as the .
                            Console.Write(rd.Next('a', 'z'));
                        }
                        break;

                    case "sleep":
                        Thread.Sleep(int.Parse(args[argIdx + 1]));
                        break;

                    case "return":
                        Environment.ExitCode = int.Parse(args[argIdx + 1]);
                        break;

                    default:
                        Console.WriteLine("Unknown command [" + args[argIdx] + "]. Skipping.");
                        break;
                }
            }
        }
    }
}

Now we can give the child process commands to change its behavior, from dumping data to its output to sleeping to returning immediately.

Once the problem is reproduced, we can narrow it down to pin-point the source. Running the exact same command in the same process (i. e. without spawning another process) results in no problems at all. Calling DoJob 500 times directly in Parallel.ForEach finishes in under 500ms (often under 450ms). So we can be sure Parallel.ForEach is working fine.

        public static void ExecAll(List<KeyValuePair<string, string>> pathArgs, int timeout)
        {
            Parallel.ForEach(pathArgs, arg => Task.Factory.StartNew(() => DoJob(arg.Value.Split(' '))).Wait() );
        }

Even executing as a new task (within the Parallel.ForEach) doesn’t result in any noticeable different in time. The reason for this good performance when running the jobs in new tasks is probably because the ThreadPool scheduler does fetch the task to execute immediately when we call Wait() and executes it. That is, because both the Task.Factory.StartNew() call as well as the DoJob() call are executed ultimately on the ThreadPool, and because Task is designed specifically to utilize it, when we call Wait() on the task, it knows that it should schedule the next job in the queue, which in this case is the job of the task on which we executed the Wait! Since the caller of Wait() happens to be running on the ThreadPool, it simply executes it instead of scheduling it on a different thread and blocking. Dumping the Thread.CurrentThread.ManagedThreadId from before the Task.Factory.StartNew() call and from within DoJob shows that indeed both are executed in the same thread. The overhead of creating and scheduling a Task is negligible, so we don’t see much of a change in time over 500 executions.

All this is great and comforting, but still doesn’t help us resolve the problem at hand: why aren’t our processes spawned and executed at the highest possible efficiency? And why are they timing out?

In the next part we’ll dive deep into the problem and find out what is going on.

Share
Apr 022013
 

I’ve mentioned in a past post that it was conceived while reading the source code for the System.Diagnostics.Process class. This post is about the reason that pushed me to read the source code in an attempt to fix the issue. It turned out that this was yet another case of Leaky Abstraction, which is a special interest of mine.

As it turned out, this post ended being way too long (even for me). I don’t like installments, but I felt that it is something that is worth trying as the size was prohibitive for single-post consumption. As such, I’ve split it up on 5 parts, so that each part would be around a 1000 words or less. I’ll post one part a day.

To give you an idea of the scope and subject of what’s to come, here is a quick overview. In part 1 I’ll lay out the problem. We are trying to spawn processes, read their output and kill if they take too long. Our first attempt is to use simple synchronous I/O to read the output and discover a deadlock. We solve the deadlock using asynchronous I/O. In part 2 we parallelize the code and discover reduced performance and yet another deadlock. We create a testbed and set about to investigate the problem at depth. In part 3 we will find out the root cause and we’ll discuss the mechanics (how and why) we hit such a problem. In part 4 we’ll discuss solutions to the problem and develop a generic solutions (with code) to fix the problem. Finally, in part 5 we see whether or not a generic solution could work before we summarize and conclude.

Let’s begin at the very beginning. Suppose you want to execute some program (call it child), get all its output (and error) and, if it doesn’t exit within some time limit, kill it. Notice that there is no interaction and no input. This is how tests are executed in Phalanger using a test runner.

Synchronous I/O

The Process class has conveniently exposed the underlying pipes to the child process using stream instances StandardOutput and StandardError. And, like many, we too might be tempted to simply call StandardOutput.ReadToEnd() and StandardError.ReadToEnd(). Albeit, that would work, until it doesn’t. As Raymond Chen noted, it’ll work as long as the data fits into the internal pipe buffer. The problem with this approach is that we are asking to read until we reach the end of the data, which will only happen for certainty when the child process we spawned exits. However, when the buffer of the pipe which the child writes its output to is full, the child has to wait until there is free space in the buffer to write to. But, you say, what if we always read and empty the buffer?

Good idea, except, we need to do that for both StandardOutput and StandardError at the same time. In the StandardOutput.ReadToEnd() call we read every byte coming in the buffer until the child process exits. While we have drained the StandardOutput buffer (so that the child process can’t be possibly blocked on that,) if it fills the StandardError buffer, which we aren’t reading yet, we will deadlock. The child won’t exit until it fully writes to the StandardError buffer (which is full because no one is reading it,) meanwhile, we are waiting for the process to exit so we can be sure we read to the end of the StandardOutput before we return (and start reading StandardError). The same problem exists for StandardOutput, if we first read StandardError, hence the need to drain both pipe buffers as they are fed, not one after the other.

Async Reading

The obvious (and only practical) solution is to read both pipes at the same time using separate threads. To that end, there are mainly two approaches. The pre-4.0 approach (async events), and the 4.5-and-up approach (tasks).

Async Reading with Events

The code is reasonably straight forward as it uses .Net events. We have two manual-reset events and two delegates that get called asynchronously when we read a line from each pipe. We get null data when we hit the end of file (i.e. when the process exits) for each of the two pipes.

        public static string ExecWithAsyncEvents(string path, string args, int timeoutMs)
        {
            using (var outputWaitHandle = new ManualResetEvent(false))
            {
                using (var errorWaitHandle = new ManualResetEvent(false))
                {
                    using (var process = new Process())
                    {
                        process.StartInfo = new ProcessStartInfo(path);
                        process.StartInfo.Arguments = args;
                        process.StartInfo.UseShellExecute = false;
                        process.StartInfo.RedirectStandardOutput = true;
                        process.StartInfo.RedirectStandardError = true;
                        process.StartInfo.ErrorDialog = false;
                        process.StartInfo.CreateNoWindow = true;

                        var sb = new StringBuilder(1024);
                        process.OutputDataReceived += (sender, e) =>
                        {
                            sb.AppendLine(e.Data);
                            if (e.Data == null)
                            {
                                outputWaitHandle.Set();
                            }
                        };
                        process.ErrorDataReceived += (sender, e) =>
                        {
                            sb.AppendLine(e.Data);
                            if (e.Data == null)
                            {
                                errorWaitHandle.Set();
                            }
                        };

                        process.Start();
                        process.BeginOutputReadLine();
                        process.BeginErrorReadLine();

                        process.WaitForExit(timeoutMs);
                        outputWaitHandle.WaitOne(timeoutMs);
                        errorWaitHandle.WaitOne(timeoutMs);

                        process.CancelErrorRead();
                        process.CancelOutputRead();

                        return sb.ToString();
                    }
                }
            }
        }

We certainly can improve on the above code (for example we should make the total wait limit <= timeoutMs) but you get the point with this sample. Also, no error handling or killing the child process when it times out and doesn’t exit.

Async Reading with Tasks

A much more simplified and sanitized approach is to use the new System.Threading.Tasks namespace/framework to do all the heavy-lifting for us. As you can see, the code has been cut by half and it’s much more readable, but we need Framework 4.5 and newer for this to work (although my target is 4.0, but for comparison purposes I gave it a spin). The results are the same.

        public static string ExecWithAsyncTasks(string path, string args, int timeout)
        {
            using (var process = new Process())
            {
                process.StartInfo = new ProcessStartInfo(path);
                process.StartInfo.Arguments = args;
                process.StartInfo.UseShellExecute = false;
                process.StartInfo.RedirectStandardOutput = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.ErrorDialog = false;
                process.StartInfo.CreateNoWindow = true;

                var sb = new StringBuilder(1024);

                process.Start();
                var stdOutTask = process.StandardOutput.ReadToEndAsync();
                var stdErrTask = process.StandardError.ReadToEndAsync();

                process.WaitForExit(timeout);
                stdOutTask.Wait(timeout);
                stdErrTask.Wait(timeout);

                return sb.ToString();
            }
        }

Again, a healthy doze of error-handling is in order, but for illustration purposes left out. A point worthy of mention is that we can’t assume we read the streams by the time the child exits. There is a race condition and we still need to wait for the I/O operations to finish before we can read the results.

In the next part we’ll parallelize the execution in an attempt to maximize efficiency and concurrency.

Share
Nov 162011
 

Transactions are the seat-belts of SQL servers. They protect our data by promising “all or nothing”. Data consistency on any level couldn’t be guaranteed without it. In a sense transactions work because they guarantee atomicity. They work because they allow you to abandon your changes and walk away at will, completely secure in the knowledge that not only your changes thus far aren’t visible to anyone, but also that your changes didn’t go anywhere near your valuable data.

Any reasonably-sized database code, therefore, is bound to have transactions peppered all about. Yet having a flawed implementation could be as worse as not having any at all. Perhaps even worse, given the false confidence. To my shock and horror, we just discovered a similar situation in one of our products. The opportunity to get to the bottom of transactions was pretty much forced, but it was a welcome opportunity nonetheless.

Here I’ll limit this investigation to MS SQL (2005 and 2008 used for testing).

Here is what we’re trying to achieve:

  1. Understand how MS SQL transactions work.
  2. Transactions must support nesting and rollback should abort all.
  3. Develop a template that use transactions correctly with error handling.

Test Bed

Let’s start by a boilerplate database as a test-bed.

-- Create an uncreatively named database.
CREATE DATABASE DeeBee
GO
USE DeeBee
GO

-- Create a table to use for data modification.
CREATE TABLE Tee(
	-- We'll generate overflow exception at will using this field.
	Num TINYINT
)
GO

Now let’s see what happens if we do multiple inserts and one fails…

-- Sproc to do multiple inserts with failure.
CREATE PROCEDURE [Multi_Insert]
AS
	-- Normal case
	INSERT INTO Tee VALUES(1);
	-- Overflow case
	INSERT INTO Tee VALUES(2000);
	-- Normal case again
	INSERT INTO Tee VALUES(3);
GO

To execute the above sproc, run the following (repeat after each change to the sproc above):

-- Clean slate.
DELETE FROM Tee
GO

-- Execute complex statements.
EXEC Multi_Insert;
GO

-- Check the results
SELECT * FROM Tee
GO

Results (for the exec alone):

(1 row(s) affected)
Msg 220, Level 16, State 2, Procedure Multi_Insert, Line 8
Arithmetic overflow error for data type tinyint, value = 2000.
The statement has been terminated.

(1 row(s) affected)

Two things to observe here: First, unsurprisingly the first insert went as planned as can be seen in the first “1 row(s) affected” message, followed by the overflow error. Second, which is important here, is that this didn’t abort or change the flow of the sproc, rather, the 3rd insert was executed as well.

Selecting everything in the Tee gives us:

1
3

Error Handling

In many cases we wouldn’t want to resume with our operations when one fails. A quick solution might look something like this:

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
	-- Normal case
	INSERT INTO Tee VALUES(1);
	if @@ERROR <> 0
		RETURN

	-- Overflow case
	INSERT INTO Tee VALUES(2000);
	if @@ERROR <> 0
		RETURN

	-- Normal case again
	INSERT INTO Tee VALUES(3);
GO

This would work, but it only prevents further operations after the initial error, it doesn’t undo previous changes. In addition, it’s very tedious, especially with complex sprocs and looks ugly. Let’s see what we can do with exception handling…

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
BEGIN TRY
	-- Normal case
	INSERT INTO Tee VALUES(1);
	-- Overflow case
	INSERT INTO Tee VALUES(2000);
	-- Normal case again
	INSERT INTO Tee VALUES(3);
END TRY
BEGIN CATCH
	PRINT ERROR_MESSAGE();
END CATCH
GO

This is functionally equivalent to the previous version, however here we have a clean control-flow and no longer do we need to taint our code with error checking. The print statement isn’t necessary, I just added it to show that the same error occurs. The result of this sproc is a single insert into Tee with the value 1.

Transactions

Let’s do the same, this time using transactions only.

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
BEGIN
	BEGIN TRANSACTION
		-- Normal case
		INSERT INTO Tee VALUES(1);
		-- Overflow case
		INSERT INTO Tee VALUES(2000);
		-- Normal case again
		INSERT INTO Tee VALUES(3);
	COMMIT TRANSACTION
END
GO

The above sproc will result in the following, familiar, output:

(1 row(s) affected)
Msg 220, Level 16, State 2, Procedure Multi_Insert, Line 9
Arithmetic overflow error for data type tinyint, value = 2000.
The statement has been terminated.

(1 row(s) affected)

This is the exact same behavior as without the transactions! This means transactions don’t automatically add protection, recovery and undo. Not only didn’t we revert the first insert after the error, but we went on to execute the remainder of the transaction. The only way to fix this is to actually issue a ROLLBACK TRANSACTION statement. Of course we must do this by first detecting that something went wrong. Since we already know that we must do this manually, we should either check for errors or wrap the code in TRY/CATCH clauses.

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
BEGIN TRY
	BEGIN TRANSACTION
		-- Normal case
		INSERT INTO Tee VALUES(1);
		-- Overflow case
		INSERT INTO Tee VALUES(2000);
		-- Normal case again
		INSERT INTO Tee VALUES(3);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	ROLLBACK TRANSACTION
	PRINT ERROR_MESSAGE();
END CATCH
GO

Results:

(1 row(s) affected)

(0 row(s) affected)
Arithmetic overflow error for data type tinyint, value = 2000.

Finally, modifications are rolled back and the table is back to a clean slate, as it was before executing the sproc.

Nested Transactions

Now that we have a working transaction with correct exception handling, let’s try to develop a version that is symmetric and can be nested. We could use the above try/transaction/catch/rollback pattern, except, it’s not robust. To see why that is so, let’s take a little bit more complex case. Suppose you used this pattern in the sprocs that do modification, and in one case you called one sproc from another.

-- Sproc that does complex operations.
CREATE PROCEDURE [SP_Complex]
AS
BEGIN TRY
	BEGIN TRANSACTION

		-- Normal case
		INSERT INTO Tee VALUES(0);

		-- Execute a bad sproc.
		EXEC Multi_Insert;

		-- Normal case again
		INSERT INTO Tee VALUES(5);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	ROLLBACK TRANSACTION
	PRINT ERROR_MESSAGE();
END CATCH
GO

The above sproc is written using the same pattern we used in Multi_Insert. We know the inner Multi_Insert will have an exception, it’ll rollback the transaction and return. But what about the outer transaction in SP_Complex? Would the first insert be rolled back? Will the last insert get executed?

Results:

(1 row(s) affected)

(1 row(s) affected)

(0 row(s) affected)
Arithmetic overflow error for data type tinyint, value = 2000.
Msg 3903, Level 16, State 1, Procedure SP_Complex, Line 19
The ROLLBACK TRANSACTION request has no corresponding BEGIN TRANSACTION.
Transaction count after EXECUTE indicates that a COMMIT or ROLLBACK TRANSACTION statement is missing. Previous count = 1, current count = 0.

Things didn’t go as planned. Checking for modifications in the Tee table shows that indeed the transaction worked as expected and all changes were rolled back. We still got an error though! To understand what’s going on, let’s see what the error was about.

First, notice that there were two inserts, one happened in the outer transaction and the second from the inner one, yet both were rolled back (the table should be empty). Then we print the overflow error, which indicates that the second insert in Multi_Insert threw. So far, so good. Then we hit an unanticipated error message. The clue to what happened is in “Transaction count after EXECUTE […]”. It seems that SQL engine is checking for transaction count after EXEC statements. Since we called ROLLBACK in Multi_Insert it seems that this has rolled back not only the inner transaction, but also the outer. This is confirmed by two facts. First, the table hasn’t been changed and all our inserts have been undone. Second, the inner ROLLBACK does match the BEGIN TRANSACTION in Multi_Insert and so does the pair in SP_Complex. So the only way we could get a mismatch error is if something in Multi_Insert had an effect larger than its scope. (Also, what’s with “Transaction count” references?)

To see what actually happened, let’s trace the execution path:

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [Multi_Insert]. Transactions: ' + Convert(varchar, @@TRANCOUNT);
		-- Normal case
		INSERT INTO Tee VALUES(1);
		-- Overflow case
		INSERT INTO Tee VALUES(2000);
		-- Normal case again
		INSERT INTO Tee VALUES(3);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [Multi_Insert]: ' + ERROR_MESSAGE();
	ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

-- Sproc that does complex operations.
ALTER PROCEDURE [SP_Complex]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [SP_Complex]. Transactions: ' + Convert(varchar,@@TRANCOUNT);

		-- Normal case
		INSERT INTO Tee VALUES(0);

		-- Execute a bad sproc.
		EXEC Multi_Insert;

		-- Normal case again
		INSERT INTO Tee VALUES(5);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [SP_Complex]: ' + ERROR_MESSAGE();
	ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

Results:

IN [SP_Complex]. Transactions: 1

(1 row(s) affected)
IN [Multi_Insert]. Transactions: 2

(1 row(s) affected)

(0 row(s) affected)
ERR [Multi_Insert]: Arithmetic overflow error for data type tinyint, value = 2000.
Rolled back. Transactions: 0
ERR [SP_Complex]: Transaction count after EXECUTE indicates that a COMMIT or ROLLBACK TRANSACTION statement is missing. Previous count = 1, current count = 0.
Msg 3903, Level 16, State 1, Procedure SP_Complex, Line 21
The ROLLBACK TRANSACTION request has no corresponding BEGIN TRANSACTION.
Rolled back. Transactions: 0

To get the transaction count we use @@TRANCOUNT. Now if we look at the output from the trace, we’ll see the exact same thing, except now it’s obvious that the transaction count is at 2 within the inner transaction, yet after catching the overflow exception and rolling back, the transaction count has dropped to 0! This confirms it. ROLLBACK works not only on the current transaction, but escalates all the way to the top-most transaction. The references to the “Previous count = 1, current count = 0.” is regarding the transaction count before the EXEC statement and after it. The ROLLBACK in Multi_Insert made both the COMMIT and ROLLBACK statements in SP_Complex obsolete, or rather invalid.

Correct Nesting

-- Sproc to do multiple inserts with failure.
ALTER PROCEDURE [Multi_Insert]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [Multi_Insert]. Transactions: ' + Convert(varchar, @@TRANCOUNT);
		-- Normal case
		INSERT INTO Tee VALUES(1);
		-- Overflow case
		INSERT INTO Tee VALUES(2000);
		-- Normal case again
		INSERT INTO Tee VALUES(3);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [Multi_Insert]: ' + ERROR_MESSAGE();
	IF (@@TRANCOUNT > 0)
		ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

-- Sproc that does complex operations.
ALTER PROCEDURE [SP_Complex]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [SP_Complex]. Transactions: ' + Convert(varchar, @@TRANCOUNT);

		-- Normal case
		INSERT INTO Tee VALUES(0);

		-- Execute a bad sproc.
		EXEC Multi_Insert;

		-- Normal case again
		INSERT INTO Tee VALUES(5);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [SP_Complex]: ' + ERROR_MESSAGE();
	IF (@@TRANCOUNT > 0)
		ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

By checking whether or not an inner transaction has rolled back already we avoid any mismatches. This works because using try/catch errors always execute the catch clauses, preventing any statements after the error to execute. If our transactions catch an exception, yet the @@TRANCOUNT is 0, then we must conclude that an inner sproc has made more commits than the transactions it created, or it rolled back the transaction(s). In both cases we must not ROLLBACK unless we have a valid transaction, and we must always ROLLBACK if we’re in a CATCH clause if we have a valid transaction. Notice that we don’t check for @@TRANCOUNT to COMMIT TRANSACTION. The reason is because the only case where we could get to it is when the execution of the previous statements are successful. True that if somewhere a mismatched COMMIT is done, we’ll get a mismatch, but that’s a programmer error and must not be suppressed. That is, by suppressing that case, our code will not work as expected (in a transaction) because all subsequent statements will execute outside transactions, and therefore can’t be rolled back. So we let that mismatch blow up.

The Template

So, now that we have a good understanding of how transactions and error handling works, we can finally create a template that we can reuse, safe in the knowledge that it’s nestable and will behave as expected in all cases.

BEGIN TRY
	BEGIN TRANSACTION

	--
	-- YOUR CODE HERE
	--

	-- Don't check for mismatching. Such an error is fatal.
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	-- An inner sproc might have rolled-back already.
	IF (@@TRANCOUNT > 0)
		ROLLBACK TRANSACTION
END CATCH

Full test-bed for reference:

IF EXISTS(SELECT * FROM sys.sysdatabases where name='DeeBee')
DROP DATABASE DeeBee
GO

-- Create an uncreatively named database.
CREATE DATABASE DeeBee
GO
USE DeeBee
GO

-- Create a table to use for data modification.
CREATE TABLE Tee(
	-- We'll generate overflow exception at will using this field.
	Num TINYINT
)
GO


-- Sproc to do multiple inserts with failure.
CREATE PROCEDURE [Multi_Insert]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [Multi_Insert]. Transactions: ' + Convert(varchar, @@TRANCOUNT);
		-- Normal case
		INSERT INTO Tee VALUES(1);
		-- Overflow case
		INSERT INTO Tee VALUES(2000);
		-- Normal case again
		INSERT INTO Tee VALUES(3);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [Multi_Insert]: ' + ERROR_MESSAGE();
	IF (@@TRANCOUNT > 0)
		ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

-- Sproc that does complex operations.
CREATE PROCEDURE [SP_Complex]
AS
BEGIN TRY
	BEGIN TRANSACTION
		PRINT 'IN [SP_Complex]. Transactions: ' + Convert(varchar, @@TRANCOUNT);

		-- Normal case
		INSERT INTO Tee VALUES(0);

		-- Execute a bad sproc.
		EXEC Multi_Insert;

		-- Normal case again
		INSERT INTO Tee VALUES(5);
	COMMIT TRANSACTION
END TRY
BEGIN CATCH
	PRINT 'ERR [SP_Complex]: ' + ERROR_MESSAGE();
	IF (@@TRANCOUNT > 0)
		ROLLBACK TRANSACTION
	PRINT 'Rolled back. Transactions: ' + Convert(varchar, @@TRANCOUNT);
END CATCH
GO

-- Clean slate.
DELETE FROM Tee
GO

-- Execute complex statements.
--EXEC Multi_Insert;
EXEC SP_Complex;
GO

-- Check the results
SELECT * FROM Tee
GO

Conclusion

Error handling is often tricky and full of gotcha’s. We should be careful not to make any unwarranted assumptions, especially when working cross-languages. The error handling, and more importantly transactions, in SQL don’t necessarily behave like your favorite programming language. By carefully reading documentation, experimenting and adding traces we can get insight and develop robust solutions.

However the above isn’t the last word on error handling or transactions. Your needs might be different and your use-cases might dictate other priorities. Perhaps my approach misses some subtle case that you can’t afford to ignore. Or may be this is good enough for a boilerplate template. Either way, comments, corrections and suggestions are more than welcome.

Share
Sep 182011
 

Summary (TL;DR)

This an experimental mod of Sqlite with built-in online compression support. Design and implementation are discussed, limitation and benchmarks provided and source code as well as prebuilt DLL are included. Use the TOC to jump to the topic of interest.

Background

Both Sqlite and MySql support compressed (and encrypted) databases. Well, more or less. Sqlite’s support is limited to read-only databases that are compressed offline, while MySql’s support is limited to compressing strings (as far as I can tell.)

While working on WikiDesk, a Wikipedia browser project, I knew the database could easily grow to 100s of gigabytes. The database of choice here is Sqlite because of it’s compactness and mobility. The English Wikipedia dump is already in the range of 100s to 1000s of gigs (depending on the dump type.) WikiDesk not only supports different Wikipedia languages, but also different projects, such as Wikinews, Wikibooks and Wiktionary, among many others in all available languages, all in the same database. Theoretically, one can import all possible Wiki content into a single database.

The opportunity of compressing this highly-redundant wiki-code mixed with Unicode text was pretty much obvious. So it was reasonable to assume others must have had a similar case and added compression support to Sqlite. My search only yielded the aforementioned cases.

A part of me was happy to have found no precedent project. I was more than happy to roll-up my sleeves and get to hacking.

Design Survey

There are many ways to go about designing a compressed database file. My main purpose, however, was to have fully-transparent, online and realtime compression support. So the design must accommodate updates and deletions as well as any other modify operation supported by Sqlite.

An obvious approach is the one used by MySql, namely to compress the fields independently. This is simple and relatively speaking straight forward. However it’d mean that LIKE couldn’t be used on compressed string fields. Collation and sorting and other features would be absent as well. In fact the fields in question couldn’t be TEXT at all. In addition, one had to explicitly compress fields, remember which is compressed and remember to uncompress before using them. Very limited I thought and probably wouldn’t be worth the effort. Another approach is to do this on a low level, such that it’d be transparent to the caller. Such an extension to Sqlite exists but this will not yield much gain on small fields. I suspect NTFS compression would give better results.

NTFS has built-in compression support. It was well worth the effort of testing it. On an English SimpleWiki dump I could compress the database file down to about 57% of its original size (see benchmarks below.) Pretty decent. However I couldn’t control it at all. I couldn’t set the chunk size, compression level or anything save for enabling and disabling it. In addition, the user could disable it and lose all the benefits. Database-level compression is more promising. A similar result can be achieved using FuseCompress or compFUSEd (on Linux), albeit, the user must install such a filesystem first.

A major problem with database files, as far as online compression is concerned, is that the database logical-structure typically stores pointers to file offsets, such that there is a one-to-one mapping between the physical and logical-structures. This is reasonable as the database is really a large and complex datastructure on disk (as opposed to memory.) The btree or rtree nodes are typically page indexes, where all pages have a predefined, database-wide fixed size. Disrupting this structure would render the file corrupted. The purpose of the fixed-size pages is to simplify the allocation and management of space. This scheme is also used by memory and disk-managers alike.

If we compress a page in the database, the page would now contain two regions: data and free-space. To utilize the free-space, we could write a portion of the next page in the free-space, and the remaining in the next page, and so on for all pages. But then we’d have to keep track of each page’s fragments somehow. To avoid that, we can leave the free-space unused, but then we’d get no net saved disk space, as the free-space would still be allocated on disk.

I could store the new indexes and offsets in some allocation table appended to the file. But I’d have to do a lot of data moving, reallocation, (de)fragmentation and whatnot just to keep track of the free ranges and so on. Obviously this approach was pretty complicated and would take much more involved design and coding. Also, Sqlite page-sizes are multiple of disk sector size for atomicity. I had to be thoroughly familiar with the Sqlite design and implementation to embark on such a largish project, if I wanted it finished and working.

The ‘be lazy’ motto seems to work well for programmers who are efficiency-oriented and hate repetitive and error-prone work. What would be the simplest approach that could work? Going back to NTFS one could learn a lesson or two on transparent compression. The secret is that NTFS can simply allocate any free inode on the disk, write the compressed data to it and update the index table. Inodes are linked lists, so it is very easy to insert/remove and modify the chain. Files, on the other hand, are arrays of bytes abstracted from the disk structure. Moving bits around in an array is much more complicated and time consuming than updating nodes in a linked-list.

What is needed is the advantage of a file-system applied on the level of files.

What if we could tell the file-system that these free-space regions of the file are really unused? NTFS supports sparse files in addition to compressed files. This could be used to our advantage. All we’d have to do is mark the free-space in each page as unused and the file-system will make them available to other files on the disk, reducing the net used disk space of the database.

The Design

Sqlite supports pages of 512-65535 bytes long. Since we can’t break a single page, the smallest compression unit must be at least 64 Kbyte long. In addition, the compression-unit of NTFS compression seems to be also 64 Kbytes. This means that a sparse range must be at least as large as a compression-unit to be deallocated from disk and marked as free. This puts a clear limitation on the amount of saving we can achieve using this design; Compression won’t save any disk space unless it reduces the size in multiples of 64 Kbytes. A multiple of 64 Kbytes is used as the compression unit, internally called a chunk. Indeed, a chunk size of 64 Kbytes would be totally useless as there could be no saving at all.

When data is written it’s first written into a memory buffer. This buffer is used to track changes to the chunk, it’s offset in the file and use to compress the data. When the chunk needs flushing the data is first compressed and the compressed data written to the chunk offset. The remainder of the chunk is marked as a sparse region. NTFS deallocates any naturally-aligned compression units that are completely sparse. Partially written units are physically allocated on disk and 0-valued bytes are written to disk.

When reading data, the complete chunk of the requested byte-offset is read, decompressed and from the buffered data the requested bytes copied back to the caller. The sparse bytes are transparently read-in as 0-valued bytes. This is done by NTFS and relieves us from tracking sparse regions.

Initially very fast compression libraries were used to avoid sacrificing too much performance. FastLz, Lz4 and MiniLzo were tested but the results weren’t very promising, compression-wise. As such the current build uses Zlib.

Implementation

The compression mod is written as a VFS Shim. This has the advantage of avoiding any modifications to the Sqlite code base.

Enabling compression must be done before opening any database files. A single function is defined as follows:

int sqlite3_compress(
    int trace,
    int compressionLevel
    );

trace can be a value between 0 and 7. When 0 tracing is disabled, larger values enable tracing of increasingly lower-level operations. Trace logs are written to stderr. -1 for default.

compressionLevel can be a value between 1 and 9, where 1 gives the fastest performance at the expense of compression ratio and 9 gives the best compression at the expense of performance. -1 for default, which is typically level-6.

To enable compression this function is simply called before calling sqlite3_open. Compression level may be changed between runs, however unless a chunk is modified, the data will not be recompressed with the new level.

Only the main database is compressed. The journal or any other temporary files aren’t compressed.

Limitations

Besides the fact that the code is in an experimental state, there are some things unsupported or even unsupportable by this mod. First and foremost only this mod can read compressed databases. The original Sqlite will declare compressed databases corrupted. However, this mod can and should detect uncompressed databases and disables compression silently (but use at your own risk.)

Since NTFS sparse file support is the key to achieving compression, the mod is literally useless on non-NTFS systems.

Sqlite is known to be quite resilient in the face of file corruption. This can no longer be supported with the same level as it is with the official release. In addition, corruptions would destroy much more data than a single page. With the compression library and the new code also comes the increased risk of crashing or being unstable.

Of the untested and probably unsupported features of Sqlite are:

  • Online database backup.
  • Multiprocess read/write.
  • Vacuum.
  • Data recovery.
  • Shell and 3rd-party tools.

Performance wise, there is virtually no caching implemented beyond the current chunk. This is bare-bone caching and there is a lot of room for performance improvements.

Benchmarks

An import of an English SimpleWiki dump was used as benchmark. The main table holds an auto-increment index, timestamp, the page title and the page contents (both Unicode).

256 Kbyte Chunks and Level-6 Compression (sizes in KBytes)
OriginalSqlite Compressed
NTFS Normal204,438 (100%)73,296 (35.85%)
NTFS Compressed117,460 (57.45%)57,431 (28.09%)

1024 Kbyte Chunks and Level-9 Compression (sizes in KBytes)
OriginalSqlite Compressed
NTFS Normal204,438 (100%)67,712 (33.12%)
NTFS Compressed117,460 (57.45%)66,220 (32.39%)

It’s quite obvious that the savings with the modified Sqlite are substantial as compared to NTFS compression on the original file. Interestingly, NTFS compression when applied on a compressed file still yields gains. This is because of inefficiencies of the Zlib (deflate) compression (which is less so for level-6 than 9) and because NTFS can deallocate at the level of clusters, which are 4096 bytes, as opposed to the sparse method’s compression-unit of 64 Kbytes. Since the free-regions are written as zero-bytes and they aren’t deallocated unless a complete 64 Kbyte unit is completely zeroed out, it seems reasonable to assume NTFS compression is crunching these zero-padded regions and deallocating them as it’s unit is only 4096 bytes.

It should also be noted that while statistically we should get better compression with larger chunk sizes and higher compression levels, this isn’t linear. In fact, increasing the chunk size may lead to reduced net gains in file size due to the 64 Kbyte compression-unit of NTFS. That is, if two chunks could each save a single unit (64 Kbytes,) doubling the chunk size (such that both would be compressed together as one chunk) might not be able to save 128 Kbytes, in which case the savings would be reduced from two units to a single, resulting in a 64 Kbyte larger file than we had with the original chunk-size. This heavily depends on both the data and the compression, of course.

Performance

A synthetic test done using generated text from an alphabet consisting of alpha-numerical plus symbol with random lengths of <1MB were done. Zlib seems to perform slowly on this random data (although the number of possible codes is small.) Chunk size of 256 Kbytes and compression-level of 6 was used. 50 random rows are generated and inserted with incremental Ids (two-column table,) the 50 rows are selected using the Ids and the texts compared to the original, new texts are generated with new lengths, this time of length <2MB and the rows updated. Again the 50 rows are selected by Id and compared to the updated-originals. The resultant database file is 50,686 Kbytes.

The original Sqlite code run the test in 13.3 seconds, while using default compression and no tracing (to avoid any overheads) the same test finished in 64.7 seconds (4.86x slower) resulting in a 41,184 KByte file. Both tests ran on the same generated data. The file was on a RAMDisk to minimize disk overhead.

Considering that the data was random and synthetic and insert/update rate was equal to select rates, the results are reasonable. In practice, reads are typically more frequent than writes. With proper caching this should reduce the performance overhead significantly.

Download

The code holds the same copyright claims as Sqlite, namely none. The code is experimental. Use it at your own risk.

Download the code and prebuilt DLL. This sqlite3.dll is version 3.7.7.1 amalgamation created with the default settings/flags from the amalgamation created from original sources by the original configure and make files. The compression code is added and it’s built using VS2010 Sp1 and statically liked to the runtime libraries, as such it has no dependencies.

Building

To build the code, first download a recent Sqlite version. The 3.7.7.1 amalgamation is perfect. The latest Zlib must also be downloaded and built.

Add the Zlib headers to the include path, copy the vfs_compress.c file next to sqlite sources and build. Next, build sqlite3.c amalgamation (or the original sources) and link the binaries of sqlite3, vfs_compress and Zlib to create the executable.

Future Plans

A good percentage of the official Sqlite tests pass successfully. But the corruption and format-validating tests unsurprisingly fail. Increasing the supported cases is a prime goal at this point. Getting the mod to “stable with known-limitation” status would be a major milestone. Improving performance is another goal that isn’t very difficult to attain. Having the ability to enable/disable compression on any database is also beneficial and will add more protection against misuse. It’d also be interesting to attempt supporting compression without NTFS sparse files support. This, while much more complicated, would work on any system and not on NTFS alone.

As a bonus, it’s almost trivial to add encryption on top of the compression subsystem.

Any comments, ideas, feedback and/or constructive criticism are more than welcome.

Share
Jul 022011
 

A customer noticed sudden and sharp increase in the database disk consumption. The alarm that went off was the low disk-space monitor. Apparently the database in question left only a few spare GBs on disk. The concerned customer opened a ticked asking two questions: Is the database growth normal and expected? and what’s the average physical storage requirements per row?

The answer to the first question had to do with their particular actions, which was case specific. However, to answer the second, one either has to keep track of each table’s schema, adding the typical/maximum size of each field, calculating indexes and their sizes, or, one could simply do the math on a typical dataset using SQL code. Obviously the latter is the simpler and preferred.

Google returns quite a number of results (1, 2, 3, 4 and 5.) For MS SQL, it seems that virtually all rely on the sp_spaceused stored proc. SQL has an undocumented sproc sp_msforeachtable which runs over each table in the database and executes a sproc passing each table’s name as param. While it isn’t at all difficult to do this manually (looping over sys.Tables is hardly a feat,) calling this one-liner is still very convenient. So no surprise that virtually all samples online just do that.

Here is an sproc that prints the total database size, reserved size, data size, index size and unused sizes. In addition, the sproc prints the same numbers for each table with the total number of rows in all tables at the end.

My prime interest wasn’t just to learn about the database size, which can be achieved using sp_spaceused without any params, nor to just learn about each table’s share, which can be done by passing the table name in question to sp_spaceused. My main purpose was to get a breakdown of the average row-size per table.

So, here is a similar script to do exactly that. The script first updates the page and row counts for the whole database (which may take a long time, so disable on production databases,) in addition, it calculates the totals and averages of each data-point for all tables and calculates the average data size (data + index) and wasted bytes (reserved + unused) per table. All the information for the tables is printed in a single join statement to return a single rowset with all the relevant data.

-- Copyright (c) 2011, Ashod Nakashian
-- All rights reserved.
-- 
-- Redistribution and use in source and binary forms, with or without modification,
-- are permitted provided that the following conditions are met:
-- 
-- o Redistributions of source code must retain the above copyright notice, 
-- this list of conditions and the following disclaimer.
-- o Redistributions in binary form must reproduce the above copyright notice, 
-- this list of conditions and the following disclaimer in the documentation and/or
-- other materials provided with the distribution.
-- o Neither the name of the author nor the names of its contributors may be used to endorse
-- or promote products derived from this software without specific prior written permission.
--
-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-- EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
-- OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 
-- SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
-- INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-- LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--
-- Show physical size statistics for each table in the database.
--
SET NOCOUNT ON

-- Update all page and count stats.
-- Comment for large tables on production!
DBCC UPDATEUSAGE(0) 

-- Total DB size.
EXEC sp_spaceused

-- Per-table statistics.
DECLARE @t TABLE
( 
    [name] NVARCHAR(128),
    [rows] BIGINT, 
    [reserved] VARCHAR(18), 
    [data] VARCHAR(18), 
    [index_size] VARCHAR(18),
    [unused] VARCHAR(18)
)

-- Collect per-table data in @t.
INSERT @t EXEC sp_msForEachTable 'EXEC sp_spaceused ''?'''

-- Calculate the averages and totals.
INSERT into @t
SELECT 'Average', AVG(rows),
    CONVERT(varchar(18), AVG(CAST(SUBSTRING([reserved], 0, LEN([reserved]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), AVG(CAST(SUBSTRING([data], 0, LEN([data]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), AVG(CAST(SUBSTRING([index_size], 0, LEN([index_size]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), AVG(CAST(SUBSTRING([unused], 0, LEN([unused]) - 1) AS int))) + ' KB'
FROM   @t
UNION ALL
SELECT 'Total', SUM(rows),
    CONVERT(varchar(18), SUM(CAST(SUBSTRING([reserved], 0, LEN([reserved]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), SUM(CAST(SUBSTRING([data], 0, LEN([data]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), SUM(CAST(SUBSTRING([index_size], 0, LEN([index_size]) - 1) AS int))) + ' KB',
    CONVERT(varchar(18), SUM(CAST(SUBSTRING([unused], 0, LEN([unused]) - 1) AS int))) + ' KB'
FROM   @t

-- Holds per-row average kbytes.
DECLARE @avg TABLE
( 
    [name] NVARCHAR(128),
    [data_per_row] VARCHAR(18),
    [waste_per_row] VARCHAR(18)
)

-- Calculate the per-row average data in kbytes.
insert into @avg
select t.name, 
    CONVERT(varchar(18),
        CONVERT(decimal(20, 2),
            (CAST(SUBSTRING(t.[data], 0, LEN(t.[data]) - 1) AS float) +
             CAST(SUBSTRING(t.[index_size], 0, LEN(t.[index_size]) - 1) AS float)) 
            / NULLIF([rows], 0))) + ' KB', 
    CONVERT(varchar(18),
        CONVERT(decimal(20, 2),
            (CAST(SUBSTRING(t.[reserved], 0, LEN(t.[reserved]) - 1) AS float) +
             CAST(SUBSTRING(t.[unused], 0, LEN(t.[unused]) - 1) AS float))
            / NULLIF([rows], 0))) + ' KB'
from @t t

-- Join the two tables using the table names.
select t.name, t.rows, t.reserved, t.data, t.index_size, t.unused, a.data_per_row, a.waste_per_row
from @t t, @avg a
where t.name = a.name

There is quite a bit of data conversion and casting that isn’t necessarily very performant, but here there isn’t much choice and optimizing further is probably unnecessary. But since there are so many different ways to get the same output, I’ll leave any variations up to the readers. Suggestions and improvements are more than welcome. Please use comments to share your alternatives.

This may easily be wrapped in an sproc for convenience. I hope you find it useful and handy.

Share
Apr 172011
 

With the introduction of .Net and a new, modern framework library, developers understandably were very cheerful. A shiny new ecosystem with mint library designed without any backwards compatibility curses or legacy support. Finally, a library to take us into the future without second guessing. Well, those were the hopes and dreams of the often too-optimistic and naive.

However, if you’d grant me those simplistic titles, you’d understand my extreme disappointment when the compiler barfed on my AddRange call on HttpWebRequest with a long parameter. Apparently HttpWebRequest lacks 64-bit AddRange member.

Surely this was a small mistake, a relic from the .Net 1.0 era. Nope, it’s in 2.0 as well. Right then, I should be used 3.5. What’s wrong with me using 2.0, such an outdated version. Wrong again, I am using 3.5. But I need to resume downloads on 7GB+ files!

To me, this is truly a shocking goof. After all, .Net is supposed to be all about the agility and modernity that is the web. Three major releases of the framework and no one put a high-priority tag on this missing member? Surely my panic was exaggerated. It must be. There is certainly some simple workaround that everyone is using that makes this issue really a low-priority one.

Right then, the HttpWebRequest is a WebRequest, and I really don’t need a specialized function to set an HTTP header. Let’s set the header directly:

            HttpWebRequest request = WebRequest.Create(Uri) as HttpWebRequest;

            request.Headers["Range"] = "bytes=0-100";

To which, .Net responded with the following System.ArgumentException:

This header must be modified using the appropriate property.

Frustration! Luckily, somebody ultimately took notice of this glaring omission and added the AddRange(long, long); function to .Net 4.0.

So where does this leave us? Seems that I either have to move to .Net 4.0, write my own HttpWebRequest replacement or avoid large files altogether. Unless, that is, I find a hack.

Different solutions do exist to this problem on the web, but the most elegant one was this:

        /// <summary>
        /// Sets an inclusive range of bytes to download.
        /// </summary>
        /// <param name="request">The request to set the range to.</param>
        /// <param name="from">The first byte offset, -ve to omit.</param>
        /// <param name="to">The last byte offset, less-than from to omit.</param>
        private static void SetWebRequestRange(HttpWebRequest request, long from, long to)
        {
            string first = from >= 0 ? from.ToString() : string.Empty;
            string last = to >= from ? to.ToString() : string.Empty;

            string val = string.Format("bytes={0}-{1}", first, last);

            Type type = typeof(WebHeaderCollection);
            MethodInfo method = type.GetMethod("AddWithoutValidate", BindingFlags.Instance | BindingFlags.NonPublic);
            method.Invoke(request.Headers, new object[] { "Range", val });
        }

Since there were apparently copied pages with similar solutions, I’m a bit hesitant to give credit to any particular page or author in fear of giving credit to a plagiarizer. In return, I’ve improved the technique and put it into a flexible function. In addition, I’ve wrapped WebResponse into a reusable Stream class that plays better with non-network streams. In particular, my WebStream supports reading the Length and Position members and returns the correct results. Here is the full source code:

// --------------------------------------------------------------------------------------
// <copyright file="WebStream.cs" company="Ashod Nakashian">
// Copyright (c) 2011, Ashod Nakashian
// All rights reserved.
// 
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
// 
// o Redistributions of source code must retain the above copyright notice, 
// this list of conditions and the following disclaimer.
// o Redistributions in binary form must reproduce the above copyright notice, 
// this list of conditions and the following disclaimer in the documentation and/or
// other materials provided with the distribution.
// o Neither the name of the author nor the names of its contributors may be used to endorse
// or promote products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
// OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT 
// SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// </copyright>
// <summary>
//   Wraps HttpWebRequest and WebResponse instances as Streams.
// </summary>
// --------------------------------------------------------------------------------------

namespace Web
{
    using System;
    using System.IO;
    using System.Net;
    using System.Reflection;

    /// <summary>
    /// HTTP Stream, wraps around HttpWebRequest.
    /// </summary>
    public class WebStream : Stream
	{
		public WebStream(string uri)
            : this(uri, 0)
		{
		}

        public WebStream(string uri, long position)
		{
			Uri = uri;
			position_ = position;
		}

        #region properties

        public string Uri { get; protected set; }
        public string UserAgent { get; set; }
        public string Referer { get; set; }

        #endregion // properties

        #region Overrides of Stream

        /// <summary>
        /// When overridden in a derived class, clears all buffers for this stream and causes any buffered data to be written to the underlying device.
        /// </summary>
        /// <filterpriority>2</filterpriority>
        public override void Flush()
        {
        }

        /// <summary>
        /// When overridden in a derived class, sets the position within the current stream.
        /// </summary>
        /// <returns>
        /// The new position within the current stream.
        /// </returns>
        /// <param name="offset">A byte offset relative to the <paramref name="origin"/> parameter.</param>
        /// <param name="origin">A value of type <see cref="T:System.IO.SeekOrigin"/> indicating the reference point used to obtain the new position.</param>
        /// <filterpriority>1</filterpriority>
        /// <exception cref="NotImplementedException"><c>NotImplementedException</c>.</exception>
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// When overridden in a derived class, sets the length of the current stream.
        /// </summary>
        /// <param name="value">The desired length of the current stream in bytes.</param>
        /// <filterpriority>2</filterpriority>
        /// <exception cref="NotImplementedException"><c>NotImplementedException</c>.</exception>
        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// When overridden in a derived class, reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
        /// </summary>
        /// <returns>
        /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
        /// </returns>
        /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between <paramref name="offset"/> and (<paramref name="offset"/> + <paramref name="count"/> - 1) replaced by the bytes read from the current source.</param>
        /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin storing the data read from the current stream.</param>
        /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
        /// <filterpriority>1</filterpriority>
        /// <exception cref="System.ArgumentException">The sum of offset and count is larger than the buffer length.</exception>
        /// <exception cref="System.ArgumentNullException">buffer is null.</exception>
        /// <exception cref="System.ArgumentOutOfRangeException">offset or count is negative.</exception>
        /// <exception cref="System.NotSupportedException">The stream does not support reading.</exception>
        /// <exception cref="System.ObjectDisposedException">Methods were called after the stream was closed.</exception>
		public override int Read(byte[] buffer, int offset, int count)
		{
            if (stream_ == null)
            {
                Connect();
            }

            try
            {
                if (stream_ != null)
                {
                    int read = stream_.Read(buffer, offset, count);
                    position_ += read;
                    return read;
                }
            }
            catch (WebException)
            {
                Close();
            }
            catch (IOException)
            {
                Close();
            }

            return -1;
		}

        /// <summary>
        /// When overridden in a derived class, writes a sequence of bytes to the current stream and advances the current position within this stream by the number of bytes written.
        /// </summary>
        /// <param name="buffer">An array of bytes. This method copies <paramref name="count"/> bytes from <paramref name="buffer"/> to the current stream.</param>
        /// <param name="offset">The zero-based byte offset in <paramref name="buffer"/> at which to begin copying bytes to the current stream.</param>
        /// <param name="count">The number of bytes to be written to the current stream.</param>
        /// <filterpriority>1</filterpriority>
        /// <exception cref="NotImplementedException"><c>NotImplementedException</c>.</exception>
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        /// <summary>
        /// When overridden in a derived class, gets a value indicating whether the current stream supports reading.
        /// Always returns true.
        /// </summary>
        /// <returns>
        /// true if the stream supports reading; otherwise, false.
        /// </returns>
        /// <filterpriority>1</filterpriority>
        public override bool CanRead
        {
            get { return true; }
        }

        /// <summary>
        /// When overridden in a derived class, gets a value indicating whether the current stream supports seeking.
        /// Always returns false.
        /// </summary>
        /// <returns>
        /// true if the stream supports seeking; otherwise, false.
        /// </returns>
        /// <filterpriority>1</filterpriority>
        public override bool CanSeek
        {
			get { return false; }
        }

        /// <summary>
        /// When overridden in a derived class, gets a value indicating whether the current stream supports writing.
        /// Always returns false.
        /// </summary>
        /// <returns>
        /// true if the stream supports writing; otherwise, false.
        /// </returns>
        /// <filterpriority>1</filterpriority>
        public override bool CanWrite
        {
			get { return false; }
        }

        /// <summary>
        /// When overridden in a derived class, gets the length in bytes of the stream.
        /// </summary>
        /// <returns>
        /// A long value representing the length of the stream in bytes.
        /// </returns>
        /// <exception cref="T:System.ObjectDisposedException">Methods were called after the stream was closed.</exception>
        /// <filterpriority>1</filterpriority>
        public override long Length
        {
            get { return webResponse_.ContentLength; }
        }

        /// <summary>
        /// When overridden in a derived class, gets or sets the position within the current stream.
        /// </summary>
        /// <returns>
        /// The current position within the stream.
        /// </returns>
        /// <filterpriority>1</filterpriority>
        /// <exception cref="NotSupportedException"><c>NotSupportedException</c>.</exception>
        public override long Position
        {
			get { return position_; }
			set { throw new NotSupportedException(); }
        }

        #endregion // Overrides of Stream

        #region operations

        /// <summary>
        /// Reads the full string data at the given URI.
        /// </summary>
        /// <returns>The full contents of the given URI.</returns>
        public static string ReadToEnd(string uri, string userAgent, string referer)
        {
            using (WebStream ws = new WebStream(uri, 0))
            {
                ws.UserAgent = userAgent;
                ws.Referer = referer;
                ws.Connect();

                using (StreamReader reader = new StreamReader(ws.stream_))
                {
                    return reader.ReadToEnd();
                }
            }
        }

        /// <summary>
        /// Writes the full data at the given URI to the given stream.
        /// </summary>
        /// <returns>The number of bytes written.</returns>
        public static long WriteToStream(string uri, string userAgent, string referer, Stream stream)
        {
            using (WebStream ws = new WebStream(uri, 0))
            {
                ws.UserAgent = userAgent;
                ws.Referer = referer;
                ws.Connect();

                long total = 0;
                byte[] buffer = new byte[64 * 1024];
                int read;
                while ((read = ws.stream_.Read(buffer, 0, buffer.Length)) > 0)
                {
                    stream.Write(buffer, 0, read);
                    total += read;
                }

                return total;
            }
        }

        #endregion // operations

        #region implementation

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);

            if (stream_ != null)
            {
                stream_.Dispose();
                stream_ = null;
            }
        }

        private void Connect()
        {
            Close();

            HttpWebRequest request = WebRequest.Create(Uri) as HttpWebRequest;
            if (request == null)
            {
                return;
            }

            request.UserAgent = UserAgent;
            request.Referer = Referer;
            if (position_ > 0)
            {
                SetWebRequestRange(request, position_, 0);
            }

            webResponse_ = request.GetResponse();
            stream_ = webResponse_.GetResponseStream();
        }

        /// <summary>
        /// Sets an inclusive range of bytes to download.
        /// </summary>
        /// <param name="request">The request to set the range to.</param>
        /// <param name="from">The first byte offset, -ve to omit.</param>
        /// <param name="to">The last byte offset, less-than from to omit.</param>
        private static void SetWebRequestRange(HttpWebRequest request, long from, long to)
        {
            string first = from >= 0 ? from.ToString() : string.Empty;
            string last = to >= from ? to.ToString() : string.Empty;

            string val = string.Format("bytes={0}-{1}", first, last);

            Type type = typeof(WebHeaderCollection);
            MethodInfo method = type.GetMethod("AddWithoutValidate", BindingFlags.Instance | BindingFlags.NonPublic);
            method.Invoke(request.Headers, new object[] { "Range", val });
        }

        #endregion // implementation

        #region representation

        private long position_;
        private WebResponse webResponse_;
        private Stream stream_;

        #endregion // representation
	}
}

I hope this saves someone some frustration and perhaps even time writing this handy class. Enjoy.

Share
QR Code Business Card