import sys sys.path.append("D:\workspace\logistics") from dataclasses import dataclass from typing import List, Literal import pandas as pd from utils.countryOperator import OperateCountry from utils.Package import Package, Package_group @dataclass class BillItem: """账单项""" amount: float # 金额 amount_usd: float # 金额(美元) item_type: Literal['头程','尾程'] # 项目类型 头程或尾程 item_detail: str # 项目明细(如提单费,尾端基础运费,尾端附加费等) currency: str # 货币单位(如 USD, EUR 等) # 应该有个模式,计算实际账单金额,或者得出最小费用金额,这个由工厂来控制,写两个账单类 class Billing: """账单""" def __init__(self, bill_name="wxx", operator: OperateCountry = None, packages: Package_group=None,postcode=None, company_name=None,head_type = 1,beizhu = None): self.operator = operator self.name = bill_name self.packages = packages self.postcode = postcode self.country = operator.country_name self.company_name = company_name self.head_type = head_type self.beizhu = beizhu self.items: List[BillItem] = [] # 存储账单项 self.volume_weight = 0 self.head_per = 0 self.logistic_type = None self.add_items_from_operator() def add_item(self, item: BillItem): """添加账单项""" self.items.append(item) def add_items_from_operator(self): """从Operator获取费用并添加账单项""" if self.packages is not None or self.postcode is not None: self.operator.set_packages_and_postcode(self.packages, self.postcode) # 设置包裹信息 company_name = self.company_name if self.company_name is not None else self.operator.get_min_company() # 取快递类型的最小公司 logistic_type = self.logistic_type if self.logistic_type is not None else self.operator.get_logistic_type(company_name) self.company_name = company_name self.logistic_type = logistic_type # 获取头程费用 head_detail = self.operator.get_ocean_fee() if self.head_type == 1 else self.operator.get_air_fee() head_fee = head_detail['head_amount'] self.head_per = head_detail['head_per'] self.volume_weight = head_detail['volume_weight'] # 获取尾程费用 detail_amount = self.operator.get_detail_amount(company_name) # 获取货币单位 tail_currency = self.operator.get_tail_currency(company_name) # 添加账单项 self.add_item(BillItem(amount = head_fee,amount_usd = self.operator.convert_to_usd(head_fee, 'CNY'), item_type = "头程", item_detail = "head_amount",currency = 'CNY')) for item_detail,amount in detail_amount.items(): if amount == 0 and item_detail != "tail_amount": continue self.add_item(BillItem(amount = round(amount,2),amount_usd = self.operator.convert_to_usd(round(amount,2), tail_currency), item_type = "尾程",item_detail=item_detail, currency=tail_currency)) def get_logistic_type(self): """获取渠道类型:快递/卡派""" return self.logistic_type def get_items(self): return self.items @property def tail_amount(self): for item in self.items: if item.item_type == "尾程" and item.item_detail == "tail_amount": tailfee = item.amount tailcurrency = item.currency break return tailfee,tailcurrency @property def head_amount(self): for item in self.items: if item.item_type == "头程" and item.item_detail == "head_amount": headfee = item.amount headcurrency = item.currency break return headfee,headcurrency @property def total_amount_usd(self): for item in self.items: if item.item_type == "头程" and item.item_detail == "head_amount": headfee = item.amount_usd if item.item_type == "尾程" and item.item_detail == "tail_amount": tailfee = item.amount_usd return headfee + tailfee def bill_dict(self): """返回账单字典""" result = {} result['ID'] = self.name result['体积重'] = self.volume_weight result['头程单价'] = self.head_per result['预测头程CNY'] = self.head_amount[0] result['预测尾端'] = self.tail_amount[0] for item in self.items: if item.item_type == "头程" or item.item_detail == 'tail_amount': continue result[item.item_detail] = item.amount result['总金额USD'] = self.total_amount_usd result['货币单位'] = self.tail_amount[1] result['尾端渠道'] = self.company_name result['备注'] = self.beizhu return result # 附加费明细 def get_other_fee(self): """返回账单附加费明细""" detail_items = {} for item in self.items: if item.item_type != "头程" and item.item_detail != "tail_amount" and item.item_detail != "base": detail_items[item.item_detail] = item.amount return detail_items def __repr__(self): result = "账单名称: " + str(self.name) + "\n" result += "账单项:" result += " ".join(f"{item.item_detail}: {item.amount} {item.currency}" for item in self.items) result += " 总金额: " + str(self.total_amount_usd) + " USD" return result class BillFactory: """账单工厂""" def __init__(self): self.bills = [] def create_bill(self, bill_name,operator: OperateCountry,packages = None,postcode = None,company_name = None,head_type=1,beizhu = None): bill = Billing(bill_name,operator,packages,postcode,company_name,head_type,beizhu) self.add_bill(bill) return bill def add_bill(self, bill): self.bills.append(bill) def get_bills(self): """获取全部账单列表""" return self.bills def bills_to_df(self): """将账单列表导存为df""" # 先定义数据结构 bills_dict = [] for bill in self.bills: bills_dict.append(bill.bill_dict()) df = pd.DataFrame(bills_dict) # 重排列顺序 columns_to_move = ['预测尾端', '总金额USD', '货币单位', '尾端渠道', '备注'] reordered_columns = [col for col in df.columns if col not in columns_to_move] + columns_to_move df = df[reordered_columns] return df def __repr__(self): result = "\n".join(str(bill) for bill in self.bills) return f"账单列表:\n{result}" if __name__ == '__main__': # 测试 package = Package("wxx",63,59,48,8000) packages= Package_group([package]) postcode = "PA2 9BF" billFactory = BillFactory() opCountry= OperateCountry('UK') company_name = "智谷" bill = billFactory.create_bill("wxx",opCountry,packages,postcode,company_name,1,"无") print(billFactory)