woensdag 9 mei 2012

File Splitter

For one of the interfaces I was working on we get a flat file with information for multiple identifiers. We needed to split this file so that we only have 1 identifier per file. For this we create a new disassemble pipeline component. We will add 2 attributes to the class. The first is StartIndex, an integer which will define the location of a unique identifier in the file, for example a identifiercode. The second is Length, an integer that indicates how long the identifier is. Also add a Queue to the class, which will hold the new messages.
/// <summary>
/// Split an incoming message.
/// </summary>
public class FileSplitter : IBaseComponent,
    IDisassemblerComponent,
    IComponentUI,
    IPersistPropertyBag
{
    /// <summary>
    /// Used to hold disassembled messages.
    /// </summary>
    private System.Collections.Queue qOutputMsgs = 
        new System.Collections.Queue();
    
    /// <summary>
    /// Namespace used to set the promoted properties.
    /// </summary>
    private string systemPropertiesNamespace = 
        @"http://schemas.microsoft.com/BizTalk/2003/system-properties";

    /// <summary>
    /// The start index of the identifier.
    /// </summary>
    private int _startIndex;

    /// <summary>
    /// The start index of the identifier.
    /// </summary>
    public int StartIndex
    {
        get
        {
            return _startIndex;
        }
        set
        {
            _startIndex = value;
        }
    }

    /// <summary>
    /// The length of the identifier.
    /// </summary>
    private int _length;

    /// <summary>
    /// The length of the identifier.
    /// </summary>
    public int Length
    {
        get
        {
            return _length;
        }
        set
        {
            _length = value;
        }
    }
}



Now we will create the disassemble method, this is called in the disassemble stage of the pipeline. Here we will read in the message line by line, and check in each line if the identifier in it is the same as the identifier in the previous line. If it is the same, we add it to the current message, if not, we create a new message.
  
/// <summary>
/// Disassemble method is used to initiate the disassembling of the 
/// message in this pipeline component.
/// </summary>
/// <param name="pc">Pipeline context.</param>
/// <param name="inmsg">Input message.</param>
public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
{
    // The namespace in which the new messages should be
    string namespaceURI = 
        "http://www.company.com/BizTalk/Application/v1";
    
    // The root element for the new messages
    string rootElement = "File_Incoming";
    
    // Stringbuilder used to create the new message
    StringBuilder messageString = new StringBuilder();
    
    // Stream that will hold the original message's data
    Stream originalMessageStream;
    
    // Get the original file name
    string srcFileName = pInMsg.Context.Read("ReceivedFileName", 
        "http://schemas.microsoft.com/BizTalk/2003/"
        + "file-properties").ToString().Replace(".txt", "");
    
    // Counter to make the outgoing filename unique
    int count = 0;

    try
    {
        // Fetch original message's data
        originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in reading original 
            message: " + ex.Message);
    }

    try
    {
        // Create a StreamReader to read the original message's data
        StreamReader sr = new StreamReader(originalMessageStream);

        // The identifier for the last line
        string curIdentifier = string.Empty;

        // Go through all the lines in the original message
        while (sr.Peek() >= 0)
        {
            // Read the next line
            string line = sr.ReadLine();
            
            // Get the identifier in this line
            string identifier = line.Substring(_startIndex, _length);
            
            // Check if this is the same identifier as in the previous 
            // line
            if (!identifier.Equals(curIdentifier))
            {
                // If not, close current identifier if any
                // This prevents an empty message to be created the
                // first time we come here
                if (!string.IsNullOrEmpty(curIdentifier))
                {
                    // Queue the message
                    CreateOutgoingMessage(pContext, 
                        messageString.ToString(), namespaceURI, 
                        rootElement, 
                        String.Format("{0}_{1}", srcFileName, count));
                        
                    // Clear the message for the next message
                    messageString.Remove(0, messageString.Length);
                    
                    // Next message will be in a unique file
                    count++;
                }
                
                // From now on we want to compare to this identifier
                curIdentifier = identifier;
            }
            
            // Add the line to the current message
            messageString.Append(line + Environment.NewLine);
        }
        
        // Close current identifier if any
        if (!string.IsNullOrEmpty(curIdentifier))
        {
            // Queue the message
            CreateOutgoingMessage(pContext, messageString.ToString(), 
                namespaceURI, rootElement, 
                String.Format("{0}_{1}", srcFileName, count));
        }
        
        // Close the StreamReader
        sr.Close();
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in writing outgoing " 
            + "messages: " + ex.Message);
    }
    finally
    {
        // Close the StringBuilder
        messageString = null;
    }
}

Now we will implement the GetNext method. This method is used in the pipeline to pass the messages to the next stage.
/// <summary>
/// Used to pass output messages to next stage.
/// </summary>
public IBaseMessage GetNext(IPipelineContext pContext)
{
    // Check if there any messages in the queue
    if (qOutputMsgs.Count > 0)
    {
        // Get the next message
        return (IBaseMessage)qOutputMsgs.Dequeue();
    }
    
    return null;
}

Finally we have to create the method that puts the messages in the queue.
/// <summary>
/// Queue outgoing messages.
/// </summary>
/// <param name="pContext">Pipeline context.</param>
/// <param name="messageString">The string with the new (debatched) message.</param>
/// <param name="namespaceURI">The namespace we want to use for the message.</param>
/// <param name="rootElement">The root element for the message.</param>
/// <param name="sourceFileName">The file name we want to use for the new
message.</param>
private void CreateOutgoingMessage(IPipelineContext pContext, 
    String messageString, string namespaceURI, string rootElement, 
    string sourceFileName)
{
    // The message to be put in the queue for further processing in 
    // the pipeline
    IBaseMessage outMsg;

    try
    {
        // Create outgoing message
        outMsg = pContext.GetMessageFactory().CreateMessage();
        
        // Add the body part
        outMsg.AddPart("Body", 
            pContext.GetMessageFactory().CreateMessagePart(), true);
        
        // Add the namespace and root element
        outMsg.Context.Promote("MessageType", 
            systemPropertiesNamespace, namespaceURI + "#" 
            + rootElement.Replace("ns0:", ""));
            
        // Set the filename to be used, this can be used in BizTalk
        // by using the %SourceFileName% identifier
        outMsg.Context.Promote("ReceivedFileName", 
            "http://schemas.microsoft.com/BizTalk/2003/file-properties", 
            sourceFileName);

        // Get the outgoing message as bytes
        byte[] bufferOutgoingMessage = 
            System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
            
        // Set the data of the outgoing message
        outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage);
        
        // Place the message in the queue
        qOutputMsgs.Enqueue(outMsg);
    }
    catch (Exception ex)
    {
        // Something went wrong
        throw new ApplicationException("Error in queueing outgoing " 
            + "messages: " + ex.Message);
    }
}
And finally, the whole class.
using System;
using System.IO;
using System.Text;
using System.ComponentModel;
using Microsoft.BizTalk.Component.Interop;
using Microsoft.BizTalk.Message.Interop;

namespace Company.BizTalk.Other.PipelineComponents
{
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]
    [System.Runtime.InteropServices.Guid("F0DF352C-657B-42AD-A26D-730E8031CD42")]
    /// <summary>
    /// Split an incoming message.
    /// </summary>
    public class FileSplitter : IBaseComponent,
        IDisassemblerComponent,
        IComponentUI,
        IPersistPropertyBag
    {
        /// <summary>
        /// Used to hold disassembled messages.
        /// </summary>
        private System.Collections.Queue qOutputMsgs = 
            new System.Collections.Queue();
        
        /// <summary>
        /// Namespace used to set the promoted properties.
        /// </summary>
        private string systemPropertiesNamespace = 
            @"http://schemas.microsoft.com/BizTalk/2003/system-properties";

        /// <summary>
        /// The start index of the identifier.
        /// </summary>
        private int _startIndex;

        /// <summary>
        /// The start index of the identifier.
        /// </summary>
        public int StartIndex
        {
            get
            {
                return _startIndex;
            }
            set
            {
                _startIndex = value;
            }
        }

        /// <summary>
        /// The length of the identifier.
        /// </summary>
        private int _length;

        /// <summary>
        /// The length of the identifier.
        /// </summary>
        public int Length
        {
            get
            {
                return _length;
            }
            set
            {
                _length = value;
            }
        }
    }

        /// <summary>
        /// Default constructor
        /// </summary>
        public FileSplitter()
        {
        }

        /// <summary>
        /// Description of pipeline
        /// </summary>
        public string Description
        {
            get
            {
                return "Component to batch one flat file message into multiple 
                    messages";
            }
        }

        /// <summary>
        /// Name of pipeline
        /// </summary>
        public string Name
        {
            get
            {
                return "FileSplitter";
            }
        }

        /// <summary>
        /// Pipeline version
        /// </summary>
        public string Version
        {
            get
            {
                return "1.0.0.0";
            }
        }


        /// <summary>
        /// Returns collecton of errors
        /// </summary>
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }

        /// <summary>
        /// Returns icon of pipeline
        /// </summary>
        public System.IntPtr Icon
        {
            get
            {
                return new System.IntPtr();
            }
        }


        /// <summary>
        /// Class GUID
        /// </summary>
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("F0DF352C-657B-42AD-A26D-730E8031CD42");
        }

        /// <summary>
        /// InitNew
        /// </summary>
        public void InitNew()
        {
        }

        /// <summary>
        /// Load property from property bag
        /// </summary>
        public void Load(IPropertyBag propertyBag, int errorLog)
        {
            object val = null;
            try
            {
                propertyBag.Read("StartIndex", out val, 0);
            }
            catch (ArgumentException)
            {
                val = 0;
            }
            catch (NullReferenceException)
            {
                val = 0;
            }
            catch (Exception ex)
            {
                throw new ApplicationException(ex.Message);
            }
            if (val != null)
            {
                this._startIndex = ((int)(val));
            }

            val = null;
            try
            {
                propertyBag.Read("Length", out val, 0);
            }
            catch (ArgumentException)
            {
                val = 0;
            }
            catch (NullReferenceException)
            {
                val = 0;
            }
            catch (Exception ex)
            {
                throw new ApplicationException(ex.Message);
            }
            if (val != null)
            {
                this._length = ((int)(val));
            }

        }

        /// <summary>
        /// Write property to property bag
        /// </summary>
        public void Save(IPropertyBag propertyBag, bool clearDirty, 
            bool saveAllProperties)
        {
            object val = (object)StartIndex;
            propertyBag.Write("StartIndex", ref val);

            val = (object)Length;
            propertyBag.Write("Length", ref val);
        }

        /// <summary>
        /// Disassemble method is used to initiate the disassembling of the 
        /// message in this pipeline component.
        /// </summary>
        /// <param name="pc">Pipeline context.</param>
        /// <param name="inmsg">Input message.</param>
        public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)
        {
            // The namespace in which the new messages should be
            string namespaceURI = 
                "http://www.company.com/BizTalk/Application/v1";
            
            // The root element for the new messages
            string rootElement = "File_Incoming";
            
            // Stringbuilder used to create the new message
            StringBuilder messageString = new StringBuilder();
            
            // Stream that will hold the original message's data
            Stream originalMessageStream;
            
            // Get the original file name
            string srcFileName = pInMsg.Context.Read("ReceivedFileName", 
                "http://schemas.microsoft.com/BizTalk/2003/"
                + "file-properties").ToString().Replace(".txt", "");
            
            // Counter to make the outgoing filename unique
            int count = 0;

            try
            {
                // Fetch original message's data
                originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();
            }
            catch (Exception ex)
            {
                // Something went wrong
                throw new ApplicationException("Error in reading original 
                    message: " + ex.Message);
            }

            try
            {
                // Create a StreamReader to read the original message's data
                StreamReader sr = new StreamReader(originalMessageStream);

                // The identifier for the last line
                string curIdentifier = string.Empty;

                // Go through all the lines in the original message
                while (sr.Peek() >= 0)
                {
                    // Read the next line
                    string line = sr.ReadLine();
                    
                    // Get the identifier in this line
                    string identifier = line.Substring(_startIndex, _length);
                    
                    // Check if this is the same identifier as in the previous 
                    // line
                    if (!identifier.Equals(curIdentifier))
                    {
                        // If not, close current identifier if any
                        // This prevents an empty message to be created the
                        // first time we come here
                        if (!string.IsNullOrEmpty(curIdentifier))
                        {
                            // Queue the message
                            CreateOutgoingMessage(pContext, 
                                messageString.ToString(), namespaceURI, 
                                rootElement, 
                                String.Format("{0}_{1}", srcFileName, count));
                                
                            // Clear the message for the next message
                            messageString.Remove(0, messageString.Length);
                            
                            // Next message will be in a unique file
                            count++;
                        }
                        
                        // From now on we want to compare to this identifier
                        curIdentifier = identifier;
                    }
                    
                    // Add the line to the current message
                    messageString.Append(line + Environment.NewLine);
                }
                
                // Close current identifier if any
                if (!string.IsNullOrEmpty(curIdentifier))
                {
                    // Queue the message
                    CreateOutgoingMessage(pContext, messageString.ToString(), 
                        namespaceURI, rootElement, 
                        String.Format("{0}_{1}", srcFileName, count));
                }
                
                // Close the StreamReader
                sr.Close();
            }
            catch (Exception ex)
            {
                // Something went wrong
                throw new ApplicationException("Error in writing outgoing " 
                    + "messages: " + ex.Message);
            }
            finally
            {
                // Close the StringBuilder
                messageString = null;
            }
        }

        /// <summary>
        /// Used to pass output messages to next stage.
        /// </summary>
        public IBaseMessage GetNext(IPipelineContext pContext)
        {
            // Check if there any messages in the queue
            if (qOutputMsgs.Count > 0)
            {
                // Get the next message
                return (IBaseMessage)qOutputMsgs.Dequeue();
            }
            
            return null;
        }

        /// <summary>
        /// Queue outgoing messages.
        /// </summary>
        /// <param name="pContext">Pipeline context.</param>
        /// <param name="messageString">The string with the new (debatched) 
        ///  message.</param>
        /// <param name="namespaceURI">The namespace we want to use for the 
        /// message.</param>
        /// <param name="rootElement">The root element for the message.</param>
        /// <param name="sourceFileName">The file name we want to use for the new 
        /// message.</param>
        private void CreateOutgoingMessage(IPipelineContext pContext, 
            String messageString, string namespaceURI, string rootElement, 
            string sourceFileName)
        {
            // The message to be put in the queue for further processing in 
            // the pipeline
            IBaseMessage outMsg;

            try
            {
                // Create outgoing message
                outMsg = pContext.GetMessageFactory().CreateMessage();
                
                // Add the body part
                outMsg.AddPart("Body", 
                    pContext.GetMessageFactory().CreateMessagePart(), true);
                
                // Add the namespace and root element
                outMsg.Context.Promote("MessageType", 
                    systemPropertiesNamespace, namespaceURI + "#" 
                    + rootElement.Replace("ns0:", ""));
                    
                // Set the filename to be used, this can be used in BizTalk
                // by using the %SourceFileName% identifier
                outMsg.Context.Promote("ReceivedFileName", 
                    "http://schemas.microsoft.com/BizTalk/2003/file-properties", 
                    sourceFileName);

                // Get the outgoing message as bytes
                byte[] bufferOutgoingMessage = 
                    System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);
                    
                // Set the data of the outgoing message
                outMsg.BodyPart.Data = new MemoryStream(bufferOutgoingMessage);
                
                // Place the message in the queue
                qOutputMsgs.Enqueue(outMsg);
            }
            catch (Exception ex)
            {
                // Something went wrong
                throw new ApplicationException("Error in queueing outgoing " 
                    + "messages: " + ex.Message);
            }
        }
    }
}

Geen opmerkingen:

Een reactie posten