Skip to content

Instantly share code, notes, and snippets.

@dionysio
Created November 5, 2019 10:32
Show Gist options
  • Save dionysio/85611d2f01a9da8c117fd794435b3f42 to your computer and use it in GitHub Desktop.
Save dionysio/85611d2f01a9da8c117fd794435b3f42 to your computer and use it in GitHub Desktop.
from datetime import datetime
from ftplib import FTP
from threading import Thread
from utils import parse_date, value_from_nested_dict
REFERENCE_CODES = {
'CN': 'pro_number',
'LA': 'tracking_number',
'PO': 'purchase_order_number',
'BM': 'bill_of_lading_number',
'MB': 'master_bill_of_lading_number',
'QN': 'stop_sequence_number',
'VN': 'vn'}
SHIPMENT_STATUS_CODES = {
'AA': 'pickup_appointment_date',
'X3': 'arrived_at_pickup_location',
'AF': 'completed_loading_at_pickup_location',
'CP': 'completed_loading_at_pickup_location',
'X6': 'en_route_to_delivery_location',
'X1': 'arrived_at_delivery_location',
'D1': 'completed_unloading_at_delivery_location'}
TIME_CODES = {
'AT': 'alaska_time',
'CT': 'central_time',
'ET': 'eastern_time',
'HT': 'hawaii_aleutian_time',
'LT': 'local_time',
'MT': 'mountain_time',
'PT': 'pacific_time'}
EQUIPMENT_DESCRIPTION_CODES = {
'CN': 'container',
'CV': 'closed_van',
'FT': 'flat_bed_trailer',
'RF': 'flat_car',
'RR': 'rail_car'}
WEIGHT_UNITS = {
"K": "KG",
"L": "LB"
}
RESOLVER = {
# Interchange Control Header
'ISA': [
'authorization_information_qualifier',
'authorization_information',
'security_information_qualifier',
'security_information',
'sender_id_qualifier',
'sender_id',
'receiver_id_qualifier',
'receiver_id',
'interchange_date',
'interchange_time',
'standards_identifier',
'version_number',
'control_number',
'acknowledgment_requested',
'usage_indicator', # shipment - test = false if P
'component_element_separator'],
# Functional Group Header
'GS': [
'functional_identifier_code', 'application_sender_code',
'application_receiver_code', 'date', 'time', 'group_control_number',
'responsible_agency_code', 'version'],
# Transaction Set Header
'ST': ['identifier_code', 'control_number'],
# Beginning Segment for Transportation Carrier Shipment Status Message
'B10': ['identification', # shipment - tracking_number
'identification_number',
'carrier_scac'], # shipment - carrier .lower()
# Business Instructions and Reference Number
'L11': ['value', 'code'], # shipment - references
# Remarks
'K1': ['message'],
# Name - Ship From
'N1': ['entity_code', 'name', 'qualifier', 'code'], # shipment sender /
# Address Information
'N3': ['street1', 'street2'], # shipment sender / recipient
# Geographic Location
'N4': ['city', 'state', 'postal_code', 'country'], # shipment sender /
# recipient
# Shipment Status Details
'AT7': [ # shipment - checkpoints
'status_code', # shipment - status & checkpoint status
'appointment_reason_code',
'appointment_status_code',
'reason_code',
'date', # checkpoint - time
'time', # checkpoint - time
'time_code'], # checkpoint - time
# Shipment Weight, Packaging and Quantity Data
'AT8': ['weight_qualifier', 'weight_unit', 'weight', # shipment - packages
'lading_quantity_1', 'lading_quantity_2'],
# Equipment, Shipment, or Real Property Location
'MS1': ['city', 'state', 'country'], # checkpoint - location
# Equipment or Container Owner and Type
'MS2': ['standard_carrier_alpha_code', 'equipment_number',
'equipment_description_code', ],
# Purchase Order Reference
'PRF': ['po_number', 'identification'],
# Transaction Set Trailer
'SE': ['included_segments', 'control_number'],
# Functional Group Trailer
'GE': ['sets_included', 'group_control_number'],
# Interchange Control Trailer
'IEA': ['included_functional_groups', 'interchange_control_number']}
class EDI:
def __init__(self, file_data):
self.obj = {
'references': [],
'remarks': [],
'packages': []
}
self.file_data = None
self.file_data = file_data
self.parse()
def parse(self):
lines = [l for l in self.file_data.split('~') if len(l) > 0]
next_n = False
for line in lines:
line_data = line.split('*')
# parse one time codes
if line_data[0] in ['ISA', 'GS', 'ST', 'B10', 'AT7', 'MS1',
'MS2', 'PRF', 'SE', 'GE', 'IEA']:
self.obj[line_data[0]] = dict(zip(
RESOLVER[line_data[0]], line_data[1:]))
for k, v in self.obj[line_data[0]].iteritems():
if 'date' in k:
try:
self.obj[line_data[0]][
k] = datetime.strptime(
v, '%Y%m%d').strftime('%Y-%m-%d')
except ValueError:
self.obj[line_data[0]][
k] = datetime.strptime(
v, '%y%m%d').strftime('%Y-%m-%d')
if 'time' in k and 'code' not in k:
self.obj[line_data[0]][
k] = v[:2] + ':' + v[2:]
elif line_data[0] == 'L11':
self.obj['references'].append(
dict(zip(RESOLVER[line_data[0]], line_data[1:])))
elif line_data[0] == 'K1':
self.obj['remarks'].append(line_data[1])
elif line_data[0] == 'AT8':
self.obj['packages'].append(
dict(zip(RESOLVER[line_data[0]], line_data[1:])))
elif line_data[0] == 'N1':
next_n = line_data[1]
self.obj[next_n] = dict(
zip(RESOLVER[line_data[0]], line_data[1:]))
elif line_data[0] == 'N3':
self.obj[next_n].update(dict(
zip(RESOLVER[line_data[0]], line_data[1:])))
elif line_data[0] == 'N4':
self.obj[next_n].update(dict(
zip(RESOLVER[line_data[0]], line_data[1:])))
def get_edi(self):
return self.obj
def retrieve_file(ftp_login, ftp_password, file_name, files_data_list):
ftp = FTP()
ftp.connect('ftp.echo.com')
ftp.login(ftp_login, ftp_password)
ftp.retrbinary(
'RETR /Outbound/214/' + file_name, files_data_list.append)
ftp.close()
def write_dict_from_dict(from_dict, to_dict, resolver):
for to_key, from_key in resolver.items():
if isinstance(from_key, list):
from_value = value_from_nested_dict(from_dict, from_key)
else:
from_value = from_dict.get(from_key)
if from_value or from_value is False:
to_dict[to_key] = from_value
def multiply_edi(edis_list):
shipments = []
for shipment_id, shipment_checkpoints in edis_list.items():
shipment_result = {}
checkpoints = []
for checkpoint_id, checkpoint_source in shipment_checkpoints.items():
if not shipment_result.get("checkpoints"):
resolver = {"packages": "packages",
"sender": "SF",
"recipient": "CN",
"tracking_number": ["B10", "identification"],
"carrier": ["B10", "carrier_scac"],
"test": ["ISA", "usage_indicator"],
"status": ["AT7", "status_code"]
}
write_dict_from_dict(checkpoint_source, shipment_result,
resolver)
if "carrier" in shipment_result:
shipment_result["carrier"] = shipment_result["carrier"]. \
lower()
if "test" in shipment_result:
shipment_result["test"] = \
False if shipment_result["test"] == "P" else True
if "status" in shipment_result:
shipment_result["status"] = SHIPMENT_STATUS_CODES.get(
shipment_result["status"], shipment_result["status"])
for package in shipment_result.get("packages", []):
weight_unit = WEIGHT_UNITS[package.pop("weight_unit")]
shipment_result["weight_unit"] = weight_unit
resolver = {
"status": ["AT7", "status_code"],
"date": ["AT7", "date"],
"timestamp": ["AT7", "time"],
"city": ["MS1", "city"],
"state": ["MS1", "state"]
}
checkpoint_result = {}
write_dict_from_dict(checkpoint_source, checkpoint_result, resolver)
if "status" in checkpoint_result:
checkpoint_result["status"] = SHIPMENT_STATUS_CODES[checkpoint_result["status"]]
if "timestamp" and "date" in checkpoint_result:
checkpoint_result["timestamp"] = "{}T{}:00Z".format(checkpoint_result.pop("date"), checkpoint_result.pop("timestamp"))
# every shipment in dataset of 350 shipments has no country,
# but everyone has US sender and recipient
if "country" not in checkpoint_result:
checkpoint_result['country'] = "US"
checkpoint_result["location"] = {}
for key in ["country", "city", "state"]:
if key in checkpoint_result:
checkpoint_result["location"][key] = checkpoint_result.pop(key)
checkpoints.append(checkpoint_result)
# remove checkpoints duplicates
checkpoints_dict = {checkpoint["timestamp"]: checkpoint for checkpoint
in checkpoints}
shipment_result["checkpoints"] = list(checkpoints_dict.itervalues())
shipments.append(shipment_result)
return {"shipments": shipments}
def inbound(is_test, req, auth):
# connect to ftp
ftp = FTP()
ftp.connect('ftp.echo.com')
ftp.login(auth['ftp_login'], auth['ftp_password'])
# retrieve all files data
files_list = list()
ftp.retrlines('LIST /Outbound/214', files_list.append)
ftp.close()
# filer files
start_date = parse_date(req['start_date'])
end_date = parse_date(req['end_date'])
files_to_parse = list()
for f in files_list:
f_data = f.split()
f_date = datetime.strptime(f_data[0], '%m-%d-%y').date()
if start_date <= f_date <= end_date:
files_to_parse.append(f_data[3])
# retrieve files data
files_data_list = list()
_threads = []
for file_name in files_to_parse:
_thread = Thread(
target=retrieve_file,
args=[auth['ftp_login'], auth['ftp_password'],
file_name, files_data_list])
_thread.start()
_threads.append(_thread)
for _t in _threads:
_t.join()
# parse shipment data
shipments = {}
print(files_data_list)
for ship_data in files_data_list:
edi = EDI(ship_data)
edi_data = edi.get_edi()
tn = edi_data['B10']['identification']
vn = edi_data['ST']['control_number']
if tn in shipments:
shipments[tn][vn] = edi_data
else:
shipments[tn] = {vn: edi_data}
return {'response': multiply_edi(shipments), 'carrier_request': '', 'carrier_response': ''}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment