logistics/spu实际体积更新.ipynb

28 KiB
Raw Permalink Blame History

取数据,SKU多次订单的情况下取最新的订单数据

In [ ]:
import pandas as pd
from utils.gtools import MySQLconnect

# 读取需要计算的包裹信息
with MySQLconnect('ods') as db:
    sql = r"""  
    # 限制范围是测量时间取得SKU种类为1且数量为1的订单且重复SKU只取最近的订单
# 测量时间D +2 天进行汇总数据
# 订单汇总产品数和取出
WITH
t1 AS (
SELECT
order_id,
SKU,
order_date,
sum(CASE WHEN opl.order_product_id REGEXP "[0-9]{15}_[0-9]*$"
      THEN product_num END) AS product_num,
DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间,
count(DISTINCT opl.SKU) AS 产品种类
FROM
dws.order_product_list opl
WHERE
  NOT EXISTS (
    SELECT 1 
    FROM dws.log_order_reissue_detail AS r 
    WHERE left(r.order_product_id,15) = opl.order_id
    
  )
AND order_date >= "2025-10-01"
AND order_date < "2025-11-01"
AND SKU <> ""
GROUP BY order_id
)
,
t2 AS (
SELECT                        
            a.`包裹测量时间`,
            t1.order_id,
            t1.SKU,
            t1.order_date,
            a.包裹号,
            a.快递公司,
            a.运输方式,
            a.`目的国`,
            d.postcode,
            CONCAT(
            '"', b.package, '": {',
            '"长": ', length, ', ',
            '"宽": ', width, ', ',
            '"高": ', hight, ', ',
            '"重量": ', weight, '}'
        ) AS package_json
        FROM
                                t1
            LEFT JOIN order_express a ON t1.order_id = a.单号
            JOIN package_vol_info b ON a.`包裹号` = b.package
            JOIN order_list d ON a.`单号` = d.order_id 
        WHERE
            a.`包裹状态` != '--'
            AND b.hight > 0 
            AND b.length > 0 
            AND b.width > 0 
            AND b.hight > 0 
            AND b.weight > 0
            AND t1.product_num = 1
            AND t1.产品种类=1
--            AND a.`包裹测量时间` >= '2025-09-01'
--             AND a.`包裹测量时间` < '2025-10-01'
),
t3 AS (
SELECT
t2.*,
SPU,
sku.成本价 AS ERP采购价,
CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据,
count(package_json) AS 包裹数,
ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn
FROM
t2
LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU
left JOIN stg_bayshop_litfad_spu spu ON sku.产品PID=spu.产品PID

GROUP BY order_id
)
SELECT
包裹测量时间,
order_id,
SPU,
SKU,
DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间,
包裹号,
`快递公司`,
`运输方式`,
`目的国`,
postcode,
ERP采购价,
实际包裹数据,
包裹数,
rn AS 从新到旧
FROM
t3

    """
    df=pd.read_sql(sql,db.con)
    df.to_clipboard(index=False)

# df=df[df['实际包裹数量']==1]

取这些SPU下的所有SKU及其现在售价

In [ ]:
# from sell.sell_price import call_sell_and_order_price
import json
from utils.gtools import MySQLconnect
import pandas as pd

base_df = df[df['从新到旧']==1]
spu_list = (
    base_df['SPU']
    .apply(pd.to_numeric, errors='coerce')
    .dropna()
    .astype(int)
    .astype(str)
    .drop_duplicates()         # 加这一行
    .tolist()
)

def chunk_list(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i+size]

result_list = []
with MySQLconnect('ods') as db:
    enginal = db.engine()
    for chunk in chunk_list(spu_list, 5000):
        quoted_spus = ','.join([f"'{spu}'" for spu in chunk])  # 加引号防止 SQL 错误
        sql = f"""
        SELECT
            产品品类,
            产品分类,
            SPU,
            SKU,
            sku.成本价,
            spvi.erp_package_vol AS ERP包裹数据,
            物流分摊,
            产品售价
        from stg_bayshop_litfad_spu spu 
        LEFT JOIN stg_bayshop_litfad_sku sku ON sku.产品PID = spu.产品PID
        LEFT JOIN dwd.dim_erp_sku_package_vol_info spvi ON sku.SKU = spvi.erp_sku
        WHERE spu.SPU IN ({quoted_spus})
        """
        df_chunk = pd.read_sql(sql, enginal)
        result_list.append(df_chunk)
        print(f"已处理 {len(result_list) * 5000} 个SPU")

result = pd.concat(result_list, ignore_index=True)

# 合并df
all_df = pd.merge(result,base_df, on=['SPU','SKU'], how='left')
all_df = all_df.drop_duplicates(subset=['SKU'])
all_df
In [ ]:
len(all_df[all_df['产品品类'] == "66 - Furniture"])

给SPU下的SKU分层级打标签# 按 SPU 分组;在组内按 成本价升序 排序;成本价相同的 SKU 属于同一个层次;层次号就是「第几种不同的成本价」。从小到大排序

In [ ]:
# 取出产品品类为 66 - Furniture 的数据

furniture_df = all_df[all_df['产品品类'] == "66 - Furniture"]
# 根据品类拿到对应表的队友SKU的标准/预设属性集
furniture_df['SKU'] = furniture_df['SKU'].astype(str)
with MySQLconnect('ods') as ods:
    sku_list = furniture_df['SKU'].tolist()
    placeholders = ','.join(['%s'] * len(sku_list))
    sql = f"""
        SELECT SKU,`标准/预设属性集` FROM erp_furniture_sku where SKU in ({placeholders})"""
    sku_df = pd.read_sql(sql, ods.engine(), params=tuple(sku_list))
sku_df['SKU'] = sku_df['SKU'].astype(str)
# 合并数据
furniture_df = pd.merge(furniture_df, sku_df, on='SKU', how='left')

# 
furniture_df
In [ ]:
cl_df = furniture_df.drop_duplicates(subset=['SKU'])
property_df = pd.read_excel(r'D:\\test\\logistics\\test_excel\\furniture-规格属性映射.xlsx')
# 筛选是否有效列为1的属性集并以规格属性映射	是否有效做成数据
property_df = property_df[property_df['是否有效'] == 1]
property_dict = {row['规格属性映射']: row['是否有效'] for index, row in property_df.iterrows()}

# 属性统计
def parse_attr_set(attr_set_str):
    """解析属性集,返回排序后的 {属性值: 属性名} 的数据"""
    mapping = {}
    for item in attr_set_str.split(";"):
        item = item.strip()
        if not item:
            continue
        try:
            left, value = item.split("~", 1)   # "231012:大小~23206184:140*80*75"
            attr_name = left.split(":")[1]
            value = value.split(":")[1]
            if attr_name in property_dict.keys():
                # 有效属性集
                mapping[attr_name] = value
        except Exception:
            continue
    
    # 对数据按键进行排序,返回有序数据
    return dict(sorted(mapping.items()))

# 1. 为每个SKU解析有效属性集并创建新列
cl_df['有效属性集'] = cl_df['标准/预设属性集'].apply(
    lambda x: parse_attr_set(x) if pd.notna(x) else {}
)

# 2. 将有效属性集转换为可比较的格式(排序后的元组)
cl_df['属性集签名'] = cl_df['有效属性集'].apply(
    lambda x: tuple(sorted(x.items()))  # 转换为排序后的元组,便于比较
)

cl_df['分组码'] = ''

for spu in cl_df['SPU'].unique():
    spu_indices = cl_df[cl_df['SPU'] == spu].index
    spu_data = cl_df.loc[spu_indices]
    
    # 按属性集签名分组
    signature_groups = {}
    group_num = 1
    
    for idx in spu_indices:
        signature = cl_df.loc[idx, '属性集签名']
        
        if signature not in signature_groups:
            signature_groups[signature] = f"{spu}_{group_num:03d}"
            group_num += 1
        
        cl_df.loc[idx, '分组码'] = signature_groups[signature]
cl_df

哪几个层级有实际数据,估算其他没有数据的层级的数据

In [ ]:
# 直接在原数据框添加组内编号
cl_df['组内编号'] = cl_df.groupby('SPU')['产品售价'].rank(method='dense').astype(int)

cl_df
In [ ]:
def dict_to_json_str(data_dict):
    """将字典转换为JSON字符串"""
    if not data_dict or pd.isna(data_dict):
        return None
    try:
        return json.dumps(data_dict, ensure_ascii=False)
    except (TypeError, ValueError):
        return None

def json_str_to_dict(json_str):
    """将JSON字符串转换为字典"""
    if not json_str or pd.isna(json_str):
        return {}
    try:
        return json.loads(json_str)
    except (json.JSONDecodeError, TypeError):
        return {}
In [ ]:
import re
def split_packages(package_dict_str):
    """
    处理包裹数据,按体积排序并重新排列长宽高
    Args:
        package_dict_str: 包裹数据的JSON字符串或数据
        
    Returns:
        dict: 排序后的包裹数据键为1,2,3...(按体积降序)
    """
    # 处理空值和非字符串/数据类型
    if package_dict_str is None or pd.isna(package_dict_str):
        return {}
    
    if isinstance(package_dict_str, (int, float)):
        return {}
    # 如果输入是字符串,先转换为数据
    if isinstance(package_dict_str, str):
        try:
            package_dict = json.loads(package_dict_str)
        except json.JSONDecodeError:
            return {}
    else:
        package_dict = package_dict_str

    # 提取并处理每个包裹的数据
    packages = []
    for package_name, package_data in package_dict.items():
        item = {}
        for key, value in package_data.items():
            try:
                # 使用正则表达式提取数字部分
                number_str = re.findall(r"[-+]?\d*\.\d+|\d+", str(value))
                if number_str:
                    item[key] = float(number_str[0])
                else:
                    item[key] = value
            except ValueError:
                item[key] = value
        # 确保有长宽高数据
        if all(k in item for k in ['长', '宽', '高']):
            # 将长宽高按实际尺寸重新排序:最长边作为长,次长边作为宽,最短边作为高
            dimensions = [item['长'], item['宽'], item['高']]
            dimensions.sort(reverse=True)  # 降序排列
            # 重新赋值
            item['长'] = dimensions[0]  # 最长边
            item['宽'] = dimensions[1]  # 次长边
            item['高'] = dimensions[2]  # 最短边
            # 计算体积
            item['体积'] = dimensions[0] * dimensions[1] * dimensions[2]
            packages.append(item)
    # 按体积降序排序
    packages.sort(key=lambda x: x.get('体积', 0), reverse=True)
    # 构建结果数据
    result = {}
    for i, package in enumerate(packages, 1):
        # 只保留长宽高和重量信息
        result_package = {
            '长': package.get('长', 0),
            '宽': package.get('宽', 0),
            '高': package.get('高', 0)
        }
        # 如果有重量信息也保留
        if '重量' in package:
            result_package['重量'] = package['重量']
        result[i] = result_package
    return dict_to_json_str(result)
In [ ]:
# 先把包裹数据拆成数据且排序
result_df = cl_df.copy()
for index, row in result_df.iterrows():
    if row['ERP包裹数据'] is not None:
        result_df.at[index, 'ERP包裹数据'] = split_packages(row['ERP包裹数据'])
        result_df.at[index, '实际包裹数据'] = split_packages(row['实际包裹数据'])
        print(result_df.at[index, '实际包裹数据'])
    else:
        result_df.at[index, 'ERP包裹数据'] = []
        result_df.at[index, '实际包裹数据'] = []
In [ ]:
# 确保订单时间是datetime类型
result_df['订单时间'] = pd.to_datetime(result_df['订单时间'])

# 先找到每个SPU的最大订单时间
max_order_time = result_df.groupby('SPU')['订单时间'].transform('max')

# 然后设置is_first
result_df['is_first'] = 0
result_df.loc[result_df['实际包裹数据'].notna() & (result_df['订单时间'] == max_order_time), 'is_first'] = 1
result_df
In [ ]:
# 根据SPU基准SKU计算系数
import numpy as np

def cal_size_coefficients(erp_packages, actual_packages):
    """
    计算每个SPU的尺寸系数
    
    Args:
        erp_packages: ERP包裹数据
        actual_packages: 实际包裹数据
    Returns:
        dict: 尺寸系数数据
    """
    # 检查输入是否有效
    if not erp_packages or not actual_packages:
        return {}
    
    # 获取包裹数量
    erp_count = len(erp_packages)
    actual_count = len(actual_packages)
    
    # 如果包裹数量不相等,取较小的数量
    min_count = min(erp_count, actual_count)
    
    coefficients = {}
    
    for i in range(1, min_count + 1):
        erp_pkg = erp_packages.get(str(i), {})
        actual_pkg = actual_packages.get(str(i), {})
        
        # 检查必要的维度是否存在
        if not all(k in erp_pkg for k in ['长', '宽', '高']) or not all(k in actual_pkg for k in ['长', '宽', '高']):
            continue
            
        pkg_coefficients = {}
        
        # 计算长宽高系数
        for dimension in ['长', '宽', '高']:
            old_val = erp_pkg.get(dimension, 0)
            new_val = actual_pkg.get(dimension, 0)
            pkg_coefficients[f'{dimension}系数'] = cal_size(old_val, new_val)
        
        # 计算重量系数(如果存在)
        if '重量' in erp_pkg and '重量' in actual_pkg:
            old_weight = erp_pkg.get('重量', 0)
            new_weight = actual_pkg.get('重量', 0)
            pkg_coefficients['重量系数'] = cal_size(old_weight, new_weight)
        
        coefficients[i] = pkg_coefficients
    
    return dict_to_json_str(coefficients)

def cal_size(old, new):
    """
    计算单个维度的系数
    """
    try:
        old = float(old)
        new = float(new)
        if old == 0:
            return None
        return (new - old) / old
    except (ValueError, TypeError):
        return None

# 应用计算
# 首先筛选出is_first=1的行作为基准
first_rows = result_df[result_df['is_first'] == 1].copy()

# 计算每个SPU的尺寸系数
for index, row in first_rows.iterrows():
    erp_packages = json_str_to_dict(row['ERP包裹数据'])
    actual_packages = json_str_to_dict(row['实际包裹数据'])
    first_rows.at[index, '尺寸系数'] = cal_size_coefficients(erp_packages, actual_packages)
    print(first_rows.at[index, '尺寸系数'])

# 创建SPU到尺寸系数的映射
spu_coefficient_map = first_rows.set_index('SPU')['尺寸系数'].to_dict()

# 将尺寸系数应用到所有相同SPU的行
result_df['尺寸系数'] = result_df['SPU'].map(spu_coefficient_map)
result_df
In [ ]:
"""
根据SPU尺寸系数,计算预测包裹数据,
1.如果分组码相同,筛选有实际数据的行,按分组码分组,在每个组内按订单时间降序排列,然后取每个组的第一条数据作为全组的估算包裹尺寸
2.如果分组码不同,则按照尺寸系数计算估算包裹尺寸
 2.1.如果尺寸系数为{},则跳过
 2.2.如果ERP包裹数>尺寸系数的数量前面对应的ERP包裹根据尺寸系数计算得到新的ERP包裹尺寸1+尺寸系数)*ERP包裹尺寸多出来的包裹就取ERP包裹也可以理解为尺寸系数都是0
 2.3.如果ERP包裹数<=尺寸系数的数量则按ERP包裹数计算估算包裹尺寸
"""
# 先筛选出order_id不为空的行然后按分组码分组在每个组内按订单时间降序排列取每个组的第一条
latest_actual_data = (result_df
                     [result_df['order_id'].notna() & 
                      result_df['实际包裹数据'].notna() & 
                      (result_df['实际包裹数据'] != '')]
                     .sort_values(['分组码', '订单时间'], ascending=[True, False])
                     .groupby('分组码')
                     .first()
                     ['实际包裹数据'])

# 映射回原DataFrame
result_df['估算包裹尺寸'] = result_df['分组码'].map(latest_actual_data)

print(f"order_id不为空的分组码数量: {len(latest_actual_data)}")
print(f"映射后估算包裹尺寸不为空的行数: {result_df['估算包裹尺寸'].notna().sum()}")
In [ ]:
from math import ceil
import math


def cal_est_package(erp_packages, size_coefficient_str):
    """
    根据ERP包裹数据和尺寸系数计算估算包裹尺寸
    """
    # 如果尺寸系数为空返回None
    if not size_coefficient_str or size_coefficient_str == '{}':
        return None
    
    if not erp_packages or not size_coefficient_str:
        return None
    
    erp_count = len(erp_packages)
    coefficient_count = len(size_coefficient_str)
    
    estimated_packages = {}
    
    # 2.2 如果ERP包裹数 > 尺寸系数的数量
    if erp_count > coefficient_count:
        for i in range(1, erp_count + 1):
            erp_pkg = erp_packages.get(str(i), {})
            if not erp_pkg:
                continue
                
            estimated_pkg = {}
            
            # 前面对应的包裹使用尺寸系数计算
            if i <= coefficient_count:
                coeff_pkg = size_coefficient_str.get(str(i), {})
                for dimension in ['长', '宽', '高']:
                    erp_value = erp_pkg.get(dimension, 0)
                    coeff = coeff_pkg.get(f'{dimension}系数', 0)
                    if coeff is not None and erp_value is not None:
                        estimated_pkg[dimension] = (1 + coeff) * erp_value
                    else:
                        estimated_pkg[dimension] = erp_value
                
                # 处理重量
                if '重量' in erp_pkg:
                    erp_weight = erp_pkg.get('重量', 0)
                    weight_coeff = coeff_pkg.get('重量系数', 0)
                    if weight_coeff is not None and erp_weight is not None:
                        estimated_pkg['重量'] = (1 + weight_coeff) * erp_weight
                    else:
                        estimated_pkg['重量'] = erp_weight
            else:
                # 多出来的包裹直接取ERP包裹数据尺寸系数为0
                estimated_pkg = erp_pkg.copy()
            
            estimated_packages[str(i)] = estimated_pkg
    
    # 2.3 如果ERP包裹数 <= 尺寸系数的数量
    else:
        for i in range(1, erp_count + 1):
            erp_pkg = erp_packages.get(str(i), {})
            coeff_pkg = size_coefficient_str.get(str(i), {})
            
            if not erp_pkg:
                continue
                
            estimated_pkg = {}
            for dimension in ['长', '宽', '高']:
                erp_value = erp_pkg.get(dimension, 0)
                coeff = coeff_pkg.get(f'{dimension}系数', 0)
                if coeff is not None and erp_value is not None:
                    estimated_pkg[dimension] = math.ceil((1 + coeff) * erp_value)
                else:
                    estimated_pkg[dimension] = math.ceil(erp_value)
            
            # 处理重量
            if '重量' in erp_pkg:
                erp_weight = erp_pkg.get('重量', 0)
                weight_coeff = coeff_pkg.get('重量系数', 0)
                if weight_coeff is not None and erp_weight is not None:
                    estimated_pkg['重量'] = math.ceil((1 + weight_coeff) * erp_weight)
                else:
                    estimated_pkg['重量'] = math.ceil(erp_weight)
            
            estimated_packages[str(i)] = estimated_pkg
    
    return dict_to_json_str(estimated_packages)


# 第二步:对于估算包裹尺寸为空字典或空列表的行,使用尺寸系数计算
different_group_rows = result_df[
    result_df['估算包裹尺寸'].isna() 
].copy()
print(f"找到 {len(different_group_rows)} 行需要计算估算包裹尺寸")

# 对筛选出来的行进行计算
for index, row in different_group_rows.iterrows():
    if not row['ERP包裹数据'] or not row['尺寸系数']:
        continue
    erp_packages = json_str_to_dict(row['ERP包裹数据'])
    size_coefficients = json_str_to_dict(row['尺寸系数'])
    estimated_packages = cal_est_package(erp_packages, size_coefficients)
    different_group_rows.at[index, '估算包裹尺寸'] = estimated_packages
    print(f"处理第 {index} 行, SKU: {row['SKU']}, 估算结果: {estimated_packages}")

# 创建预估尺寸映射
est_map = different_group_rows.set_index('SKU')['估算包裹尺寸'].to_dict()

# 只更新那些原来为空的行的估算包裹尺寸,保留第一步的结果
result_df.loc[result_df['估算包裹尺寸'].isna(), '估算包裹尺寸'] = result_df.loc[result_df['估算包裹尺寸'].isna(), 'SKU'].map(est_map)
In [ ]:
different_group_rows = result_df[
    result_df['估算包裹尺寸'].isna() 
].copy()
print(f"找到 {len(different_group_rows)} 行需要计算估算包裹尺寸")
result_df
In [ ]:
result_df.to_excel(f"D:\\test\\logistics\\test_excel\\估算尺寸\\furniture.xlsx", index=False)