28 KiB
28 KiB
取数据,SKU多次订单的情况下取最新的订单数据
In [ ]:
import pandas as pd from utils.gtools import MySQLconnect # 读取需要计算的包裹信息 with MySQLconnect('ods') as db: sql = r""" # 限制范围是测量时间,取得SKU种类为1且数量为1的订单,且重复SKU只取最近的订单 # 测量时间D +2 天进行汇总数据 # 订单汇总产品数和取出 WITH t1 AS ( SELECT order_id, SKU, order_date, sum(CASE WHEN opl.order_product_id REGEXP "[0-9]{15}_[0-9]*$" THEN product_num END) AS product_num, DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间, count(DISTINCT opl.SKU) AS 产品种类 FROM dws.order_product_list opl WHERE NOT EXISTS ( SELECT 1 FROM dws.log_order_reissue_detail AS r WHERE left(r.order_product_id,15) = opl.order_id ) AND order_date >= "2025-10-01" AND order_date < "2025-11-01" AND SKU <> "" GROUP BY order_id ) , t2 AS ( SELECT a.`包裹测量时间`, t1.order_id, t1.SKU, t1.order_date, a.包裹号, a.快递公司, a.运输方式, a.`目的国`, d.postcode, CONCAT( '"', b.package, '": {', '"长": ', length, ', ', '"宽": ', width, ', ', '"高": ', hight, ', ', '"重量": ', weight, '}' ) AS package_json FROM t1 LEFT JOIN order_express a ON t1.order_id = a.单号 JOIN package_vol_info b ON a.`包裹号` = b.package JOIN order_list d ON a.`单号` = d.order_id WHERE a.`包裹状态` != '--' AND b.hight > 0 AND b.length > 0 AND b.width > 0 AND b.hight > 0 AND b.weight > 0 AND t1.product_num = 1 AND t1.产品种类=1 -- AND a.`包裹测量时间` >= '2025-09-01' -- AND a.`包裹测量时间` < '2025-10-01' ), t3 AS ( SELECT t2.*, SPU, sku.成本价 AS ERP采购价, CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据, count(package_json) AS 包裹数, ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn FROM t2 LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU left JOIN stg_bayshop_litfad_spu spu ON sku.产品PID=spu.产品PID GROUP BY order_id ) SELECT 包裹测量时间, order_id, SPU, SKU, DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间, 包裹号, `快递公司`, `运输方式`, `目的国`, postcode, ERP采购价, 实际包裹数据, 包裹数, rn AS 从新到旧 FROM t3 """ df=pd.read_sql(sql,db.con) df.to_clipboard(index=False) # df=df[df['实际包裹数量']==1]
取这些SPU下的所有SKU及其现在售价¶
In [ ]:
# from sell.sell_price import call_sell_and_order_price import json from utils.gtools import MySQLconnect import pandas as pd base_df = df[df['从新到旧']==1] spu_list = ( base_df['SPU'] .apply(pd.to_numeric, errors='coerce') .dropna() .astype(int) .astype(str) .drop_duplicates() # 加这一行 .tolist() ) def chunk_list(lst, size): for i in range(0, len(lst), size): yield lst[i:i+size] result_list = [] with MySQLconnect('ods') as db: enginal = db.engine() for chunk in chunk_list(spu_list, 5000): quoted_spus = ','.join([f"'{spu}'" for spu in chunk]) # 加引号防止 SQL 错误 sql = f""" SELECT 产品品类, 产品分类, SPU, SKU, sku.成本价, spvi.erp_package_vol AS ERP包裹数据, 物流分摊, 产品售价 from stg_bayshop_litfad_spu spu LEFT JOIN stg_bayshop_litfad_sku sku ON sku.产品PID = spu.产品PID LEFT JOIN dwd.dim_erp_sku_package_vol_info spvi ON sku.SKU = spvi.erp_sku WHERE spu.SPU IN ({quoted_spus}) """ df_chunk = pd.read_sql(sql, enginal) result_list.append(df_chunk) print(f"已处理 {len(result_list) * 5000} 个SPU") result = pd.concat(result_list, ignore_index=True) # 合并df all_df = pd.merge(result,base_df, on=['SPU','SKU'], how='left') all_df = all_df.drop_duplicates(subset=['SKU']) all_df
In [ ]:
len(all_df[all_df['产品品类'] == "66 - Furniture"])
给SPU下的SKU分层级打标签,# 按 SPU 分组;在组内按 成本价升序 排序;成本价相同的 SKU 属于同一个层次;层次号就是「第几种不同的成本价」。从小到大排序
In [ ]:
# 取出产品品类为 66 - Furniture 的数据 furniture_df = all_df[all_df['产品品类'] == "66 - Furniture"] # 根据品类拿到对应表的队友SKU的标准/预设属性集 furniture_df['SKU'] = furniture_df['SKU'].astype(str) with MySQLconnect('ods') as ods: sku_list = furniture_df['SKU'].tolist() placeholders = ','.join(['%s'] * len(sku_list)) sql = f""" SELECT SKU,`标准/预设属性集` FROM erp_furniture_sku where SKU in ({placeholders})""" sku_df = pd.read_sql(sql, ods.engine(), params=tuple(sku_list)) sku_df['SKU'] = sku_df['SKU'].astype(str) # 合并数据 furniture_df = pd.merge(furniture_df, sku_df, on='SKU', how='left') # furniture_df
In [ ]:
cl_df = furniture_df.drop_duplicates(subset=['SKU']) property_df = pd.read_excel(r'D:\\test\\logistics\\test_excel\\furniture-规格属性映射.xlsx') # 筛选是否有效列为1的属性集,并以规格属性映射 是否有效做成数据 property_df = property_df[property_df['是否有效'] == 1] property_dict = {row['规格属性映射']: row['是否有效'] for index, row in property_df.iterrows()} # 属性统计 def parse_attr_set(attr_set_str): """解析属性集,返回排序后的 {属性值: 属性名} 的数据""" mapping = {} for item in attr_set_str.split(";"): item = item.strip() if not item: continue try: left, value = item.split("~", 1) # "231012:大小~23206184:140*80*75" attr_name = left.split(":")[1] value = value.split(":")[1] if attr_name in property_dict.keys(): # 有效属性集 mapping[attr_name] = value except Exception: continue # 对数据按键进行排序,返回有序数据 return dict(sorted(mapping.items())) # 1. 为每个SKU解析有效属性集并创建新列 cl_df['有效属性集'] = cl_df['标准/预设属性集'].apply( lambda x: parse_attr_set(x) if pd.notna(x) else {} ) # 2. 将有效属性集转换为可比较的格式(排序后的元组) cl_df['属性集签名'] = cl_df['有效属性集'].apply( lambda x: tuple(sorted(x.items())) # 转换为排序后的元组,便于比较 ) cl_df['分组码'] = '' for spu in cl_df['SPU'].unique(): spu_indices = cl_df[cl_df['SPU'] == spu].index spu_data = cl_df.loc[spu_indices] # 按属性集签名分组 signature_groups = {} group_num = 1 for idx in spu_indices: signature = cl_df.loc[idx, '属性集签名'] if signature not in signature_groups: signature_groups[signature] = f"{spu}_{group_num:03d}" group_num += 1 cl_df.loc[idx, '分组码'] = signature_groups[signature] cl_df
哪几个层级有实际数据,估算其他没有数据的层级的数据¶
In [ ]:
# 直接在原数据框添加组内编号 cl_df['组内编号'] = cl_df.groupby('SPU')['产品售价'].rank(method='dense').astype(int) cl_df
In [ ]:
def dict_to_json_str(data_dict): """将字典转换为JSON字符串""" if not data_dict or pd.isna(data_dict): return None try: return json.dumps(data_dict, ensure_ascii=False) except (TypeError, ValueError): return None def json_str_to_dict(json_str): """将JSON字符串转换为字典""" if not json_str or pd.isna(json_str): return {} try: return json.loads(json_str) except (json.JSONDecodeError, TypeError): return {}
In [ ]:
import re def split_packages(package_dict_str): """ 处理包裹数据,按体积排序并重新排列长宽高 Args: package_dict_str: 包裹数据的JSON字符串或数据 Returns: dict: 排序后的包裹数据,键为1,2,3...(按体积降序) """ # 处理空值和非字符串/数据类型 if package_dict_str is None or pd.isna(package_dict_str): return {} if isinstance(package_dict_str, (int, float)): return {} # 如果输入是字符串,先转换为数据 if isinstance(package_dict_str, str): try: package_dict = json.loads(package_dict_str) except json.JSONDecodeError: return {} else: package_dict = package_dict_str # 提取并处理每个包裹的数据 packages = [] for package_name, package_data in package_dict.items(): item = {} for key, value in package_data.items(): try: # 使用正则表达式提取数字部分 number_str = re.findall(r"[-+]?\d*\.\d+|\d+", str(value)) if number_str: item[key] = float(number_str[0]) else: item[key] = value except ValueError: item[key] = value # 确保有长宽高数据 if all(k in item for k in ['长', '宽', '高']): # 将长宽高按实际尺寸重新排序:最长边作为长,次长边作为宽,最短边作为高 dimensions = [item['长'], item['宽'], item['高']] dimensions.sort(reverse=True) # 降序排列 # 重新赋值 item['长'] = dimensions[0] # 最长边 item['宽'] = dimensions[1] # 次长边 item['高'] = dimensions[2] # 最短边 # 计算体积 item['体积'] = dimensions[0] * dimensions[1] * dimensions[2] packages.append(item) # 按体积降序排序 packages.sort(key=lambda x: x.get('体积', 0), reverse=True) # 构建结果数据 result = {} for i, package in enumerate(packages, 1): # 只保留长宽高和重量信息 result_package = { '长': package.get('长', 0), '宽': package.get('宽', 0), '高': package.get('高', 0) } # 如果有重量信息也保留 if '重量' in package: result_package['重量'] = package['重量'] result[i] = result_package return dict_to_json_str(result)
In [ ]:
# 先把包裹数据拆成数据且排序 result_df = cl_df.copy() for index, row in result_df.iterrows(): if row['ERP包裹数据'] is not None: result_df.at[index, 'ERP包裹数据'] = split_packages(row['ERP包裹数据']) result_df.at[index, '实际包裹数据'] = split_packages(row['实际包裹数据']) print(result_df.at[index, '实际包裹数据']) else: result_df.at[index, 'ERP包裹数据'] = [] result_df.at[index, '实际包裹数据'] = []
In [ ]:
# 确保订单时间是datetime类型 result_df['订单时间'] = pd.to_datetime(result_df['订单时间']) # 先找到每个SPU的最大订单时间 max_order_time = result_df.groupby('SPU')['订单时间'].transform('max') # 然后设置is_first result_df['is_first'] = 0 result_df.loc[result_df['实际包裹数据'].notna() & (result_df['订单时间'] == max_order_time), 'is_first'] = 1 result_df
In [ ]:
# 根据SPU基准SKU计算系数 import numpy as np def cal_size_coefficients(erp_packages, actual_packages): """ 计算每个SPU的尺寸系数 Args: erp_packages: ERP包裹数据 actual_packages: 实际包裹数据 Returns: dict: 尺寸系数数据 """ # 检查输入是否有效 if not erp_packages or not actual_packages: return {} # 获取包裹数量 erp_count = len(erp_packages) actual_count = len(actual_packages) # 如果包裹数量不相等,取较小的数量 min_count = min(erp_count, actual_count) coefficients = {} for i in range(1, min_count + 1): erp_pkg = erp_packages.get(str(i), {}) actual_pkg = actual_packages.get(str(i), {}) # 检查必要的维度是否存在 if not all(k in erp_pkg for k in ['长', '宽', '高']) or not all(k in actual_pkg for k in ['长', '宽', '高']): continue pkg_coefficients = {} # 计算长宽高系数 for dimension in ['长', '宽', '高']: old_val = erp_pkg.get(dimension, 0) new_val = actual_pkg.get(dimension, 0) pkg_coefficients[f'{dimension}系数'] = cal_size(old_val, new_val) # 计算重量系数(如果存在) if '重量' in erp_pkg and '重量' in actual_pkg: old_weight = erp_pkg.get('重量', 0) new_weight = actual_pkg.get('重量', 0) pkg_coefficients['重量系数'] = cal_size(old_weight, new_weight) coefficients[i] = pkg_coefficients return dict_to_json_str(coefficients) def cal_size(old, new): """ 计算单个维度的系数 """ try: old = float(old) new = float(new) if old == 0: return None return (new - old) / old except (ValueError, TypeError): return None # 应用计算 # 首先筛选出is_first=1的行作为基准 first_rows = result_df[result_df['is_first'] == 1].copy() # 计算每个SPU的尺寸系数 for index, row in first_rows.iterrows(): erp_packages = json_str_to_dict(row['ERP包裹数据']) actual_packages = json_str_to_dict(row['实际包裹数据']) first_rows.at[index, '尺寸系数'] = cal_size_coefficients(erp_packages, actual_packages) print(first_rows.at[index, '尺寸系数']) # 创建SPU到尺寸系数的映射 spu_coefficient_map = first_rows.set_index('SPU')['尺寸系数'].to_dict() # 将尺寸系数应用到所有相同SPU的行 result_df['尺寸系数'] = result_df['SPU'].map(spu_coefficient_map) result_df
In [ ]:
""" 根据SPU尺寸系数,计算预测包裹数据, 1.如果分组码相同,筛选有实际数据的行,按分组码分组,在每个组内按订单时间降序排列,然后取每个组的第一条数据作为全组的估算包裹尺寸 2.如果分组码不同,则按照尺寸系数计算估算包裹尺寸 2.1.如果尺寸系数为{},则跳过 2.2.如果ERP包裹数>尺寸系数的数量,前面对应的ERP包裹根据尺寸系数计算得到新的ERP包裹尺寸((1+尺寸系数)*ERP包裹尺寸),多出来的包裹就取ERP包裹(也可以理解为尺寸系数都是0) 2.3.如果ERP包裹数<=尺寸系数的数量,则按ERP包裹数计算估算包裹尺寸 """ # 先筛选出order_id不为空的行,然后按分组码分组,在每个组内按订单时间降序排列,取每个组的第一条 latest_actual_data = (result_df [result_df['order_id'].notna() & result_df['实际包裹数据'].notna() & (result_df['实际包裹数据'] != '')] .sort_values(['分组码', '订单时间'], ascending=[True, False]) .groupby('分组码') .first() ['实际包裹数据']) # 映射回原DataFrame result_df['估算包裹尺寸'] = result_df['分组码'].map(latest_actual_data) print(f"order_id不为空的分组码数量: {len(latest_actual_data)}") print(f"映射后估算包裹尺寸不为空的行数: {result_df['估算包裹尺寸'].notna().sum()}")
In [ ]:
from math import ceil import math def cal_est_package(erp_packages, size_coefficient_str): """ 根据ERP包裹数据和尺寸系数计算估算包裹尺寸 """ # 如果尺寸系数为空,返回None if not size_coefficient_str or size_coefficient_str == '{}': return None if not erp_packages or not size_coefficient_str: return None erp_count = len(erp_packages) coefficient_count = len(size_coefficient_str) estimated_packages = {} # 2.2 如果ERP包裹数 > 尺寸系数的数量 if erp_count > coefficient_count: for i in range(1, erp_count + 1): erp_pkg = erp_packages.get(str(i), {}) if not erp_pkg: continue estimated_pkg = {} # 前面对应的包裹使用尺寸系数计算 if i <= coefficient_count: coeff_pkg = size_coefficient_str.get(str(i), {}) for dimension in ['长', '宽', '高']: erp_value = erp_pkg.get(dimension, 0) coeff = coeff_pkg.get(f'{dimension}系数', 0) if coeff is not None and erp_value is not None: estimated_pkg[dimension] = (1 + coeff) * erp_value else: estimated_pkg[dimension] = erp_value # 处理重量 if '重量' in erp_pkg: erp_weight = erp_pkg.get('重量', 0) weight_coeff = coeff_pkg.get('重量系数', 0) if weight_coeff is not None and erp_weight is not None: estimated_pkg['重量'] = (1 + weight_coeff) * erp_weight else: estimated_pkg['重量'] = erp_weight else: # 多出来的包裹直接取ERP包裹数据(尺寸系数为0) estimated_pkg = erp_pkg.copy() estimated_packages[str(i)] = estimated_pkg # 2.3 如果ERP包裹数 <= 尺寸系数的数量 else: for i in range(1, erp_count + 1): erp_pkg = erp_packages.get(str(i), {}) coeff_pkg = size_coefficient_str.get(str(i), {}) if not erp_pkg: continue estimated_pkg = {} for dimension in ['长', '宽', '高']: erp_value = erp_pkg.get(dimension, 0) coeff = coeff_pkg.get(f'{dimension}系数', 0) if coeff is not None and erp_value is not None: estimated_pkg[dimension] = math.ceil((1 + coeff) * erp_value) else: estimated_pkg[dimension] = math.ceil(erp_value) # 处理重量 if '重量' in erp_pkg: erp_weight = erp_pkg.get('重量', 0) weight_coeff = coeff_pkg.get('重量系数', 0) if weight_coeff is not None and erp_weight is not None: estimated_pkg['重量'] = math.ceil((1 + weight_coeff) * erp_weight) else: estimated_pkg['重量'] = math.ceil(erp_weight) estimated_packages[str(i)] = estimated_pkg return dict_to_json_str(estimated_packages) # 第二步:对于估算包裹尺寸为空字典或空列表的行,使用尺寸系数计算 different_group_rows = result_df[ result_df['估算包裹尺寸'].isna() ].copy() print(f"找到 {len(different_group_rows)} 行需要计算估算包裹尺寸") # 对筛选出来的行进行计算 for index, row in different_group_rows.iterrows(): if not row['ERP包裹数据'] or not row['尺寸系数']: continue erp_packages = json_str_to_dict(row['ERP包裹数据']) size_coefficients = json_str_to_dict(row['尺寸系数']) estimated_packages = cal_est_package(erp_packages, size_coefficients) different_group_rows.at[index, '估算包裹尺寸'] = estimated_packages print(f"处理第 {index} 行, SKU: {row['SKU']}, 估算结果: {estimated_packages}") # 创建预估尺寸映射 est_map = different_group_rows.set_index('SKU')['估算包裹尺寸'].to_dict() # 只更新那些原来为空的行的估算包裹尺寸,保留第一步的结果 result_df.loc[result_df['估算包裹尺寸'].isna(), '估算包裹尺寸'] = result_df.loc[result_df['估算包裹尺寸'].isna(), 'SKU'].map(est_map)
In [ ]:
different_group_rows = result_df[ result_df['估算包裹尺寸'].isna() ].copy() print(f"找到 {len(different_group_rows)} 行需要计算估算包裹尺寸") result_df
In [ ]:
result_df.to_excel(f"D:\\test\\logistics\\test_excel\\估算尺寸\\furniture.xlsx", index=False)