""" fetch_order_data函数只是获取源数据,是一个sql语句,可以更改 cal_min_fee 函数是分别以一票一件和一票多件计算出最小的费用和渠道 analyze_orders 订单层面的业务逻辑判断,防止出现混合渠道投递,卡派订单包含多个不同快递追踪单号,多渠道订单总重量小于1000KG(因为1000KG以内一个卡派可以搞定,不应该出现多渠道) analyze_logistics 真正的物流投递层面去分析,先判断投递渠道和最优渠道是否一致,再判断偶发估算费用和最优渠道费用是否一致 """ import pandas as pd from utils.gtools import MySQLconnect from utils.logisticsBill import BillFactory, Billing from utils.countryOperator import OperateCountry from utils.Package import Package, Package_group from utils.logistics_name_config import logistics_name # 获取数据 def fetch_order_data(): """从数据库获取原始订单数据""" with MySQLconnect('ods') as db: sql = """ SELECT ol.order_date, ol.fund_status, oe.`包裹状态`, oe.包裹号 AS package, oe.单号 AS order_id, oe.运输方式, oe.`目的国`, ol.postcode AS postcode, oe.`快递分区`, oe.快递跟踪号, ecm.类型 AS 渠道类型, -- 包裹类型 pvi.length AS 长, pvi.width AS 宽, pvi.hight AS 高, pvi.weight AS 重量, pfi.express_fee AS 基础估算, pfi.express_additional_fee AS 偶发估算, pfi.express_fee + pfi.express_additional_fee AS 总估算, oe.快递公司 FROM ods.order_express oe LEFT JOIN ods.express_company ecm ON oe.快递公司 = ecm.快递公司 LEFT JOIN ods.package_vol_info pvi ON oe.包裹号 = pvi.package LEFT JOIN ods.package_fee_info pfi ON oe.包裹号 = pfi.package LEFT JOIN ods.order_list ol ON oe.单号 = ol.order_id WHERE oe.包裹状态 REGEXP '已经投递|发货仓出库' AND oe.`快递公司` NOT REGEXP "--" AND `卡板发货时间` REGEXP "--" AND ol.fund_status NOT REGEXP '等待|全额退款' AND ol.site_name REGEXP 'litfad|kwoking|lakiq' AND oe.投递时间 >= DATE_SUB(NOW(), INTERVAL 3 DAY) AND pvi.length>0 AND pvi.width >0 AND pvi.hight>0 AND pvi.weight>0 and oe.目的国 regexp 'United States' """ return pd.read_sql(sql, db.engine()) def cal_min_fee(raw_data: pd.DataFrame): """ 处理物流费用数据并实现业务逻辑判断 1.用 """ df = raw_data.copy() # 包裹层面审核 for order_id, group in df.groupby('order_id'): package_group = Package_group() opCountry = OperateCountry(group['目的国'].iloc[0]) express_fee = 0 for index, row in group.iterrows(): # 计算一票一件 packages=Package_group() package = Package(row['package'], row['长'], row['宽'], row['高'], row['重量']) packages.add_package(package) bill_express = Billing("1",opCountry,packages,row['postcode'],company_name=None,head_type=1,beizhu="") if bill_express.tail_amount[0] == 0 or bill_express.tail_amount[0] >=9999: df.loc[index,"快递尾端费用"] = "不可派" express_fee = 999999 else: df.loc[index,"快递尾端费用"] = bill_express.tail_amount[0] df.loc[index,"快递尾端渠道"] = bill_express.company_name express_fee += bill_express.tail_amount[0] # 计算一票多件 package_group.add_package(package) # 计算一票多件 if len(package_group) > 1: bill_ltl = Billing("1",opCountry,package_group,row['postcode'],company_name=None,head_type=1,beizhu="") df.loc[df['order_id']==order_id,'卡派尾端费用'] = bill_ltl.tail_amount[0]/len(package_group) df.loc[df['order_id']==order_id,'卡派尾端渠道'] = bill_ltl.company_name min_fee = min(bill_ltl.tail_amount[0],express_fee) else: min_fee = express_fee if min_fee == express_fee: df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee df.loc[df['order_id']==order_id,'最优渠道类型'] = "快递" else: df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee df.loc[df['order_id']==order_id,'最优渠道类型'] = "卡派" df.loc[df['order_id']==order_id,'尾端货币'] = bill_ltl.tail_amount[1] return df # 订单层面审核,防止出现混合渠道投递,卡派订单包含多个不同快递单号,多渠道订单总重量小于1000KG def analyze_orders(raw_data: pd.DataFrame): """ 处理订单数据并实现业务逻辑判断 返回聚合后的订单数据和分析结果,包裹信息按指定字典格式输出 """ data = raw_data.copy() # 1. 预处理 - 处理空值 data.fillna({ '渠道类型': '未知类型', '基础估算': 0, '偶发估算': 0, '总估算': 0, '重量': 0, '长': 0, '宽': 0, '高': 0, 'postcode': '未知' }, inplace=True) # 2. 按订单聚合数据 def create_package_details(group): """创建包裹详情字典,严格按照要求的格式""" details = {} for i, (_, row) in enumerate(group.iterrows(), 1): details[f"包裹{i}"] = { "宽": f"{float(row['宽']):.2f}", "长": f"{float(row['长']):.2f}", "高": f"{float(row['高']):.2f}", "重量": f"{float(row['重量']):.2f}" } return details grouped = data.groupby('order_id') aggregated = pd.DataFrame({ '包裹数量': grouped.size(), '总重量': grouped['重量'].sum(), '总基础估算': grouped['基础估算'].sum(), '总附加估算': grouped['偶发估算'].sum(), '总物流估算': grouped['总估算'].sum(), '包裹数据': grouped.apply(create_package_details), # 使用新函数 '快递公司列表': grouped['快递公司'].unique(), '渠道类型列表': grouped['渠道类型'].unique(), '邮编列表': grouped['postcode'].first(), '快递跟踪号': grouped['快递跟踪号'].unique() }).reset_index() # 3. 实现业务逻辑判断(保持不变) def determine_order_type(row): if len(row['渠道类型列表']) > 1: return '混合' elif len(row['渠道类型列表']) == 1: return row['渠道类型列表'][0] else: return '未知类型' def determine_channel_type(row): if len(row['快递公司列表']) > 1: return '多渠道' else: return '单渠道' aggregated['订单类型'] = aggregated.apply(determine_order_type, axis=1) aggregated['渠道种类'] = aggregated.apply(determine_channel_type, axis=1) # 4. 实现业务规则检查(保持不变) def apply_business_rules(row): actions = [] status = '正常' comments = [] if row['订单类型'] == '卡派' and len(row['快递跟踪号']) > 1: # tracking_nos = [list(p.values())[0] for p in row['包裹数据'].values()] # if len(set(tracking_nos)) > 1: # status = '异常' status = '异常' comments.append('卡派订单包含多个不同快递单号') elif row['订单类型'] == '混合': status = '异常' comments.append('出现混合渠道类型订单,需要核查') if row['渠道种类'] == '多渠道': if row['总重量'] < 1000: comments.append(f'多渠道订单总重量{row["总重量"]:.2f}KG < 1000KG') return pd.Series({ '状态': status, '建议操作': '; '.join(actions) if actions else '下一步', '备注': ' | '.join(comments) if comments else '' }) rule_results = aggregated.apply(apply_business_rules, axis=1) aggregated = pd.concat([aggregated, rule_results], axis=1) # 5. 整理最终输出列 final_columns = [ 'order_id', '订单类型', '渠道种类', '包裹数量', '总重量', '总基础估算', '总附加估算', '总物流估算', '快递公司列表', '邮编列表', '包裹数据' ,'状态', '备注','快递跟踪号'# 使用新列名 ] return aggregated[final_columns] # 物流费用层面审核 def analyze_logistics(df: pd.DataFrame): """ 1.判断实际投递物流渠道和cal_min_fee计算的最优物流渠道是否一致 2.物流渠道一致的情况下,判断费用是否一样 """ # 1. 计算最优渠道和费用 df= cal_min_fee(df) # 判断渠道是否一致 df['最优渠道'] = df.apply(lambda row: row['快递尾端渠道'] if row['最优渠道类型'] == "快递" else row['卡派尾端渠道'], axis=1) df['渠道一致'] = df.apply(lambda row: row['最优渠道'] == logistics_name.get(row['快递公司']), axis=1) # 2. 计算费用是否一致 def all_estimate(row): if row['总估算'] is None or row['总估算'] ==0: return "暂无系统估算值" if row['最优总物流费用'] is None or row['最优总物流费用'] ==0: return "暂无最优费用" if row['尾端货币'] == "USD": all_estimate= row['总估算']/7 elif row['尾端货币'] == "GBP": all_estimate = row['总估算']/9 elif row['尾端货币'] == "EUR": all_estimate = row['总估算']/8 elif row['尾端货币'] == "AUD": all_estimate = row['总估算']/5 elif row['尾端货币'] == "CAD": all_estimate = row['总估算']/5 elif row['尾端货币'] == "JPY": all_estimate = row['总估算']/0.05 return all_estimate df['费用一致'] = df.apply(lambda row: False if isinstance(all_estimate(row), str) else abs(all_estimate(row) - row['最优总物流费用']) < 1,axis=1) df['费用差(当地货币)'] = df.apply(lambda row: "费用有误" if isinstance(all_estimate(row), str) else row['最优总物流费用'] - all_estimate(row),axis=1) return df def main(): # 获取数据 raw_data = fetch_order_data() print('已获取数据') # 订单层面审核 order_result = analyze_orders(raw_data) print('已完成订单层面审核') order_result.to_excel(r'D:\test\logistics\拦截数据\order_analysis.xlsx', index=False) # 计算最优渠道和费用 raw_data = analyze_logistics(raw_data) print('已完成物流费用层面审核') raw_data.to_excel(r'D:\test\logistics\拦截数据\logistics_analysis.xlsx', index=False) if __name__ == '__main__': main()