diff --git a/logisticsClass/logisticsTail_US.py b/logisticsClass/logisticsTail_US.py index 382860a..88d5726 100644 --- a/logisticsClass/logisticsTail_US.py +++ b/logisticsClass/logisticsTail_US.py @@ -27,8 +27,7 @@ class WestLogistics_US(TailLogistics): cls._load_postcodes() # 第一次实例化时加载文件 cls._is_loaded = True # 标记文件已加载 return super().__new__(cls) - # Path_current_directory = Path(__file__).parent - # current_directory = os.path.dirname(__file__) + @classmethod def _load_postcodes(cls): """加载邮编文件""" diff --git a/utils/logistics_name_config.py b/utils/logistics_name_config.py new file mode 100644 index 0000000..d90723d --- /dev/null +++ b/utils/logistics_name_config.py @@ -0,0 +1,18 @@ + +logistics_name = { + "空LAX-FEDEX-SAIR-H": "Fedex-HOME", + "空LAX-FEDEX-SAIR-G": "Fedex-GROUD", + "海MS-FEDEX-SAIR-H": "Fedex-HOME", + "海MS-FEDEX-SAIR-G": "Fedex-GROUD", + "海MS-METRO-SAIR": "Metro-SAIR", + "海NY-METRO-SAIR": "Metro-SAIR", # 暂时没写 + "海NY-XMILES-SAIR": "XMILES-SAIR", + "海MS-XMILES": "XMILES-SAIR", + "海NY-AMT-SAIR": "AM-美东", + "海MS-AMT-SAIR": "AM-美西", + + "海MS-FEDEX02":"Fedex-彩虹小马", + "空LAX-FEDEX02":"Fedex-彩虹小马" +} + +#海NY-WWEX-SAIR,海MS-WWEX-SAIR没报价表,也停用了 \ No newline at end of file diff --git a/物流t投递审核.py b/物流t投递审核.py new file mode 100644 index 0000000..c59f225 --- /dev/null +++ b/物流t投递审核.py @@ -0,0 +1,257 @@ +""" +fetch_order_data函数只是获取源数据,是一个sql语句,可以更改 +cal_min_fee 函数是分别以一票一件和一票多件计算出最小的费用和渠道 +analyze_orders 订单层面的业务逻辑判断,防止出现混合渠道投递,卡派订单包含多个不同快递追踪单号,多渠道订单总重量小于1000KG(因为1000KG以内一个卡派可以搞定,不应该出现多渠道) +analyze_logistics 真正的物流投递层面去分析,先判断投递渠道和最优渠道是否一致,再判断偶发估算费用和最优渠道费用是否一致 +""" +import pandas as pd +from utils.gtools import MySQLconnect +from utils.logisticsBill import BillFactory, Billing +from utils.countryOperator import OperateCountry +from utils.Package import Package, Package_group +from utils.logistics_name_config import logistics_name +# 获取数据 +def fetch_order_data(): + """从数据库获取原始订单数据""" + with MySQLconnect('ods') as db: + sql = """ + SELECT + ol.order_date, + ol.fund_status, + oe.`包裹状态`, + oe.包裹号 AS package, + oe.单号 AS order_id, + oe.运输方式, + oe.`目的国`, + oe.快递公司, + oe.`快递分区`, + oe.快递跟踪号, + ecm.类型 AS 渠道类型, -- 包裹类型 + pvi.length AS 长, + pvi.width AS 宽, + pvi.hight AS 高, + pvi.weight AS 重量, + pfi.express_fee AS 基础估算, + pfi.express_additional_fee AS 偶发估算, + pfi.express_fee + pfi.express_additional_fee AS 总估算, + ol.postcode AS postcode + FROM + ods.order_express oe + LEFT JOIN ods.express_company ecm ON oe.快递公司 = ecm.快递公司 + LEFT JOIN ods.package_vol_info pvi ON oe.包裹号 = pvi.package + LEFT JOIN ods.package_fee_info pfi ON oe.包裹号 = pfi.package + LEFT JOIN ods.order_list ol ON oe.单号 = ol.order_id + WHERE + oe.包裹状态 REGEXP '已经投递|发货仓出库' + AND oe.`快递公司` NOT REGEXP "--" + AND `卡板发货时间` REGEXP "--" + AND ol.fund_status NOT REGEXP '等待|全额退款' + AND ol.site_name REGEXP 'litfad|kwoking|lakiq' + AND oe.投递时间 >= DATE_SUB(NOW(), INTERVAL 20 DAY) + AND pvi.length>0 AND pvi.width >0 AND pvi.hight>0 AND pvi.weight>0 + and oe.目的国 regexp 'United States' + """ + + return pd.read_sql(sql, db.engine()) + + +def cal_min_fee(raw_data: pd.DataFrame): + """ + 处理物流费用数据并实现业务逻辑判断 + """ + df = raw_data.copy() + # 包裹层面审核 + for order_id, group in df.groupby('order_id'): + package_group = Package_group() + opCountry = OperateCountry(group['目的国'].iloc[0]) + express_fee = 0 + for index, row in group.iterrows(): + # 计算一票一件 + packages=Package_group() + package = Package(row['package'], row['长'], row['宽'], row['高'], row['重量']) + packages.add_package(package) + bill_express = Billing("1",opCountry,packages,row['postcode'],company_name=None,head_type=1,beizhu="") + if bill_express.tail_amount[0] == 0 or bill_express.tail_amount[0] >=9999: + df.loc[index,"快递尾端费用"] = "不可派" + express_fee = 999999 + else: + df.loc[index,"快递尾端费用"] = bill_express.tail_amount[0] + df.loc[index,"快递尾端渠道"] = bill_express.company_name + express_fee += bill_express.tail_amount[0] + # 计算一票多件 + package_group.add_package(package) + # 计算一票多件 + bill_ltl = Billing("1",opCountry,package_group,row['postcode'],company_name=None,head_type=1,beizhu="") + df.loc[df['order_id']==order_id,'卡派尾端费用'] = bill_ltl.tail_amount[0]/len(package_group) + df.loc[df['order_id']==order_id,'卡派尾端渠道'] = bill_ltl.company_name + min_fee = min(bill_ltl.tail_amount[0],express_fee) + if min_fee == express_fee: + df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee + df.loc[df['order_id']==order_id,'最优渠道类型'] = "快递" + else: + df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee + df.loc[df['order_id']==order_id,'最优渠道类型'] = "卡派" + df.loc[df['order_id']==order_id,'尾端货币'] = bill_ltl.tail_amount[1] + return df + +# 订单层面审核,防止出现混合渠道投递,卡派订单包含多个不同快递单号,多渠道订单总重量小于1000KG +def analyze_orders(raw_data: pd.DataFrame): + """ + 处理订单数据并实现业务逻辑判断 + 返回聚合后的订单数据和分析结果,包裹信息按指定字典格式输出 + + """ + data = raw_data.copy() + + # 1. 预处理 - 处理空值 + data.fillna({ + '渠道类型': '未知类型', + '基础估算': 0, + '偶发估算': 0, + '总估算': 0, + '重量': 0, + '长': 0, + '宽': 0, + '高': 0, + 'postcode': '未知' + }, inplace=True) + + # 2. 按订单聚合数据 + def create_package_details(group): + """创建包裹详情字典,严格按照要求的格式""" + details = {} + for i, (_, row) in enumerate(group.iterrows(), 1): + details[f"包裹{i}"] = { + "宽": f"{float(row['宽']):.2f}", + "长": f"{float(row['长']):.2f}", + "高": f"{float(row['高']):.2f}", + "重量": f"{float(row['重量']):.2f}" + } + return details + + grouped = data.groupby('order_id') + + aggregated = pd.DataFrame({ + '包裹数量': grouped.size(), + '总重量': grouped['重量'].sum(), + '总基础估算': grouped['基础估算'].sum(), + '总附加估算': grouped['偶发估算'].sum(), + '总物流估算': grouped['总估算'].sum(), + '包裹数据': grouped.apply(create_package_details), # 使用新函数 + '快递公司列表': grouped['快递公司'].unique(), + '渠道类型列表': grouped['渠道类型'].unique(), + '邮编列表': grouped['postcode'].first(), + '快递跟踪号数量': grouped['快递跟踪号'].unique() + }).reset_index() + + # 3. 实现业务逻辑判断(保持不变) + def determine_order_type(row): + if len(row['渠道类型列表']) > 1: + return '混合' + elif len(row['渠道类型列表']) == 1: + return row['渠道类型列表'][0] + else: + return '未知类型' + + def determine_channel_type(row): + if len(row['快递公司列表']) > 1: + return '多渠道' + else: + return '单渠道' + + aggregated['订单类型'] = aggregated.apply(determine_order_type, axis=1) + aggregated['渠道种类'] = aggregated.apply(determine_channel_type, axis=1) + + # 4. 实现业务规则检查(保持不变) + def apply_business_rules(row): + actions = [] + status = '正常' + comments = [] + + if row['订单类型'] == '卡派': + tracking_nos = [list(p.values())[0] for p in row['包裹数据'].values()] + if len(set(tracking_nos)) > 1: + status = '异常' + if len(row['快递跟踪号数量']) > 1 : + comments.append('卡派订单包含多个不同快递单号') + elif row['订单类型'] == '混合': + status = '异常' + comments.append('出现混合类型订单,需要核查') + + if row['渠道种类'] == '多渠道': + if row['总重量'] < 1000: + comments.append(f'多渠道订单总重量{row["总重量"]:.2f}KG < 1000KG') + + return pd.Series({ + '状态': status, + '建议操作': '; '.join(actions) if actions else '下一步', + '备注': ' | '.join(comments) if comments else '' + }) + + rule_results = aggregated.apply(apply_business_rules, axis=1) + aggregated = pd.concat([aggregated, rule_results], axis=1) + + # 5. 整理最终输出列 + final_columns = [ + 'order_id', '订单类型', '渠道种类', + '包裹数量', '总重量', + '总基础估算', '总附加估算', '总物流估算', + '快递公司列表', '邮编列表', + '包裹数据' ,'状态', '备注','快递跟踪号数量'# 使用新列名 + ] + + return aggregated[final_columns] + +# 物流费用层面审核 +def analyze_logistics(df: pd.DataFrame): + """ + 1.判断实际投递物流渠道和cal_min_fee计算的最优物流渠道是否一致 + 2.物流渠道一致的情况下,判断费用是否一样 + """ + # 1. 计算最优渠道和费用 + df= cal_min_fee(df) + # 判断渠道是否一致 + df['最优渠道'] = df.apply(lambda row: row['快递尾端渠道'] if row['最优渠道类型'] == "快递" else row['卡派尾端渠道'], axis=1) + df['渠道一致'] = df.apply(lambda row: row['最优渠道'] == logistics_name.get(row['快递公司']), axis=1) + # 2. 计算费用是否一致 + def all_estimate(row): + if row['总估算'] is None or row['总估算'] ==0: + return "暂无系统估算值" + if row['最优总物流费用'] is None or row['最优总物流费用'] ==0: + return "暂无最优费用" + if row['尾端货币'] == "USD": + all_estimate= row['总估算']/7 + elif row['尾端货币'] == "GBP": + all_estimate = row['总估算']/9 + elif row['尾端货币'] == "EUR": + all_estimate = row['总估算']/8 + elif row['尾端货币'] == "AUD": + all_estimate = row['总估算']/5 + elif row['尾端货币'] == "CAD": + all_estimate = row['总估算']/5 + elif row['尾端货币'] == "JPY": + all_estimate = row['总估算']/0.05 + return all_estimate + + + df['费用一致'] = df.apply(lambda row: abs(all_estimate(row) - row['最优总物流费用'])<1, axis=1) + df['费用差(当地货币)'] = df.apply(lambda row: row['最优总物流费用']-all_estimate(row), axis=1) + return df + +def main(): + # 获取数据 + raw_data = fetch_order_data() + print('已获取数据') + # 订单层面审核 + order_result = analyze_orders(raw_data) + print('已完成订单层面审核') + order_result.to_excel(r'D:\test\logistics\拦截数据\order_analysis.xlsx', index=False) + # 计算最优渠道和费用 + raw_data = analyze_logistics(raw_data) + print('已完成物流费用层面审核') + raw_data.to_excel(r'D:\test\logistics\拦截数据\logistics_analysis.xlsx', index=False) + +if __name__ == '__main__': + + main() +