{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "取ERP采购价+ERP尺寸+实际尺寸,需要国家+条目+邮编+order_id\n" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "C:\\Users\\joker\\AppData\\Local\\Temp\\ipykernel_34576\\54471085.py:105: UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.\n", " df=pd.read_sql(\"SELECT * FROM `order_complet4` WHERE buy_amount is not null and `实际尺寸售价` IS NULL limit 100\",db.con)\n" ] } ], "source": [ "import pandas as pd\n", "from utils.gtools import MySQLconnect\n", "\n", "# 读取需要计算的包裹信息\n", "with MySQLconnect('ads') as db:\n", " sql = r\"\"\" \n", " # 限制范围是测量时间,取得SKU种类为1且数量为1的订单,且重复SKU只取最近的订单\n", "\n", "WITH\n", "t1 AS (\n", "SELECT\n", "order_id,\n", "SKU,\n", "order_date,\n", "sum(CASE WHEN opl.order_product_id LIKE '%\\_%' ESCAPE '\\\\' \n", " AND opl.order_product_id NOT LIKE '%\\_%\\_%' ESCAPE '\\\\' THEN product_num END) AS product_num,\n", "DATE_FORMAT(order_date,\"%Y-%m-%d\") AS 订单时间,\n", "count(opl.SKU) AS 产品种类\n", "FROM\n", "dws.fact_order_product_list opl\n", "WHERE\n", " NOT EXISTS (\n", " SELECT 1 \n", " FROM dws.log_order_reissue_detail AS r \n", " WHERE r.order_product_id = opl.order_product_id\n", " )\n", "AND order_date >= \"20250501\"\n", "AND order_date < \"20250612\"\n", "AND SKU <> \"\"\n", "GROUP BY order_id\n", ")\n", ",\n", "t2 AS (\n", "SELECT\t\t\t\n", " a.`包裹测量时间`,\n", "\t\t\t\t\t\tt1.order_id,\n", "\t\t\t\t\t\tt1.SKU,\n", "\t\t\t\t\t\tt1.order_date,\n", " a.包裹号,\n", " a.快递公司,\n", " a.运输方式,\n", "\t\t\t\t\t\ta.`目的国`,\n", " d.postcode,\n", " CONCAT(\n", " '\"', b.package, '\": {',\n", " '\"长\": ', length, ', ',\n", " '\"宽\": ', width, ', ',\n", " '\"高\": ', hight, ', ',\n", " '\"重量\": ', weight, '}'\n", " ) AS package_json\n", " FROM\n", "\t\t\t\tt1\n", " LEFT JOIN order_express a ON t1.order_id = a.单号\n", " JOIN package_vol_info b ON a.`包裹号` = b.package\n", " JOIN order_list d ON a.`单号` = d.order_id \n", " WHERE\n", " a.`包裹状态` IN ( '客户签收', '已经投递') \n", " AND b.hight > 0 \n", " AND b.length > 0 \n", " AND b.width > 0 \n", " AND b.hight > 0 \n", " AND b.weight > 0\n", "-- AND a.`目的国` = \"United States\"\n", "\t\t\t\t\t\tAND t1.product_num = 1\n", "\t\t\t\t\t\tAND t1.产品种类=1\n", "\t\t\t\t\t\tAND a.`包裹测量时间` >= '2025-05-01'\n", "\t\t\t\t\t\tAND a.`包裹测量时间` < '2025-06-12'\n", "),\n", "t3 AS (\n", "SELECT\n", "t2.*,\n", "sku.成本价 AS ERP采购价,\n", "ess.erp_package_vol AS ERP包裹数据,\n", "CONCAT('{', GROUP_CONCAT(package_json SEPARATOR ','), '}') AS 实际包裹数据,\n", "ROW_NUMBER() OVER (PARTITION BY SKU ORDER BY 包裹测量时间 DESC) as rn\n", "FROM\n", "t2\n", "LEFT JOIN dwd.dim_erp_sku_package_vol_info ess ON t2.SKU=ess.erp_sku\n", "LEFT JOIN stg_bayshop_litfad_sku sku ON t2.SKU=sku.SKU\n", "WHERE\n", "ess.`erp_package_vol`<>\"{}\" AND ess.`erp_package_vol`<>\"\"\n", "GROUP BY order_id\n", ")\n", "SELECT\n", "包裹测量时间,\n", "order_id,\n", "SKU,\n", "DATE_FORMAT(order_date,\"%Y-%M-%D\") AS 订单时间,\n", "包裹号,\n", "`快递公司`,\n", "`运输方式`,\n", "`目的国`,\n", "postcode,\n", "ERP采购价,\n", "ERP包裹数据,\n", "实际包裹数据\n", "FROM\n", "t3\n", "WHERE\n", "rn=1\n", "\n", "\n", "\n", " \"\"\"\n", " df=pd.read_sql(\"SELECT * FROM `order_complet4` WHERE buy_amount is not null and `实际尺寸售价` IS NULL limit 100\",db.con)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "取实际采购价(当前已有ERP采购价+ERP尺寸+实际尺寸),输入:df['order_id'],输出:df['采购成本']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utils.gtools import MySQLconnect\n", "\n", "ods = MySQLconnect(\"ods\")\n", "engine = ods.engine()\n", "cursor = ods.connect().cursor()\n", "\n", "batch_size = 50000 # 每次查询 500 个 order_id,避免 SQL 语句过长\n", "order_id_list = df[\"order_id\"].drop_duplicates().tolist() # 取出所有 order_id\n", "# df['postcode'] = \"38016\"\n", "# 存储分批查询的结果\n", "result_dfs1 = []\n", "result_dfs2 = []\n", "for i in range(0, len(order_id_list), batch_size):\n", " batch_order_ids = order_id_list[i:i + batch_size] # 取当前批次的 order_id\n", " param = \",\".join(f\"'{order_id}'\" for order_id in batch_order_ids)\n", "\n", " purchase_order_sql = f\"\"\"\n", " WITH t1 AS (\n", " SELECT LEFT(ol.out_detials_outlink_id, 15) AS order_id,\n", " SUM(out_detials_qty * price) AS instock_cost,\n", " NULL AS buy_cost\n", " FROM ods.outstock_list ol\n", " JOIN ods.instock_list il ON ol.store_in_id = il.id \n", " WHERE LEFT(ol.out_detials_outlink_id, 15) IN ({param})\n", " GROUP BY LEFT(ol.out_detials_outlink_id, 15)\n", " \n", " UNION ALL\n", " \n", " SELECT LEFT(order_product_id, 15) AS order_id, \n", " NULL AS instock_cost,\n", " SUM(buy_num * actual_price) AS buy_cost\n", " FROM warehouse_purchasing\n", " WHERE LEFT(order_product_id, 15) IN ({param}) \n", " AND buy_audit = \"采购完成\"\n", " GROUP BY LEFT(order_product_id, 15)\n", " )\n", " SELECT order_id,\n", " SUM(CASE \n", " WHEN instock_cost IS NULL THEN buy_cost\n", " ELSE instock_cost \n", " END) AS 采购成本\n", " FROM t1 \n", " GROUP BY order_id\n", " \"\"\"\n", " \n", "\n", " batch_df1 = pd.read_sql(purchase_order_sql, con=engine) # 运行 SQL 查询\n", " result_dfs1.append(batch_df1) # 存入结果列表\n", " print(f\"已完成 {i + batch_size} 个 order_id 的查询\")\n", "\n", "# 合并所有查询结果\n", "purchase_order_df1 = pd.concat(result_dfs1, ignore_index=True)\n", "purchase_order_df1[\"order_id\"] = purchase_order_df1[\"order_id\"].astype(str)\n", "\n", "\n", "# 转换数据类型,确保匹配\n", "df[\"order_id\"] = df[\"order_id\"].astype(str)\n", "\n", "# 进行合并\n", "df = pd.merge(df, purchase_order_df1, on='order_id', how='left')\n", "# 复制到剪贴板\n", "df.to_clipboard(index=False)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "计算标准网站售价,输入尺寸,输出售价和订单物流费" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "ename": "NameError", "evalue": "name 'df' is not defined", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mNameError\u001b[0m Traceback (most recent call last)", "Cell \u001b[1;32mIn[3], line 16\u001b[0m\n\u001b[0;32m 14\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m (sell_price, order_price, order_type)\n\u001b[0;32m 15\u001b[0m \u001b[38;5;66;03m# 计算当前售价\u001b[39;00m\n\u001b[1;32m---> 16\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m index,row \u001b[38;5;129;01min\u001b[39;00m df\u001b[38;5;241m.\u001b[39miterrows():\n\u001b[0;32m 17\u001b[0m price \u001b[38;5;241m=\u001b[39m row[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mbuy_amount\u001b[39m\u001b[38;5;124m'\u001b[39m]\n\u001b[0;32m 18\u001b[0m \u001b[38;5;66;03m# package_dict = json.loads(row['erp_package_vol'])\u001b[39;00m\n", "\u001b[1;31mNameError\u001b[0m: name 'df' is not defined" ] } ], "source": [ "def call_sell_price(price, package_dict,head_type=\"海运\"):\n", " import json\n", " from sell.sell_price import call_sell_and_order_price\n", " try:\n", " package_dict = json.loads(package_dict)\n", " all_sell_price, order_price, order_type = call_sell_and_order_price(price, package_dict,head_type)\n", " except Exception as e:\n", " print(f\" 报错: {e}\")\n", " return (\"\",\"\",\"\")\n", " if all_sell_price == 0:\n", " return (\"\",\"\",\"\")\n", " sell_price= all_sell_price\n", " # logis_price = all_sell_price[1]\n", " return (sell_price, order_price, order_type)\n", "# 计算当前售价\n", "for index,row in df.iterrows():\n", " price = row['buy_amount']\n", " # package_dict = json.loads(row['erp_package_vol'])\n", " sell_price = call_sell_price(price, row['package_json'],\"海运\")\n", " print(sell_price)\n", " df.loc[index,'网站售价'] = sell_price[0]\n", " df.loc[index,'订单物流费'] = sell_price[1]\n", " df.loc[index,'尾端类型'] = sell_price[2]\n", " print(f\"SKU: {row['sku']} 网站售价: {sell_price[0]} 订单物流费: {sell_price[1]} 尾端类型: {sell_price[2]}\")\n", "df.to_clipboard(index=False)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "df.to_clipboard()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "计算实际渠道物流费用" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utils.countryOperator import OperateCountry\n", "from utils.logisticsBill import BillFactory\n", "from utils.Package import Package, Package_group\n", "import pandas as pd\n", "import json\n", "import re\n", "# 美国 \n", "from utils.logisticsBill import Billing\n", "import requests\n", "\n", "for index, row in df.iterrows():\n", " opCountry = OperateCountry('US')\n", " postcode = row['postcode']\n", " if pd.isna(postcode) or str(postcode).lower() == \"nan\":\n", " continue\n", " try:\n", " package_dict = json.loads(row['实际包裹数据'])\n", " except Exception as e:\n", " print(f\"行 {index} 解析失败: {e}\")\n", " print(row['实际包裹数据'])\n", " continue\n", " packages = Package_group()\n", " def extract_number(value):\n", " # 提取字符串中的第一个数字\n", " match = re.search(r\"[-+]?\\d*\\.\\d+|\\d+\", str(value))\n", " return float(match.group()) if match else 0.0\n", " for key, package in package_dict.items():\n", " package['长'] = extract_number(package['长'])\n", " package['宽'] = extract_number(package['宽'])\n", " package['高'] = extract_number(package['高'])\n", " package['重量'] = extract_number(package['重量'])\n", " \n", " if package['长'] == 0 or package['宽'] == 0 or package['高'] == 0 or package['重量'] == 0:\n", " continue\n", " packages.add_package(Package(key,package['长'], package['宽'], package['高'], package['重量']))\n", " if packages is None:\n", " continue\n", " if \"海运\" in row['运输方式']:\n", " head_type = 1\n", " else:\n", " head_type = 0\n", "\n", " # if \"FEDEX-SAIR-G\" in row['快递公司']:\n", " # company_name = \"Fedex-GROUD\"\n", " # elif \"FEDEX-SAIR-H\" in row['快递公司']:\n", " # company_name = \"Fedex-HOME\"\n", " # elif \"FEDEX02\" in row['快递公司']:\n", " # company_name = \"Fedex-彩虹小马\"\n", " # elif \"大包\" in row['快递公司'] or row['快递公司'] == '海MS-FEDEX':\n", " # company_name = \"Fedex-金宏亚\"\n", " # elif \"GIGA\" in row['快递公司']:\n", " # company_name = \"大健-GIGA\"\n", " # elif \"CEVA\" in row['快递公司']:\n", " # company_name = \"大健-CEVA\"\n", " # elif \"USPS\" in row['快递公司']:\n", " # company_name = \"Fedex-GROUD\"\n", " # else:\n", " # company_name = \"大健-Metro\"\n", " \n", " bill = Billing(str(index),opCountry,packages,postcode,company_name=\"Fedex-GROUD\",head_type=head_type,beizhu='1')\n", " head_price = bill.head_amount[0]\n", " tail_price = bill.tail_amount[0]\n", " if \"USPS\" in row['快递公司']:\n", " tail_price = tail_price/2\n", " # df.loc[index,'头程CNY'] = head_price\n", " df.loc[index,'头程CNY'] = head_price\n", " # df.loc[index,'最优渠道'] = bill.company_name\n", " print(f\"行 {index} 处理完成\")\n", " \n", "df.to_clipboard(index=False)\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from utils.gtools import MySQLconnect\n", "import pandas as pd\n", "df = pd.read_clipboard()\n", "log = MySQLconnect('logistics')\n", "pd.io.sql.to_sql(df, 'table_name', con=log.engine(), if_exists='replace', index=False)" ] } ], "metadata": { "kernelspec": { "display_name": "base", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 2 }