Demo code source : Tasks2.zip

Introduction

This is the 2nd part of my proposed series of articles on TPL. Last time I introduced Tasks, and covered this ground:

  • Thread Versus Tasks
  • Creating Tasks
  • Trigger Methods/Properties
  • Handling Exceptions
  • Cancelling Tasks
  • SynchronizationContext
This time we are going to be looking at how to use a TPL concept called continuations. Which is how we might run something after a Task that does something with a return value from a Task.  

 

Article Series Roadmap

This is article 2 of a possible 6, which I hope people will like. Shown below is the rough outline of what I would like to cover.

  1. Starting Tasks / Trigger Operations / ExceptionHandling / Cancelling / UI Synchronization
  2. Continuations / Cancelling Chained Tasks (This article)
  3. Parallel For / Custom Partioner / Aggregate Operations
  4. Parallel LINQ
  5. Pipelines
  6. Advanced Scenarios / v.Next For Tasks

Now I am aware that some folk will simply read this article and state that it is similar to what is currently available on MSDN, and I in part agree with that, however there are several reasons I have chosen to still take on the task of writing up these articles, which are as follows:

  • It will only really be the first couple of articles which show simliar ideas to MSDN, after that I feel the material I will get into will not be on MSDN, and will be the result of some TPL research on my behalf, which I will be outlining in the article(s), so you will benefit from my research which you can just read...Aye, nice
  • There will be screen shots of live output here which is something MSDN does not have that much off, which may help some readers to reinforce the article(s) text
  • There may be some readers out here that have never even heard of Task Parallel Library so would not come across it in MSDN, you know the old story, you have to know what you are looking for in the 1st place thing.
  • I enjoy threading articles, so like doing them, so I did them, will do them, have done them, and continue to do them

All that said, if people having read this article, truly think this is too similar to MSDN (which I still hope it won't be) let me know that as well, and I will try and adjust the upcoming articles to make amends. 

Table Of Contents 

Anyway what I am going to cover in this article is as follows:

Some More TPL Background

This section is something I should really have talked about in the 1st article but I did not so I am including it here instead. I hope to explain why the designers of TPL did things the way they did and how that benefits us all.

The Default Task Scheduler

TPL relies on a scheduler to organise and run tasks, in .NET 4, the default (which you can swap out) Task scheduler is tightly intergrated with the ThreadPool. As such if you use the default Task scheduler , the worker threads that run the Tasks are managed by the ThreadPool, where generally there are at least as many worker Threads as there are cores on the target PC. When there are more Tasks than there are worker Threads, some Tasks must be queued, until a ThreadPool worker thread becomes free to service the Task.

This is a similar concept the one deployed by the existing ThreadPool.QueueUserWorkItem(..). In fact you could think of the default Task scheduler as an improved ThreadPool, where the worker items are simply Tasks.  The default scheduler is capable of better performance than the standard ThreadPool alone as the number of cores increases, we shall examine that below.

The Standard ThreadPool

The ThreadPool is basically a global First In First Out (FIFO) queue which worker items are assigned to do the dequeued work.

This is ok until the number of cores increases, and then it becomes a bottleneck, as the queue can only be accesses bny one worker thread at a time. When there are only a few large course grained parallel items to deal with the synchronization cost of ensuring single access to this global queue is small, but when you have much finer grainer parallelism going on (as you would with Tasks), the synchronization costs of working with this single global queue begin to become a bottle neck.

Tasks were always designed to scale to the number of cores available, and I read somewhere that a .NET is capable of running efficiently with milllions of Tasks. To handle that a different approach had to be taken from the centralized queue, I will talk about this more decentralized approach to scheduling below.

DeCentralized Local Queues

The .NET framework provides each worker thread from the ThreadPool with its own local task queue. The local queus distribute the load and allieviate much of the need to use the single global queue. You can see below that there are as manay local queues as there are worker threads as well as the single global queue, all of which operate concurrently.

The idea being that a worker thread may take from its local queue in a last in first out (LIFO) approach, where it might find work, or it may have to go back (and incur a heavier synchronization cost for doing so) to the single global queue.

There is one more trick that the TPL designers managed to get into play, if a worker Thread local queue is empty they could go back to the global queue for more, but what the TPL designers did was get it to work steal from its neighbours local queue in FIFO order.

Former MVP and now Microsoft employee Daniel Moth, has an excellent post with some highly intuitive diagrams to illustrate all this on his blog post :

http://www.danielmoth.com/Blog/New-And-Improved-CLR-4-Thread-Pool-Engine.aspx

It is well worth reading that post.  

Anyway I am sorry about that slight divergence, I just felt I need to get that out there. Ok so now on to continuations 

 

Continuation, What's that

Simply put continuations allow Tasks to be chained together. While this does not sound like that much of a big deal by itself, what makes the continuation concept really shine, is that you can have selective continuations, that is you could have a continuation that only fires when a whole group of Tasks finish, or a continuation that only fires when one of many Tasks finish, or we could have a continuation that only fires when a Task fails or is cancelled, continuations afford us that level of freedom. And by using this freedom offered us by TPL we can achieve very fine grain control over many aspects of our parallel code, rather than just one monolithic chunk of threaded code.

In this article I have purposely designed the Task chains to be quite small, but you really can make these chains as small or as large as you see fit.

 

 

Simple continuation

Demo code project : SimpleContinuation

I have not really got too much to say about this small code snippet/demo apart from to say it is a continuation, and truth be known that is probably all I need to say, as that really is pretty much all there is to creating and using a continuation. Dead easy really.

// create the task
Task<List<int>> taskWithFactoryAndState =
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 2000);


try
{
    //setup a continuation for task
    taskWithFactoryAndState.ContinueWith((ant) =>
    {
        List<int> result = ant.Result;
        foreach (int resultValue in result)
        {
            Console.WriteLine("Task produced {0}", resultValue);
        }
    });
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'",
            ex.Message));
    }
}


Console.ReadLine();

And here is a small screen shot, not very exciting I know, it does get better though.

 

WPF Synchronization

Demo code project : WPFDispatcherSynchonizationContext

This is something that I covered in the previous article UI Synchronization and more specifically Synchronization, WPF Synchronization there is nothing I have changed to that code, but I have included it here again, you should read that article for the basis of what we are trying to solve, this article just included the code to show you that it is a very valid reason to use a TPL continuation, that is to marshall the thread back to a UI controls owning thread. As I say this code snippet will not make much sense unless you go and read the relevant sections of the 1st article.

private void btnDoIt_Click(object sender, RoutedEventArgs e)
{
    //create CancellationTokenSource, so we can use the overload of
    //the Task.Factory that allows us to pass in a SynchronizationContext
    CancellationTokenSource tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;

    Task taskWithFactoryAndState1 = 
	Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        // This is not run on the UI thread.
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
        }
        return ints;
    }, 10000, token).ContinueWith(ant =>
    {
        //updates UI no problem as we are using correct SynchronizationContext
        lstBox.ItemsSource = ant.Result;
    }, token, TaskContinuationOptions.None,
        TaskScheduler.FromCurrentSynchronizationContext());
}

And here is screen shot of the demo running

 

 

Continue "WhenAny"

Demo code project : ContinueWhenAny

One requirement when working with parrallel programming might be to to try and run a group of algorithms over a dataset and use the one that was the best performing. These algorithms could literally be anything from a custom experimental search. I did not have time to write a whole slew of ustom experimental search algorithms so rarther opted for something a little bit better known, "Sorting Algorithms". I am using examples from past C# competition winner http://www.codeproject.com/KB/recipes/SortVisualization.aspx by Kanasz Robert.

The basic idea here is that I want to wait only until the 1st algorithm (ie hopefully the fastest one) achieves its goal.

I have squirreled away the algorithms into a little helper Dll called "ContinueWhen.Common" in the attached VS2010 solution, but as far as getting the concept of only waiting for one of many running tasks for a continuation this code should be easy enough to understand without needing to see the actual sorting algorithms.

static void Main(string[] args)
{
    //create a list of random numbers to sort
    Random rand = new Random();
    List<int> unsortedList = new List<int>();
    int numberOfItemsToSort = 5000;

    for (int i = 0; i < numberOfItemsToSort; i++)
    {
        unsortedList.Add(rand.Next(numberOfItemsToSort));
    }

    //create 3 tasks to run 3 different sorting algorithms
    Task<SortingTaskResult>[] tasks = 
	new Task<SortingTaskResult>[3];






    //Bubble Sort Task
    tasks[0] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Bubble Sort");
    }, unsortedList);


    //Selection Sort Task
    tasks[1] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Selection Sort");
    }, unsortedList);



    //Counting Sort Task
    tasks[2] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.CountingSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Counting Sort");
    }, unsortedList);






    //Wait for any of them (assuming nothing goes wrong)
    Task.Factory.ContinueWhenAny(
        tasks,
        (Task<SortingTaskResult> antecedent) =>
        {
            Console.WriteLine(antecedent.Result.ToString());
        });


    Console.ReadLine();
}

The above code shows us creating 3 Tasks, one for each sorting algorithm, and then a single continuation that waits for Any (the 1st of the 3 Tasks to finish) of the Tasks. Here is what is produced when the listing above is run.

It can be seen that "Selection Sort" won the race, even though it was not the 1st Task to be started, it won as it is a better algorithm than whatever other algorithm happened to have use of an available CPU core at that time. I only have 2 CPU cores on the laptop I wrote this test code on, so chances are if I had 4 CPU cores, that the 3rd algorithm may have ended up winning, as on paper it is the better algorithm.

The other intersting thing to note is that because we are waiting for only 1 of the Tasks in a group (array) to finish, we are ONLY able to use the Result from the single Task that we waited on, as shown in this screen shot.

 

Continue "WhenAll"

Demo code project : ContinueWhenAll

ContinueWhenAll is an interesting one, I could think of a number of times this would be very useful, you have split up your parallel work, but must wait for all the parts to finish before moving to the next step, or to use the experimental algorithm idea again, we could also imagine that we might be quite interested in seeing various characteristics about how our custom alogorithms performed, so must wait for them all to complete before continuing.

I have chosen to again use the sorting algorithm, as its a simple concept, where by the idea is that we want to run various sorting algorithms over an unsorted list, and wait for all the different algorithms to complete before we can continue.

Here is the code that does that.

static void Main(string[] args)
{
    //create a list of random numbers to sort
    Random rand = new Random();
    List<int> unsortedList = new List<int>();
    int numberOfItemsToSort = 5000;

    for (int i = 0; i < numberOfItemsToSort; i++)
    {
        unsortedList.Add(rand.Next(numberOfItemsToSort));
    }

    //create 3 tasks to run 3 different sorting algorithms
    Task<SortingTaskResult>[] tasks = 
	new Task<SortingTaskResult>[3];





    //Bubble Sort Task
    tasks[0] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.BubbleSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Bubble Sort");
    }, unsortedList);


    //Selection Sort Task
    tasks[1] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.SelectionSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Selection Sort");
    }, unsortedList);



    //Counting Sort Task
    tasks[2] = Task.Factory.StartNew((state) =>
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();
        List<int> source = (List<int>)state;
        List<int> localWorkList = new List<int>();

        //copy
        for (int i = 0; i < source.Count; i++)
        {
            localWorkList.Add(source[i]);
        }
        //run algorithm
        List<int> result = SortingAlgorithms.CountingSort(localWorkList);
        watch.Stop();
        return new SortingTaskResult(
            watch.ElapsedMilliseconds, result, "Counting Sort");
    }, unsortedList);





    //Wait for all of them (assuming nothing goes wrong)
    Task.Factory.ContinueWhenAll(
        tasks,
        (antecedents) =>
        {
            foreach (Task<SortingTaskResult> task in antecedents)
            {
                Console.WriteLine(task.Result.ToString());
            }
        });


    Console.ReadLine();

}

It can be seen that this time the continution only kicked in when all the 3 sorting Tasks complete. Here is the results of running this snippet:

The other intersting thing to note is that because we are waiting for all Tasks in a group (array) to finish, we are able to use the Result from all these Tasks which we waited on, as shown in this screen shot.

 

Using a continuation for exception handling

Demo code project : UsingContinuationForExceptionHandling

When I was talking about the different ways of how to handle Task Exceptions within the 1st article, I also mentioned that there was another technique which I did not show at the time. Well now is the time to show that other way. It is pretty simple really we simply use continuations. The idea is that we have a continuation that is run if the antecedent Task ran to completion, and another if the antecedent Task was put into the Faulted state.

This is easily achieved using the TaskContinuationOptions that we can supply when we create a Task continuation. Here is some example code to illustrate what I mean

// create the task
Task<List<int>> taskWithFactoryAndState =
    Task.Factory.StartNew<List<int>>((stateObj) =>
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < (int)stateObj; i++)
        {
            ints.Add(i);
            if (i > 100)
            {
                InvalidOperationException ex =
                    new InvalidOperationException("oh no its > 100");
                ex.Source = "taskWithFactoryAndState";
                throw ex;
            }
        }
        return ints;
    }, 2000);


//and setup a continuation for it only on when faulted
taskWithFactoryAndState.ContinueWith((ant) =>
    {
        AggregateException aggEx = ant.Exception;
        Console.WriteLine("OOOOPS : The Task exited with Exception(s)");

        foreach (Exception ex in aggEx.InnerExceptions)
        {
            Console.WriteLine(string.Format("Caught exception '{0}'",
                ex.Message));
        }
    }, TaskContinuationOptions.OnlyOnFaulted);


//and setup a continuation for it only on ran to completion
taskWithFactoryAndState.ContinueWith((ant) =>
{
    List<int> result = ant.Result;
    foreach (int resultValue in result)
    {
        Console.WriteLine("Task produced {0}", resultValue);
    }
}, TaskContinuationOptions.OnlyOnRanToCompletion);

Console.ReadLine();

And to show you what happens when we run this lot, here is a demo

See how only one of the continuations ran, which is the one that should run "OnlyOnFaulted".

 

Using continuation as pipeline

Demo code project : UsingContinuationsAsPipelines

I did mention somewhere at the beginning of this article that you could use continuations to chain tasks together to make them as simple or as complex as you choose, I have not gone nuts or anything but I have come up with a small example shown below, which is just slightly larger than the examples you have seen so far. What it does illustrate though is the fact that you can quite easily continue a continuation.

static void Main(string[] args)
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                ints.Add(i);
            }
            return ints;
        }, 10);


    //and setup a continuation for it only on ran to completion, where this continuation
    //returns a result too, which will be used by yet another continuation
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, which will be squared by continuation", 
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    }, TaskContinuationOptions.OnlyOnRanToCompletion)
    //Another continution
    .ContinueWith((ant) =>
        {
            List<int> parentResult = ant.Result;
            foreach (int resultValue in parentResult)
            {
                Console.WriteLine("Parent Continuation Task produced Square of {0}", 
                    resultValue);
            }
        }, TaskContinuationOptions.OnlyOnRanToCompletion);


    Console.ReadLine();

}

Its certainly not rocket science this one, all that I am doing is creating an initial Task that creates and returns a list of numbers, this List<int> produced  by the 1st Task, is then passed to a continuation where the original result from the original Task (the antecedent)  is printed, and where a new List<int> is created by obtaining a square of the original Task (the antecedent) value produced, the result of this continuation is then fed to yet another continuation that prints the results of the squaring continuation Task (the antecedent to this continuation).

Each of the continuations assumes an ideal world and will only run if the original Task of the continuation runs to completion.

Here is a small demo of this one running

 

Catching exception in continuation antecedent

Demo code project : CatchExceptionInAntecedent

So we have now seen several examples of using Tasks/continuations, and we have seen that we can use continuations to run when things go to plan, and we can also run Tasks when the original Task fails to complete its job, but sometimes we might want simply have unspecified continuation that always happens, even if the original Task completes successfully or not, and have the continuation decide what to do if something is not right about the status of the original Task.

Here is an example of how we would check for an Exception in the original Task in the continuation, where we are rethrowing the original Exception that was provided by the original Task. Since in this example the continuation rethrows an Exception we need to make sure the Exception it will throw will be observed in some way (I talked abou Exception observing last time, when I talked about trigger methods/properties such as Wait() / Result), as such I Wait() on the continution.

Here is the code:

try
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            Console.WriteLine("In TaskWithFactoryAndState");
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
                ints.Add(i);
                if (i == 5)
                    throw new InvalidOperationException("Don't like 5 its vulgar and dirty");
  
            }
            return ints;
        }, 100);


    //Setup a continuation which will not run
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        if (ant.Status == TaskStatus.Faulted)
            throw ant.Exception.InnerException;


        Console.WriteLine("In Continuation, no problems in Antecedent");

        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, which will be squared by continuation",
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    });


    //wait for the task to complete
    taskWithFactoryAndState.Wait();
}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
    }
}


Console.WriteLine("Finished");

And here is a small demo screen shot of this running, see how we caught the original Exception of the original Task, and rethrew successfully (preserving the Exception information) up to the point where the Exception got caught by the try/catch.

 

 

Cancelling a continuation

Demo code project : CancellingContinuations

One of the most obvious things you may want to do with a continuation is cancel it right? Well luckily you have already seen all the tricks of the trade for doing this is the 1st article, remember the CancellationTokenSource object we looked at last time?

There is not much more to it than that, we create a new CancellationTokenSource, and pass the CancellationToken from it to any TPL Tasks/continuations we want to be affected when the CancellationToken is cancelled.

The same rules you saw in the 1st article, still apply, we must be good and ensure that the Task/continuation that expect, and make use of a CancellationToken throw an Exception when cancel is requested (remember this is vital to ensure the Task transitions to the correct state.

Anyway I am probably talking too much, when the code speaks for itself, the code itself is simple we have a original Task that creates a List<int> which are then used inside a continuation, where the original Tasks numbers are squared/printed and returned. However 5 seconds after the original Task is created, the CancellationToken that was passed to the original Task and the continuation is cancelled.

CancellationTokenSource tokenSource
    = new CancellationTokenSource();

CancellationToken token = tokenSource.Token;


try
{
    // create the task
    Task<List<int>> taskWithFactoryAndState =
        Task.Factory.StartNew<List<int>>((stateObj) =>
        {
            Console.WriteLine("In TaskWithFactoryAndState");
            List<int> ints = new List<int>();
            for (int i = 0; i < (int)stateObj; i++)
            {
                tokenSource.Token.ThrowIfCancellationRequested();
                ints.Add(i);
                Console.WriteLine("taskWithFactoryAndState, creating Item: {0}", i);
                Thread.Sleep(1000); // simulate some work
            }
            return ints;
        }, 10000, tokenSource.Token);


    Thread.Sleep(5000); //wait 5 seconds then cancel the runnning Task

    tokenSource.Cancel();


    //Setup a continuation which will not run
    taskWithFactoryAndState.ContinueWith<List<int>>((ant) =>
    {
        Console.WriteLine("In Continuation");

        List<int> parentResult = ant.Result;
        List<int> result = new List<int>();
        foreach (int resultValue in parentResult)
        {

            Console.WriteLine("Parent Task produced {0}, which will be squared by continuation",
                resultValue);
            result.Add(resultValue * resultValue);
        }
        return result;
    }, tokenSource.Token);


    taskWithFactoryAndState.Wait();

}
catch (AggregateException aggEx)
{
    foreach (Exception ex in aggEx.InnerExceptions)
    {
        Console.WriteLine(string.Format("Caught exception '{0}'", ex.Message));
    }
}
finally
{
    tokenSource.Dispose();
}


Console.WriteLine("Finished");
Console.ReadLine();

 

Here is the results:

Notice how the continuation did not even kick in at all, and we only created 5 items, that is due to the fact that the CancellationToken cancel kicked in. It's that easy to cancel a continuation, you have to love this TPL stuff man. 

 

That's It For Now

I know this article did not have as much to say as the 1st, thing is continuations are suprisingly easy to get to grips with, so there was less to say, the next 2 articles, are likely to be of about the same sort of size as this, but the ones after that will have some more meat.

That is all I wanted to say in this in this article. I hope you liked it, and want more. If you did like this article, and would like more, could you spare some time to leave a comment and a vote. Many thanks.

Hopefully , see you at the next one, and the one after that, and the one after that, yes 6 in total, I better get busy

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架