logistics/物流t投递审核.py

384 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
fetch_order_data函数只是获取源数据,是一个sql语句可以更改
cal_min_fee 函数是分别以一票一件和一票多件计算出最小的费用和渠道
analyze_orders 订单层面的业务逻辑判断防止出现混合渠道投递卡派订单包含多个不同快递追踪单号多渠道订单总重量小于1000KG(因为1000KG以内一个卡派可以搞定不应该出现多渠道)
analyze_logistics 真正的物流投递层面去分析,先判断投递渠道和最优渠道是否一致,再判断偶发估算费用和最优渠道费用是否一致
"""
import pandas as pd
from utils.gtools import MySQLconnect
from utils.logisticsBill import BillFactory, Billing
from utils.countryOperator import OperateCountry
from utils.Package import Package, Package_group
from utils.logistics_name_config import logistics_name
from datetime import date
# 货币转换其他转RMB
def convert_currency(amount, current_currency):
"""
货币转换
"""
if amount is None or amount ==0:
return "金额为空"
if amount >=9999:
return "无可用渠道"
if current_currency == "USD":
amount=amount*7
elif current_currency == "GBP":
amount =amount*9
elif current_currency == "EUR":
amount = amount*8
elif current_currency == "AUD":
amount = amount*5
elif current_currency == "CAD":
amount = amount*5
elif current_currency == "JPY":
amount =amount*0.05
return amount
# 获取数据
def fetch_order_data():
"""从数据库获取原始订单数据"""
with MySQLconnect('ods') as db:
sql = """
SELECT
DATE_FORMAT(ol.order_date, '%%Y-%%m-%%d') AS order_date,
DATE_FORMAT(oe.投递时间, '%%Y-%%m-%%d') AS 投递时间,
ol.fund_status,
oe.`包裹状态`,
oe.包裹号 AS package,
oe.单号 AS order_id,
oe.运输方式,
oe.`目的国`,
ol.postcode AS postcode,
oe.`快递分区`,
oe.快递跟踪号,
ecm.类型 AS 渠道类型, -- 包裹类型
pvi.length AS 长,
pvi.width AS 宽,
pvi.hight AS 高,
pvi.weight AS 重量,
pfi.express_fee AS 基础估算,
pfi.express_additional_fee AS 偶发估算,
pfi.express_fee + pfi.express_additional_fee AS 包裹总估算,
oe.快递公司 AS 投递渠道
FROM
ods.order_express oe
LEFT JOIN ods.express_company ecm ON oe.快递公司 = ecm.快递公司
LEFT JOIN ods.package_vol_info pvi ON oe.包裹号 = pvi.package
LEFT JOIN ods.package_fee_info pfi ON oe.包裹号 = pfi.package
LEFT JOIN ods.order_list ol ON oe.单号 = ol.order_id
WHERE
oe.包裹状态 not REGEXP '已作废|--|客户签收'
# AND oe.`快递公司` NOT REGEXP "--"
AND `卡板发货时间` REGEXP "--"
AND ol.fund_status NOT REGEXP '等待|全额退款'
AND ol.site_name REGEXP 'litfad|kwoking|lakiq'
AND oe.投递时间 >= DATE_SUB(NOW(), INTERVAL 3 DAY)
AND pvi.length>0 AND pvi.width >0 AND pvi.hight>0 AND pvi.weight>0
and oe.目的国 regexp 'United States|Australia|United Kingdom|Germany|France|Spain|Italy|Netherlands|Belgium'
order by ol.order_id,ol.order_date
"""
return pd.read_sql(sql, db.engine())
def cal_min_fee(raw_data: pd.DataFrame):
"""
处理物流费用数据并实现业务逻辑判断
1.用
"""
df = raw_data.copy()
# 包裹层面审核
for order_id, group in df.groupby('order_id'):
package_group = Package_group()
opCountry = OperateCountry(group['目的国'].iloc[0])
express_fee = 0
express_type=''
for index, row in group.iterrows():
# 计算一票一件
packages=Package_group()
package = Package(row['package'], row[''], row[''], row[''], row['重量'])
packages.add_package(package)
bill_express = Billing("1",opCountry,packages,row['postcode'],company_name=None,head_type=1,beizhu="")
if bill_express.tail_amount[0] == 0 or bill_express.tail_amount[0] >=9999:
df.loc[index,"单票最小费用"] = ""
df.loc[index,"单票渠道"] = ""
express_fee = 999999
express_type = '不可派'
else:
df.loc[index,"单票最小费用"] = bill_express.tail_amount[0]
df.loc[index,"单票渠道"] = bill_express.company_name
express_fee += bill_express.tail_amount[0]
express_type = bill_express.logistic_type
if bill_express.logistic_type == '卡派':
express_type = '卡派单包裹'
# 计算一票多件
package_group.add_package(package)
# 计算一票多件
if len(package_group) > 1:
bill_ltl = Billing("1",opCountry,package_group,row['postcode'],company_name=None,head_type=1,beizhu="")
if bill_ltl.tail_amount[0] == 0 or bill_ltl.tail_amount[0] >=9999:
df.loc[df['order_id']==order_id,'多票最小费用'] = ""
df.loc[df['order_id']==order_id,'多票渠道'] = "不可派"
df.loc[df['order_id']==order_id,'多票最小费用'] = bill_ltl.tail_amount[0]/len(package_group)
df.loc[df['order_id']==order_id,'多票渠道'] = bill_ltl.company_name
min_fee = min(bill_ltl.tail_amount[0],express_fee)
df.loc[df['order_id']==order_id,'最优总费用'] = min_fee
df.loc[df['order_id']==order_id,'最优渠道类型'] = bill_ltl.logistic_type if min_fee == bill_ltl.tail_amount[0] else express_type
else:
min_fee = express_fee
df.loc[df['order_id']==order_id,'最优总费用'] = min_fee
df.loc[df['order_id']==order_id,'最优渠道类型'] = express_type
df.loc[df['order_id']==order_id,'尾端货币'] = bill_express.tail_amount[1]
return df
# 订单层面审核防止出现混合渠道投递卡派订单包含多个不同快递单号多渠道订单总重量小于1000KG
def analyze_orders(raw_data: pd.DataFrame):
"""
处理订单数据并实现业务逻辑判断
返回聚合后的订单数据和分析结果,包裹信息按指定字典格式输出
"""
data = raw_data.copy()
# 1. 预处理 - 处理空值
data.fillna({
'渠道类型': '未知类型',
'基础估算': 0,
'偶发估算': 0,
'包裹总估算': 0,
'重量': 0,
'': 0,
'': 0,
'': 0,
'postcode': '未知'
}, inplace=True)
# 2. 按订单聚合数据
def create_package_details(group):
"""创建包裹详情字典,严格按照要求的格式"""
details = {}
for i, (_, row) in enumerate(group.iterrows(), 1):
details[f"包裹{i}"] = {
"": f"{float(row['']):.2f}",
"": f"{float(row['']):.2f}",
"": f"{float(row['']):.2f}",
"重量": f"{float(row['重量']):.2f}"
}
return details
grouped = data.groupby('order_id')
aggregated = pd.DataFrame({
'订单时间': grouped['order_date'].first(),
'最晚投递时间': grouped['投递时间'].max(),
'包裹数量': grouped.size(),
'总重量': grouped['重量'].sum(),
'订单总估算': grouped['包裹总估算'].sum(),
'包裹数据': grouped.apply(create_package_details), # 使用新函数
'投递渠道列表': grouped['投递渠道'].unique(),
'渠道类型列表': grouped['渠道类型'].unique(),
'快递跟踪号': grouped['快递跟踪号'].unique(),
'最优渠道推荐':grouped['最优渠道'].first(),
'最优渠道类型':grouped['最优渠道类型'].first(),
'最优总费用':grouped['最优总费用'].first(),
'费用差(RMB)':grouped['费用差(RMB)'].first(),
}).reset_index()
# 3. 实现业务逻辑判断(保持不变)
def determine_order_type(row):
if len(row['渠道类型列表']) > 1:
return '混合'
elif len(row['渠道类型列表']) == 1:
return row['渠道类型列表'][0]
else:
return '未知类型'
def determine_channel_type(row):
if len(row['投递渠道列表']) > 1:
return '多渠道'
else:
return '单渠道'
aggregated['订单类型'] = aggregated.apply(determine_order_type, axis=1)
aggregated['渠道种类'] = aggregated.apply(determine_channel_type, axis=1)
# 4. 实现业务规则检查(保持不变)
def apply_business_rules(row):
actions = []
status = '正常'
comments = []
if row['订单类型'] == '卡派' and len(row['快递跟踪号']) > 1:
# tracking_nos = [list(p.values())[0] for p in row['包裹数据'].values()]
# if len(set(tracking_nos)) > 1:
# status = '异常'
status = '异常'
comments.append('卡派订单包含多个不同快递单号')
elif row['订单类型'] == '混合':
status = '异常'
comments.append('出现混合渠道类型订单,需要核查')
if row['渠道种类'] == '多渠道':
if row['总重量'] < 1000:
comments.append(f'多渠道订单总重量{row["总重量"]:.2f}KG < 1000KG')
return pd.Series({
'状态': status,
'建议操作': '; '.join(actions) if actions else '下一步',
'备注': ' | '.join(comments) if comments else ''
})
rule_results = aggregated.apply(apply_business_rules, axis=1)
aggregated = pd.concat([aggregated, rule_results], axis=1)
aggregated['测算日期'] = date.today().strftime("%Y-%m-%d")
# 5. 整理最终输出列
final_columns = [
'order_id','订单时间','最晚投递时间', '订单类型', '渠道种类','快递跟踪号',
'包裹数量', '总重量',
'订单总估算',
'投递渠道列表',
'包裹数据' ,'状态', '备注','最优渠道推荐','最优总费用','费用差(RMB)','测算日期'# 使用新列名
]
return aggregated[final_columns]
# 物流费用层面审核
def analyze_logistics(df: pd.DataFrame):
"""
1.判断实际投递物流渠道和cal_min_fee计算的最优物流渠道是否一致
2.物流渠道一致的情况下,判断费用是否一样
"""
# 1. 计算最优渠道和费用
df= cal_min_fee(df)
# 判断渠道是否一致
df['测算日期'] = date.today().strftime("%Y-%m-%d")
df['最优渠道'] = df.apply(lambda row: row['单票渠道'] if row['最优渠道类型'] == "快递" or row['最优渠道类型'] == "卡派单包裹" else row['多票渠道'], axis=1)
df['渠道一致'] = df.apply(lambda row: row['最优渠道'] == logistics_name.get(row['投递渠道']), axis=1)
# 2. 计算费用是否一致
def all_estimate(row):
if row['最优总费用'] >=9999:
return "费用有误"
all_estimate = convert_currency(row['最优总费用'], row['尾端货币'])
return all_estimate
df['订单总估算']= df.groupby('order_id')['包裹总估算'].transform('sum')
df['费用一致'] = df.apply(lambda row: False if isinstance(all_estimate(row), str) else abs(all_estimate(row) - row['订单总估算']) < 1,axis=1)
df['费用差(RMB)'] = df.apply(lambda row: "费用有误" if isinstance(all_estimate(row), str) else round( all_estimate(row)-row['订单总估算'],2),axis=1)
df['是否改投'] = df.apply(lambda row: "不改投" if row['渠道一致'] == True else 0,axis=1) # 渠道一致只检查费用问题无需改投0不确定需要人工确认
df['异常情况'] = None
# 调整输出列
final_columns = ['order_date','投递时间','fund_status','包裹状态','运输方式','快递跟踪号','目的国','postcode','快递分区','order_id','package','','','','重量',
'基础估算','偶发估算','包裹总估算','订单总估算','本地估算RMB','渠道类型','投递渠道','单票最小费用','单票渠道','多票最小费用','多票渠道','最优总费用',
'最优渠道','最优渠道类型','尾端货币','渠道一致','费用一致','费用差(RMB)','测算日期','是否改投','异常情况']
return df[final_columns]
# 系统渠道下的本地计算费用
def local_fee_cal(df: pd.DataFrame):
df_grouped= df.groupby('快递跟踪号')
for order_num, group in df_grouped:
postcode = group['postcode'].iloc[0]
if pd.isna(postcode) or str(postcode).lower() == "nan":
continue
packages= Package_group() # Metro-SAIR
company_name = logistics_name.get(group['投递渠道'].iloc[0])
opCountry = OperateCountry(group['目的国'].iloc[0])
total_weight=0 # 按体积重分费用
for index,row in group.iterrows():
if row[''] == 0 or row[''] == 0 or row[''] == 0 or row['重量'] == 0:
continue
total_weight = row['']*row['']*row['']/6000
package = Package(row['package'],row[''],row[''],row[''],row['重量'])
packages.add_package(package)
try:
bill = Billing(str(index),opCountry,packages,postcode,company_name=company_name,head_type=1,beizhu='1')
for index,row in group.iterrows():
propertion = bill.bill_dict()["体积重"]/total_weight
tail_fee = bill.tail_amount[0]*propertion
# 转rmb
tail_fee = convert_currency(tail_fee, bill.tail_amount[1])
df.loc[df['package']==row['package'],'本地估算RMB'] =round(tail_fee,2) if tail_fee <9999 else "暂无配置"
except:
df.loc[df['快递跟踪号'] == order_num, '本地估算RMB']= "暂无配置"
continue
print(bill)
return df
# 合并新旧df并写入
def append_result(new_data, excel_path, only_columns):
try:
df_existing = pd.read_excel(excel_path,dtype={'order_id': str})
except FileNotFoundError:
# 文件不存在就直接存
new_data.to_excel(excel_path, index=False)
return
# 识别老表里的特殊列
special_cols = [col for col in ['是否改投', '异常情况','是否处理'] if col in df_existing.columns]
# 新老合并先全部concat起来以便后面筛选
df_all = pd.concat([df_existing, new_data], ignore_index=True)
# 找出:重复的(即同时在新旧里都有的 only_columns 值)
duplicated_keys = set(df_existing[only_columns]) & set(new_data[only_columns])
# 1⃣ 对有重复的 key → 保留 旧表的特殊列 + 新表的其他列
if duplicated_keys:
duplicated_keys = list(duplicated_keys)
# 老表保留特殊列
old_part = df_existing[df_existing[only_columns].isin(duplicated_keys)][[only_columns] + special_cols]
# 新表保留除特殊列外的所有列
new_part = new_data[new_data[only_columns].isin(duplicated_keys)]
new_part_no_special = new_part.drop(columns=special_cols, errors='ignore')
# 合并
merged_part = new_part_no_special.merge(old_part, on=only_columns, how='left')
else:
merged_part = pd.DataFrame(columns=df_all.columns) # 空
# 2⃣ 对没有重复的 → 直接保留新表的完整行
unique_new_part = new_data[~new_data[only_columns].isin(duplicated_keys)]
# 3⃣ 把 老数据的全部 + 处理好的新数据拼起来
final_result = pd.concat([df_existing, merged_part, unique_new_part], ignore_index=True)
# 去重(以 only_columns 为唯一键,保留最后一次出现的)
final_result = final_result.drop_duplicates(subset=[only_columns], keep='last')
# 写回
final_result.to_excel(excel_path, index=False)
def main():
# 将前一天改投的数据保存到excel
# 1.先读取logistics_analysis并筛选是否改投列为1的数据
# 2.将筛选结果追加到另一个excel
df_new = pd.read_excel(r'D:\test\logistics\拦截数据\logistics_analysis.xlsx')
df_new = df_new [df_new ['是否改投'] == ""]
df_new = df_new[['目的国','运输方式','order_id','package','基础估算','偶发估算','包裹总估算',
'渠道类型','最优渠道类型','投递渠道','最优渠道','尾端货币','订单总估算','最优总费用','费用差(RMB)','测算日期','是否改投','异常情况']]
target_file1 = r'D:\test\logistics\拦截数据\改投记录表.xlsx'
append_result(df_new,target_file1,'package')
print("前一天的数据已保存")
# 获取数据
raw_data = fetch_order_data()
print('已获取数据')
# 本地计算投递渠道的费用
order_result =local_fee_cal(raw_data)
# 计算最优渠道和费用
raw_data = analyze_logistics(raw_data)
target_file2 = r'D:\test\logistics\拦截数据\logistics_analysis.xlsx'
append_result(raw_data,target_file2,'package')
print('已完成物流费用层面审核')
# 订单层面审核
order_result = analyze_orders(raw_data)
target_file3 = r'D:\test\logistics\拦截数据\order_analysis.xlsx'
append_result(order_result,target_file3,'order_id')
print('已完成订单层面审核')
if __name__ == '__main__':
main()
# 取数