254 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			254 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
 | 
						|
import os
 | 
						|
import pandas as pd
 | 
						|
from utils.Package import Package,Package_group
 | 
						|
from utils.gtools import MySQLconnect
 | 
						|
from utils.countryOperator import OperateCountry
 | 
						|
from utils.logisticsBill import BillFactory, Billing
 | 
						|
import requests
 | 
						|
 | 
						|
ods = MySQLconnect("ods")
 | 
						|
engine = ods.engine()
 | 
						|
cursor = ods.connect().cursor()
 | 
						|
 | 
						|
def get_package_real_vol_by_api(packages_id):
 | 
						|
    packages = Package_group()
 | 
						|
    packages_str = ""
 | 
						|
    index = 1
 | 
						|
    for package_id in packages_id:
 | 
						|
        # 计算往年利润率
 | 
						|
        # sql = f"SELECT length,width,hight,weight FROM package_vol_info WHERE package = %s"
 | 
						|
        # cursor.execute(sql, (package_id,))
 | 
						|
        # resp = cursor.fetchall()
 | 
						|
        # if len(resp) == 0:
 | 
						|
        #     continue
 | 
						|
        # else:
 | 
						|
        #     weight = resp[0][3]
 | 
						|
        #     package_length = resp[0][0]
 | 
						|
        #     package_width = resp[0][1]
 | 
						|
        #     package_hight = resp[0][2]
 | 
						|
        # 拦截
 | 
						|
        url = f'https://cp.maso.hk/index.php?main=biphp&act=package_fund&key=W6BOYJ7BH27YCGRFCA0LWBVKMU1KRU5Q&package={package_id}'
 | 
						|
        resp = requests.get(url).json()
 | 
						|
        if resp['code'] == "0":
 | 
						|
            weight = int(float(resp['data'][0]['weight'])*1000)
 | 
						|
            package_length = resp['data'][0]['l'].replace(",","") if len(resp['data'][0]['l'])>0 else "0"
 | 
						|
            package_width = resp['data'][0]['w'].replace(",","") if len(resp['data'][0]['w'])>0 else "0"
 | 
						|
            package_hight = resp['data'][0]['h'].replace(",","") if len(resp['data'][0]['h'])>0 else "0"
 | 
						|
            package_str = f"{weight/1000}|{package_length}*{package_width}*{package_hight}"
 | 
						|
            packages_str += package_str + ","
 | 
						|
            index += 1
 | 
						|
            package = Package(str(package_id),float(package_length),float(package_width),float(package_hight),weight)
 | 
						|
            packages.add_package(package)
 | 
						|
    return packages,packages_str
 | 
						|
 | 
						|
def get_order_bill(opCountry,order_id,packages_id,postcode,convey,amount):
 | 
						|
    print(order_id)
 | 
						|
    beizhu = amount
 | 
						|
    conveys = 1 if convey == "海运" else 0
 | 
						|
 | 
						|
    try:
 | 
						|
        
 | 
						|
        packages,packages_str = get_package_real_vol_by_api(packages_id=packages_id)
 | 
						|
        bill = Billing(str(order_id),opCountry,packages,postcode,company_name=None,head_type=conveys,beizhu=beizhu)
 | 
						|
        print(bill)
 | 
						|
        bill_data = bill.bill_dict()  # ✅ 只调用一次
 | 
						|
 | 
						|
        result = bill_data.get("预测尾端", 0)
 | 
						|
        _type = bill_data.get("尾端渠道", "")
 | 
						|
        head_amount = bill_data.get("预测头程CNY", 0)
 | 
						|
        total_amount = bill_data.get("总金额USD", 0)
 | 
						|
        volume_weight = bill_data.get("体积重", 0)
 | 
						|
        per_head = bill_data.get("头程单价", 0)
 | 
						|
        return volume_weight,_type,per_head,head_amount,result,total_amount,packages_str
 | 
						|
    except ZeroDivisionError as e:
 | 
						|
        print(e)
 | 
						|
        return "没测量" ,0,0,0,0,0,""
 | 
						|
 | 
						|
excel_path = r'D:\test\logistics\拦截数据\订单数据.xlsx'
 | 
						|
 | 
						|
country_list = ['United Kingdom','United States','Australia','Germany','Spain','France']
 | 
						|
# country_list = ['United States']
 | 
						|
for country in country_list:
 | 
						|
    order_id = f"""
 | 
						|
    SELECT
 | 
						|
# ol.order_date,
 | 
						|
        CONCAT("[",GROUP_CONCAT(pr.包裹号),"]") AS package_group ,
 | 
						|
        ol.order_id ,
 | 
						|
        ol.postcode,
 | 
						|
        ol.delivery_country,
 | 
						|
        ol.convey,
 | 
						|
        ol.order_price_dollar
 | 
						|
        
 | 
						|
    FROM
 | 
						|
        parcel pr
 | 
						|
        LEFT JOIN dwd.order_list ol ON ol.order_id = pr.订单号 
 | 
						|
    WHERE
 | 
						|
        # ol.order_date between "2024-10-01" and "2025-03-01"
 | 
						|
        pr.生成时间 >= DATE_SUB(NOW(), INTERVAL 20 DAY) 
 | 
						|
        AND  fund_status NOT REGEXP "等待" 
 | 
						|
        AND site_name REGEXP "litfad|kwoking|lakiq" 
 | 
						|
        # AND convey = "海运" 
 | 
						|
        # AND delivery_country = "United States" 
 | 
						|
        AND pr.订单号 = ol.order_id 
 | 
						|
        AND delivery_country regexp '{country}'
 | 
						|
        AND NOT EXISTS (
 | 
						|
        SELECT
 | 
						|
            1 
 | 
						|
        FROM
 | 
						|
            `order_express` oe 
 | 
						|
        WHERE
 | 
						|
        oe.包裹号 = pr.包裹号 
 | 
						|
        AND oe.包裹状态 = "已作废")
 | 
						|
        GROUP BY ol.order_id
 | 
						|
    """
 | 
						|
    order_id_df = pd.read_sql(order_id,engine)
 | 
						|
    if order_id_df.empty:
 | 
						|
        print(f"{country}无订单")
 | 
						|
        continue
 | 
						|
    order_id_df.sort_values(by=['delivery_country'],inplace=True)
 | 
						|
    countries = order_id_df['delivery_country'].unique()
 | 
						|
    opCountry = OperateCountry(country)
 | 
						|
    order_id_df_cal = order_id_df.copy()
 | 
						|
    # 删除order_id_df_cal中已经存在的订单
 | 
						|
    if os.path.exists(excel_path):
 | 
						|
        existing_df = pd.read_excel(excel_path)
 | 
						|
        order_id_df_cal = order_id_df_cal[~(order_id_df_cal['order_id'].isin(existing_df['order_id']) & 
 | 
						|
                                        order_id_df_cal['package_group'].isin(existing_df['package_group']))]
 | 
						|
    if order_id_df_cal.empty:
 | 
						|
        continue
 | 
						|
    order_id_df_cal[["体积重","_type","头程单价","头程(CNY)","尾端","总金额(USD)","实际体积"]] = order_id_df_cal.apply(lambda x: get_order_bill(opCountry,
 | 
						|
                                                                                                                        x['order_id'],
 | 
						|
                                                                                                                    eval(x['package_group']),
 | 
						|
                                                                                                                    x['postcode'],    
 | 
						|
                                                                                                                    x['convey'],
 | 
						|
                                                                                                                    x['order_price_dollar']),axis=1,result_type='expand')
 | 
						|
    order_id_df_cal = order_id_df_cal[~(order_id_df_cal['体积重']== "没测量")]
 | 
						|
    order_id_df_cal['总金额(USD)'] = pd.to_numeric(order_id_df_cal['总金额(USD)'], errors='coerce')
 | 
						|
    order_id_df_cal = order_id_df_cal.dropna(subset=['总金额(USD)'])
 | 
						|
    order_id_df_cal = order_id_df_cal[order_id_df_cal['总金额(USD)'] <9999]
 | 
						|
    order_id_df_cal['生成日期'] = pd.Timestamp.now().strftime('%Y/%m/%d')
 | 
						|
    order_id_df_cal['order_id'] = order_id_df_cal['order_id'].astype(int)
 | 
						|
    if order_id_df_cal.empty:
 | 
						|
        print(f"{country}没有需要测量的订单。")
 | 
						|
        continue
 | 
						|
    if not os.path.exists(excel_path):
 | 
						|
        order_id_df_cal.to_excel(excel_path, index=False, sheet_name='Sheet1')
 | 
						|
    else:
 | 
						|
        exist_df = pd.read_excel(excel_path)
 | 
						|
        new_rows = order_id_df_cal[~(order_id_df_cal['order_id'].isin(exist_df['order_id']) & 
 | 
						|
                                order_id_df_cal['package_group'].isin(exist_df['package_group']))]
 | 
						|
        updated_df = pd.concat([exist_df, new_rows], ignore_index=True)
 | 
						|
        # order_id去重
 | 
						|
        updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last')
 | 
						|
        if not new_rows.empty:
 | 
						|
            # 将更新后的 DataFrame 写回 Excel 文件
 | 
						|
            updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
 | 
						|
            print(f"已写入 {len(new_rows)} 条新数据。")
 | 
						|
 | 
						|
    order_id_df_cal = order_id_df_cal.drop(columns=['生成日期'])
 | 
						|
    order_id_list = order_id_df_cal["order_id"].tolist()
 | 
						|
    param = ",".join(f"'{order_id}'" for order_id in order_id_list)
 | 
						|
    purchase_order_sql = f"""
 | 
						|
    with t1 AS (SELECT LEFT
 | 
						|
        ( ol.out_detials_outlink_id, 15 ) AS order_id,
 | 
						|
        SUM( out_detials_qty * price )/ 7 AS instock_cost,
 | 
						|
				NULL AS buy_cost
 | 
						|
    FROM
 | 
						|
        ods.outstock_list ol
 | 
						|
        JOIN ods.instock_list il ON ol.store_in_id = il.id 
 | 
						|
    WHERE
 | 
						|
        LEFT ( ol.out_detials_outlink_id, 15 ) IN ({param}) 
 | 
						|
    GROUP BY
 | 
						|
        LEFT ( ol.out_detials_outlink_id, 15 )
 | 
						|
UNION ALL
 | 
						|
 SELECT
 | 
						|
        LEFT ( order_product_id, 15 ) as order_id, 
 | 
						|
				NULL as instock_cost,
 | 
						|
				SUM(buy_num * actual_price)/7 AS buy_cost
 | 
						|
    FROM
 | 
						|
        `warehouse_purchasing`
 | 
						|
    WHERE
 | 
						|
        LEFT ( order_product_id, 15 ) IN ({param}) 
 | 
						|
        AND buy_audit = "采购完成"
 | 
						|
    group by LEFT ( order_product_id, 15 )
 | 
						|
		)
 | 
						|
 | 
						|
    SELECT
 | 
						|
    order_id,
 | 
						|
    SUM(CASE 
 | 
						|
    WHEN instock_cost is null THEN
 | 
						|
        buy_cost
 | 
						|
    ELSE
 | 
						|
        instock_cost END) AS pur_cost
 | 
						|
    FROM
 | 
						|
    t1 
 | 
						|
    GROUP BY order_id
 | 
						|
 | 
						|
    """
 | 
						|
    purchase_order_df = pd.read_sql(purchase_order_sql, con=engine)
 | 
						|
    purchase_order_df["order_id"] = purchase_order_df["order_id"].astype(str)
 | 
						|
    order_id_df_cal["order_id"] = order_id_df_cal["order_id"].astype(str)
 | 
						|
    order_id_df_cal = pd.merge(order_id_df_cal, purchase_order_df, on='order_id', how='left')
 | 
						|
 | 
						|
    def profit_cal(amount,cost):
 | 
						|
        transaction_fees_rate = 0.031
 | 
						|
        cur_tran_rate = 0
 | 
						|
        store_cost =  2
 | 
						|
 | 
						|
 | 
						|
 | 
						|
        
 | 
						|
        profit = amount * (1 - transaction_fees_rate - cur_tran_rate) - store_cost - cost
 | 
						|
        return round(profit, 2)
 | 
						|
 | 
						|
 | 
						|
    order_id_df_cal["profit"] =  order_id_df_cal.apply(lambda x: profit_cal(x["order_price_dollar"] , x["pur_cost"] + x ["总金额(USD)"]), axis=1)
 | 
						|
    order_id_df_cal["profit_rate"] =  round(order_id_df_cal["profit"]  / order_id_df_cal["order_price_dollar"],2)
 | 
						|
    order_id_df_cal["rejust"] = order_id_df_cal.apply(lambda x: 1 if x["profit_rate"] < 0 else 0, axis=1) 
 | 
						|
    order_id_df_cal[order_id_df_cal['rejust'] == 1]
 | 
						|
    order_id_df_cal['拦截处理日期'] = pd.to_datetime(pd.Timestamp.now().date())
 | 
						|
 | 
						|
    
 | 
						|
    # order_id_df_cal = order_id_df_cal[order_id_df_cal['rejust'] == 1]
 | 
						|
    # import pendulum
 | 
						|
    # dt = pendulum.now()
 | 
						|
 | 
						|
    # # 将日期时间对象转换为字符串
 | 
						|
    # dt_str = dt.to_format('YYYY-MM-DD')
 | 
						|
    # finename = dt_str + "拦截.xlsx"
 | 
						|
 | 
						|
    def update_excel_with_new_data(excel_path, new_data_df):
 | 
						|
        try:
 | 
						|
            new_data_df['order_id'] = new_data_df['order_id'].astype(str)
 | 
						|
            if not os.path.exists(excel_path):
 | 
						|
                # 如果文件不存在,创建一个新的 Excel 文件
 | 
						|
                new_data_df = new_data_df[new_data_df['pur_cost'].notna()] 
 | 
						|
                new_data_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
 | 
						|
                print(f"文件不存在,已创建新文件并写入 {len(new_data_df)} 条数据。")
 | 
						|
                df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期']).dt.strftime('%Y/%m/%d')
 | 
						|
            else:
 | 
						|
                existing_df = pd.read_excel(excel_path)
 | 
						|
                existing_df['order_id'] = existing_df['order_id'].astype(str)
 | 
						|
                new_rows = new_data_df[~(new_data_df['order_id'].isin(existing_df['order_id']) & 
 | 
						|
                                        new_data_df['package_group'].isin(existing_df['package_group']))]
 | 
						|
                new_rows = new_rows[new_rows['pur_cost'].notna()]
 | 
						|
 | 
						|
                updated_df = pd.concat([existing_df, new_rows], ignore_index=True)
 | 
						|
                # order_id去重
 | 
						|
                updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last')
 | 
						|
                updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1')
 | 
						|
                df = pd.read_excel(excel_path)
 | 
						|
                df = df.drop_duplicates(subset=['order_id'], keep='last')
 | 
						|
                df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期'])
 | 
						|
                df.to_excel(excel_path, index=False, sheet_name='Sheet1')
 | 
						|
                
 | 
						|
                if not new_rows.empty:
 | 
						|
                    # 将更新后的 DataFrame 写回 Excel 文件
 | 
						|
                    print(f"已写入 {len(new_rows)} 条新数据。")
 | 
						|
                else:
 | 
						|
                    print("没有新数据需要添加。")
 | 
						|
        except Exception as e:
 | 
						|
            print(f"发生错误: {e}")
 | 
						|
    update_excel_with_new_data(r'D:\test\logistics\拦截数据\拦截总表.xlsx',order_id_df_cal)
 |