dev: добавлена обработка исключений в passport_former

This commit is contained in:
Andrew 2025-02-05 20:15:53 +03:00
parent c4bc353032
commit 731824918a

View File

@ -1,26 +1,22 @@
import traceback
import sys
from typing import Optional, Any from typing import Optional, Any
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from loguru import logger from loguru import logger
from base.base import BasePointPassportFormer, BaseIdealDataBuilder, PointPassport, GraphicPassport, Settings, UsefulGraphData from base.base import BasePointPassportFormer, BaseIdealDataBuilder, PointPassport, GraphicPassport, Settings, UsefulGraphData
class PassportFormer(BasePointPassportFormer): class PassportFormer(BasePointPassportFormer):
def form_passports(self, data: list[pd.DataFrame]) -> None: def form_passports(self, data: list[pd.DataFrame]) -> None:
"""
Формирует паспорта для каждого DataFrame из списка.
В случае ошибки логируется сообщение и возвращается пустой список.
"""
try: try:
return_data = [self._build_from_df_only(dataframe) for dataframe in data] return_data = [self._build_from_df_only(dataframe) for dataframe in data]
except: except Exception as e:
# TODO: обработка исключений!!! logger.error(f"form_passports - Непредвиденная ошибка при формировании паспортов: {e}")
tb = sys.exc_info()[2]
# TODO: Нормальные сообщения в лог!
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
return_data = [] return_data = []
finally: finally:
self._mediator.notify(self, return_data) self._mediator.notify(self, return_data)
@ -29,26 +25,26 @@ class PassportFormer(BasePointPassportFormer):
self._settings = settings self._settings = settings
def form_customer_passport(self, data: list[pd.DataFrame, dict]) -> None: def form_customer_passport(self, data: list[pd.DataFrame, dict]) -> None:
"""
Формирует паспорт заказчика на основе DataFrame и словаря событий.
В случае ошибки логируется сообщение и возвращается пустой список.
"""
try: try:
dataframe, events = data dataframe, events = data
point_quantity = len(events["Welding"][0]) point_quantity = len(events["Welding"][0])
data = self._form_graphic_passport(dataframe, events, point_quantity) data_passport = self._form_graphic_passport(dataframe, events, point_quantity)
return_data = [data] return_data = [data_passport]
except: except Exception as e:
#TODO: Добавить обработку исключений logger.error(f"form_customer_passport - Непредвиденная ошибка при формировании паспорта заказчика: {e}")
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
return_data = [] return_data = []
finally: finally:
self._mediator.notify(self, return_data) self._mediator.notify(self, return_data)
@staticmethod @staticmethod
def _find_indexes(signal: str, def _find_indexes(signal: str, dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]: """
Находит индексы начала и конца этапа для указанного сигнала.
"""
stage_diff = np.diff(dataframe[signal]) stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1) start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1) finish_idx = np.where(stage_diff == -1)
@ -56,8 +52,11 @@ class PassportFormer(BasePointPassportFormer):
@staticmethod @staticmethod
def _find_events(signal: str, def _find_events(signal: str,
times:pd.Series, times: pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]: dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
"""
Формирует списки времен начала и окончания событий для указанного сигнала.
"""
start_idx, finish_idx = PassportFormer._find_indexes(signal, dataframe) start_idx, finish_idx = PassportFormer._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]: if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
@ -66,203 +65,303 @@ class PassportFormer(BasePointPassportFormer):
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else [] end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1: if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1])) end_list.append(float(times.iloc[-1]))
return start_list, end_list return start_list, end_list
def _generate_events(self, def _generate_events(self,
times: pd.Series, times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]: dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
"""
Генерирует словарь событий для каждого этапа и определяет количество точек.
Если для основного этапа (_clear_stage) не найдено ни одного события,
логируется ошибка и возвращается пустой список.
"""
events = {} events = {}
point_quantity = 0 point_quantity = 0
if self._clear_stage in self._stages: if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe) start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list) point_quantity = len(start_list)
if point_quantity == 0: if point_quantity == 0:
#TODO: добавить обработку исключения logger.error("_generate_events - Не найдены события для этапа '{}'.", self._clear_stage)
return [] return {}, 0 # Возвращаем пустой словарь и 0 точек
for stage in self._stages: for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe) s_list, e_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)]) temp = min(len(s_list), len(e_list))
if temp < point_quantity: if temp < point_quantity:
print ("cant find enough", stage) logger.warning(f"_generate_events - Недостаточное количество событий для этапа '{stage}'. "
start_list += [0]*(point_quantity - temp) f"Ожидается {point_quantity}, получено {temp}. Заполнение нулями/единицами.")
end_list += [1]*(point_quantity - temp) s_list += [0] * (point_quantity - temp)
e_list += [1] * (point_quantity - temp)
events[stage] = [start_list, end_list] events[stage] = [s_list, e_list]
return events, point_quantity return events, point_quantity
def _build_ideal_data(self, def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None, idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
point_settings: Settings = None) -> dict: point_settings: Settings = None) -> dict:
self.opt_algorithm = idealDataBuilder(point_settings) """
stage_ideals = { Строит идеальные данные с использованием переданного билдера.
"Closing": self._opt_algorithm.get_closingDF(), """
"Squeeze": self._opt_algorithm.get_compressionDF(), try:
"Welding": self._opt_algorithm.get_weldingDF(), self.opt_algorithm = idealDataBuilder(point_settings)
"Relief": self._opt_algorithm.get_openingDF(), stage_ideals = {
"Oncomming": self._opt_algorithm.get_oncomingDF(), "Closing": self._opt_algorithm.get_closingDF(),
"Ideal cycle": self._opt_algorithm.get_cycle_time(), "Squeeze": self._opt_algorithm.get_compressionDF(),
"Ideal timings": self._opt_algorithm.get_ideal_timings() "Welding": self._opt_algorithm.get_weldingDF(),
} "Relief": self._opt_algorithm.get_openingDF(),
return stage_ideals "Oncomming": self._opt_algorithm.get_oncomingDF(),
"Ideal cycle": self._opt_algorithm.get_cycle_time(),
"Ideal timings": self._opt_algorithm.get_ideal_timings()
}
return stage_ideals
except Exception as e:
logger.error(f"_build_ideal_data - Ошибка при построении идеальных данных: {e}")
return {}
def _generate_cache_key(self, def _generate_cache_key(self,
point_settings:Settings) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]: point_settings: Settings) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
""" """
Преобразует point_settings в хешируемый ключ для кэша. Преобразует point_settings в хешируемый ключ для кэша.
""" """
# Преобразуем словари в отсортированные кортежи пар ключ-значение try:
operator_tuple = frozenset((key, value) operator_tuple = frozenset(
for key, value in point_settings.operator.items() (key, value)
if str(key) in self._OptAlgorithm_operator_params) for key, value in point_settings.operator.items()
if str(key) in self._OptAlgorithm_operator_params
system_tuple = frozenset((key, value)
for key, value in point_settings.system.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
def _build_from_df_only(self, dataframe: pd.DataFrame) -> GraphicPassport:
if (dataframe is not None and
set(self._stages).issubset(set(dataframe.columns.tolist()))):
events, point_quantity = self._generate_events(dataframe["time"], dataframe)
point_quantity = len(events["Welding"])
pass
if point_quantity == 0:
return []
else:
events = None
key = list(self._settings.operator.keys())[0]
point_quantity = len(self._settings.operator[key])
passport = self._form_graphic_passport(dataframe, events, point_quantity)
return passport
def _form_graphic_passport(self,
dataframe:pd.DataFrame,
events:dict,
point_quantity:int) -> GraphicPassport:
system_settings = {key: value[0] for key, value in self._settings.system.items()}
graphic_passport = GraphicPassport(dataframe, [], self._form_graphic_useful_data(system_settings))
for i in range(point_quantity):
point_settings = Settings(self._get_operator_settings_part(i), system_settings)
timeframe, po_events = self._form_point_events(events, i)
ideal_data = self._form_point_ideal_data(point_settings)
useful_data = self._form_point_useful_data(point_settings.operator)
point_passport = PointPassport(timeframe, po_events, ideal_data, useful_data)
graphic_passport.points_pocket.append(point_passport)
return graphic_passport
def _form_graphic_useful_data(self, system_settings:dict) -> dict:
tesla_time = sum(self._settings.operator.get("Tesla summary time", []))
useful_data = UsefulGraphData(
tesla_time,
system_settings["Range ME, mm"],
system_settings["k_hardness_1"]
) )
return useful_data system_tuple = frozenset(
(key, value)
for key, value in point_settings.system.items()
if str(key) in self._OptAlgorithm_system_params
)
return (operator_tuple, system_tuple)
except Exception as e:
logger.error(f"_generate_cache_key - Ошибка при генерации ключа кэша: {e}")
return ((), ())
def _form_point_useful_data(self, operator_settings:dict) -> dict: def _build_from_df_only(self, dataframe: pd.DataFrame) -> GraphicPassport:
useful_data = {"thickness": operator_settings["object_thickness"], """
Строит GraphicPassport на основе одного DataFrame.
Если DataFrame содержит необходимые столбцы, события генерируются;
иначе используется альтернативное определение point_quantity.
Если количество точек равно нулю, логируется ошибка и возвращается None.
"""
try:
if (dataframe is not None and
set(self._stages).issubset(set(dataframe.columns.tolist()))):
events, point_quantity = self._generate_events(dataframe["time"], dataframe)
point_quantity = len(events["Welding"][0])
if point_quantity == 0:
logger.error("_build_from_df_only - Не найдено ни одного события в DataFrame.")
return None
else:
events = None
key = list(self._settings.operator.keys())[0]
point_quantity = len(self._settings.operator[key])
passport = self._form_graphic_passport(dataframe, events, point_quantity)
return passport
except Exception as e:
logger.error(f"_build_from_df_only - Непредвиденная ошибка: {e}")
return None
def _form_graphic_passport(self,
dataframe: pd.DataFrame,
events: dict,
point_quantity: int) -> GraphicPassport:
"""
Формирует графический паспорт с полезными данными и списком PointPassport.
"""
try:
system_settings = {key: value[0] for key, value in self._settings.system.items()}
graphic_passport = GraphicPassport(dataframe, [], self._form_graphic_useful_data(system_settings))
for i in range(point_quantity):
point_settings = Settings(self._get_operator_settings_part(i), system_settings)
timeframe, po_events = self._form_point_events(events, i)
ideal_data = self._form_point_ideal_data(point_settings)
useful_data = self._form_point_useful_data(point_settings.operator)
point_passport = PointPassport(timeframe, po_events, ideal_data, useful_data)
graphic_passport.points_pocket.append(point_passport)
return graphic_passport
except Exception as e:
logger.error(f"_form_graphic_passport - Ошибка при формировании графического паспорта: {e}")
return None
def _form_graphic_useful_data(self, system_settings: dict) -> dict:
"""
Формирует словарь полезных данных для графика.
"""
try:
tesla_time = sum(self._settings.operator.get("Tesla summary time", []))
useful_data = UsefulGraphData(
tesla_time,
system_settings["Range ME, mm"],
system_settings["k_hardness_1"]
)
return useful_data
except Exception as e:
logger.error(f"_form_graphic_useful_data - Ошибка при формировании полезных данных: {e}")
return {}
def _form_point_useful_data(self, operator_settings: dict) -> dict:
"""
Формирует полезные данные для отдельной точки.
"""
try:
useful_data = {
"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"], "L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]} "force": operator_settings["force_target"]
return useful_data }
return useful_data
except Exception as e:
logger.error(f"_form_point_useful_data - Ошибка при формировании полезных данных точки: {e}")
return {}
def _form_point_ideal_data(self, point_settings:Settings) -> dict: def _form_point_ideal_data(self, point_settings: Settings) -> dict:
cache_key = self._generate_cache_key(point_settings) """
ideal_data = self._ideal_data_cashe.get(cache_key, Формирует идеальные данные для точки с использованием кэша.
self._build_ideal_data(idealDataBuilder=IdealDataBuilder, point_settings=point_settings)) """
self._ideal_data_cashe[cache_key] = ideal_data try:
return ideal_data cache_key = self._generate_cache_key(point_settings)
ideal_data = self._ideal_data_cashe.get(
cache_key,
self._build_ideal_data(idealDataBuilder=IdealDataBuilder, point_settings=point_settings)
)
self._ideal_data_cashe[cache_key] = ideal_data
return ideal_data
except Exception as e:
logger.error(f"_form_point_ideal_data - Ошибка при формировании идеальных данных точки: {e}")
return {}
def _get_operator_settings_part(self, idx:int) -> dict: def _get_operator_settings_part(self, idx: int) -> dict:
operator_settings = { """
Извлекает часть настроек оператора для конкретного индекса.
"""
try:
operator_settings = {
key: (value[idx] if idx < len(value) else value[0]) key: (value[idx] if idx < len(value) else value[0])
for key, value in self._settings.operator.items() for key, value in self._settings.operator.items()
} }
return operator_settings return operator_settings
except Exception as e:
logger.error(f"_get_operator_settings_part - Ошибка при получении настроек оператора для индекса {idx}: {e}")
return {}
def _form_point_events(self, events:dict, idx:int) -> list[list, dict]: def _form_point_events(self, events: dict, idx: int) -> list[list, dict]:
timeframe, point_events = None, None """
if events is not None: Формирует временной интервал и события для отдельной точки.
idx_shift = idx+1 if events[self._stages[-1]][0][0] == 0 else idx Если событий нет, возвращает (None, None).
timeframe = [events[self._stages[0]][0][idx], events[self._stages[-1]][1][idx_shift]] """
point_events = {key: [value[0][idx], value[1][idx]] for key, value in events.items()} try:
return timeframe, point_events timeframe, point_events = None, None
if events is not None:
idx_shift = idx + 1 if events[self._stages[-1]][0][0] == 0 else idx
timeframe = [events[self._stages[0]][0][idx], events[self._stages[-1]][1][idx_shift]]
point_events = {key: [value[0][idx], value[1][idx]] for key, value in events.items()}
return timeframe, point_events
except Exception as e:
logger.error(f"_form_point_events - Ошибка при формировании событий для точки {idx}: {e}")
return None, None
class IdealDataBuilder(BaseIdealDataBuilder): class IdealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame: def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose) try:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
except Exception as e:
logger.error(f"get_closingDF - Ошибка при получении данных для этапа закрытия: {e}")
return pd.DataFrame()
def get_compressionDF(self) -> pd.DataFrame: def get_compressionDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow) try:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
except Exception as e:
logger.error(f"get_compressionDF - Ошибка при получении данных для этапа сжатия: {e}")
return pd.DataFrame()
def get_openingDF(self) -> pd.DataFrame: def get_openingDF(self) -> pd.DataFrame:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen) try:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
except Exception as e:
logger.error(f"get_openingDF - Ошибка при получении данных для этапа открытия: {e}")
return pd.DataFrame()
def get_oncomingDF(self) -> pd.DataFrame: def get_oncomingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement) try:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
except Exception as e:
logger.error(f"get_oncomingDF - Ошибка при получении данных для этапа движения: {e}")
return pd.DataFrame()
def get_weldingDF(self) -> pd.DataFrame: def get_weldingDF(self) -> pd.DataFrame:
data = [] try:
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001) data = []
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000 X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'] - 0.0001)
data.append({ X1, X2, V1, V2 = X1 * 1000, X2 * 1000, V1 * 1000, V2 * 1000
"time":0,
"Position FE":X1,
"Position ME":X2,
"Rotor Speed FE":V1,
"Rotor Speed ME":V2,
"Force":F
})
data.append({
"time":self.welding_time,
"Position FE":X1,
"Position ME":X2,
"Rotor Speed FE":V1,
"Rotor Speed ME":V2,
"Force":F
})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]:
data = self.Ts
ideal_timings = [
data['tclose'],
data['tgrow'],
self.welding_time,
self.getMarkOpen(),
data['tmovement']
]
return ideal_timings
def _get_data(self, end_timestamp:float, func) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({ data.append({
"time":time, "time": 0,
"Position FE":X1*1000, "Position FE": X1,
"Position ME":X2*1000, "Position ME": X2,
"Rotor Speed FE":V1*1000, "Rotor Speed FE": V1,
"Rotor Speed ME":V2*1000, "Rotor Speed ME": V2,
"Force":F "Force": F
})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({
"time":end_timestamp,
"Position FE":X1*1000,
"Position ME":X2*1000,
"Rotor Speed FE":V1*1000,
"Rotor Speed ME":V2*1000,
"Force":F
}) })
return pd.DataFrame(data) data.append({
"time": self.welding_time,
"Position FE": X1,
"Position ME": X2,
"Rotor Speed FE": V1,
"Rotor Speed ME": V2,
"Force": F
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"get_weldingDF - Ошибка при получении данных для этапа сварки: {e}")
return pd.DataFrame()
def get_ideal_timings(self) -> list[float]:
try:
data = self.Ts
ideal_timings = [
data['tclose'],
data['tgrow'],
self.welding_time,
self.getMarkOpen(),
data['tmovement']
]
return ideal_timings
except Exception as e:
logger.error(f"get_ideal_timings - Ошибка при получении идеальных временных интервалов: {e}")
return []
def _get_data(self, end_timestamp: float, func) -> pd.DataFrame:
"""
Получает данные до указанного времени с шагом, определяемым mul.
"""
try:
data = []
for i in range(0, int(end_timestamp * self.mul) + 1):
time = i / self.mul
X1, X2, V1, V2, F = func(time)
data.append({
"time": time,
"Position FE": X1 * 1000,
"Position ME": X2 * 1000,
"Rotor Speed FE": V1 * 1000,
"Rotor Speed ME": V2 * 1000,
"Force": F
})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({
"time": end_timestamp,
"Position FE": X1 * 1000,
"Position ME": X2 * 1000,
"Rotor Speed FE": V1 * 1000,
"Rotor Speed ME": V2 * 1000,
"Force": F
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"_get_data - Ошибка при получении данных с end_timestamp={end_timestamp}: {e}")
return pd.DataFrame()