57 KiB
57 KiB
In [ ]:
import pandas as pd from utils.gtools import MySQLconnect # 读取需要计算的包裹信息 with MySQLconnect('ods') as db: sql = r""" WITH t1 AS ( SELECT order_id, SKU, order_date, sum(CASE WHEN opl.order_product_id LIKE '%\_%' ESCAPE '\\' AND opl.order_product_id NOT LIKE '%\_%\_%' ESCAPE '\\' THEN product_num END) AS product_num, DATE_FORMAT(order_date,"%Y-%m-%d") AS 订单时间, count(opl.SKU) AS 产品种类 FROM dws.fact_order_product_list opl WHERE NOT EXISTS ( SELECT 1 FROM dws.log_order_reissue_detail AS r WHERE r.order_product_id = opl.order_product_id ) AND order_date >= "20250101" AND order_date < "20250601" AND SKU <> "" GROUP BY order_id ) , t2 AS ( SELECT a.`包裹测量时间`, t1.order_id, t1.SKU, t1.order_date, a.包裹号, a.快递公司, a.运输方式, a.`目的国`, d.postcode, CONCAT( '"', b.package, '": {', '"长": ', length, ', ', '"宽": ', width, ', ', '"高": ', hight, ', ', '"重量": ', weight, '}' ) AS package_json FROM t1 LEFT JOIN order_express a ON t1.order_id = a.单号 JOIN package_vol_info b ON a.`包裹号` = b.package JOIN order_list d ON a.`单号` = d.order_id WHERE a.`包裹状态` IN ( '客户签收', '已经投递') AND b.hight > 0 AND b.length > 0 AND b.width > 0 AND b.hight > 0 AND b.weight > 0 AND a.`目的国` = "United States" AND t1.product_num = 1 AND t1.产品种类=1 AND a.`包裹测量时间` >= '2025-03-01' AND a.`包裹测量时间` < '2025-06-01' ), t3 AS ( SELECT t2.*, sku.成本价 AS ERP采购价, ess.包裹数据 AS ERP包裹数据, CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据, ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn FROM t2 LEFT JOIN ads.new_erp_sku_size ess ON t2.SKU=ess.SKU LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU WHERE ess.`包裹数据`<>'' GROUP BY order_id ) SELECT 包裹测量时间, order_id, SKU, DATE_FORMAT(order_date,"%Y-%M-%D") AS 订单时间, 包裹号, `快递公司`, `运输方式`, `目的国`, postcode, ERP采购价, ERP包裹数据, 实际包裹数据 FROM t3 WHERE rn=1 """ df=pd.read_sql(sql,db.con) print(df) df.to_clipboard(index=False)
C:\Users\Admin\AppData\Local\Temp\ipykernel_18728\4048319598.py:101: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy. df=pd.read_sql(sql,db.con)
包裹测量时间 order_id SKU 订单时间 \
0 2025-03-01 14:06:35 250225025414549 22696 2025-February-25th
1 2025-04-02 15:44:20 250330050647870 220934 2025-March-30th
2 2025-03-03 16:31:27 250225062602893 221468 2025-February-25th
3 2025-03-15 10:37:19 250311002220317 222890 2025-March-11th
4 2025-04-03 18:16:25 250331190637251 224072 2025-March-31st
... ... ... ... ...
13489 2025-05-04 16:39:49 250429213610207 2207148642 2025-April-29th
13490 2025-05-04 13:03:06 250501025810935 2207161446 2025-May-1st
13491 2025-05-05 17:41:38 250430063602142 2207176415 2025-April-30th
13492 2025-05-02 13:33:34 250430035236822 2207192083 2025-April-30th
13493 2025-05-08 20:15:50 250430104809255 2207202010 2025-April-30th
包裹号 快递公司 运输方式 目的国 postcode ERP采购价 \
0 991344640 海MS-FEDEX 海运 United States 55414 50.0
1 991399418 海MS-FEDEX 海运 United States 37014 45.0
2 991348761 海MS-FEDEX02 海运 United States 11803 65.0
3 991369401 海MS-FEDEX02 海运 United States 54568-9248 35.0
4 991401728 海MS-FEDEX 海运 United States 34112 180.0
... ... ... ... ... ... ...
13489 991447512 海MS-FEDEX-SAIR-H 海运 United States 32168 NaN
13490 991446594 海MS-FEDEX-SAIR-H 海运 United States 92647 NaN
13491 991449178 海MS-FEDEX-SAIR-H 海运 United States 90019 518.0
13492 991444756 海MS-FEDEX-SAIR-H 海运 United States 78572 228.0
13493 991454248 海MS-FEDEX-SAIR-H 海运 United States 89129 NaN
ERP包裹数据 \
0 {"包裹1": {"宽": "21.0", "长": "21.0", "高": "21.0"...
1 {"包裹1": {"宽": "41.0", "长": "41.0", "高": "21.0"...
2 {"包裹1": {"宽": "32.0", "长": "33.0", "高": "28.0"...
3 {"包裹1": {"宽": "28.0", "长": "28.0", "高": "13.0"...
4 {"包裹1": {"宽": "25.0", "长": "75.0", "高": "28.0"...
... ...
13489 {"包裹1": {"宽": "70.0", "长": "190.0", "高": "20.0...
13490 {"包裹1": {"宽": "48.0", "长": "54.0", "高": "14.0"...
13491 {"包裹1": {"宽": "30.0", "长": "193.0", "高": "13.0...
13492 {"包裹1": {"宽": "51.0", "长": "53.0", "高": "33.0"...
13493 {"包裹1": {"宽": "70.0", "长": "130.0", "高": "45.0...
实际包裹数据
0 {"991344640": {"长": 45.00, "宽": 45.00, "高": 17...
1 {"991399418": {"长": 41.00, "宽": 41.00, "高": 21...
2 {"991348761": {"长": 31.00, "宽": 31.00, "高": 24...
3 {"991369401": {"长": 28.00, "宽": 28.00, "高": 12...
4 {"991401728": {"长": 75.00, "宽": 25.00, "高": 16...
... ...
13489 {"991447512": {"长": 208.00, "宽": 54.00, "高": 1...
13490 {"991446594": {"长": 53.00, "宽": 48.00, "高": 20...
13491 {"991449178": {"长": 193.00, "宽": 30.00, "高": 1...
13492 {"991444756": {"长": 53.00, "宽": 51.00, "高": 33...
13493 {"991454248": {"长": 141.00, "宽": 76.00, "高": 2...
[13494 rows x 12 columns]
In [ ]:
from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group import pandas as pd # 美国 from utils.logisticsBill import Billing import requests def get_package(packages_id): """ 根据订单号的包裹ID,返回包裹类,包裹明细,包裹实重,包裹体积重6000 """ url = f'https://cp.maso.hk/index.php?main=biphp&act=package_fund&key=JJ57S744ZJR26ORGRMMSJ8V4D4UVF5AU&package={package_id}' resp = requests.get(url).json() if resp['code'] == "0": weight = int(float(resp['data'][0]['weight'])*1000) package_length = resp['data'][0]['l'].replace(",","") if len(resp['data'][0]['l'])>0 else "0" package_width = resp['data'][0]['w'].replace(",","") if len(resp['data'][0]['w'])>0 else "0" package_hight = resp['data'][0]['h'].replace(",","") if len(resp['data'][0]['h'])>0 else "0" return float(package_length),float(package_width),float(package_hight),int(weight) # df1=pd.read_excel(r"F:\DOCUMENTS\WXWork\1688854527635889\Cache\File\2025-05\新建Microsoft Excel 工作表 (2)(1).xlsx",sheet_name="包裹成本信息") # df = df1[(df1['快递公司']=="海MS-大健-METRO")|(df1['快递公司']=="海MS-大健-CEVA")|(df1['快递公司']=="空LAX-大健-CEVA")] # df = df1 df_grouped= df.groupby('order_id') # for index,row in df.iterrows(): # if "CEVA" in row['快递公司']: # df.loc[index,'类型'] = "卡派ceva" # elif "GIGA" in row['快递公司']: # df.loc[index,'类型'] = "卡派giga" # elif "METRO" in row['快递公司']: # df.loc[index,'类型'] = "卡派metro" # elif any(x in row['快递公司'] for x in ["FEDEX-SAIR-G", "FEDEX-SAIR-H", "FEDEX02"]): # df.loc[index,'类型'] = "快递" # else: # df.loc[index,'类型'] = "其他" for order_num, group in df_grouped: if pd.isna(order_num) or str(order_num).lower() == "nan" or order_num == 0 or order_num == "手工包裹": continue order_num = str(int(order_num)) # opCountry = OperateCountry(group['目的国'].iloc[0]) opCountry = OperateCountry('US') postcode = group['postcode'].iloc[0] if pd.isna(postcode) or str(postcode).lower() == "nan": continue packages1= Package_group() packages2= Package_group() packages3= Package_group() # 记录包裹号 packages_id1=[] packages_id2=[] packages_id3=[] if "海" in group['快递公司'].iloc[0]: head_type = 1 else: head_type = 0 for index,row in group.iterrows(): print(f"正在处理包裹 {row['包裹号']}") df.at[index, 'order_id'] = str(int(row['order_id'])) # print(f"正在处理订单 {df.loc[index,'order_id']}") if any(pd.isna([row['长'], row['宽'], row['高'], row['重量']])): row['长'],row['宽'],row['高'],row['重量'] = get_package(row['包裹号']) if "FEDEX-SAIR-G" in row['快递公司']: company_name = "Fedex-GROUD" elif "FEDEX-SAIR-H" in row['快递公司']: company_name = "Fedex-HOME" elif "FEDEX02" in row['快递公司']: company_name = "Fedex-彩虹小马" package = Package(row['包裹号'],row['长'],row['宽'],row['高'],row['重量']) if row['类型'] == "快递": packages= Package_group() packages.add_package(package) bill = Billing(str(index),opCountry,packages,postcode,company_name=company_name,head_type=head_type,beizhu='1') head_price = bill.head_amount[0] tail_price = bill.tail_amount[0] df.loc[index,'头程'] = head_price df.loc[index,'尾端'] = tail_price elif row['类型'] == "卡派ceva": packages1.add_package(package) packages_id1.append(row['包裹号']) elif row['类型'] == "卡派giga": packages2.add_package(package) packages_id2.append(row['包裹号']) elif row['类型'] == "卡派metro": packages3.add_package(package) packages_id3.append(row['包裹号']) else: continue if len(packages1)>0: bill1 = Billing(str(index),opCountry,packages1,postcode,company_name="大健-CEVA",head_type=head_type,beizhu='1') for package_id in packages_id1: df.loc[df['包裹号']==package_id,'头程'] = bill1.head_amount[0] df.loc[df['包裹号']==package_id,'尾端'] = bill1.tail_amount[0] if len(packages2)>0: bill2 = Billing(str(index),opCountry,packages2,postcode,company_name="大健-GIGA",head_type=head_type,beizhu='1') for package_id in packages_id2: df.loc[df['包裹号']==package_id,'头程'] = bill2.head_amount[0] df.loc[df['包裹号']==package_id,'尾端'] = bill2.tail_amount[0] if len(packages3)>0: bill3 = Billing(str(index),opCountry,packages3,postcode,company_name="大健-Metro",head_type=head_type,beizhu='1') for package_id in packages_id3: df.loc[df['包裹号']==package_id,'头程'] = bill3.head_amount[0] df.loc[df['包裹号']==package_id,'尾端'] = bill3.tail_amount[0] df.to_clipboard(index=False)
In [ ]:
df.to_clipboard(index=False)
In [ ]:
from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group # 美国 from utils.logisticsBill import Billing df_grouped= df.groupby('订单号') calculated_results = [] for order_num, group in df_grouped: # opCountry = OperateCountry(group['目的国'].iloc[0]) opCountry = OperateCountry('US') postcode = '33900' packages= Package_group() packages_dict = {} volume_weight = 0 weight = 0 for index,row in group.iterrows(): package = Package(row['包裹号'],row['长'],row['宽'],row['高'],row['重量']) packages.add_package(package) packages_dict[row['包裹号']] = { "长": row['长'], "宽": row['宽'], "高": row['高'], "重量": row['重量'] } # weight += row['重量']/1000 # volume_weight += package.get_volume_weight(6000) # postcode = row['postcode'] # head_type = 1 if row['运输方式'] == '海运' else 0 # try: # bill = Billing(str(index),opCountry,packages,postcode,company_name=None,head_type=head_type,beizhu='1') # head_price = bill.head_amount[0] # tail_price = bill.tail_amount[0] # tail_currency = bill.tail_amount[1] # except: # head_price ='出错' # tail_price = '出错' # tail_currency = '出错' result = { '订单号': order_num, '目的国': group['目的国'].iloc[0], # Same for all rows in the group '快递公司': group['快递公司'].iloc[0], # Same for all rows in the group '运输方式': group['运输方式'].iloc[0], # Same for all rows in the group 'postcode': group['postcode'].iloc[0], # Same for all rows in the group '销售额USD':group['销售额'].iloc[0], '真实尾端CNY': group['真实尾端CNY'].sum(),# Same for all rows in the group '真实头程CNY': group['真实头程CNY'].sum(), # Same for all rows in the group # '头程预测': head_price, # '尾端预测': tail_price, # '尾端货币': tail_currency, '产品品类': group['产品品类'].iloc[0], # Same for all rows in the group '产品分类': group['产品分类'].iloc[0], # Same for all rows in the group 'SKU': group['SKU'].iloc[0], # Same for all rows in the group '订单日期': group['订单日期'].iloc[0], # Same for all rows in the group 'packages_dict': packages_dict, '包裹数据':group['包裹数据'].iloc[0], } calculated_results.append(result) print(packages_dict) calculated_df = pd.DataFrame(calculated_results) # 将calculated_df的订单号改为order_id calculated_df.rename(columns={'订单号':'order_id'},inplace=True)
In [ ]:
len(df)
In [ ]:
from utils.gtools import MySQLconnect ods = MySQLconnect("ods") engine = ods.engine() cursor = ods.connect().cursor() batch_size = 50000 # 每次查询 500 个 order_id,避免 SQL 语句过长 order_id_list = df["order_id"].tolist() # 存储分批查询的结果 result_dfs1 = [] result_dfs2 = [] for i in range(0, len(order_id_list), batch_size): batch_order_ids = order_id_list[i:i + batch_size] # 取当前批次的 order_id param = ",".join(f"'{order_id}'" for order_id in batch_order_ids) purchase_order_sql = f""" WITH t1 AS ( SELECT LEFT(ol.out_detials_outlink_id, 15) AS order_id, SUM(out_detials_qty * price) AS instock_cost, NULL AS buy_cost FROM ods.outstock_list ol JOIN ods.instock_list il ON ol.store_in_id = il.id WHERE LEFT(ol.out_detials_outlink_id, 15) IN ({param}) GROUP BY LEFT(ol.out_detials_outlink_id, 15) UNION ALL SELECT LEFT(order_product_id, 15) AS order_id, NULL AS instock_cost, SUM(buy_num * actual_price) AS buy_cost FROM warehouse_purchasing WHERE LEFT(order_product_id, 15) IN ({param}) AND buy_audit = "采购完成" GROUP BY LEFT(order_product_id, 15) ) SELECT order_id, SUM(CASE WHEN instock_cost IS NULL THEN buy_cost ELSE instock_cost END) AS 采购成本 FROM t1 GROUP BY order_id """ # sql_biaozhun = f""" # SELECT # order_id, # sum(s1.`成本价`*product_num) AS "入库采购价" # FROM # dws.order_product_list opl LEFT JOIN ods.stg_bayshop_litfad_sku s1 on s1.sku = opl.sku # LEFT JOIN dwd.dim_erp_sku_package_vol_info des on des.erp_sku = opl.sku # WHERE # order_id IN ({param}) # AND order_product_id regexp '^[0-9]{{15}}_[1-9][0-9]*$' # GROUP BY order_id # """ batch_df1 = pd.read_sql(purchase_order_sql, con=engine) # 运行 SQL 查询 result_dfs1.append(batch_df1) # 存入结果列表 # batch_df2 = pd.read_sql(sql_biaozhun, con=engine) # 运行 SQL 查询 # result_dfs2.append(batch_df2) # 存入结果列表 print(f"已完成 {i + batch_size} 个 order_id 的查询") # 合并所有查询结果 purchase_order_df1 = pd.concat(result_dfs1, ignore_index=True) # purchase_order_df2 = pd.concat(result_dfs2, ignore_index=True) purchase_order_df1["order_id"] = purchase_order_df1["order_id"].astype(str) # purchase_order_df2["order_id"] = purchase_order_df2["order_id"].astype(str) # purchase_order_df = pd.merge(purchase_order_df1, purchase_order_df2, on='order_id', how='left') # 转换数据类型,确保匹配 df["order_id"] = df["order_id"].astype(str) # 进行合并 order_id_df_cal = pd.merge(df, purchase_order_df1, on='order_id', how='left') # order_id_df_cal.drop_duplicates(subset=["order_id"], inplace=True) # 复制到剪贴板 order_id_df_cal.to_clipboard(index=False)
In [ ]:
print(len(df),len(purchase_order_df1),len(order_id_df_cal))
用系统售价模型和实际体积计算实际应该有的售价和订单价¶
In [ ]:
import json import pandas as pd order_id_df_cal=pd.read_excel(r"F:\DOCUMENTS\WXWork\1688854527635889\Cache\File\2025-05\新建Microsoft Excel 工作表 (2)(1).xlsx",sheet_name="包裹成本信息") df=order_id_df_cal.groupby('order_id') for order_num,group in df: packages_dict = {} for index,row in group.iterrows(): if row["长"]==0 or row["宽"]==0 or row["高"]==0 or row["重量"]==0 or pd.isnull(row["长"]) or pd.isnull(row["宽"]) or pd.isnull(row["高"]) or pd.isnull(row["重量"]): continue package_dict = { "长":row['长'], "宽":row['宽'], "高":row['高'], "重量":row['重量'] } packages_dict[row['包裹号']] = package_dict print(packages_dict) order_id_df_cal.loc[order_id_df_cal['order_id']==order_num,'packages_dict'] = json.dumps(packages_dict) if len(packages_dict)>0 else None
In [ ]:
df.
In [ ]:
# 用系统售价模型和实际体积计算实际应该有的售价和订单价 from sell.sell_price import call_sell_and_order_price import re order_id_df_cal['预测售价'] = 0 for index, row in order_id_df_cal.iterrows(): # 如果没有预测售价这一列,正常执行 if row['预测售价'] > 0: continue print(row['packages_dict']) if not isinstance(row['packages_dict'], str): print(f"跳过第 {index} 行,packages_dict 不是字符串:{row['packages_dict']}") continue if not isinstance(package_dict, dict) or not package_dict: continue package_dict = json.loads(row['packages_dict']) # biaozhun_dict = row['包裹数据'] # if isinstance(biaozhun_dict, str): # biaozhun_dict = eval(biaozhun_dict) # if biaozhun_dict is None: # continue # for key, value in biaozhun_dict.items(): # for k, v in value.items(): # if isinstance(v, str): # v = v.replace(',', '') # 移除千分位逗号 # numbers = re.findall(r"\d+\.?\d*", v) # 提取所有数字部分 # if numbers: # biaozhun_dict[key][k] = float(numbers[0]) price = row['总订单采购成本'] # price1=row['入库采购价'] sell_price,order_price,order_type = call_sell_and_order_price(price, package_dict) # sell_price1,order_price1,order_type1 = call_sell_and_order_price(price1, biaozhun_dict) order_id_df_cal.loc[order_id_df_cal['order_id']==row['order_id'],'预测售价'] = sell_price order_id_df_cal.loc[order_id_df_cal['order_id']==row['order_id'],'预测订单价'] = order_price order_id_df_cal.loc[order_id_df_cal['order_id']==row['order_id'],'预测订单类型'] = order_type # order_id_df_cal.loc[index,'ERP售价'] = sell_price1 # order_id_df_cal.loc[index,'ERP订单价'] = order_price1 # order_id_df_cal.loc[index,'ERP订单类型'] = order_type1 print(f"第{index+1}个订单{row['order_id']}的预测售价为{sell_price},预测订单价为{order_price},订单类型为{order_type}") order_id_df_cal.to_clipboard(index=False)
In [ ]:
order_id_df_cal.to_clipboard(index=False)
In [ ]:
import re for index, row in order_id_df_cal.iterrows(): package_dict = row['packages_dict'] if not isinstance(package_dict, dict): continue # 跳过无效行 price = row['总订单采购成本'] try: sell_price, order_price, order_type = call_sell_and_order_price(price, package_dict) except Exception as e: print(f"订单 {row['order_id']} 报错: {e}") continue order_id_df_cal.loc[index, '预测售价'] = sell_price order_id_df_cal.loc[index, '预测订单价'] = order_price order_id_df_cal.loc[index, '预测订单类型'] = order_type print(f"第{index+1}个订单 {row['order_id']} 的预测售价为 {sell_price},预测订单价为 {order_price},订单类型为 {order_type}")
In [ ]:
order_id_df_cal.to_clipboard(index=False)
计算各条目订单SKU的标准体积下的物流成本和采购成本
In [ ]:
import pandas as pd from utils.gtools import MySQLconnect from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group import json # 美国 from utils.logisticsBill import Billing import re def get_package_info_by_sql(order_df): packages = Package_group() for index, row in order_df.iterrows(): order_id = row['订单号'] if row['包裹数据'] is None: return None, "" row['包裹数据'] = json.loads(row['包裹数据']) item_list = [] for package in row['包裹数据'].values(): item = {} for key, value in package.items(): try: # 使用正则表达式提取数字部分 number_str = re.findall(r"[-+]?\d*\.\d+|\d+", str(value)) if number_str: item[key] = float(number_str[0]) # 取第一个匹配到的数字并转换为 float else: item[key] = value # 如果没有数字部分,保留原值 except ValueError: item[key] = value # 如果遇到无法转换的值,保留原值 item_list.append(item) for item in item_list: if item['长'] == 0 or item['宽'] == 0 or item['高'] == 0 or item['重量'] == 0: return None, "" package = Package(row['SKU'], item['长'], item['宽'], item['高'], item['重量']) packages.add_package(package) return order_id, packages ods = MySQLconnect("ods") engine = ods.engine() cursor = ods.connect().cursor() with MySQLconnect("ods") as db: engine = db.engine() cursor = db.connect().cursor() sql = """ WITH t1 as ( SELECT opl.order_id, -- opl.cate_3, `产品品类`, `产品分类`, # opl.SKU, SUM(opl.as_value_amount) AS 销售额, COUNT(`产品分类`) AS 条目数, sum(sku.`成本价`*product_num) AS "入库采购价" FROM dws.fact_order_product_list opl LEFT JOIN stg_bayshop_litfad_sku sku ON opl.SKU = sku.SKU LEFT JOIN stg_bayshop_litfad_spu spu ON sku.`产品PID` = spu.`产品PID` LEFT JOIN order_express oe ON opl.order_id = oe.单号 WHERE opl.order_date >= '20240901' AND opl.order_date<'20250101' AND opl.site_name REGEXP 'Litfad' AND opl.SKU IS NOT NULL AND EXISTS ( SELECT 1 FROM order_express oe_sub WHERE oe_sub.单号 = opl.order_id AND oe_sub.包裹状态 REGEXP '签收|投递' ) GROUP BY opl.order_id ) SELECT a.目的国, a.单号 AS `订单号`, a.运输方式, d.order_price_dollar AS '销售额', d.postcode, t1.产品品类, t1.产品分类, t1.入库采购价 FROM order_express a JOIN order_list d ON a.`单号` = d.order_id JOIN t1 ON a.单号 = t1.order_id WHERE 目的国 IN ('United Kingdom','United States','Australia','Germany','Spain','France') AND t1.条目数 = 1 AND t1.产品分类 IS NOT NULL ORDER BY d.order_id """ biaozhun_df = pd.read_sql(sql, engine) print(f"获取订单基本信息{biaozhun_df.shape}") batch_size = 10000 order_id_list = biaozhun_df["订单号"].tolist() result_dfs=[] for i in range(0, len(order_id_list), batch_size): batch_order_ids = order_id_list[i:i + batch_size] # 取当前批次的 order_id param = ",".join(f"'{order_id}'" for order_id in batch_order_ids) packages_sql =f""" SELECT CAST(opl.order_id AS CHAR) AS 订单号, opl.SKU, 包裹数据 FROM dws.order_product_list opl LEFT JOIN ads.new_erp_sku_size spi ON opl.SKU =spi.SKU WHERE order_id in ({param}) AND opl.order_product_id REGEXP '[0-9]{{15}}_[0-9]*$' """ batch_df = pd.read_sql(packages_sql, engine) result_df = batch_df.groupby("订单号").apply(lambda x: pd.Series(get_package_info_by_sql(x), index=["订单号", "Packages"])).reset_index(drop=True) result_dfs.append(result_df) print(f"已完成 {i} / {len(order_id_list)}") packages_df = pd.concat(result_dfs, ignore_index=True) # # 去除空值 # packages_df = packages_df[packages_df['Packages'].notnull()] # 转换数据类型,确保匹配 packages_df["订单号"] = packages_df["订单号"].astype(str) biaozhun_df["订单号"] = biaozhun_df["订单号"].astype(str) # 进行合并 biaozhun_df = pd.merge(biaozhun_df, packages_df, on='订单号', how='left') biaozhun_df.drop_duplicates(subset=["订单号"], inplace=True) print(biaozhun_df.head()) for index, row in biaozhun_df.iterrows(): order_id = row['订单号'] packages = row['Packages'] opCountry = OperateCountry('US') postcode ='33900' head_type = 1 if row['运输方式'] == '海运' else 0 if packages is None: continue try: bill = Billing(str(index),opCountry,packages,postcode,company_name=None,head_type=head_type,beizhu='1') head_price = bill.head_amount[0] tail_price = bill.tail_amount[0] tail_currency = bill.tail_amount[1] except: head_price ='出错' tail_price = '出错' tail_currency = '出错' print(f"{order_id} 头费:{head_price} {bill.head_amount[1]} 尾费:{tail_price} {tail_currency}") biaozhun_df.loc[index, '标准头程CNY'] = head_price biaozhun_df.loc[index, '标准尾程'] = tail_price biaozhun_df.loc[index, '尾程币种'] = tail_currency biaozhun_df.to_clipboard(index=False)
In [ ]:
biaozhun_df.to_clipboard(index=False)
In [ ]:
from utils.gtools import MySQLconnect import pandas as pd df=pd.read_excel(r"F:\DOCUMENTS\WXWork\1688854527635889\Cache\File\2025-03\导出订单维护任务数据2025-3-20.xlsx",sheet_name="导出订单维护任务数据2025-3-20") # df['订单数量'] = df['订单数量'].astype(str) # df = df[df['订单数量'].str.len() > 0] # df['订单数量'] = df['订单数量'].astype(float) # df = df[df['订单数量']>0] df=df[['erp sku','订单号']]
In [ ]:
from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group # 美国 from utils.logisticsBill import Billing df = df[df['erp sku'] != 0] # df的订单号转换字符串 df['订单号'] = df['订单号'].astype(str) # 过滤掉订单号长度为0的行 df = df[df['订单号'].str.len() > 0] sku_df = pd.DataFrame() with MySQLconnect("ods") as db: engine = db.engine() conn = db.connect() cursor = conn.cursor() df_group = df.groupby('erp sku') for sku, group in df_group: for index1,row in group.iterrows(): order_list = group["订单号"].tolist() param = ",".join(f"'{order_id}'" for order_id in order_list) sql = f""" SELECT order_id FROM dws.fact_order_product_list WHERE SKU = {sku} AND order_id IN ({param}) and site_type IN ("独立站") group by order_id """ # 找到sku的订单号,根据订单号计算该订单号的利润情况 order_id_df = pd.read_sql(sql, con=engine) if order_id_df.empty: continue order_id_df['sku']=sku sku_df = pd.concat([sku_df, order_id_df], axis=0, ignore_index=True) print(f"sku:{sku} 订单号数量:{len(order_id_df)}") print(sku_df)
In [ ]:
ods = MySQLconnect("ods") engine = ods.engine() cursor = ods.connect().cursor() sql = """ SELECT opl.order_id, `产品品类`, `产品分类`, SUM(opl.as_value_amount) AS 销售额, COUNT(`产品分类`) AS 条目数 FROM dws.fact_order_product_list opl LEFT JOIN stg_bayshop_litfad_sku sku ON opl.SKU = sku.SKU LEFT JOIN stg_bayshop_litfad_spu spu ON sku.`产品PID` = spu.`产品PID` WHERE opl.order_date >= '20240901' AND opl.order_date<'20250101' AND opl.site_name REGEXP 'Litfad' GROUP BY opl.order_id """ sku_df = pd.read_sql(sql, con=engine)
In [ ]:
# 找采购价 from utils.gtools import MySQLconnect ods = MySQLconnect("ods") engine = ods.engine() cursor = ods.connect().cursor() import pandas as pd order_id_list = sku_df["order_id"].tolist() param = ",".join(f"'{order_id}'" for order_id in order_id_list) # 取采购费 purchase_order_sql = f""" with t1 AS (SELECT LEFT ( ol.out_detials_outlink_id, 15 ) AS order_id, SUM( out_detials_qty * price )/ 7 AS instock_cost, NULL AS buy_cost FROM ods.outstock_list ol JOIN ods.instock_list il ON ol.store_in_id = il.id WHERE LEFT ( ol.out_detials_outlink_id, 15 ) IN ({param}) GROUP BY LEFT ( ol.out_detials_outlink_id, 15 ) UNION ALL SELECT LEFT ( order_product_id, 15 ) as order_id, NULL as instock_cost, SUM(buy_num * actual_price)/7 AS buy_cost FROM `warehouse_purchasing` WHERE LEFT ( order_product_id, 15 ) IN ({param}) AND buy_audit = "采购完成" group by LEFT ( order_product_id, 15 ) ) SELECT order_id, SUM(CASE WHEN instock_cost is null THEN buy_cost ELSE instock_cost END) AS pur_cost FROM t1 GROUP BY order_id """ purchase_order_df = pd.read_sql(purchase_order_sql, con=engine) # 转换数据类型,确保匹配 purchase_order_df["order_id"] = purchase_order_df["order_id"].astype(str) sku_df["order_id"] = sku_df["order_id"].astype(str) # 进行合并 sku_df = pd.merge(sku_df, purchase_order_df, on='order_id', how='left')
In [ ]:
# sku_df=sku_df[['order_id','sku','pur_cost_y']] sku_df.columns=['order_id','sku','采购价'] sku_df = sku_df[sku_df['采购价']>0] print(sku_df.head())
In [ ]:
# 取包裹数据 package_sql = f""" SELECT a.目的国, a.单号 AS `订单号`, a.包裹号, a.快递公司, a.运输方式, d.order_price_dollar AS `销售额`, d.postcode, b.length AS `长`, b.width AS `宽`, b.hight AS `高`, b.weight AS `重量` FROM order_express a JOIN package_vol_info b ON a.`包裹号` = b.package JOIN order_list d ON a.`单号` = d.order_id WHERE a.`包裹状态` IN ( '客户签收', '已经投递' ) AND b.hight > 0 AND b.length > 0 AND b.width > 0 AND b.hight > 0 AND b.weight > 0 AND a.单号 IN ({param}) ORDER BY order_id """ package_df = pd.read_sql(package_sql, con=engine) print("获取包裹数据成功") i = 0 package_grouped= package_df.groupby('订单号') calculated_results = [] for order_id, group2 in package_grouped: print(f"开始计算订单号{order_id}的账单,第{i}个订单") i += 1 opCountry = OperateCountry(group2['目的国'].iloc[0]) packages= Package_group() for index,row in group2.iterrows(): package = Package(row['包裹号'],row['长'],row['宽'],row['高'],row['重量']) packages.add_package(package) postcode = row['postcode'] head_type = 1 if row['运输方式'] == '海运' else 0 try: bill = Billing(str(index),opCountry,packages,postcode,company_name=None,head_type=head_type,beizhu='1') head_price = bill.head_amount[0] tail_price = bill.tail_amount[0] tail_currency = bill.tail_amount[1] except: head_price ='出错' tail_price = '出错' tail_currency = '出错' print(f"{bill}") result = { '订单号': order_id, '目的国': group2['目的国'].iloc[0], # Same for all rows in the group '快递公司': group2['快递公司'].iloc[0], # Same for all rows in the group '运输方式': group2['运输方式'].iloc[0], # Same for all rows in the group 'postcode': group2['postcode'].iloc[0], # Same for all rows in the group '销售额USD':group2['销售额'].iloc[0], '头程预测': head_price, '尾端预测': tail_price, '尾端货币': tail_currency, } calculated_results.append(result) calculated_df = pd.DataFrame(calculated_results) calculated_df.rename(columns={'订单号':'order_id'},inplace=True) sku_df["order_id"] = sku_df["order_id"].astype(str) calculated_df["order_id"] = calculated_df["order_id"].astype(str) # 进行合并 order_id_df_cal = pd.merge(calculated_df, sku_df, on='order_id', how='left')
In [ ]:
# order_id_df_cal = pd.merge(calculated_df, sku_df, on='order_id', how='left') order_id_df_cal.to_clipboard(index=False)
In [ ]:
from logisticsClass.logisticsTail_US import MetroLogistics_US from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group import pandas as pd from utils.gtools import MySQLconnect # 美国 from utils.logisticsBill import Billing ods = MySQLconnect('ods') cursor = ods.connect().cursor() sql = f"""SELECT oe.`包裹号`, oe.`单号`, oe.`关联提单号`, oe.`包裹状态`, oe.`目的国`, oe.`快递公司`, ol.postcode, pvi.length, pvi.width, pvi.hight, pvi.weight FROM `order_express` oe left JOIN package_vol_info pvi ON oe.`包裹号` = pvi.package LEFT JOIN order_list ol ON oe.`单号` =ol.order_id WHERE 关联提单号 IN ( 'BMOU4594999', 'CAIU9043860', 'GVCU5328406', 'TCNU7259447') AND `包裹状态` NOT REGEXP '已作废' """ df = pd.read_sql(sql, ods.connect()) print("查询完成") calculated_results = [] # package_list = ",".join(str(item) for item in df['ORDER#'].tolist()) # print(package_list) # query = f"SELECT package,length,width,hight,weight FROM `package_vol_info` WHERE `package` IN ({package_list})" # result = cursor.execute(query) # packages_info = cursor.fetchall() # print(packages_info) # 将df和result合并 # new_df = pd.DataFrame(packages_info, columns=['package', 'length', 'width', 'hight', 'weight']) # df = df.merge(new_df, left_on='ORDER#', right_on='package') # df = df.drop(columns='package') df_grouped= df.groupby('单号') MetroLogistics_US.refresh() for order_num, group in df_grouped: # opCountry = OperateCountry(group['目的国'].iloc[0]) opCountry = OperateCountry('US') postcode = str(group['postcode'].iloc[0]) packages= Package_group() packages_dict = {} # volume_weight = 0 # weight = 0 for index,row in group.iterrows(): length = float(row['length']) width = float(row['width']) hight = float(row['hight']) weight = float(row['weight']) package = Package(row['包裹号'],length,width,hight,weight) packages.add_package(package) # packages_dict[row['包裹号']] = { # "长": row['长'], # "宽": row['宽'], # "高": row['高'], # "重量": row['重量'] # } # weight += row['重量']/1000 # volume_weight += package.get_volume_weight(6000) # postcode = row['postcode'] # head_type = 1 if row['运输方式'] == '海运' else 0 try: bill1 = Billing(str(index),opCountry,packages,postcode,company_name='大健-Metro',head_type=1,beizhu='1') tail_price1 = bill1.tail_amount[0] except: tail_price1 = '出错' try: bill2 = Billing(str(index),opCountry,packages,postcode,company_name='大健-CEVA',head_type=1,beizhu='1') tail_price2 = bill2.tail_amount[0] except: tail_price2 = '出错' try: bill3 = Billing(str(index),opCountry,packages,postcode,company_name='大健-GIGA',head_type=1,beizhu='1') tail_price3 = bill3.tail_amount[0] except: tail_price3 = '出错' df.loc[df['单号']==order_num,'大健-METRO']= tail_price1 df.loc[df['单号']==order_num,'大健-CEVA']= tail_price2 df.loc[df['单号']==order_num,'大健-GIGA']= tail_price3 print(order_num) # print(result) # calculated_results.append(result) # # print(packages_dict) # calculated_df = pd.DataFrame(calculated_results) # 将calculated_df的订单号改为order_id # calculated_df.rename(columns={'订单号':'order_id'},inplace=True) print(df) df.to_clipboard(index=False)
In [ ]:
from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory from utils.Package import Package, Package_group import pandas as pd from utils.gtools import MySQLconnect # 美国 from utils.logisticsBill import Billing ods = MySQLconnect('ods') cursor = ods.connect().cursor() df = pd.read_excel(r'F:\DOCUMENTS\WXWork\1688854527635889\Cache\File\2025-04\Litfad 4-10-25 with data analysis-2(1).xlsx', sheet_name='CA- 96004885') package_list = ",".join(str(item) for item in df['ORDER#'].tolist()) print(package_list) query = f"SELECT package,length,width,hight,weight FROM `package_vol_info` WHERE `package` IN ({package_list})" result = cursor.execute(query) packages_info = cursor.fetchall() package_df = pd.DataFrame(packages_info, columns=['package','length','width','hight', 'weight']) package_df.to_clipboard(index=False)
In [ ]:
import pandas as pd from utils.gtools import MySQLconnect ods = MySQLconnect('ods') engine = ods.engine() cursor = ods.connect().cursor() df = pd.read_excel(r'D:\test\logistics\售价_2024-01-01.xlsx') batch_size = 50000 # 每次查询 500 个 order_id,避免 SQL 语句过长 sku_list = df["SKU"].tolist() result_dfs1 = [] for i in range(0, len(sku_list), batch_size): batch_order_ids = sku_list[i:i + batch_size] # 取当前批次的 order_id param = ",".join(f"'{sku}'" for sku in batch_order_ids) sku_sql = f""" SELECT SKU, 产品售价, `产品品类`, `产品分类` FROM `stg_bayshop_litfad_sku` sku LEFT JOIN stg_bayshop_litfad_spu spu ON sku.`产品PID` =spu.`产品PID` WHERE SKU IN ({param}) """ batch_df1 = pd.read_sql(sku_sql, con=engine) # 运行 SQL 查询 result_dfs1.append(batch_df1) print(f"已完成 {i} / {len(sku_list)}") sku_df = pd.concat(result_dfs1) # 合并结果 merge_df = pd.merge(df, sku_df, on="SKU", how="left") # 合并数据 merge_df.to_excel(r'D:\test\logistics\售价_sku_1-9月.xlsx', index=False) # 保存结果
筛选取原尺寸还是理论预估尺寸
In [ ]:
import pandas as pd all_df = pd.read_excel('单包裹SKU售价分析1.xlsx',sheet_name="Sheet1") all_df = all_df[(all_df['是否有过修改记录']=="否")&(all_df['使用尺寸售价']!="ERP售价")] all_df['SPU最大涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(max) all_df['SPU最小涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(min) filtered_df = all_df[(all_df['SPU最大涨幅'] <= 0.5) & (all_df['SPU最小涨幅'] >= -0.5)] for index,row in filtered_df.iterrows(): if row['使用尺寸售价']=="实际体积售价": length = str(row['长']) width = str(row['宽']) height = str(row['高']) weight = str(row['重量']) else: length = str(row['理论长']) width = str(row['理论宽']) height = str(row['理论高']) weight = str(row['理论重量']) filtered_df.loc['尺寸重量']=weight+"|"+length+"*"+width+"*"+height+"*1," print(filtered_df.loc['尺寸重量']) spu_list = filtered_df['SPU'].unique() filtered_df = filtered_df[['SKU','成本价','尺寸重量']] filtered_df
C:\Users\Admin\AppData\Local\Temp\ipykernel_8364\2603640831.py:4: FutureWarning: The provided callable <built-in function max> is currently using SeriesGroupBy.max. In a future version of pandas, the provided callable will be used directly. To keep current behavior pass the string "max" instead.
all_df['SPU最大涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(max)
C:\Users\Admin\AppData\Local\Temp\ipykernel_8364\2603640831.py:5: FutureWarning: The provided callable <built-in function min> is currently using SeriesGroupBy.min. In a future version of pandas, the provided callable will be used directly. To keep current behavior pass the string "min" instead.
all_df['SPU最小涨幅']=all_df.groupby('SPU')['售价涨跌幅'].transform(min)
10{"stdout":"[{\"variableName\": \"ID_TO_MEANING\", \"type\": \"dictionary\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"builtins.dict\"}, {\"variableName\": \"NULL\", \"type\": \"unknown\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"_pydevd_bundle.pydevd_constants.Null\"}]\n","stderr":"","mime":[]}
10{"stdout":"[{\"variableName\": \"ID_TO_MEANING\", \"type\": \"dictionary\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"builtins.dict\"}, {\"variableName\": \"NULL\", \"type\": \"unknown\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"_pydevd_bundle.pydevd_constants.Null\"}]\n","stderr":"","mime":[]}
10{"stdout":"[{\"variableName\": \"ID_TO_MEANING\", \"type\": \"dictionary\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"builtins.dict\"}, {\"variableName\": \"NULL\", \"type\": \"unknown\", \"supportedEngines\": [\"pandas\"], \"isLocalVariable\": true, \"rawType\": \"_pydevd_bundle.pydevd_constants.Null\"}]\n","stderr":"","mime":[]}