Welcome, guest | Sign In | My Account | Store | Cart

A parser I designed to work with HIPAA EDI files. It reads in files and spits out the individual segments without terminators.

Requires Python 2.3 or greater. (Use can probably use Python 2.2 with from __future__ import generators at the top...)

Python, 149 lines
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import array
import string
import sys

try:
    # If available use the psyco optimizing routines.  This will speed
    # up execution by 2x.
    import psyco.classes
    base_class = psyco.classes.psyobj
except ImportError:
    base_class = object

alphanums = string.letters + string.digits

class BadFile(Exception):
    """Raised when file corruption is detected."""

class Parser(base_class):
    """Parse out segments from the X12 raw data files.

    Raises the BadFile exception when data corruption is detected.

    Attributes:
        delimiters
            A string where
            [0] == segment separator
            [1] == element separator
            [2] == sub-element separator
            [3] == repetition separator (if ISA version >= 00405
    """
    def __init__(self, filename=None):
        self.delimiters = ''
        if filename:
            self.open_file(filename)

    def __iter__(self):
        """Return the iterator for use in a for loop"""
        return self

    def open_file(self, filename):
        self.fp = open(filename, 'r')
        self.in_isa = False

    def next(self):
        """return the next segment from the file or raise StopIteration

        Here we'll return the next segment, this will be a 'bare' segment
        without the segment terminator.

        We're using the array module.  Written in C this should be very
        efficient at adding and converting to a string.
        """
        seg = array.array('c')
        if not self.in_isa:
            #We're at the begining of a file or interchange so we need
            #to handle it specially.  We read in the first 105 bytes,
            #ignoring new lines.  After that we read in the segment
            #terminator.
            while len(seg) != 106:
                i = self.fp.read(1)
                if i == '\0': continue
                if i == '':
                    if len(seg) == 0:
                        # We have reached the end of the file normally.
                        raise StopIteration
                    else:
                        # We have reached the end of the file, this is an error
                        # since we are in the middle of an ISA loop.
                        raise BadFile('Unexpected EOF found')
                if len(seg) < 105:
                    # While we're still gathering the 'main' portion of the
                    # ISA, we ignore NULLs and newlines.
                    if i != '\n':
                        # We're still in the 'middle' of the ISA, we won't
                        # accept NULLs or line feeds.
                        try:
                            seg.append(i)
                        except TypeError:
                            # This should never occur in a valid file.
                            print 'Type error on appending "%s"' % i
                else:
                    # We're at the end of the ISA, we'll accept *any*
                    # character except the NULL as the segment terminator for
                    # now.  We'll check for validity next.
                    if i == '\n':
                        # Since we're breaking some lines at position
                        # 80 on a given line, we need to also check the
                        # first character after the line break to make
                        # sure that the newline is supposed to be the
                        # terminator.  If it is, we just backup to
                        # reset the file pointer and move on.
                        pos = self.fp.tell()
                        next_char = self.fp.read(1)
                        if next_char != 'G':
                            i = next_char
                        else:
                            self.fp.seek(pos)
                    try:
                        seg.append(i)
                    except TypeError:
                        print 'Type error on appending "%s"' % i

            self.version = seg[84:89].tostring()
            self.delimiters = seg[105] + seg[3] + seg[104]
            if self.version >= '00405':
                self.delimiters = seg[105] + seg[3] + seg[104] + seg[83]

            # Verify that the delimiters are valid.
            for delim in self.delimiters:
                if delim in alphanums:
                    raise BadFile('"%s" is not a valid delimiter' % delim)

            # Set the flag to process everything else as normal segments.
            self.in_isa = True

            # Pop off the segment terminator.
            seg.pop()
            return seg.tostring()
        else:
            #We're somewhere in the body of the X12 message.  We just
            #read until we find the segment terminator and return the
            #segment.  (We still ignore line feeds unless the line feed
            #is the segment terminator.
            if self.delimiters[0] == '\n':
                return self.fp.readline()[:-1]
            else:
                fp_read = self.fp.read
                while 1:
                    i = fp_read(1)
                    if i == '\0': continue
                    if i == self.delimiters[0]:
                        # End of segment found, exit the loop and return the
                        # segment.
                        segment = seg.tostring()
                        if segment.startswith('IEA'):
                            self.in_isa = False
                        return segment
                    elif i != '\n':
                        try:
                            seg.append(i)
                        except TypeError:
                            raise BadFile('Corrupt characters found in data or unexpected EOF')

if __name__ == '__main__':
    # Sample usage
    message = Parser('edifile.txt')
    for segment in message:
        elements = segment.split(message.delimiters[1])
        # Dispatch based on elements[0]...

This parser is currently in use to work with 150-200MB of EDI transactions a day and has proven very reliable. Some considerations I had that not everyone may be worried about:

-Some, but not all, of my data comes from our mainframe and is stored 80 bytes to a line. The line with the final IEA of each message is \0 padded out to 80 bytes. I had to account for extraneous \0's and possible linefeeds. However, since the linefeed is a valid segment terminator, I couldn't just ignore them.

-We bundle multiple interchanges within the same physical file. Not all of our trading partners use the same delimiter set so i had to be able to adjust at any point to a new delimiter set.

-At the very high level, I assume that the interchanges aren't complete garbage. (i.e.: All interchanges are wrapped in a valid ISA - IEA envelope.) No sort of syntax or structural checking is done.

I've used this class for all sorts of reports and tools and have found that the dispatcher idiom in the example at the bottom seems to work the best. YMMV.

Cheers!

2 comments

Francis Gulotta 11 years, 8 months ago  # | flag

This works wonderfully, you just saved me a few hours of work! Thanks!

Francis Gulotta 11 years, 8 months ago  # | flag

I did find a bug when parsing edi documents that use newlines as a segment delimiter. You never actually mark if you're done with the ISA segment and get stuck in a loop returning empty lines. While readline might be faster you miss a bunch of important logic. I solved this by taking out lines 124-126.

Additionally if on windows with newlines delimiters file.tell() will return invalid values. Changing the open mode on line 41 to 'rb' solves a problem where your peak ahead to check if the newline is a delimiter fails.