logistics/订单拦截总数据.py

254 lines
11 KiB
Python

import os
import pandas as pd
from utils.Package import Package,Package_group
from utils.gtools import MySQLconnect
from utils.countryOperator import OperateCountry
from utils.logisticsBill import BillFactory, Billing
import requests
ods = MySQLconnect("ods")
engine = ods.engine()
cursor = ods.connect().cursor()
def get_package_real_vol_by_api(packages_id):
packages = Package_group()
packages_str = ""
index = 1
for package_id in packages_id:
# 计算往年利润率
# sql = f"SELECT length,width,hight,weight FROM package_vol_info WHERE package = %s"
# cursor.execute(sql, (package_id,))
# resp = cursor.fetchall()
# if len(resp) == 0:
# continue
# else:
# weight = resp[0][3]
# package_length = resp[0][0]
# package_width = resp[0][1]
# package_hight = resp[0][2]
# 拦截
url = f'https://cp.maso.hk/index.php?main=biphp&act=package_fund&key=W6BOYJ7BH27YCGRFCA0LWBVKMU1KRU5Q&package={package_id}'
resp = requests.get(url).json()
if resp['code'] == "0":
weight = int(float(resp['data'][0]['weight'])*1000)
package_length = resp['data'][0]['l'].replace(",","") if len(resp['data'][0]['l'])>0 else "0"
package_width = resp['data'][0]['w'].replace(",","") if len(resp['data'][0]['w'])>0 else "0"
package_hight = resp['data'][0]['h'].replace(",","") if len(resp['data'][0]['h'])>0 else "0"
package_str = f"{weight/1000}|{package_length}*{package_width}*{package_hight}"
packages_str += package_str + ","
index += 1
package = Package(str(package_id),float(package_length),float(package_width),float(package_hight),weight)
packages.add_package(package)
return packages,packages_str
def get_order_bill(opCountry,order_id,packages_id,postcode,convey,amount):
print(order_id)
beizhu = amount
conveys = 1 if convey == "海运" else 0
try:
packages,packages_str = get_package_real_vol_by_api(packages_id=packages_id)
bill = Billing(str(order_id),opCountry,packages,postcode,company_name=None,head_type=conveys,beizhu=beizhu)
print(bill)
bill_data = bill.bill_dict() # ✅ 只调用一次
result = bill_data.get("预测尾端", 0)
_type = bill_data.get("尾端渠道", "")
head_amount = bill_data.get("预测头程CNY", 0)
total_amount = bill_data.get("总金额USD", 0)
volume_weight = bill_data.get("体积重", 0)
per_head = bill_data.get("头程单价", 0)
return volume_weight,_type,per_head,head_amount,result,total_amount,packages_str
except ZeroDivisionError as e:
print(e)
return "没测量" ,0,0,0,0,0,""
excel_path = r'D:\test\logistics\拦截数据\订单数据.xlsx'
country_list = ['United Kingdom','United States','Australia','Germany','Spain','France']
# country_list = ['United States']
for country in country_list:
order_id = f"""
SELECT
# ol.order_date,
CONCAT("[",GROUP_CONCAT(pr.包裹号),"]") AS package_group ,
ol.order_id ,
ol.postcode,
ol.delivery_country,
ol.convey,
ol.order_price_dollar
FROM
parcel pr
LEFT JOIN dwd.order_list ol ON ol.order_id = pr.订单号
WHERE
# ol.order_date between "2024-10-01" and "2025-03-01"
pr.生成时间 >= DATE_SUB(NOW(), INTERVAL 20 DAY)
AND fund_status NOT REGEXP "等待"
AND site_name REGEXP "litfad|kwoking|lakiq"
# AND convey = "海运"
# AND delivery_country = "United States"
AND pr.订单号 = ol.order_id
AND delivery_country regexp '{country}'
AND NOT EXISTS (
SELECT
1
FROM
`order_express` oe
WHERE
oe.包裹号 = pr.包裹号
AND oe.包裹状态 = "已作废")
GROUP BY ol.order_id
"""
order_id_df = pd.read_sql(order_id,engine)
if order_id_df.empty:
print(f"{country}无订单")
continue
order_id_df.sort_values(by=['delivery_country'],inplace=True)
countries = order_id_df['delivery_country'].unique()
opCountry = OperateCountry(country)
order_id_df_cal = order_id_df.copy()
# 删除order_id_df_cal中已经存在的订单
if os.path.exists(excel_path):
existing_df = pd.read_excel(excel_path)
order_id_df_cal = order_id_df_cal[~(order_id_df_cal['order_id'].isin(existing_df['order_id']) &
order_id_df_cal['package_group'].isin(existing_df['package_group']))]
if order_id_df_cal.empty:
continue
order_id_df_cal[["体积重","_type","头程单价","头程(CNY)","尾端","总金额(USD)","实际体积"]] = order_id_df_cal.apply(lambda x: get_order_bill(opCountry,
x['order_id'],
eval(x['package_group']),
x['postcode'],
x['convey'],
x['order_price_dollar']),axis=1,result_type='expand')
order_id_df_cal = order_id_df_cal[~(order_id_df_cal['体积重']== "没测量")]
order_id_df_cal['总金额(USD)'] = pd.to_numeric(order_id_df_cal['总金额(USD)'], errors='coerce')
order_id_df_cal = order_id_df_cal.dropna(subset=['总金额(USD)'])
order_id_df_cal = order_id_df_cal[order_id_df_cal['总金额(USD)'] <9999]
order_id_df_cal['生成日期'] = pd.Timestamp.now().strftime('%Y/%m/%d')
order_id_df_cal['order_id'] = order_id_df_cal['order_id'].astype(int)
if order_id_df_cal.empty:
print(f"{country}没有需要测量的订单。")
continue
if not os.path.exists(excel_path):
order_id_df_cal.to_excel(excel_path, index=False, sheet_name='Sheet1')
else:
exist_df = pd.read_excel(excel_path)
new_rows = order_id_df_cal[~(order_id_df_cal['order_id'].isin(exist_df['order_id']) &
order_id_df_cal['package_group'].isin(exist_df['package_group']))]
updated_df = pd.concat([exist_df, new_rows], ignore_index=True)
# order_id去重
updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last')
if not new_rows.empty:
# 将更新后的 DataFrame 写回 Excel 文件
updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
print(f"已写入 {len(new_rows)} 条新数据。")
order_id_df_cal = order_id_df_cal.drop(columns=['生成日期'])
order_id_list = order_id_df_cal["order_id"].tolist()
param = ",".join(f"'{order_id}'" for order_id in order_id_list)
purchase_order_sql = f"""
with t1 AS (SELECT LEFT
( ol.out_detials_outlink_id, 15 ) AS order_id,
SUM( out_detials_qty * price )/ 7 AS instock_cost,
NULL AS buy_cost
FROM
ods.outstock_list ol
JOIN ods.instock_list il ON ol.store_in_id = il.id
WHERE
LEFT ( ol.out_detials_outlink_id, 15 ) IN ({param})
GROUP BY
LEFT ( ol.out_detials_outlink_id, 15 )
UNION ALL
SELECT
LEFT ( order_product_id, 15 ) as order_id,
NULL as instock_cost,
SUM(buy_num * actual_price)/7 AS buy_cost
FROM
`warehouse_purchasing`
WHERE
LEFT ( order_product_id, 15 ) IN ({param})
AND buy_audit = "采购完成"
group by LEFT ( order_product_id, 15 )
)
SELECT
order_id,
SUM(CASE
WHEN instock_cost is null THEN
buy_cost
ELSE
instock_cost END) AS pur_cost
FROM
t1
GROUP BY order_id
"""
purchase_order_df = pd.read_sql(purchase_order_sql, con=engine)
purchase_order_df["order_id"] = purchase_order_df["order_id"].astype(str)
order_id_df_cal["order_id"] = order_id_df_cal["order_id"].astype(str)
order_id_df_cal = pd.merge(order_id_df_cal, purchase_order_df, on='order_id', how='left')
def profit_cal(amount,cost):
transaction_fees_rate = 0.031
cur_tran_rate = 0
store_cost = 2
profit = amount * (1 - transaction_fees_rate - cur_tran_rate) - store_cost - cost
return round(profit, 2)
order_id_df_cal["profit"] = order_id_df_cal.apply(lambda x: profit_cal(x["order_price_dollar"] , x["pur_cost"] + x ["总金额(USD)"]), axis=1)
order_id_df_cal["profit_rate"] = round(order_id_df_cal["profit"] / order_id_df_cal["order_price_dollar"],2)
order_id_df_cal["rejust"] = order_id_df_cal.apply(lambda x: 1 if x["profit_rate"] < 0 else 0, axis=1)
order_id_df_cal[order_id_df_cal['rejust'] == 1]
order_id_df_cal['拦截处理日期'] = pd.to_datetime(pd.Timestamp.now().date())
# order_id_df_cal = order_id_df_cal[order_id_df_cal['rejust'] == 1]
# import pendulum
# dt = pendulum.now()
# # 将日期时间对象转换为字符串
# dt_str = dt.to_format('YYYY-MM-DD')
# finename = dt_str + "拦截.xlsx"
def update_excel_with_new_data(excel_path, new_data_df):
try:
new_data_df['order_id'] = new_data_df['order_id'].astype(str)
if not os.path.exists(excel_path):
# 如果文件不存在,创建一个新的 Excel 文件
new_data_df = new_data_df[new_data_df['pur_cost'].notna()]
new_data_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
print(f"文件不存在,已创建新文件并写入 {len(new_data_df)} 条数据。")
df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期']).dt.strftime('%Y/%m/%d')
else:
existing_df = pd.read_excel(excel_path)
existing_df['order_id'] = existing_df['order_id'].astype(str)
new_rows = new_data_df[~(new_data_df['order_id'].isin(existing_df['order_id']) &
new_data_df['package_group'].isin(existing_df['package_group']))]
new_rows = new_rows[new_rows['pur_cost'].notna()]
updated_df = pd.concat([existing_df, new_rows], ignore_index=True)
# order_id去重
updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last')
updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
df = pd.read_excel(excel_path)
df = df.drop_duplicates(subset=['order_id'], keep='last')
df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期'])
df.to_excel(excel_path, index=False, sheet_name='Sheet1')
if not new_rows.empty:
# 将更新后的 DataFrame 写回 Excel 文件
print(f"已写入 {len(new_rows)} 条新数据。")
else:
print("没有新数据需要添加。")
except Exception as e:
print(f"发生错误: {e}")
update_excel_with_new_data(r'D:\test\logistics\拦截数据\拦截总表.xlsx',order_id_df_cal)