A parser I designed to work with HIPAA EDI files. It reads in files and spits out the individual segments without terminators.
Requires Python 2.3 or greater. (Use can probably use Python 2.2 with from __future__ import generators at the top...)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | import array
import string
import sys
try:
# If available use the psyco optimizing routines. This will speed
# up execution by 2x.
import psyco.classes
base_class = psyco.classes.psyobj
except ImportError:
base_class = object
alphanums = string.letters + string.digits
class BadFile(Exception):
"""Raised when file corruption is detected."""
class Parser(base_class):
"""Parse out segments from the X12 raw data files.
Raises the BadFile exception when data corruption is detected.
Attributes:
delimiters
A string where
[0] == segment separator
[1] == element separator
[2] == sub-element separator
[3] == repetition separator (if ISA version >= 00405
"""
def __init__(self, filename=None):
self.delimiters = ''
if filename:
self.open_file(filename)
def __iter__(self):
"""Return the iterator for use in a for loop"""
return self
def open_file(self, filename):
self.fp = open(filename, 'r')
self.in_isa = False
def next(self):
"""return the next segment from the file or raise StopIteration
Here we'll return the next segment, this will be a 'bare' segment
without the segment terminator.
We're using the array module. Written in C this should be very
efficient at adding and converting to a string.
"""
seg = array.array('c')
if not self.in_isa:
#We're at the begining of a file or interchange so we need
#to handle it specially. We read in the first 105 bytes,
#ignoring new lines. After that we read in the segment
#terminator.
while len(seg) != 106:
i = self.fp.read(1)
if i == '\0': continue
if i == '':
if len(seg) == 0:
# We have reached the end of the file normally.
raise StopIteration
else:
# We have reached the end of the file, this is an error
# since we are in the middle of an ISA loop.
raise BadFile('Unexpected EOF found')
if len(seg) < 105:
# While we're still gathering the 'main' portion of the
# ISA, we ignore NULLs and newlines.
if i != '\n':
# We're still in the 'middle' of the ISA, we won't
# accept NULLs or line feeds.
try:
seg.append(i)
except TypeError:
# This should never occur in a valid file.
print 'Type error on appending "%s"' % i
else:
# We're at the end of the ISA, we'll accept *any*
# character except the NULL as the segment terminator for
# now. We'll check for validity next.
if i == '\n':
# Since we're breaking some lines at position
# 80 on a given line, we need to also check the
# first character after the line break to make
# sure that the newline is supposed to be the
# terminator. If it is, we just backup to
# reset the file pointer and move on.
pos = self.fp.tell()
next_char = self.fp.read(1)
if next_char != 'G':
i = next_char
else:
self.fp.seek(pos)
try:
seg.append(i)
except TypeError:
print 'Type error on appending "%s"' % i
self.version = seg[84:89].tostring()
self.delimiters = seg[105] + seg[3] + seg[104]
if self.version >= '00405':
self.delimiters = seg[105] + seg[3] + seg[104] + seg[83]
# Verify that the delimiters are valid.
for delim in self.delimiters:
if delim in alphanums:
raise BadFile('"%s" is not a valid delimiter' % delim)
# Set the flag to process everything else as normal segments.
self.in_isa = True
# Pop off the segment terminator.
seg.pop()
return seg.tostring()
else:
#We're somewhere in the body of the X12 message. We just
#read until we find the segment terminator and return the
#segment. (We still ignore line feeds unless the line feed
#is the segment terminator.
if self.delimiters[0] == '\n':
return self.fp.readline()[:-1]
else:
fp_read = self.fp.read
while 1:
i = fp_read(1)
if i == '\0': continue
if i == self.delimiters[0]:
# End of segment found, exit the loop and return the
# segment.
segment = seg.tostring()
if segment.startswith('IEA'):
self.in_isa = False
return segment
elif i != '\n':
try:
seg.append(i)
except TypeError:
raise BadFile('Corrupt characters found in data or unexpected EOF')
if __name__ == '__main__':
# Sample usage
message = Parser('edifile.txt')
for segment in message:
elements = segment.split(message.delimiters[1])
# Dispatch based on elements[0]...
|
This parser is currently in use to work with 150-200MB of EDI transactions a day and has proven very reliable. Some considerations I had that not everyone may be worried about:
-Some, but not all, of my data comes from our mainframe and is stored 80 bytes to a line. The line with the final IEA of each message is \0 padded out to 80 bytes. I had to account for extraneous \0's and possible linefeeds. However, since the linefeed is a valid segment terminator, I couldn't just ignore them.
-We bundle multiple interchanges within the same physical file. Not all of our trading partners use the same delimiter set so i had to be able to adjust at any point to a new delimiter set.
-At the very high level, I assume that the interchanges aren't complete garbage. (i.e.: All interchanges are wrapped in a valid ISA - IEA envelope.) No sort of syntax or structural checking is done.
I've used this class for all sorts of reports and tools and have found that the dispatcher idiom in the example at the bottom seems to work the best. YMMV.
Cheers!
This works wonderfully, you just saved me a few hours of work! Thanks!
I did find a bug when parsing edi documents that use newlines as a segment delimiter. You never actually mark if you're done with the ISA segment and get stuck in a loop returning empty lines. While readline might be faster you miss a bunch of important logic. I solved this by taking out lines 124-126.
Additionally if on windows with newlines delimiters file.tell() will return invalid values. Changing the open mode on line 41 to 'rb' solves a problem where your peak ahead to check if the newline is a delimiter fails.