logistics/dataaccess/base_dao.py

66 lines
1.8 KiB
Python

"""基础数据访问类"""
import pandas as pd
from typing import Any, Dict, List, Optional
from utils.gtools import MySQLconnect
class BaseDAO:
"""基础数据访问对象"""
_cache: Dict[str, pd.DataFrame] = {}
_cache_enabled: bool = True
def __init__(self, table_name: str):
self.table_name = table_name
def _get_connection(self, dbname: str = None) -> MySQLconnect:
"""获取数据库连接"""
return MySQLconnect(dbname)
def get_all(self, dbname: str = None) -> pd.DataFrame:
"""获取所有数据"""
cache_key = f"{dbname}:{self.table_name}"
if self._cache_enabled and cache_key in self._cache:
return self._cache[cache_key]
with self._get_connection(dbname) as conn:
df = pd.read_sql(f"SELECT * FROM {self.table_name}", conn.con)
if self._cache_enabled:
self._cache[cache_key] = df
return df
def get_by_condition(self, conditions: Dict[str, Any], dbname: str = None) -> pd.DataFrame:
"""根据条件查询数据"""
where_clause = " AND ".join([f"{k} = '{v}'" for k, v in conditions.items()])
query = f"SELECT * FROM {self.table_name} WHERE {where_clause}"
with self._get_connection(dbname) as conn:
df = pd.read_sql(query, conn.con)
return df
def execute_query(self, query: str, dbname: str = None) -> pd.DataFrame:
"""执行自定义查询"""
with self._get_connection(dbname) as conn:
df = pd.read_sql(query, conn.con)
return df
@classmethod
def clear_cache(cls):
"""清空缓存"""
cls._cache.clear()
@classmethod
def disable_cache(cls):
"""禁用缓存"""
cls._cache_enabled = False
@classmethod
def enable_cache(cls):
"""启用缓存"""
cls._cache_enabled = True