Problem with multithreading in powershell

567 Views Asked by At

[edit 10/29/18] - It appears (based on logging) that the parallel.foreach function is not waiting for the return value information

Below is the code I'm using to have a multithreaded powershell function. I'm leveraging the add-type function in powershell to use c# in my powershell code with Parallel.ForEach. The problem I'm encountering is that sometimes this function does not list all of our servers in a dag, and other times it does. (it should always return 1 value for each server).

I have verified that all servers are being fed into the Parallel.ForEach by writing them to the console before passing them into the Parallel.ForEach function and I've verified they are not coming back as an ae.InnerExceptions as well when there are missing servers, but I don't know how else to troubleshoot it. I cannot find any consistency or pattern as to why this is happening and am looking for ideas of how to fix that or troubleshoot this?

Write-host "2. Get-MailboxDatabaseCopyStatus health (copyqueue < 6 and replayqueue < 100 for Ap's 1, 2 and 3) for " $dagName -ForegroundColor Yellow -BackgroundColor Black;
Write-Host "";

$queueLengths = 
@'

    using System;
    using System.Collections.Generic;
    using System.Collections.ObjectModel;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Management.Automation;
    using System.Management.Automation.Runspaces;
    using System.Security;
    using System.Text;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    using System.Threading;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Security.Principal;    


    namespace MultiThreading
    {
        public class queues
        {
            public string uri = "http://xxxx/powershell";


            public AuthenticationMechanism auth = AuthenticationMechanism.Kerberos;
            //public AuthenticationMechanism auth = AuthenticationMechanism.Negotiate;

            public List<string> get(string dag)
            {
                var response = new ConcurrentBag<List<string>>();
                var exceptions = new ConcurrentQueue<Exception>();

                string[] serversUnsorted = getDagMembers(dag);
                var servers = from s in serversUnsorted orderby s select s;

                try
                {
                    Parallel.ForEach(servers, server =>
                    {
                        response.Add(runPowerShellScriptGetDbCopyStatusHealth(server));
                    });
                }
                catch (AggregateException ae)
                {
                    foreach (var aex in ae.InnerExceptions)
                    {
                        exceptions.Enqueue(aex);
                    }
                }

                List<string> returnValues = new List<string>();

                foreach (var item in response)
                {
                    for (int i = 0; i < item.Count; i++)
                    {
                        returnValues.Add(item[i].ToString());
                    }
                }
                returnValues.Sort();

                return returnValues;
            }

            private List<string> runPowerShellScriptGetDbCopyStatusHealth(object server)
            {
                Collection<PSObject> psobjs = new Collection<PSObject>();
                List<string> returnValue = new List<string>();
                string serverName = server.ToString();


                WSManConnectionInfo wmc = new WSManConnectionInfo(new Uri(uri));

                wmc.AuthenticationMechanism = auth;
                wmc.ShellUri = "http://schemas.microsoft.com/powershell/Microsoft.Exchange";

                using (Runspace runspace = RunspaceFactory.CreateRunspace(wmc))
                {
                    PowerShell powershell = PowerShell.Create();

                    if (runspace.RunspaceStateInfo.State == RunspaceState.Opened)
                    {
                        // do nothing
                    }
                    else
                    {
                        runspace.Open();
                        powershell.Runspace = runspace;
                    }

                    try
                    {
                        PSCommand command = new PSCommand();                    
                        command.AddScript("Get-MailboxDatabase -Server " + serverName);// + " | Where {$_.Recovery -eq $False}  | Sort Name | Get-MailboxDatabaseCopyStatus");
                        powershell.Commands = command;
                        psobjs = powershell.Invoke();                    

                        if (powershell.HadErrors == true)
                        {
                            returnValue.Add(serverName +  " failed - " + powershell.Streams.Error[0].ToString());
                        }

                        List<string> nonRecoveryDbs = parseDatabaseResults(psobjs);

                        returnValue = getCopyQueueLength(serverName, nonRecoveryDbs);
                    }
                    catch (Exception ex)
                    {
                        string fail = ex.Message;
                    }
                }

                return returnValue;
            }

            private List<string> parseDatabaseResults(Collection<PSObject> objs)
            {
                List<string> returnValue = new List<string>();

                foreach (PSObject obj in objs)
                {
                    if (obj != null)
                    {
                        object o = obj.Members["Recovery"].Value;

                        string asdf = o.ToString();
                        if (o.ToString().ToLower() == "false")
                        {
                            returnValue.Add(obj.Members["Identity"].Value.ToString());
                        }
                    }
                }

                return returnValue;
            }

            private List<string> getCopyQueueLength(string server, List<string> databases)
            {
                Collection<PSObject> psobjs = new Collection<PSObject>();
                List<string> returnValue = new List<string>();


                WSManConnectionInfo wmc = new WSManConnectionInfo(new Uri(uri));

                wmc.AuthenticationMechanism = auth;
                wmc.ShellUri = "http://schemas.microsoft.com/powershell/Microsoft.Exchange";

                using (Runspace runspace = RunspaceFactory.CreateRunspace(wmc))
                {
                    PowerShell powershell = PowerShell.Create();

                    if (runspace.RunspaceStateInfo.State == RunspaceState.Opened)
                    {
                        // do nothing
                    }
                    else
                    {
                        runspace.Open();
                        powershell.Runspace = runspace;
                    }

                    foreach (string database in databases)
                    {
                        try
                        {
                            PSCommand command = new PSCommand();
                            command.AddScript("Get-MailboxDatabaseCopyStatus -verbose -Identity " + database);
                            powershell.Commands = command;
                            psobjs = powershell.Invoke();

                            if (powershell.HadErrors == true)
                            {
                                returnValue.Add(server + " - " + database + " failed - " + powershell.Streams.Error[0].ToString());
                            }

                            foreach (PSObject psobj in psobjs)
                            {
                                if (psobj != null)
                                {
                                    object ap = psobj.Members["ActivationPreference"].Value;
                                    object queueLength = psobj.Members["CopyQueueLength"].Value;
                                    object replayQueueLength = psobj.Members["ReplayQueueLength"].Value;

                                    if (ap.ToString() == "4")
                                    {
                                        if (Convert.ToInt32(queueLength) > 5)
                                        {
                                            returnValue.Add(server + " - " + database + " - queue count: " + queueLength);
                                        }
                                    }
                                    else
                                    {
                                        if ((Convert.ToInt32(queueLength) > 5) || (Convert.ToInt32(replayQueueLength) > 100))
                                        {
                                            returnValue.Add(server + " - " + database + " - queue count: " + queueLength + " - replay queue count: " + replayQueueLength);
                                        }
                                    }                                   
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            string fail = ex.Message;
                            if (fail.Contains("xmlsoap"))
                            {
                                returnValue.Add(server + " - xmlsoap failure");
                            }
                            else
                            {
                                returnValue.Add(server + " - " + fail);
                            }
                        }
                    }
                }

                if (returnValue.Count == 0)
                {
                    returnValue.Add(server + ", queue count less than 6 and replay count less than 100");
                }

                return returnValue;
            }

            private string[] getDagMembers(string dagName)
            {
                Collection<PSObject> psobjs = new Collection<PSObject>();
                string result = "";
                string[] servers = null;


                WSManConnectionInfo wmc = new WSManConnectionInfo(new Uri(uri));
                wmc.AuthenticationMechanism = auth;

                wmc.ShellUri = "http://schemas.microsoft.com/powershell/Microsoft.Exchange";               

                using (Runspace runspace = RunspaceFactory.CreateRunspace(wmc))
                {
                    PowerShell powershell = PowerShell.Create();

                    if (runspace.RunspaceStateInfo.State == RunspaceState.Opened)
                    {
                        // do nothing
                    }
                    else
                    {
                        runspace.Open();
                        powershell.Runspace = runspace;
                    }

                    try
                    {
                        PSCommand command = new PSCommand();
                        command.AddScript("Get-DatabaseAvailabilityGroup -Identity " + dagName);
                        powershell.Commands = command;
                        psobjs = powershell.Invoke();

                        if (powershell.HadErrors == true)
                        {
                            result = "Failed - " + powershell.Streams.Error[0].ToString();
                            result = result.Replace("\"", "*");
                        }

                        PSPropertyInfo serversTemp = null;
                        foreach (PSObject psobj in psobjs)
                        {
                            serversTemp = psobj.Properties["servers"];
                        }

                        string s_servers = serversTemp.Value.ToString();
                        servers = s_servers.Split(' ');

                    }
                    catch (Exception ex)
                    {
                        string fail = ex.Message;
                    }
                }            

                return servers;
            }
        }
    }
'@

try
{
    $queueLengthCheck = New-Object MultiThreading.queues;
}
catch
{
    Add-Type -TypeDefinition $queueLengths -ReferencedAssemblies System.Collections, System.ComponentModel, System.Data, System.Drawing, System.Linq, System.Management.Automation, System.Security, System.Threading.Tasks, System.Windows.Forms, System.Threading, System.Collections.Concurrent, System.Security.Principal
    $queueLengthCheck = New-Object MultiThreading.queues;
}

$queues = $queueLengthCheck.get($dagName);
Write-Host "Queues server count -" $queues.Count;
foreach($queue in $queues)
{
    if ($queue -match "queue count less than 6 and replay count less than 100")
    {
        Write-Host "     " $queue -ForegroundColor Green;
    }
    else
    {
        Write-Host "     " $queue -ForegroundColor Red;
    }

}
0

There are 0 best solutions below