logistics/售价模型计算.ipynb

14 KiB
Raw Blame History

取ERP采购价+ERP尺寸+实际尺寸,需要国家+条目+邮编+order_id

In [4]:
import pandas as pd
from utils.gtools import MySQLconnect

# 读取需要计算的包裹信息
with MySQLconnect('ads') as db:
    sql = r"""  
       # 限制范围是测量时间取得SKU种类为1且数量为1的订单且重复SKU只取最近的订单

WITH
t1 AS (
SELECT
order_id,
SKU,
order_date,
sum(CASE WHEN opl.order_product_id LIKE '%\_%' ESCAPE '\\' 
         AND opl.order_product_id NOT LIKE '%\_%\_%' ESCAPE '\\' THEN product_num END) AS product_num,
DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间,
count(opl.SKU) AS 产品种类
FROM
dws.fact_order_product_list opl
WHERE
  NOT EXISTS (
    SELECT 1 
    FROM dws.log_order_reissue_detail AS r 
    WHERE r.order_product_id = opl.order_product_id
  )
AND order_date >= "20250501"
AND order_date < "20250612"
AND SKU <> ""
GROUP BY order_id
)
,
t2 AS (
SELECT			
            a.`包裹测量时间`,
						t1.order_id,
						t1.SKU,
						t1.order_date,
            a.包裹号,
            a.快递公司,
            a.运输方式,
						a.`目的国`,
            d.postcode,
            CONCAT(
            '"', b.package, '": {',
            '"长": ', length, ', ',
            '"宽": ', width, ', ',
            '"高": ', hight, ', ',
            '"重量": ', weight, '}'
        ) AS package_json
        FROM
				t1
            LEFT JOIN order_express a ON t1.order_id = a.单号
            JOIN package_vol_info b ON a.`包裹号` = b.package
            JOIN order_list d ON a.`单号` = d.order_id 
        WHERE
            a.`包裹状态` IN ( '客户签收', '已经投递') 
            AND b.hight > 0 
            AND b.length > 0 
            AND b.width > 0 
            AND b.hight > 0 
            AND b.weight > 0
--             AND a.`目的国` = "United States"
						AND t1.product_num = 1
						AND t1.产品种类=1
						AND a.`包裹测量时间` >= '2025-05-01'
						AND a.`包裹测量时间` < '2025-06-12'
),
t3 AS (
SELECT
t2.*,
sku.成本价 AS ERP采购价,
ess.erp_package_vol AS ERP包裹数据,
CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据,
ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn
FROM
t2
LEFT JOIN dwd.dim_erp_sku_package_vol_info ess ON t2.SKU=ess.erp_sku
LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU
WHERE
ess.`erp_package_vol`<>"{}" AND ess.`erp_package_vol`<>""
GROUP BY order_id
)
SELECT
包裹测量时间,
order_id,
SKU,
DATE_FORMAT(order_date,"%Y-%M-%D") AS 订单时间,
包裹号,
`快递公司`,
`运输方式`,
`目的国`,
postcode,
ERP采购价,
ERP包裹数据,
实际包裹数据
FROM
t3
WHERE
rn=1



    """
    df=pd.read_sql("SELECT * FROM `order_complet4` WHERE buy_amount is not null and `实际尺寸售价` IS NULL limit 100",db.con)
C:\Users\joker\AppData\Local\Temp\ipykernel_34576\54471085.py:105: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.
  df=pd.read_sql("SELECT * FROM `order_complet4` WHERE buy_amount is not null and `实际尺寸售价` IS NULL limit 100",db.con)

取实际采购价当前已有ERP采购价+ERP尺寸+实际尺寸输入df['order_id']输出df['采购成本']

In [ ]:
from utils.gtools import MySQLconnect

ods = MySQLconnect("ods")
engine = ods.engine()
cursor = ods.connect().cursor()

batch_size = 50000  # 每次查询 500 个 order_id避免 SQL 语句过长
order_id_list = df["order_id"].drop_duplicates().tolist() # 取出所有 order_id
# df['postcode'] = "38016"
# 存储分批查询的结果
result_dfs1 = []
result_dfs2 = []
for i in range(0, len(order_id_list), batch_size):
    batch_order_ids = order_id_list[i:i + batch_size]  # 取当前批次的 order_id
    param = ",".join(f"'{order_id}'" for order_id in batch_order_ids)

    purchase_order_sql = f"""
    WITH t1 AS (
        SELECT LEFT(ol.out_detials_outlink_id, 15) AS order_id,
               SUM(out_detials_qty * price) AS instock_cost,
               NULL AS buy_cost
        FROM ods.outstock_list ol
        JOIN ods.instock_list il ON ol.store_in_id = il.id 
        WHERE LEFT(ol.out_detials_outlink_id, 15) IN ({param})
        GROUP BY LEFT(ol.out_detials_outlink_id, 15)
        
        UNION ALL
        
        SELECT LEFT(order_product_id, 15) AS order_id, 
               NULL AS instock_cost,
               SUM(buy_num * actual_price) AS buy_cost
        FROM warehouse_purchasing
        WHERE LEFT(order_product_id, 15) IN ({param}) 
              AND buy_audit = "采购完成"
        GROUP BY LEFT(order_product_id, 15)
    )
    SELECT order_id,
           SUM(CASE 
               WHEN instock_cost IS NULL THEN buy_cost
               ELSE instock_cost 
           END) AS 采购成本
    FROM t1 
    GROUP BY order_id
    """
 

    batch_df1 = pd.read_sql(purchase_order_sql, con=engine)  # 运行 SQL 查询
    result_dfs1.append(batch_df1)  # 存入结果列表
    print(f"已完成 {i + batch_size} 个 order_id 的查询")

# 合并所有查询结果
purchase_order_df1 = pd.concat(result_dfs1, ignore_index=True)
purchase_order_df1["order_id"] = purchase_order_df1["order_id"].astype(str)


# 转换数据类型,确保匹配
df["order_id"] = df["order_id"].astype(str)

# 进行合并
df = pd.merge(df, purchase_order_df1, on='order_id', how='left')
# 复制到剪贴板
df.to_clipboard(index=False)

计算标准网站售价,输入尺寸,输出售价和订单物流费

In [3]:
def call_sell_price(price, package_dict,head_type="海运"):
    import json
    from sell.sell_price import call_sell_and_order_price
    try:
        package_dict = json.loads(package_dict)
        all_sell_price, order_price, order_type = call_sell_and_order_price(price, package_dict,head_type)
    except Exception as e:
        print(f" 报错: {e}")
        return  ("","","")
    if all_sell_price == 0:
        return  ("","","")
    sell_price= all_sell_price
    # logis_price = all_sell_price[1]
    return (sell_price, order_price, order_type)
# 计算当前售价
for index,row in df.iterrows():
    price = row['buy_amount']
    # package_dict = json.loads(row['erp_package_vol'])
    sell_price = call_sell_price(price, row['package_json'],"海运")
    print(sell_price)
    df.loc[index,'网站售价'] = sell_price[0]
    df.loc[index,'订单物流费'] = sell_price[1]
    df.loc[index,'尾端类型'] = sell_price[2]
    print(f"SKU: {row['sku']} 网站售价: {sell_price[0]}  订单物流费: {sell_price[1]} 尾端类型: {sell_price[2]}")
df.to_clipboard(index=False)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[3], line 16
     14     return (sell_price, order_price, order_type)
     15 # 计算当前售价
---> 16 for index,row in df.iterrows():
     17     price = row['buy_amount']
     18     # package_dict = json.loads(row['erp_package_vol'])

NameError: name 'df' is not defined
In [3]:
df.to_clipboard()

计算实际渠道物流费用

In [ ]:
from utils.countryOperator import OperateCountry
from utils.logisticsBill import BillFactory
from utils.Package import Package, Package_group
import pandas as pd
import json
import re
# 美国 
from utils.logisticsBill import Billing
import requests

for index, row in df.iterrows():
    opCountry = OperateCountry('US')
    postcode = row['postcode']
    if pd.isna(postcode) or str(postcode).lower() == "nan":
        continue
    try:
        package_dict = json.loads(row['实际包裹数据'])
    except Exception as e:
        print(f"行 {index} 解析失败: {e}")
        print(row['实际包裹数据'])
        continue
    packages = Package_group()
    def extract_number(value):
    # 提取字符串中的第一个数字
        match = re.search(r"[-+]?\d*\.\d+|\d+", str(value))
        return float(match.group()) if match else 0.0
    for key, package in package_dict.items():
        package['长'] = extract_number(package['长'])
        package['宽'] = extract_number(package['宽'])
        package['高'] = extract_number(package['高'])
        package['重量'] = extract_number(package['重量'])
    
        if package['长'] == 0 or package['宽'] == 0 or package['高'] == 0 or package['重量'] == 0:
            continue
        packages.add_package(Package(key,package['长'], package['宽'], package['高'], package['重量']))
    if packages is None:
        continue
    if "海运" in row['运输方式']:
        head_type = 1
    else:
        head_type = 0

    # if "FEDEX-SAIR-G" in row['快递公司']:
    #     company_name = "Fedex-GROUD"
    # elif "FEDEX-SAIR-H" in row['快递公司']:
    #     company_name = "Fedex-HOME"
    # elif "FEDEX02" in row['快递公司']:
    #     company_name = "Fedex-彩虹小马"
    # elif "大包" in row['快递公司'] or row['快递公司'] == '海MS-FEDEX':
    #     company_name = "Fedex-金宏亚"
    # elif "GIGA" in row['快递公司']:
    #     company_name = "大健-GIGA"
    # elif "CEVA" in row['快递公司']:
    #     company_name = "大健-CEVA"
    # elif "USPS" in row['快递公司']:
    #     company_name = "Fedex-GROUD"
    # else:
    #     company_name = "大健-Metro"
    
    bill = Billing(str(index),opCountry,packages,postcode,company_name="Fedex-GROUD",head_type=head_type,beizhu='1')
    head_price = bill.head_amount[0]
    tail_price = bill.tail_amount[0]
    if "USPS" in row['快递公司']:
        tail_price = tail_price/2
    # df.loc[index,'头程CNY'] = head_price
    df.loc[index,'头程CNY'] = head_price
    # df.loc[index,'最优渠道'] = bill.company_name
    print(f"行 {index} 处理完成")
    
df.to_clipboard(index=False)
In [ ]:
from utils.gtools import MySQLconnect
import pandas as pd
df = pd.read_clipboard()
log = MySQLconnect('logistics')
pd.io.sql.to_sql(df, 'table_name', con=log.engine(), if_exists='replace', index=False)