import os import pandas as pd from utils.Package import Package,Package_group from utils.gtools import MySQLconnect from utils.countryOperator import OperateCountry from utils.logisticsBill import BillFactory, Billing import requests ods = MySQLconnect("ods") engine = ods.engine() cursor = ods.connect().cursor() def get_package_real_vol_by_api(packages_id): packages = Package_group() packages_str = "" index = 1 for package_id in packages_id: # 计算往年利润率 # sql = f"SELECT length,width,hight,weight FROM package_vol_info WHERE package = %s" # cursor.execute(sql, (package_id,)) # resp = cursor.fetchall() # if len(resp) == 0: # continue # else: # weight = resp[0][3] # package_length = resp[0][0] # package_width = resp[0][1] # package_hight = resp[0][2] # 拦截 url = f'https://cp.maso.hk/index.php?main=biphp&act=package_fund&key=W6BOYJ7BH27YCGRFCA0LWBVKMU1KRU5Q&package={package_id}' resp = requests.get(url).json() if resp['code'] == "0": weight = int(float(resp['data'][0]['weight'])*1000) package_length = resp['data'][0]['l'].replace(",","") if len(resp['data'][0]['l'])>0 else "0" package_width = resp['data'][0]['w'].replace(",","") if len(resp['data'][0]['w'])>0 else "0" package_hight = resp['data'][0]['h'].replace(",","") if len(resp['data'][0]['h'])>0 else "0" package_str = f"{weight/1000}|{package_length}*{package_width}*{package_hight}" packages_str += package_str + "," index += 1 package = Package(str(package_id),float(package_length),float(package_width),float(package_hight),weight) packages.add_package(package) return packages,packages_str def get_order_bill(opCountry,order_id,packages_id,postcode,convey,amount): print(order_id) beizhu = amount conveys = 1 if convey == "海运" else 0 try: packages,packages_str = get_package_real_vol_by_api(packages_id=packages_id) bill = Billing(str(order_id),opCountry,packages,postcode,company_name=None,head_type=conveys,beizhu=beizhu) print(bill) bill_data = bill.bill_dict() # ✅ 只调用一次 result = bill_data.get("预测尾端", 0) _type = bill_data.get("尾端渠道", "") head_amount = bill_data.get("预测头程CNY", 0) total_amount = bill_data.get("总金额USD", 0) volume_weight = bill_data.get("体积重", 0) per_head = bill_data.get("头程单价", 0) return volume_weight,_type,per_head,head_amount,result,total_amount,packages_str except ZeroDivisionError as e: print(e) return "没测量" ,0,0,0,0,0,"" excel_path = r'D:\test\logistics\拦截数据\订单数据.xlsx' country_list = ['United Kingdom','United States','Australia','Germany','Spain','France'] # country_list = ['United States'] for country in country_list: order_id = f""" SELECT # ol.order_date, CONCAT("[",GROUP_CONCAT(pr.包裹号),"]") AS package_group , ol.order_id , ol.postcode, ol.delivery_country, ol.convey, ol.order_price_dollar FROM parcel pr LEFT JOIN dwd.order_list ol ON ol.order_id = pr.订单号 WHERE # ol.order_date between "2024-10-01" and "2025-03-01" pr.生成时间 >= DATE_SUB(NOW(), INTERVAL 20 DAY) AND fund_status NOT REGEXP "等待" AND site_name REGEXP "litfad|kwoking|lakiq" # AND convey = "海运" # AND delivery_country = "United States" AND pr.订单号 = ol.order_id AND delivery_country regexp '{country}' AND NOT EXISTS ( SELECT 1 FROM `order_express` oe WHERE oe.包裹号 = pr.包裹号 AND oe.包裹状态 = "已作废") GROUP BY ol.order_id """ order_id_df = pd.read_sql(order_id,engine) if order_id_df.empty: print(f"{country}无订单") continue order_id_df.sort_values(by=['delivery_country'],inplace=True) countries = order_id_df['delivery_country'].unique() opCountry = OperateCountry(country) order_id_df_cal = order_id_df.copy() # 删除order_id_df_cal中已经存在的订单 if os.path.exists(excel_path): existing_df = pd.read_excel(excel_path) order_id_df_cal = order_id_df_cal[~(order_id_df_cal['order_id'].isin(existing_df['order_id']) & order_id_df_cal['package_group'].isin(existing_df['package_group']))] if order_id_df_cal.empty: continue order_id_df_cal[["体积重","_type","头程单价","头程(CNY)","尾端","总金额(USD)","实际体积"]] = order_id_df_cal.apply(lambda x: get_order_bill(opCountry, x['order_id'], eval(x['package_group']), x['postcode'], x['convey'], x['order_price_dollar']),axis=1,result_type='expand') order_id_df_cal = order_id_df_cal[~(order_id_df_cal['体积重']== "没测量")] order_id_df_cal['总金额(USD)'] = pd.to_numeric(order_id_df_cal['总金额(USD)'], errors='coerce') order_id_df_cal = order_id_df_cal.dropna(subset=['总金额(USD)']) order_id_df_cal = order_id_df_cal[order_id_df_cal['总金额(USD)'] <9999] order_id_df_cal['生成日期'] = pd.Timestamp.now().strftime('%Y/%m/%d') order_id_df_cal['order_id'] = order_id_df_cal['order_id'].astype(int) if order_id_df_cal.empty: print(f"{country}没有需要测量的订单。") continue if not os.path.exists(excel_path): order_id_df_cal.to_excel(excel_path, index=False, sheet_name='Sheet1') else: exist_df = pd.read_excel(excel_path) new_rows = order_id_df_cal[~(order_id_df_cal['order_id'].isin(exist_df['order_id']) & order_id_df_cal['package_group'].isin(exist_df['package_group']))] updated_df = pd.concat([exist_df, new_rows], ignore_index=True) # order_id去重 updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last') if not new_rows.empty: # 将更新后的 DataFrame 写回 Excel 文件 updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1') print(f"已写入 {len(new_rows)} 条新数据。") order_id_df_cal = order_id_df_cal.drop(columns=['生成日期']) order_id_list = order_id_df_cal["order_id"].tolist() param = ",".join(f"'{order_id}'" for order_id in order_id_list) purchase_order_sql = f""" with t1 AS (SELECT LEFT ( ol.out_detials_outlink_id, 15 ) AS order_id, SUM( out_detials_qty * price )/ 7 AS instock_cost, NULL AS buy_cost FROM ods.outstock_list ol JOIN ods.instock_list il ON ol.store_in_id = il.id WHERE LEFT ( ol.out_detials_outlink_id, 15 ) IN ({param}) GROUP BY LEFT ( ol.out_detials_outlink_id, 15 ) UNION ALL SELECT LEFT ( order_product_id, 15 ) as order_id, NULL as instock_cost, SUM(buy_num * actual_price)/7 AS buy_cost FROM `warehouse_purchasing` WHERE LEFT ( order_product_id, 15 ) IN ({param}) AND buy_audit = "采购完成" group by LEFT ( order_product_id, 15 ) ) SELECT order_id, SUM(CASE WHEN instock_cost is null THEN buy_cost ELSE instock_cost END) AS pur_cost FROM t1 GROUP BY order_id """ purchase_order_df = pd.read_sql(purchase_order_sql, con=engine) purchase_order_df["order_id"] = purchase_order_df["order_id"].astype(str) order_id_df_cal["order_id"] = order_id_df_cal["order_id"].astype(str) order_id_df_cal = pd.merge(order_id_df_cal, purchase_order_df, on='order_id', how='left') def profit_cal(amount,cost): transaction_fees_rate = 0.031 cur_tran_rate = 0 store_cost = 2 profit = amount * (1 - transaction_fees_rate - cur_tran_rate) - store_cost - cost return round(profit, 2) order_id_df_cal["profit"] = order_id_df_cal.apply(lambda x: profit_cal(x["order_price_dollar"] , x["pur_cost"] + x ["总金额(USD)"]), axis=1) order_id_df_cal["profit_rate"] = round(order_id_df_cal["profit"] / order_id_df_cal["order_price_dollar"],2) order_id_df_cal["rejust"] = order_id_df_cal.apply(lambda x: 1 if x["profit_rate"] < 0 else 0, axis=1) order_id_df_cal[order_id_df_cal['rejust'] == 1] order_id_df_cal['拦截处理日期'] = pd.to_datetime(pd.Timestamp.now().date()) # order_id_df_cal = order_id_df_cal[order_id_df_cal['rejust'] == 1] # import pendulum # dt = pendulum.now() # # 将日期时间对象转换为字符串 # dt_str = dt.to_format('YYYY-MM-DD') # finename = dt_str + "拦截.xlsx" def update_excel_with_new_data(excel_path, new_data_df): try: new_data_df['order_id'] = new_data_df['order_id'].astype(str) if not os.path.exists(excel_path): # 如果文件不存在,创建一个新的 Excel 文件 new_data_df = new_data_df[new_data_df['pur_cost'].notna()] new_data_df.to_excel(excel_path, index=False, sheet_name='Sheet1') print(f"文件不存在,已创建新文件并写入 {len(new_data_df)} 条数据。") df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期']).dt.strftime('%Y/%m/%d') else: existing_df = pd.read_excel(excel_path) existing_df['order_id'] = existing_df['order_id'].astype(str) new_rows = new_data_df[~(new_data_df['order_id'].isin(existing_df['order_id']) & new_data_df['package_group'].isin(existing_df['package_group']))] new_rows = new_rows[new_rows['pur_cost'].notna()] updated_df = pd.concat([existing_df, new_rows], ignore_index=True) # order_id去重 updated_df = updated_df.drop_duplicates(subset=['order_id'], keep='last') updated_df.to_excel(excel_path, index=False, sheet_name='Sheet1') df = pd.read_excel(excel_path) df = df.drop_duplicates(subset=['order_id'], keep='last') df['拦截处理日期'] = pd.to_datetime(df['拦截处理日期']) df.to_excel(excel_path, index=False, sheet_name='Sheet1') if not new_rows.empty: # 将更新后的 DataFrame 写回 Excel 文件 print(f"已写入 {len(new_rows)} 条新数据。") else: print("没有新数据需要添加。") except Exception as e: print(f"发生错误: {e}") update_excel_with_new_data(r'D:\test\logistics\拦截数据\拦截总表.xlsx',order_id_df_cal)