19 KiB
19 KiB
取数据
In [ ]:
import pandas as pd from utils.gtools import MySQLconnect # 读取需要计算的包裹信息 with MySQLconnect('ods') as db: sql = r""" # 限制范围是测量时间,取得SKU种类为1且数量为1的订单,且重复SKU只取最近的订单 # 测量时间D +2 天进行汇总数据 # 订单汇总产品数和取出 # 测量时间D +2 天进行汇总数据 # 订单汇总产品数和取出 WITH t1 AS ( SELECT order_id, SKU, order_date, sum(CASE WHEN opl.order_product_id REGEXP "[0-9]{15}_[0-9]*$" THEN product_num END) AS product_num, DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间, count(DISTINCT opl.SKU) AS 产品种类 FROM dws.order_product_list opl WHERE NOT EXISTS ( SELECT 1 FROM dws.log_order_reissue_detail AS r WHERE left(r.order_product_id,15) = opl.order_id ) AND order_date >= "2025-05-01" AND order_date < "2025-09-18" AND SKU <> "" GROUP BY order_id ) , t2 AS ( SELECT a.`包裹测量时间`, t1.order_id, t1.SKU, t1.order_date, a.包裹号, a.快递公司, a.运输方式, a.`目的国`, d.postcode, CONCAT( '"', b.package, '": {', '"长": ', length, ', ', '"宽": ', width, ', ', '"高": ', hight, ', ', '"重量": ', weight, '}' ) AS package_json FROM t1 LEFT JOIN order_express a ON t1.order_id = a.单号 JOIN package_vol_info b ON a.`包裹号` = b.package JOIN order_list d ON a.`单号` = d.order_id WHERE a.`包裹状态` != '--' AND b.hight > 0 AND b.length > 0 AND b.width > 0 AND b.hight > 0 AND b.weight > 0 AND t1.product_num = 1 AND t1.产品种类=1 AND a.`包裹测量时间` >= '2025-06-01' AND a.`包裹测量时间` < '2025-09-16' ), t3 AS ( SELECT t2.*, SPU, sku.成本价 AS ERP采购价, CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据, count(package_json) AS 包裹数, ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn FROM t2 LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU left JOIN stg_bayshop_litfad_spu spu ON sku.产品PID=spu.产品PID GROUP BY order_id ) SELECT 包裹测量时间, order_id, SPU, SKU, DATE_FORMAT(order_date,"%Y-%m-%D") AS 订单时间, 包裹号, `快递公司`, `运输方式`, `目的国`, postcode, ERP采购价, 实际包裹数据, 包裹数, rn AS 从新到旧 FROM t3 """ df=pd.read_sql(sql,db.con) print(df) df.to_clipboard(index=False) # df=df[df['实际包裹数量']==1] df
拆开实际包裹数据,并标记为1
In [ ]:
# 先把ERP包裹数据拆出来 import re import json base_df=df.copy() for index, row in base_df.iterrows(): if not isinstance(row['实际包裹数据'], str) or not row['实际包裹数据']: print(f"第{index}行包裹数据为空或非字符串,跳过") continue try: package_dict = json.loads(row['实际包裹数据']) except json.JSONDecodeError as e: print(f"解析失败:第{index}行,错误信息:{e}") continue count = len(package_dict) print(f"第{index}行,包裹数量:{count}") if count !=1: print(f"第{index}行,包裹数量不为1,跳过") continue try: for package in package_dict.values(): item = {} for key, value in package.items(): try: # 使用正则表达式提取数字部分 number_str = re.findall(r"[-+]?\d*\.\d+|\d+", str(value)) if number_str: item[key] = float(number_str[0]) # 取第一个匹配到的数字并转换为 float else: item[key] = value # 如果没有数字部分,保留原值 except ValueError: item[key] = value # 如果遇到无法转换的值,保留原值 except AttributeError: print(f"解析失败:第{index}行,错误信息:包裹数据为空") continue size = [] size.append(item['长']) size.append(item['宽']) size.append(item['高']) weight = item['重量'] size.sort() length = size[2] width = size[1] height = size[0] base_df.loc[index,'is_first'] = 1 base_df.loc[index, '长'] = length base_df.loc[index, '宽'] = width base_df.loc[index, '高'] = height base_df.loc[index, '重量'] = weight print(f"{row['SKU']}尺寸为:{width},h:{height},d:{length},w:{weight}") base_df
取SPU下所有SKU
In [ ]:
# 取这些SPU下的所有SKU及其现在售价 # from sell.sell_price import call_sell_and_order_price import json from utils.gtools import MySQLconnect import pandas as pd base_df=df.copy() spu_list = ( base_df['SPU'] .apply(pd.to_numeric, errors='coerce') .dropna() .astype(int) .astype(str) .drop_duplicates() # 加这一行 .tolist() ) def chunk_list(lst, size): for i in range(0, len(lst), size): yield lst[i:i+size] result_list = [] with MySQLconnect('ods') as db: enginal = db.engine() for chunk in chunk_list(spu_list, 100): quoted_spus = ','.join([f"'{spu}'" for spu in chunk]) # 加引号防止 SQL 错误 sql = f""" SELECT 产品品类, 产品分类, SPU, SKU, sku.成本价, 物流分摊, 产品售价 from stg_bayshop_litfad_spu spu LEFT JOIN stg_bayshop_litfad_sku sku ON sku.产品PID = spu.产品PID WHERE spu.SPU IN ({quoted_spus}) """ df_chunk = pd.read_sql(sql, enginal) result_list.append(df_chunk) print(f"已处理 {len(result_list) * 100} 个SPU") result = pd.concat(result_list, ignore_index=True) # 合并df all_df = pd.merge(result,base_df, on=['SPU','SKU'], how='left') all_df
所有的SKU 分类和汇总 ->层级一样的SKU¶
In [ ]:
# 按 SPU 分组; # 在组内按 成本价升序 排序; # 成本价相同的 SKU 属于同一个层次; # 层次号就是「第几种不同的成本价」。# all_df['层次'] = all_df.groupby('SPU')['成本价'].rank(method='dense').astype(int) all_df
哪几个层级有实际数据,估算其他没有数据的层级的数据¶
In [ ]:
In [ ]:
# 先把ERP包裹数据拆出来 import re import json for index, row in all_df.iterrows(): if not isinstance(row['ERP包裹数据'], str) or not row['ERP包裹数据']: print(f"第{index}行包裹数据为空或非字符串,跳过") continue try: package_dict = json.loads(row['ERP包裹数据']) except json.JSONDecodeError as e: print(f"解析失败:第{index}行,错误信息:{e}") continue count = len(package_dict) print(f"第{index}行,包裹数量:{count}") if count !=1: print(f"第{index}行,包裹数量不为1,跳过") continue try: for package in package_dict.values(): item = {} for key, value in package.items(): try: # 使用正则表达式提取数字部分 number_str = re.findall(r"[-+]?\d*\.\d+|\d+", str(value)) if number_str: item[key] = float(number_str[0]) # 取第一个匹配到的数字并转换为 float else: item[key] = value # 如果没有数字部分,保留原值 except ValueError: item[key] = value # 如果遇到无法转换的值,保留原值 except AttributeError: print(f"解析失败:第{index}行,错误信息:包裹数据为空") continue size = [] size.append(item['长']) size.append(item['宽']) size.append(item['高']) weight = item['重量'] size.sort() length = size[2] width = size[1] height = size[0] all_df.loc[index, 'ERP包裹数'] = count all_df.loc[index, 'ERP长'] = length all_df.loc[index, 'ERP宽'] = width all_df.loc[index, 'ERP高'] = height all_df.loc[index, 'ERP重量'] = weight print(f"{row['SKU']}尺寸为:{width},h:{height},d:{length},w:{weight}")
计算每个SPU的长宽高重量系数
In [ ]:
# 计算每个SPU的长宽高重量系数 def cal_size(old,new): try: old = float(old) new = float(new) if old == 0: return None # 或 return 0,防止除以0 return (new - old) / old except (ValueError, TypeError): return None # 遇到不能转为 float 的就返回 None test_df = all_df[all_df['is_first']==1] # 取基准数据SPU的系数 for index, row in test_df.iterrows(): test_df.loc[index, '长系数'] = cal_size(row['ERP长'],row['长']) test_df.loc[index, '宽系数'] = cal_size(row['ERP宽'],row['宽']) test_df.loc[index, '高系数'] = cal_size(row['ERP高'],row['高']) test_df.loc[index, '重量系数'] = cal_size(row['ERP重量'],row['重量']) print( f"{row['SPU']} 的系数为 " f"{test_df.loc[index, '长系数']}, " f"{test_df.loc[index, '宽系数']}, " f"{test_df.loc[index, '高系数']}, " f"{test_df.loc[index, '重量系数']}" ) # 将SPU的基准系数合并至all_df all_df = pd.merge(all_df, test_df[['SPU', '长系数', '宽系数', '高系数', '重量系数']], on='SPU', how='left') all_df.to_excel('单包裹SKU售价分析.xlsx', index=False)
计算每个sku的理论尺寸
In [ ]:
import pandas as pd all_df = pd.read_excel('单包裹SKU售价分析.xlsx') # 根据系数 得到所有SKU的预测尺寸 # 根据系数计算每个SKU的理论尺寸 all_df['理论长'] = ((1 + all_df['长系数']) * all_df['ERP长']).round(2) all_df['理论宽'] = ((1 + all_df['宽系数']) * all_df['ERP宽']).round(2) all_df['理论高'] = ((1 + all_df['高系数']) * all_df['ERP高']).round(2) all_df['理论重量'] = ((1 + all_df['重量系数']) * all_df['ERP重量']).round(2) all_df.to_excel('单包裹SKU售价分析.xlsx', index=False)
计算三种尺寸下的售价,计算预测后的尺寸下,一票一件订单的售价和订单价格
In [ ]:
# 计算三种尺寸下的售价 # 计算预测后的尺寸下,一票一件订单的售价和订单价格 from sell.sell_price import call_sell_and_order_price for index,row in all_df.iterrows(): price = row['成本价'] package_dict1={} package_dict2={} package_dict3={} try: package_dict1['包裹1'] = {} package_dict2['包裹1'] = {} package_dict3['包裹1'] = {} package_dict1['包裹1']['长'] = row['长'] package_dict1['包裹1']['宽'] = row['宽'] package_dict1['包裹1']['高'] = row['高'] package_dict1['包裹1']['重量'] = row['重量'] package_dict2['包裹1']['长'] = row['理论长'] package_dict2['包裹1']['宽'] = row['理论宽'] package_dict2['包裹1']['高'] = row['理论高'] package_dict2['包裹1']['重量'] = row['理论重量'] package_dict3['包裹1']['长'] = row['ERP长'] package_dict3['包裹1']['宽'] = row['ERP宽'] package_dict3['包裹1']['高'] = row['ERP高'] package_dict3['包裹1']['重量'] = row['ERP重量'] sell_price1, order_price1, order_type1 = call_sell_and_order_price(price, package_dict1,head_type="海运") sell_price2, order_price2, order_type2 = call_sell_and_order_price(price, package_dict2,head_type="海运") sell_price3, order_price3, order_type3 = call_sell_and_order_price(price, package_dict3,head_type="海运") except Exception as e: print(f"SKU: {row['SKU']} 报错: {e}") continue if isinstance(sell_price3, (int, float)): all_df.loc[index, 'ERP售价'] = sell_price3 else: all_df.loc[index, 'ERP售价'] = sell_price3[0] if isinstance(sell_price1, (int, float)): all_df.loc[index, '实际体积售价'] = sell_price1 else: all_df.loc[index, '实际体积售价'] = sell_price1[0] if isinstance(sell_price2, (int, float)): all_df.loc[index, '理论体积售价'] = sell_price2 else: all_df.loc[index, '理论体积售价'] = sell_price2[0] all_df.loc[index, 'ERP订单物流'] = order_price3 all_df.loc[index, '实际体积订单物流'] = order_price1 all_df.loc[index, '理论体积订单物流'] = order_price2 # all_df.loc[index, '理论体积订单类型'] = order_type2 print(f"SPU: {row['SPU']}, SKU {row['SKU']} ,网站售价: {row['产品售价']}, ERP售价: {sell_price3}, 实际体积售价: {sell_price1}, 理论体积售价: {sell_price2},")
写成可以上传批量修改尺寸的格式
In [ ]:
all_df = all_df[(all_df['是否有过修改记录']=="否")&(all_df['使用尺寸售价']!="ERP售价")] all_df['SPU最大涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(max) all_df['SPU最小涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(min) filtered_df = all_df[(all_df['SPU最大涨幅'] <= 0.5) & (all_df['SPU最小涨幅'] >= -0.5)] for index,row in filtered_df.iterrows(): if row['使用尺寸售价']=="实际体积售价": length = str(row['长']) width = str(row['宽']) height = str(row['高']) weight = str(row['重量']) else: length = str(row['理论长']) width = str(row['理论宽']) height = str(row['理论高']) weight = str(row['理论重量']) filtered_df.loc[index, '尺寸重量'] = f"{weight}|{length}*{width}*{height}*1," print(index) spu_list = filtered_df['SPU'].unique() filtered_df = filtered_df[['SKU','成本价','尺寸重量']] filtered_df