logistics/售价模型计算.ipynb

16 KiB
Raw Blame History

取ERP采购价+ERP尺寸+实际尺寸,需要国家+条目+邮编+order_id

In [ ]:
import pandas as pd
from utils.gtools import MySQLconnect

# 读取需要计算的包裹信息
with MySQLconnect('ods') as db:
    sql = r"""  
       # 限制范围是测量时间取得SKU种类为1且数量为1的订单且重复SKU只取最近的订单

WITH
t1 AS (
SELECT
order_id,
SKU,
order_date,
sum(CASE WHEN opl.order_product_id LIKE '%\_%' ESCAPE '\\' 
         AND opl.order_product_id NOT LIKE '%\_%\_%' ESCAPE '\\' THEN product_num END) AS product_num,
DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间,
count(opl.SKU) AS 产品种类
FROM
dws.fact_order_product_list opl
WHERE
  NOT EXISTS (
    SELECT 1 
    FROM dws.log_order_reissue_detail AS r 
    WHERE r.order_product_id = opl.order_product_id
  )
AND order_date >= "20251001"
AND order_date < "20251101"
AND SKU <> ""
GROUP BY order_id
)
,
t2 AS (
SELECT			
            a.`包裹测量时间`,
						t1.order_id,
						t1.SKU,
						t1.order_date,
            a.包裹号,
            a.快递公司,
            a.运输方式,
						a.`目的国`,
            d.postcode,
            CONCAT(
            '"', b.package, '": {',
            '"长": ', length, ', ',
            '"宽": ', width, ', ',
            '"高": ', hight, ', ',
            '"重量": ', weight, '}'
        ) AS package_json
        FROM
				t1
            LEFT JOIN order_express a ON t1.order_id = a.单号
            JOIN package_vol_info b ON a.`包裹号` = b.package
            JOIN order_list d ON a.`单号` = d.order_id 
        WHERE
            a.`包裹状态` IN ( '客户签收', '已经投递') 
            AND b.hight > 0 
            AND b.length > 0 
            AND b.width > 0 
            AND b.hight > 0 
            AND b.weight > 0
--             AND a.`目的国` = "United States"
						AND t1.product_num = 1
						AND t1.产品种类=1
						AND a.`包裹测量时间` >= '2025-05-01'
						AND a.`包裹测量时间` < '2025-06-12'
),
t3 AS (
SELECT
t2.*,
sku.成本价 AS ERP采购价,
ess.erp_package_vol AS ERP包裹数据,
CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据,
ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn
FROM
t2
LEFT JOIN dwd.dim_erp_sku_package_vol_info ess ON t2.SKU=ess.erp_sku
LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU
WHERE
ess.`erp_package_vol`<>"{}" AND ess.`erp_package_vol`<>""
GROUP BY order_id
)
SELECT
包裹测量时间,
order_id,
SKU,
DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间,
包裹号,
`快递公司`,
`运输方式`,
`目的国`,
postcode,
ERP采购价,
ERP包裹数据,
实际包裹数据
FROM
t3
WHERE
rn=1



    """
    df=pd.read_sql("SELECT * FROM `order_complet4` WHERE buy_amount is not null and `实际尺寸售价` IS NULL limit 100",db.con)
    # df = pd.read_sql(sql, db.con)
    # 去除package_json为空的行
    df = df.dropna(subset=['package_json'])

取实际采购价当前已有ERP采购价+ERP尺寸+实际尺寸输入df['order_id']输出df['采购成本']

In [ ]:
import pandas as pd
df = pd.read_excel(r'test_excel/估算尺寸/furniture.xlsx',sheet_name='Sheet1')
# df['order_id'].drop_duplicates(inplace=True)
# df['order_id'] = df['order_id'].astype(str)
# df['order_id'] = df['order_id'].str.replace(' ','')
df
In [ ]:
from utils.gtools import MySQLconnect

ods = MySQLconnect("ods")
engine = ods.engine()
cursor = ods.connect().cursor()

batch_size = 50000  # 每次查询 500 个 order_id避免 SQL 语句过长
order_id_list = df["order_id"].drop_duplicates().tolist() # 取出所有 order_id
# 存储分批查询的结果
result_dfs1 = []
for i in range(0, len(order_id_list), batch_size):
    batch_order_ids = order_id_list[i:i + batch_size]  # 取当前批次的 order_id
    param = ",".join(f"'{order_id}'" for order_id in batch_order_ids)

    purchase_order_sql = f"""
    WITH t1 AS (
        SELECT LEFT(ol.out_detials_outlink_id, 15) AS order_id,
               SUM(out_detials_qty * price) AS instock_cost,
               NULL AS buy_cost
        FROM ods.outstock_list ol
        JOIN ods.instock_list il ON ol.store_in_id = il.id 
        WHERE LEFT(ol.out_detials_outlink_id, 15) IN ({param})
        GROUP BY LEFT(ol.out_detials_outlink_id, 15)
        
        UNION ALL
        
        SELECT LEFT(order_product_id, 15) AS order_id, 
               NULL AS instock_cost,
               SUM(buy_num * actual_price) AS buy_cost
        FROM warehouse_purchasing
        WHERE LEFT(order_product_id, 15) IN ({param}) 
              AND buy_audit = "采购完成"
        GROUP BY LEFT(order_product_id, 15)
    )
    SELECT order_id,
           SUM(CASE 
               WHEN instock_cost IS NULL THEN buy_cost
               ELSE instock_cost 
           END) AS 采购成本
    FROM t1 
    GROUP BY order_id
    """
 

    batch_df1 = pd.read_sql(purchase_order_sql, con=engine)  # 运行 SQL 查询
    result_dfs1.append(batch_df1)  # 存入结果列表
    print(f"已完成 {i + batch_size} 个 order_id 的查询")

# 合并所有查询结果
purchase_order_df1 = pd.concat(result_dfs1, ignore_index=True)
purchase_order_df1["order_id"] = purchase_order_df1["order_id"].astype(str)


# 转换数据类型,确保匹配
df["order_id"] = df["order_id"].astype(str)

# 进行合并
df = pd.merge(df, purchase_order_df1, on='order_id', how='left')
# 复制到剪贴板
df.to_clipboard(index=False)

计算标准网站售价,输入尺寸,输出售价和订单物流费

In [ ]:
# 计算售价相关
import json
from sell.sell_price import call_sell_price_2025
from sell.sell_price import air_order_price,ocean_order_price
from utils.Package import Package, Package_group
import pandas as pd
import re

# 计算当前售价
for index,row in df.iterrows():
    def safe_json_loads(data):
        """
        安全地解析JSON数据处理各种异常情况
        如果是空列表[],也返回空字典{}
        """
        if data is None or pd.isna(data):
            return {}
        
        # 如果已经是字典,直接返回
        if isinstance(data, dict):
            return data
        
        # 如果是空列表,返回空字典
        if isinstance(data, list) and len(data) == 0:
            return {}
        
        # 如果是字符串尝试解析JSON
        if isinstance(data, str):
            try:
                result = json.loads(data)
                # 如果解析结果是空列表,也返回空字典
                if isinstance(result, list) and len(result) == 0:
                    return {}
                return result
            except json.JSONDecodeError:
                # 如果是空字典或空列表的字符串表示
                if data.strip() in ['{}', '[]']:
                    return {}
                return {}
        
        # 其他类型如float转换为字符串再尝试
        try:
            result = json.loads(str(data))
            if isinstance(result, list) and len(result) == 0:
                return {}
            return result
        except:
            return {}
    package_dict = safe_json_loads(row['ERP包裹数据'])
    actual_package = safe_json_loads(row['估算包裹尺寸'])
    price = row['成本价']
    # package_dict = json.loads(row['erp_package_vol'])
    erp_sell_price = call_sell_price_2025(price, package_dict)
    actual_sell_price = call_sell_price_2025(price, actual_package)
    print(row["SKU"],erp_sell_price[0],actual_sell_price[0])
    df.loc[index,'ERP售价'] = erp_sell_price[0] 
    df.loc[index,'估算售价'] = actual_sell_price[0]
    # df.loc[index,'物流分摊费'] = sell_price[1]
    # df.loc[index,'海运cny总价'] = sell_price[2]
    # df.loc[index,'海运usd总价'] = sell_price[2]
    # erp_packages = Package_group()
    # def extract_number(value):
    # # 提取字符串中的第一个数字
    #     match = re.search(r"[-+]?\d*\.\d+|\d+", str(value))
    #     return float(match.group()) if match else 0.0
   
    # for key, package in package_dict.items():
    #     package['长'] = extract_number(package['长'])
    #     package['宽'] = extract_number(package['宽'])
    #     package['高'] = extract_number(package['高'])
    #     package['重量'] = extract_number(package['重量'])
    
    #     if package['长'] == 0 or package['宽'] == 0 or package['高'] == 0 or package['重量'] == 0:
    #         continue
    #     erp_packages.add_package(Package(key,package['长'], package['宽'], package['高'], package['重量']))
            
    # if erp_packages is None:
    #     continue
    # shop_logistics_fee = ocean_order_price(packages)
    # df.loc[index,'订单物流费'] = shop_logistics_fee[0]
    # df.loc[index,'尾端类型'] = shop_logistics_fee[1]
    # print(f"SKU: {row['SKU']} 网站售价: {sell_price[0]}  订单物流费: {shop_logistics_fee[0]} 尾端类型: {shop_logistics_fee[1]}")
# df.to_clipboard(index=False)
In [ ]:
df.to_excel(f"D:\\test\\logistics\\test_excel\\估算尺寸\\furniture.xlsx", index=False)

计算实际渠道物流费用

In [ ]:
from utils.countryOperator import OperateCountry
from utils.logisticsBill import BillFactory
from utils.Package import Package, Package_group
import pandas as pd
import json
import re
# 美国 
from utils.logisticsBill import Billing
import requests

for index, row in df.iterrows():
    opCountry = OperateCountry('US')
    postcode = row['postcode']
    if pd.isna(postcode) or str(postcode).lower() == "nan":
        continue
    try:
        package_dict = json.loads(row['实际包裹数据'])
    except Exception as e:
        print(f"行 {index} 解析失败: {e}")
        print(row['实际包裹数据'])
        continue
    packages = Package_group()
    def extract_number(value):
    # 提取字符串中的第一个数字
        match = re.search(r"[-+]?\d*\.\d+|\d+", str(value))
        return float(match.group()) if match else 0.0
    for key, package in package_dict.items():
        package['长'] = extract_number(package['长'])
        package['宽'] = extract_number(package['宽'])
        package['高'] = extract_number(package['高'])
        package['重量'] = extract_number(package['重量'])
    
        if package['长'] == 0 or package['宽'] == 0 or package['高'] == 0 or package['重量'] == 0:
            continue
        packages.add_package(Package(key,package['长'], package['宽'], package['高'], package['重量']))
    if packages is None:
        continue
    if "海运" in row['运输方式']:
        head_type = 1
    else:
        head_type = 0

    # if "FEDEX-SAIR-G" in row['快递公司']:
    #     company_name = "Fedex-GROUD"
    # elif "FEDEX-SAIR-H" in row['快递公司']:
    #     company_name = "Fedex-HOME"
    # elif "FEDEX02" in row['快递公司']:
    #     company_name = "Fedex-彩虹小马"
    # elif "大包" in row['快递公司'] or row['快递公司'] == '海MS-FEDEX':
    #     company_name = "Fedex-金宏亚"
    # elif "GIGA" in row['快递公司']:
    #     company_name = "大健-GIGA"
    # elif "CEVA" in row['快递公司']:
    #     company_name = "大健-CEVA"
    # elif "USPS" in row['快递公司']:
    #     company_name = "Fedex-GROUD"
    # else:
    #     company_name = "大健-Metro"
    
    bill = Billing(str(index),opCountry,packages,postcode,company_name="Fedex-GROUD",head_type=head_type,beizhu='1')
    head_price = bill.head_amount[0]
    tail_price = bill.tail_amount[0]
    if "USPS" in row['快递公司']:
        tail_price = tail_price/2
    # df.loc[index,'头程CNY'] = head_price
    df.loc[index,'头程CNY'] = head_price
    # df.loc[index,'最优渠道'] = bill.company_name
    print(f"行 {index} 处理完成")
    
df.to_clipboard(index=False)
In [ ]:
from utils.gtools import MySQLconnect
import pandas as pd
df = pd.read_clipboard()
log = MySQLconnect('logistics')
pd.io.sql.to_sql(df, 'table_name', con=log.engine(), if_exists='replace', index=False)