Parallel Programming Easier than ever using .NET Framework 4

The .NET Framework 4 Provides support for parallel loops and regions easily if you compare to the older versions of .Net such as using System.Threading now with the introduction of the Parallel class it now provides library-based data parallel operations such as for loops, for each loops, and invoke actions.

The new namespace is the System.Threading.Tasks. The methods are:

Parallel.For()

Executes a for loop where the iterations run in parallel.  This method is useful if you want to invoke the same method where you run your loop using a parameter like a counter.

Usage Sample:

static void Main(string[] args)
{
    int LoopCounter = 50;

    Parallel.For(0, LoopCounter, i =>
    {
        DoSomething(i);
        Console.ReadLine();
    }
    );
}
public static void DoSomething(int item) { Console.WriteLine("DoSomething " + item.ToString() + " : " + DateTime.Now); }

Parallel.ForEach()

Executes a foreach operation on an IEnumerable<TSource> where the iterations run in parallel.  This method is useful if you want to invoke the same method where you rin your loop using a IEnumerable<TSource> parameter like Dictionary<>, List<>, Queue<>, Stack<> or anything in the System.Collections.Generic Namespace.

Usage Sample:

static void Main(string[] args)
{
    List<int> myList = new List<int>();
    for (int i = 0; i < 500; i++) { myList.Add(i); }

    Parallel.ForEach(myList, item =>
    {
        DoSomething(item);
        Console.ReadLine();
    }
    );
}

public static void DoSomething(int item) { Console.WriteLine("DoSomething " + item.ToString() + " : " + DateTime.Now); }

Parallel.Invoke()

Executes each of the provided actions in parallel.  This method is useful if you want to invoke different methods in one go.

Usage Sample:

static void Main(string[] args)
{
    Parallel.Invoke(DoSomething1, DoSomething2, DoSomething3);
    Console.ReadLine();
}
public static void DoSomething1() { Console.WriteLine("DoSomething 1 : " + DateTime.Now); }
public static void DoSomething2() { Console.WriteLine("DoSomething 2 : " + DateTime.Now); }
public static void DoSomething3() { Console.WriteLine("DoSomething 3 : " + DateTime.Now); }

Now lets try it in real life, below is an application that will zip a folder 100 times over and that folder consists of 2 file which is 9 MB data the first test will be using the normal loop in a linear fashion and the second test will be using parallel execution.

static void Main(string[] args)
{
 switch (args[0].ToString())
 {
 case "1":
 DateTime dDateStart = DateTime.Now;
 Console.WriteLine("Non Threaded - Start : " + dDateStart);
 DoLinear();
 DateTime dDateEnd = DateTime.Now;
 Console.WriteLine("Non Threaded - End : " + dDateEnd);
 System.TimeSpan TimeRun = dDateEnd - dDateStart;
 Console.WriteLine("Time Spent in ms : " + TimeRun.Milliseconds);
 break;
 case "2":
 DateTime dDateStart2 = DateTime.Now;
 Console.WriteLine("Non Threaded - Start : " + dDateStart2);
 DoParallel();
 DateTime dDateEnd2 = DateTime.Now;
 Console.WriteLine("Non Threaded - End : " + dDateEnd2);
 System.TimeSpan TimeRun2 = dDateEnd2 - dDateStart2;
 Console.WriteLine("Time Spent in ms : " + TimeRun2.Milliseconds);
 break;
 default:
 break;
 }
}

private static void DoLinear()
{
 int LoopCounter = 100;

 for (int x = 0; x < LoopCounter; x++)
 {
 PerformZip(x);
 }

}

private static void DoParallel()
{
 int LoopCounter = 100;

 Parallel.For(0, LoopCounter, i =>
 {
 PerformZip(i);
 }
 );
}
private static void PerformZip(int item)
{
 Process myProcess = new Process();
 myProcess.StartInfo.FileName = "rar.exe";
 myProcess.StartInfo.Arguments = @"a -m1 -r ""C:\Test\FolderForCompression-" + item + @".rar"" ""C:\Test\FolderForCompression""";
 myProcess.StartInfo.UseShellExecute = false;
 myProcess.StartInfo.RedirectStandardOutput = true;
 myProcess.Start();
}

Now here are my results

The time spend for the processes to run is not that dramatic as its only 47.38% imporvement but this is just a sample and there is an improvement, but try to imagine if you are doing a distributed application where you have workstations that do the work where the work thread is invoked from a central server definitely it will have a dramatic impact (this is where I had applied this in a real project) as the commands coming from the central server will not be linear anymore.

To dig deeper I had checked the processor usage during the test

This is the Non-Parallel usage

This is the Parallel usage

Notice we had used at max 100% of all of the 8 cores in the machine compared to 12% also  you will see that there are now multiple rar programs that are running on the backround compared to one

I think with the new age processors with multi core and multi threaded technology we need to use more threaded programming to make use of this new processors because if not why is it there for.

Execute Stored Procedures in Parallel

After my post here, It seems that everything is working fine but I need performance out of it an my last resort is to Execute Stored Procedures in Parallel so I can run multiple instances in one run.  To achieve that I need to create a CLR Stored Procedure so I can run Execute commands in a thread.  So what do I need to achieve that?

  1. You need SQL 2008 or later and enable CLR
  2. You also need to set the Database Trustworthy Flag to On
  3. To do that use these commands:

    sp_configure 'clr enabled', 1
    GO
    RECONFIGURE
    GO
    
    ALTER DATABASE [YourDatabase] SET TRUSTWORTHY ON
    GO
  4. You also need an Account to establish new connections based on Integrated Security
  5. You need Visual Studio to develop the CLR Stored Procedure whit .NET Framework 2.0 or later installed

So what exactly is a CLR Stored Procedure? According to Microsoft

CLR Stored procedures are routines that cannot be used in scalar expressions. Unlike scalar functions, they can return tabular results and messages to the client, invoke data definition language (DDL) and data manipulation language (DML) statements, and return output parameters.
In SQL Server, you can develop a database objects inside an instance of SQL Server that is programmed in an assembly created in the Microsoft .NET Framework common language runtime (CLR). Database objects that can leverage the rich programming model provided by the CLR include triggers, stored procedures, functions, aggregate functions, and types.

Creating a CLR stored procedure in SQL Server involves the following steps:

  • Define the stored procedure as a static method of a class in a language supported by the .NET Framework. For more information about how to program CLR stored procedures, see CLR Stored Procedures. Then, compile the class to build an assembly in the .NET Framework by using the appropriate language compiler.
  • Register the assembly in SQL Server by using the CREATE ASSEMBLY statement. For more information about how to work with assemblies in SQL Server, see Assemblies.
  • Create the stored procedure that references the registered assembly by using the CREATE PROCEDURE statement.

Now having said that lets start creating our CLR Stored Procedure, fire up the Visual Studio and choose a Database/SQL Server Project

New Database Project

Once it is created it will ask you for the Database and choose the database you want run the CLR Stored Procedure from.  Then the coding begins:

First you need to add a new item which is a stored procedure

Stored Procedure Item

As you might have noticed you can also add an Aggregate, User Defined Function, Triggers and User Defined Types.  Once you added the Stored Procedure then the fun begins.  Here is what I have done.

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using System.Threading;
using Microsoft.SqlServer.Server;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using SampleConsole;

namespace Parallel_Execution
{
public partial class StoredProcedures
{
[Microsoft.SqlServer.Server.SqlProcedure]
public static SqlInt32 spExecuteParallel(string DB, int MaxDOP, string TSQL, int msDelay, int Retries)
{
    // Initialize Variables
    SqlConnection oConn = new SqlConnection();
    SqlCommand oCmd = new SqlCommand();
    List<string> oErrorString = new List<string>();
    object oLocker = new object();
    string sServer = null;

    List<Thread> oThread = new List<Thread>();
    StringCollection sStopped = new StringCollection();

    // Get Server Instance Name
    oConn = new SqlConnection("context connection = true;");
    oConn.Open();

    oCmd = oConn.CreateCommand();
    oCmd.CommandText = "SELECT @@SERVERNAME";
    sServer = oCmd.ExecuteScalar().ToString();

    oCmd.Dispose();
    oConn.Close();
    oConn.Dispose();

    // Execute Threads
    int iCurrentThread = 0;
    while (iCurrentThread < MaxDOP)
    {
        ExecuteSQL Executer = new ExecuteSQL(sServer, DB, TSQL.Replace("?", DB.ToString().Trim()), Retries, ref oErrorString, ref oLocker);

        Thread oItem = new Thread(Executer.Process);
        oItem.Name = "ExecuteSQL " + DB.ToString().Trim();
        oItem.Start();
        oThread.Add(oItem);

        SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() + " : Start : " + oItem.Name.Replace("ExecuteSQL """));
        Thread.Sleep(msDelay);

        while (RunningThreads(ref oThread, ref sStopped) >= MaxDOP)
        {
            Thread.Sleep(1000);
        }
        iCurrentThread++;
    }

    // Wait for all Threads to Stop
    while (RunningThreads(ref oThread, ref sStopped) > 0)
    {
        Thread.Sleep(1000);
    }
    SqlContext.Pipe.Send("All Thread have Stopped with " + oErrorString.Count.ToString() + " Error/s ");

    if (oErrorString.Count > 0)
    {
        foreach (string sIndividualErrors in oErrorString)
        {
            SqlContext.Pipe.Send(sIndividualErrors.ToString());
        }

        throw new Exception("Error Occurred.");
    }

    return 0 - oErrorString.Count;
}

public static int RunningThreads(ref List<Thread> oThread, ref StringCollection oStops)
{
    int iRunningCount = 0;

    foreach (Thread oIndividualThread in oThread)
    {
        if (oIndividualThread.IsAlive)
        {
            iRunningCount += 1;
        }
        else if (!oStops.Contains(oIndividualThread.Name))
        {
            oStops.Add(oIndividualThread.Name);
            SqlContext.Pipe.Send(DateTime.Now.ToLongTimeString() + " : Stop  : " + oIndividualThread.Name.Replace("ExecuteSQL """));
        }
    }
    return iRunningCount;
}
}
}

Now you might have noticed that I have a class called ExecuteSQL, that class is the actual class that performs the execute, the code above just takes care of the threads.  Here is the code for ExecuteSQL:

using System.Data.SqlClient;
using System.Threading;
using System.Data;
using System;
using System.Collections.Generic;
namespace Parallel_Execution
{
class ExecuteSQL
{
private List<string> oExecuteErrors;
private object oExecuteLocker;
private string sExecuteServer;
private string sExecuteDB;
private string sExecuteTSQL;
private int iExecuteRetries;

public ExecuteSQL(string sServer, string sDB, string sTSQL, int iRetries, ref List<string> oErrors, ref object oLocker)
{
    this.sExecuteServer = sServer;
    this.sExecuteDB = sDB;
    this.sExecuteTSQL = sTSQL;
    this.iExecuteRetries = iRetries;
    this.oExecuteErrors = oErrors;
    this.oExecuteLocker = oLocker;
}

public void Process()
{
    int iTries = 1;
    SqlConnection oConn = new SqlConnection();

Retry:
    oConn = new SqlConnection("Data Source=" + sExecuteServer + ";Initial Catalog=" + sExecuteDB + ";Integrated Security=SSPI;");
    try
    {
        oConn.Open();

        if (oConn.State == ConnectionState.Open)
        {
            SqlCommand oCmd = oConn.CreateCommand();
            oCmd.CommandText = sExecuteTSQL;
            oCmd.CommandTimeout = 0;
            oCmd.ExecuteNonQuery();

            oCmd.Dispose();
            oConn.Close();
            oConn.Dispose();
        }
        else
        {
            throw new Exception("SQL Server not Found or Unable to Connect to SQL Server");
        }
    }
    catch (Exception ex)
    {
        if (oConn.State != ConnectionState.Closed) oConn.Close();
        oConn.Dispose();

        if (iTries <= iExecuteRetries)
        {
            Thread.Sleep(5000);
            iTries += 1;
            goto Retry;
        }
        else
        {
            lock (oExecuteLocker)
            {
                char cSpace = char.Parse(" ");
                oExecuteErrors.Add(this.sExecuteDB.PadRight(16, cSpace) + " : " + ex.Message);
            }
        }
    }
}
}
}

Once all done you can now deploy you CLR Stored Procedure, and once you had deployed it it will be now available in your chosen database in the same place you find your stored procedures.

The deployed CLR Stored Procedure

Once it is there you can now use it to execute stored procedures in Parallel, in this case I had created a table so that I can insert sample data with Date.  To use it you have the following parameters

  • DB – Your Database
  • MaxDOP – The number of Threads you want to use
  • TSQL – The TSQL you want to execute
  • msDelay – Delay before you run Next Thread
  • Retries – Retry Count if you encounter errors

To show you a sample onhow I use it

USE [SampleDB]
GO

DECLARE    @return_value int

EXEC    @return_value = [dbo].[spExecuteParallel]
 @DB = N'SampleDB',
 @MaxDOP = 8,
 @TSQL = N'Insert into TestTable ([Message], LogDate) values (''Test'', GetDate())',
 @msDelay = 0,
 @Retries = 1

SELECT    'Return Value' = @return_value
GO

After you run this check your message and table for the results!

Here are my results

Results

Table Results

Follow

Get every new post delivered to your Inbox.

Join 774 other followers