I am working on an ETL process that is part of a larger system.

This larger system includes a logging system using a REST API.

The ETL section is running under SSIS, developed in Visual Studio 2015 and deployed on SQL Server 2016.

The ETL is covered with integration tests including tests of logs being generated.

The REST API cannot be guaranteed to be running during these integration tests, and even if it is, the asynchronous nature makes testing log generation... awkward.

We could use a script component to handle the logging, but we have 30+ packages requiring logging (each a distinct operation, based on a data-point to be calculated from one database into the next, so that a team can work concurrently without having TFS merge butcher XML definitions as much as possible), so maintenance becomes a headache.

In order to get around this, I have written a custom component that will bundle up all the errors across a package execution (separated into Fatal, Error, Warning, Info, and Detail levels), add a source, and fire JSON off to the REST API. In the event that the REST API is not specified, the system will log locally instead (which for the integration tests, means that we have a synchronous and local log source to check).

  • The ComponentType is ComponentType.DestinationAdapter.
  • The component has two custom properties, one for the variable name of the logging url (defaults to $Project::LoggingURL), one for the source of the log (defaults to System::PackageName).
  • The component validates the custom properties to not be blank.
  • The component has a single connection, defaulting to a master database, used as a fallback.
  • The component validates that the connection is set.
  • The component has multiple (five) inputs and no outputs.
  • Each input is marked as having side-effects.
  • Each attached input is validated as having a single input column of type DT_WSTR.
  • Unattached inputs are fine (a package that cannot log any fatal errors will leave that input unattached).
  • If any Fatal or Error messages are detected, the component fails the package in the Post-Execute step (in order to detect as many issues as possible in a run, rather than only the first).
  • The build targets 32-bit, and .NET Framework 4.
  • On Post-Build, the dll is copied to the DTS/PipelineComponents folder, and the assembly is deregistered then reregistered in the GAC.
  • When executing a package through Visual Studio (right-click on the package, 'Execute Package'), the component behaves exactly as expected.
  • When the package is deployed to the local SQL Server 2016 instance on my machine and the integration tests are run, the validation claims that the outputs leading into my component are not used and should be removed, and the component does nothing (as if it was never there). There are no messages about the component whatsoever.

I would very much like to have the component run in SQL Server, otherwise it is completely useless.

This is the code (there is an associated UI, but the Design-Time behaviour is as expected):

[DtsPipelineComponent(
    DisplayName = "Custom Logging Component",
    ComponentType = ComponentType.DestinationAdapter,
    IconResource = "DestinationIcon",
    CurrentVersion = 1,
    UITypeName = "ETLCustomDataFlowComponents.CustomLoggingComponentUI,ETLCustomDataFlowComponents,Version=1.0.0.0,Culture=neutral,PublicKeyToken=051a7fa35dda5a9f"
)]
public class HermesCustomLoggingComponent : PipelineComponent
{
    public const string _SOURCE_PROPERTY = "Source Name";
    public const string _LOG_PROPERTY = "Log URL";
    public const string _MASTER_CONN_PROPERTY = "Master Connection";

    public override void ProvideComponentProperties()
    {
        base.ProvideComponentProperties();
        base.RemoveAllInputsOutputsAndCustomProperties();

        var loggingPath = ComponentMetaData.CustomPropertyCollection.New();
        loggingPath.Description = "The url to send json log messages to";
        loggingPath.Name = _LOG_PROPERTY;
        loggingPath.Value = string.Empty;
        loggingPath.ExpressionType = DTSCustomPropertyExpressionType.CPET_NOTIFY;

        var source = ComponentMetaData.CustomPropertyCollection.New();
        source.Description = "The source to which the log is to be attributed";
        source.Name = _SOURCE_PROPERTY;
        source.Value = string.Empty;

        var masterConn = ComponentMetaData.RuntimeConnectionCollection.New();
        masterConn.Name = _MASTER_CONN_PROPERTY;
        masterConn.Description = "The connection to log.Log as a backup when centralised logging fails";

        foreach (var level in new[] { "Fatal", "Error", "Warning", "Info", "Debug" })
        {
            var input = ComponentMetaData.InputCollection.New();
            input.Name = level;
            input.HasSideEffects = true;
        }
    }

    public override DTSValidationStatus Validate()
    {
        bool broken = false;
        bool cancel;

        foreach (IDTSInput100 input in ComponentMetaData.InputCollection)
        {
            if (input.IsAttached)
            {
                if (input.InputColumnCollection.Count != 1)
                {
                    ComponentMetaData.FireError(0, ComponentMetaData.Name, $"{input.Name} should have only a message input", "", 0, out cancel);
                    broken = true;
                }
                else
                {
                    if (input.InputColumnCollection[0].DataType != DataType.DT_WSTR)
                    {
                        ComponentMetaData.FireError(0, ComponentMetaData.Name, $"Input to {input.Name} is not of type DT_WSTR", "", 0, out cancel);
                        broken = true;
                    }
                }
            }
            else
            {
                input.InputColumnCollection.RemoveAll();
            }
        }

        if (ComponentMetaData.CustomPropertyCollection[_SOURCE_PROPERTY].Value == string.Empty)
        {
            ComponentMetaData.FireError(0, ComponentMetaData.Name, $"{_SOURCE_PROPERTY} parameter has not been set", "", 0, out cancel);
            broken = true;
        }

        if (ComponentMetaData.CustomPropertyCollection[_LOG_PROPERTY].Value == string.Empty)
        {
            ComponentMetaData.FireError(0, ComponentMetaData.Name, $"{_LOG_PROPERTY} parameter has not been set", "", 0, out cancel);
            broken = true;
        }

        if (ComponentMetaData.RuntimeConnectionCollection[_MASTER_CONN_PROPERTY].ConnectionManager == null)
        {
            ComponentMetaData.FireError(0, ComponentMetaData.Name, $"{_MASTER_CONN_PROPERTY} has not been set", "", 0, out cancel);
            broken = true;
        }

        if (broken)
        {
            return DTSValidationStatus.VS_ISBROKEN;
        }

        return base.Validate();
    }

    private readonly List<Dictionary<string, string>> _logMessages = new List<Dictionary<string, string>>();
    private readonly Dictionary<int, IDTSInput100> _inputs = new Dictionary<int, IDTSInput100>();
    private readonly Dictionary<string, int> _messageCounts = new Dictionary<string, int>();
    private string _source = string.Empty;
    private string _loggingPath = string.Empty;

    private SqlConnection sqlConnection;

    public override void AcquireConnections(object transaction)
    {
        if (ComponentMetaData.RuntimeConnectionCollection[_MASTER_CONN_PROPERTY].ConnectionManager != null)
        {
            ConnectionManager cm = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[_MASTER_CONN_PROPERTY].ConnectionManager);
            ConnectionManagerAdoNet cmAdoNet = cm.InnerObject as ConnectionManagerAdoNet;

            if (cmAdoNet == null) throw new Exception($"Connection Manager {cm.Name} is not ADO.NET");

            sqlConnection = cmAdoNet.AcquireConnection(transaction) as SqlConnection;

            if ((sqlConnection != null) && (sqlConnection.State != ConnectionState.Open)) sqlConnection.Open();
        }
    }

    public override void ReleaseConnections()
    {
        if ((sqlConnection != null) && (sqlConnection.State != ConnectionState.Closed)) sqlConnection.Close();
    }

    public override void PreExecute()
    {
        var sourceVar = ComponentMetaData.CustomPropertyCollection[_SOURCE_PROPERTY].Value;
        if (sourceVar != string.Empty)
        {
            IDTSVariables100 variables;
            VariableDispenser.LockForRead(sourceVar);
            VariableDispenser.GetVariables(out variables);
            _source = variables[sourceVar].Value.ToString();
        }
        var loggingVar = ComponentMetaData.CustomPropertyCollection[_LOG_PROPERTY].Value;
        if (loggingVar != string.Empty)
        {
            IDTSVariables100 variables;
            VariableDispenser.LockForRead(loggingVar);
            VariableDispenser.GetVariables(out variables);
            _loggingPath = variables[loggingVar].Value.ToString();
        }
        foreach (IDTSInput100 input in ComponentMetaData.InputCollection)
        {
            _inputs[input.ID] = input;
            _messageCounts[input.Name] = 0;
        }
    }

    public override void ProcessInput(int inputID, PipelineBuffer buffer)
    {
        while (buffer.NextRow())
        {
            string message = buffer[0].ToString();

            _messageCounts[_inputs[inputID].Name] += 1;

            _logMessages.Add(new Dictionary<string, string>
            {
                {"Level", _inputs[inputID].Name},
                {"InstanceId", Environment.MachineName},
                {"Source", _source},
                {"Message", message}
            });
        }
    }

    public override void PostExecute()
    {
        if (string.IsNullOrWhiteSpace(_loggingPath))
        {
            List<string> logMessagesList = new List<string>();

            foreach (var logMessage in _logMessages)
            {
                logMessagesList.Add(
                    $"('{logMessage["Level"].Substring(0, 1)}', '{logMessage["Source"]}', '{logMessage["Message"]}')");
            }

            if ((sqlConnection != null) && (sqlConnection.State == ConnectionState.Open))
            {
                var command = sqlConnection.CreateCommand();
                command.CommandText =
                    $"INSERT INTO log.Log ([Level], [Source], [Message]) VALUES {string.Join(", ", logMessagesList)}";
                command.ExecuteNonQuery();
            }

        }
        else
        {
            List<string> logMessagesList = new List<string>();

            foreach (var logMessage in _logMessages)
            {
                List<string> logJsonList = new List<string>();
                foreach (var logElement in logMessage)
                {
                    logJsonList.Add($"\"{logElement.Key}\":\"{logElement.Value}\"");
                }
                var logString = string.Join(", ", logJsonList);
                if (!logMessagesList.Contains(logString))
                {
                    logMessagesList.Add(logString);
                }
            }

            string logJson = "[{" + string.Join("}, {", logMessagesList) + "}]";
            var request = (HttpWebRequest)WebRequest.Create(_loggingPath + "api/log");
            request.Method = "POST";
            request.ContentType = "application/json";
            request.ContentLength = logJson.Length;
            using (var requestWriter = new StreamWriter(request.GetRequestStream(), System.Text.Encoding.ASCII))
            {
                requestWriter.Write(logJson);
            }
        }

        foreach (var level in new[] { "Fatal", "Error" })
        {
            if (_messageCounts[level] > 0)
            {
                bool cancel;
                ComponentMetaData.FireError(0, _source, "Package has logged an exception, and cannot continue", "",
                                        0,
                                        out cancel);
            }
        }
    }

    public override void PerformUpgrade(int pipelineVersion)
    {
        ComponentMetaData.Version = 1;
    }
}
0

There are 0 best solutions below