物流投递审核
This commit is contained in:
parent
b6ee600e78
commit
0e49591945
|
|
@ -27,8 +27,7 @@ class WestLogistics_US(TailLogistics):
|
|||
cls._load_postcodes() # 第一次实例化时加载文件
|
||||
cls._is_loaded = True # 标记文件已加载
|
||||
return super().__new__(cls)
|
||||
# Path_current_directory = Path(__file__).parent
|
||||
# current_directory = os.path.dirname(__file__)
|
||||
|
||||
@classmethod
|
||||
def _load_postcodes(cls):
|
||||
"""加载邮编文件"""
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
|
||||
logistics_name = {
|
||||
"空LAX-FEDEX-SAIR-H": "Fedex-HOME",
|
||||
"空LAX-FEDEX-SAIR-G": "Fedex-GROUD",
|
||||
"海MS-FEDEX-SAIR-H": "Fedex-HOME",
|
||||
"海MS-FEDEX-SAIR-G": "Fedex-GROUD",
|
||||
"海MS-METRO-SAIR": "Metro-SAIR",
|
||||
"海NY-METRO-SAIR": "Metro-SAIR", # 暂时没写
|
||||
"海NY-XMILES-SAIR": "XMILES-SAIR",
|
||||
"海MS-XMILES": "XMILES-SAIR",
|
||||
"海NY-AMT-SAIR": "AM-美东",
|
||||
"海MS-AMT-SAIR": "AM-美西",
|
||||
|
||||
"海MS-FEDEX02":"Fedex-彩虹小马",
|
||||
"空LAX-FEDEX02":"Fedex-彩虹小马"
|
||||
}
|
||||
|
||||
#海NY-WWEX-SAIR,海MS-WWEX-SAIR没报价表,也停用了
|
||||
|
|
@ -0,0 +1,257 @@
|
|||
"""
|
||||
fetch_order_data函数只是获取源数据,是一个sql语句,可以更改
|
||||
cal_min_fee 函数是分别以一票一件和一票多件计算出最小的费用和渠道
|
||||
analyze_orders 订单层面的业务逻辑判断,防止出现混合渠道投递,卡派订单包含多个不同快递追踪单号,多渠道订单总重量小于1000KG(因为1000KG以内一个卡派可以搞定,不应该出现多渠道)
|
||||
analyze_logistics 真正的物流投递层面去分析,先判断投递渠道和最优渠道是否一致,再判断偶发估算费用和最优渠道费用是否一致
|
||||
"""
|
||||
import pandas as pd
|
||||
from utils.gtools import MySQLconnect
|
||||
from utils.logisticsBill import BillFactory, Billing
|
||||
from utils.countryOperator import OperateCountry
|
||||
from utils.Package import Package, Package_group
|
||||
from utils.logistics_name_config import logistics_name
|
||||
# 获取数据
|
||||
def fetch_order_data():
|
||||
"""从数据库获取原始订单数据"""
|
||||
with MySQLconnect('ods') as db:
|
||||
sql = """
|
||||
SELECT
|
||||
ol.order_date,
|
||||
ol.fund_status,
|
||||
oe.`包裹状态`,
|
||||
oe.包裹号 AS package,
|
||||
oe.单号 AS order_id,
|
||||
oe.运输方式,
|
||||
oe.`目的国`,
|
||||
oe.快递公司,
|
||||
oe.`快递分区`,
|
||||
oe.快递跟踪号,
|
||||
ecm.类型 AS 渠道类型, -- 包裹类型
|
||||
pvi.length AS 长,
|
||||
pvi.width AS 宽,
|
||||
pvi.hight AS 高,
|
||||
pvi.weight AS 重量,
|
||||
pfi.express_fee AS 基础估算,
|
||||
pfi.express_additional_fee AS 偶发估算,
|
||||
pfi.express_fee + pfi.express_additional_fee AS 总估算,
|
||||
ol.postcode AS postcode
|
||||
FROM
|
||||
ods.order_express oe
|
||||
LEFT JOIN ods.express_company ecm ON oe.快递公司 = ecm.快递公司
|
||||
LEFT JOIN ods.package_vol_info pvi ON oe.包裹号 = pvi.package
|
||||
LEFT JOIN ods.package_fee_info pfi ON oe.包裹号 = pfi.package
|
||||
LEFT JOIN ods.order_list ol ON oe.单号 = ol.order_id
|
||||
WHERE
|
||||
oe.包裹状态 REGEXP '已经投递|发货仓出库'
|
||||
AND oe.`快递公司` NOT REGEXP "--"
|
||||
AND `卡板发货时间` REGEXP "--"
|
||||
AND ol.fund_status NOT REGEXP '等待|全额退款'
|
||||
AND ol.site_name REGEXP 'litfad|kwoking|lakiq'
|
||||
AND oe.投递时间 >= DATE_SUB(NOW(), INTERVAL 20 DAY)
|
||||
AND pvi.length>0 AND pvi.width >0 AND pvi.hight>0 AND pvi.weight>0
|
||||
and oe.目的国 regexp 'United States'
|
||||
"""
|
||||
|
||||
return pd.read_sql(sql, db.engine())
|
||||
|
||||
|
||||
def cal_min_fee(raw_data: pd.DataFrame):
|
||||
"""
|
||||
处理物流费用数据并实现业务逻辑判断
|
||||
"""
|
||||
df = raw_data.copy()
|
||||
# 包裹层面审核
|
||||
for order_id, group in df.groupby('order_id'):
|
||||
package_group = Package_group()
|
||||
opCountry = OperateCountry(group['目的国'].iloc[0])
|
||||
express_fee = 0
|
||||
for index, row in group.iterrows():
|
||||
# 计算一票一件
|
||||
packages=Package_group()
|
||||
package = Package(row['package'], row['长'], row['宽'], row['高'], row['重量'])
|
||||
packages.add_package(package)
|
||||
bill_express = Billing("1",opCountry,packages,row['postcode'],company_name=None,head_type=1,beizhu="")
|
||||
if bill_express.tail_amount[0] == 0 or bill_express.tail_amount[0] >=9999:
|
||||
df.loc[index,"快递尾端费用"] = "不可派"
|
||||
express_fee = 999999
|
||||
else:
|
||||
df.loc[index,"快递尾端费用"] = bill_express.tail_amount[0]
|
||||
df.loc[index,"快递尾端渠道"] = bill_express.company_name
|
||||
express_fee += bill_express.tail_amount[0]
|
||||
# 计算一票多件
|
||||
package_group.add_package(package)
|
||||
# 计算一票多件
|
||||
bill_ltl = Billing("1",opCountry,package_group,row['postcode'],company_name=None,head_type=1,beizhu="")
|
||||
df.loc[df['order_id']==order_id,'卡派尾端费用'] = bill_ltl.tail_amount[0]/len(package_group)
|
||||
df.loc[df['order_id']==order_id,'卡派尾端渠道'] = bill_ltl.company_name
|
||||
min_fee = min(bill_ltl.tail_amount[0],express_fee)
|
||||
if min_fee == express_fee:
|
||||
df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee
|
||||
df.loc[df['order_id']==order_id,'最优渠道类型'] = "快递"
|
||||
else:
|
||||
df.loc[df['order_id']==order_id,'最优总物流费用'] = min_fee
|
||||
df.loc[df['order_id']==order_id,'最优渠道类型'] = "卡派"
|
||||
df.loc[df['order_id']==order_id,'尾端货币'] = bill_ltl.tail_amount[1]
|
||||
return df
|
||||
|
||||
# 订单层面审核,防止出现混合渠道投递,卡派订单包含多个不同快递单号,多渠道订单总重量小于1000KG
|
||||
def analyze_orders(raw_data: pd.DataFrame):
|
||||
"""
|
||||
处理订单数据并实现业务逻辑判断
|
||||
返回聚合后的订单数据和分析结果,包裹信息按指定字典格式输出
|
||||
|
||||
"""
|
||||
data = raw_data.copy()
|
||||
|
||||
# 1. 预处理 - 处理空值
|
||||
data.fillna({
|
||||
'渠道类型': '未知类型',
|
||||
'基础估算': 0,
|
||||
'偶发估算': 0,
|
||||
'总估算': 0,
|
||||
'重量': 0,
|
||||
'长': 0,
|
||||
'宽': 0,
|
||||
'高': 0,
|
||||
'postcode': '未知'
|
||||
}, inplace=True)
|
||||
|
||||
# 2. 按订单聚合数据
|
||||
def create_package_details(group):
|
||||
"""创建包裹详情字典,严格按照要求的格式"""
|
||||
details = {}
|
||||
for i, (_, row) in enumerate(group.iterrows(), 1):
|
||||
details[f"包裹{i}"] = {
|
||||
"宽": f"{float(row['宽']):.2f}",
|
||||
"长": f"{float(row['长']):.2f}",
|
||||
"高": f"{float(row['高']):.2f}",
|
||||
"重量": f"{float(row['重量']):.2f}"
|
||||
}
|
||||
return details
|
||||
|
||||
grouped = data.groupby('order_id')
|
||||
|
||||
aggregated = pd.DataFrame({
|
||||
'包裹数量': grouped.size(),
|
||||
'总重量': grouped['重量'].sum(),
|
||||
'总基础估算': grouped['基础估算'].sum(),
|
||||
'总附加估算': grouped['偶发估算'].sum(),
|
||||
'总物流估算': grouped['总估算'].sum(),
|
||||
'包裹数据': grouped.apply(create_package_details), # 使用新函数
|
||||
'快递公司列表': grouped['快递公司'].unique(),
|
||||
'渠道类型列表': grouped['渠道类型'].unique(),
|
||||
'邮编列表': grouped['postcode'].first(),
|
||||
'快递跟踪号数量': grouped['快递跟踪号'].unique()
|
||||
}).reset_index()
|
||||
|
||||
# 3. 实现业务逻辑判断(保持不变)
|
||||
def determine_order_type(row):
|
||||
if len(row['渠道类型列表']) > 1:
|
||||
return '混合'
|
||||
elif len(row['渠道类型列表']) == 1:
|
||||
return row['渠道类型列表'][0]
|
||||
else:
|
||||
return '未知类型'
|
||||
|
||||
def determine_channel_type(row):
|
||||
if len(row['快递公司列表']) > 1:
|
||||
return '多渠道'
|
||||
else:
|
||||
return '单渠道'
|
||||
|
||||
aggregated['订单类型'] = aggregated.apply(determine_order_type, axis=1)
|
||||
aggregated['渠道种类'] = aggregated.apply(determine_channel_type, axis=1)
|
||||
|
||||
# 4. 实现业务规则检查(保持不变)
|
||||
def apply_business_rules(row):
|
||||
actions = []
|
||||
status = '正常'
|
||||
comments = []
|
||||
|
||||
if row['订单类型'] == '卡派':
|
||||
tracking_nos = [list(p.values())[0] for p in row['包裹数据'].values()]
|
||||
if len(set(tracking_nos)) > 1:
|
||||
status = '异常'
|
||||
if len(row['快递跟踪号数量']) > 1 :
|
||||
comments.append('卡派订单包含多个不同快递单号')
|
||||
elif row['订单类型'] == '混合':
|
||||
status = '异常'
|
||||
comments.append('出现混合类型订单,需要核查')
|
||||
|
||||
if row['渠道种类'] == '多渠道':
|
||||
if row['总重量'] < 1000:
|
||||
comments.append(f'多渠道订单总重量{row["总重量"]:.2f}KG < 1000KG')
|
||||
|
||||
return pd.Series({
|
||||
'状态': status,
|
||||
'建议操作': '; '.join(actions) if actions else '下一步',
|
||||
'备注': ' | '.join(comments) if comments else ''
|
||||
})
|
||||
|
||||
rule_results = aggregated.apply(apply_business_rules, axis=1)
|
||||
aggregated = pd.concat([aggregated, rule_results], axis=1)
|
||||
|
||||
# 5. 整理最终输出列
|
||||
final_columns = [
|
||||
'order_id', '订单类型', '渠道种类',
|
||||
'包裹数量', '总重量',
|
||||
'总基础估算', '总附加估算', '总物流估算',
|
||||
'快递公司列表', '邮编列表',
|
||||
'包裹数据' ,'状态', '备注','快递跟踪号数量'# 使用新列名
|
||||
]
|
||||
|
||||
return aggregated[final_columns]
|
||||
|
||||
# 物流费用层面审核
|
||||
def analyze_logistics(df: pd.DataFrame):
|
||||
"""
|
||||
1.判断实际投递物流渠道和cal_min_fee计算的最优物流渠道是否一致
|
||||
2.物流渠道一致的情况下,判断费用是否一样
|
||||
"""
|
||||
# 1. 计算最优渠道和费用
|
||||
df= cal_min_fee(df)
|
||||
# 判断渠道是否一致
|
||||
df['最优渠道'] = df.apply(lambda row: row['快递尾端渠道'] if row['最优渠道类型'] == "快递" else row['卡派尾端渠道'], axis=1)
|
||||
df['渠道一致'] = df.apply(lambda row: row['最优渠道'] == logistics_name.get(row['快递公司']), axis=1)
|
||||
# 2. 计算费用是否一致
|
||||
def all_estimate(row):
|
||||
if row['总估算'] is None or row['总估算'] ==0:
|
||||
return "暂无系统估算值"
|
||||
if row['最优总物流费用'] is None or row['最优总物流费用'] ==0:
|
||||
return "暂无最优费用"
|
||||
if row['尾端货币'] == "USD":
|
||||
all_estimate= row['总估算']/7
|
||||
elif row['尾端货币'] == "GBP":
|
||||
all_estimate = row['总估算']/9
|
||||
elif row['尾端货币'] == "EUR":
|
||||
all_estimate = row['总估算']/8
|
||||
elif row['尾端货币'] == "AUD":
|
||||
all_estimate = row['总估算']/5
|
||||
elif row['尾端货币'] == "CAD":
|
||||
all_estimate = row['总估算']/5
|
||||
elif row['尾端货币'] == "JPY":
|
||||
all_estimate = row['总估算']/0.05
|
||||
return all_estimate
|
||||
|
||||
|
||||
df['费用一致'] = df.apply(lambda row: abs(all_estimate(row) - row['最优总物流费用'])<1, axis=1)
|
||||
df['费用差(当地货币)'] = df.apply(lambda row: row['最优总物流费用']-all_estimate(row), axis=1)
|
||||
return df
|
||||
|
||||
def main():
|
||||
# 获取数据
|
||||
raw_data = fetch_order_data()
|
||||
print('已获取数据')
|
||||
# 订单层面审核
|
||||
order_result = analyze_orders(raw_data)
|
||||
print('已完成订单层面审核')
|
||||
order_result.to_excel(r'D:\test\logistics\拦截数据\order_analysis.xlsx', index=False)
|
||||
# 计算最优渠道和费用
|
||||
raw_data = analyze_logistics(raw_data)
|
||||
print('已完成物流费用层面审核')
|
||||
raw_data.to_excel(r'D:\test\logistics\拦截数据\logistics_analysis.xlsx', index=False)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
main()
|
||||
|
||||
Loading…
Reference in New Issue