WeldingSpotPerformance/src/controller/passport_former.py

508 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Optional, Any, Tuple, List, Dict
import numpy as np
import pandas as pd
from loguru import logger
from base.base import (
BasePointPassportFormer,
BaseIdealDataBuilder,
PointPassport,
GraphicPassport,
Settings,
UsefulGraphData
)
class PassportFormer(BasePointPassportFormer):
"""
Класс для формирования паспортов (графических и точечных) по данным трассировки.
Основные возможности:
- Формирование паспортов для каждого DataFrame.
- Формирование паспорта заказчика на основе DataFrame и событий.
- Генерация событий по DataFrame.
- Построение идеальных данных для каждой точки.
- Формирование графического паспорта, включающего полезные данные и перечень паспортов точек.
Внимание: многие атрибуты (например, self._mediator, self._stages, self._clear_stage,
self._settings, self._ideal_data_cache, self._OptAlgorithm_operator_params,
self._OptAlgorithm_system_params) предполагается задавать извне (например, в базовом классе).
"""
def form_passports(self, data: List[pd.DataFrame]) -> None:
"""
Формирует паспорта для каждого DataFrame из списка.
В случае ошибки логируется сообщение, а медиатору отправляется пустой список.
:param data: Список DataFrame с данными трассировки.
"""
try:
passports = [self._build_from_df_only(df) for df in data]
except Exception as e:
logger.error(f"form_passports - Непредвиденная ошибка при формировании паспортов: {e}")
passports = []
finally:
self._mediator.notify(self, passports)
def update_settings(self, settings: Settings) -> None:
"""
Обновляет настройки формирования паспортов.
:param settings: Объект настроек.
"""
self._settings = settings
def form_customer_passport(self, data: Tuple[pd.DataFrame, Dict, Tuple]) -> None:
"""
Формирует паспорт заказчика на основе DataFrame и словаря событий.
Ожидается, что data содержит:
- DataFrame с данными,
- словарь событий,
- кортеж координат ME (например, (part_pos, open_pos)).
В случае ошибки логируется сообщение, а медиатору отправляется пустой список.
:param data: Кортеж из (DataFrame, события, ME_coords).
"""
try:
dataframe, events, ME_coords = data
point_quantity = len(events["Squeeze"][0])
self._modify_coord_settings(ME_coords)
customer_passport = self._form_graphic_passport(dataframe, events, point_quantity)
result = [customer_passport]
except Exception as e:
logger.error(f"form_customer_passport - Непредвиденная ошибка при формировании паспорта заказчика: {e}")
result = []
finally:
self._mediator.notify(self, result)
def _modify_coord_settings(self, ME_coords: Tuple[List[float], List[float]]) -> None:
"""
Модифицирует настройки координат на основе переданных данных ME.
:param ME_coords: Кортеж из двух списков: (part_pos, open_pos).
"""
part_pos, open_pos = ME_coords
self._settings.operator["distance_l_2"] = open_pos
l1 = self._settings.operator["distance_l_1"]
self._settings.operator["part_pos"] = [part_pos[i] + l1[i] for i in range(len(part_pos))]
@staticmethod
def _find_indexes(signal: str, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
"""
Находит индексы начала и окончания этапа для указанного сигнала.
:param signal: Имя столбца-сигнала.
:param df: DataFrame с данными.
:return: Кортеж из массивов индексов начала и окончания этапа.
"""
stage_diff = np.diff(df[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
@staticmethod
def _find_events(signal: str, times: pd.Series, df: pd.DataFrame) -> Tuple[List[float], List[float]]:
"""
Формирует списки времён начала и окончания событий для указанного сигнала.
Если у первого события не определено время начала, оно принимается за 0.
Если число стартов события больше числа финишей, последним финишем считается конец времён.
:param signal: Имя столбца-сигнала.
:param times: Series с временными метками.
:param df: DataFrame с данными.
:return: Кортеж из двух списков: (список времён начала, список времён окончания).
"""
start_idx, finish_idx = PassportFormer._find_indexes(signal, df)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
return (start_list, end_list)
def _generate_events(self, times: pd.Series, df: pd.DataFrame) -> Tuple[Dict[str, List[List[float]]], int]:
"""
Генерирует словарь событий для каждого этапа, используя временные метки и данные.
Также определяет общее количество точек (на основе количества событий основного этапа).
Если для основного этапа (self._clear_stage) не найдено ни одного события, возвращается пустой словарь и 0 точек.
:param times: Серия временных меток.
:param df: DataFrame с данными.
:return: Кортеж (словарь событий, количество точек).
"""
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, df)
point_quantity = len(start_list)
if point_quantity == 0:
logger.error(f"_generate_events - Не найдены события для этапа '{self._clear_stage}'.")
return {}, 0
for stage in self._stages:
s_list, e_list = self._find_events(stage, times, df)
temp = min(len(s_list), len(e_list))
if temp < point_quantity:
logger.warning(f"_generate_events - Недостаточное количество событий для этапа '{stage}'. "
f"Ожидается {point_quantity}, получено {temp}. Заполнение нулями/единицами.")
s_list += [0] * (point_quantity - temp)
e_list += [1] * (point_quantity - temp)
events[stage] = [s_list, e_list]
return (events, point_quantity)
def _build_ideal_data(self, idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
point_settings: Optional[Settings] = None) -> Dict:
"""
Строит идеальные данные с использованием билдера.
:param idealDataBuilder: Класс-билдер для генерации идеальных данных.
:param point_settings: Настройки для точки.
:return: Словарь с идеальными данными для этапов.
"""
try:
self._opt_algorithm = idealDataBuilder(point_settings)
stage_ideals = {
"Closing": self.opt_algorithm.get_closingDF(),
"Squeeze": self.opt_algorithm.get_compressionDF(),
"Welding": self.opt_algorithm.get_weldingDF(),
"Relief": self.opt_algorithm.get_openingDF(),
"Oncomming": self.opt_algorithm.get_oncomingDF(),
"Ideal cycle": self.opt_algorithm.get_cycle_time(),
"Ideal timings": self.opt_algorithm.get_ideal_timings()
}
return stage_ideals
except Exception as e:
logger.error(f"_build_ideal_data - Ошибка при построении идеальных данных: {e}")
return {}
def _generate_cache_key(self, point_settings: Settings) -> Tuple[Tuple[Tuple[str, Any], ...], Tuple[Tuple[str, Any], ...]]:
"""
Преобразует настройки точки в хешируемый ключ для кэша.
Использует только те параметры оператора и системы, которые присутствуют в
соответствующих наборах (self._OptAlgorithm_operator_params и self._OptAlgorithm_system_params).
:param point_settings: Объект настроек для точки.
:return: Кортеж из двух frozenset с параметрами.
"""
try:
operator_tuple = frozenset(
(key, value)
for key, value in point_settings.operator.items()
if str(key) in self._OptAlgorithm_operator_params
)
system_tuple = frozenset(
(key, value)
for key, value in point_settings.system.items()
if str(key) in self._OptAlgorithm_system_params
)
return (operator_tuple, system_tuple)
except Exception as e:
logger.error(f"_generate_cache_key - Ошибка при генерации ключа кэша: {e}")
return ((), ())
def _build_from_df_only(self, df: pd.DataFrame) -> Optional[GraphicPassport]:
"""
Строит GraphicPassport на основе одного DataFrame.
Если DataFrame содержит необходимые столбцы (определяемые в self._stages),
генерируются события; иначе используется альтернативное определение количества точек.
Если количество точек равно нулю, логируется ошибка и возвращается None.
:param df: DataFrame с данными.
:return: Объект GraphicPassport или None в случае ошибки.
"""
try:
if df is not None and set(self._stages).issubset(set(df.columns.tolist())):
events, _ = self._generate_events(df["time"], df)
point_quantity = len(events["Welding"][0])
if point_quantity == 0:
logger.error("_build_from_df_only - Не найдено ни одного события в DataFrame.")
return None
else:
events = None
key = list(self._settings.operator.keys())[0]
point_quantity = len(self._settings.operator[key])
self._settings.operator["part_pos"] = self._settings.operator["distance_l_2"]
passport = self._form_graphic_passport(df, events, point_quantity)
return passport
except Exception as e:
logger.error(f"_build_from_df_only - Непредвиденная ошибка: {e}")
return None
def _form_graphic_passport(self, df: pd.DataFrame, events: Dict, point_quantity: int) -> Optional[GraphicPassport]:
"""
Формирует графический паспорт, включающий полезные данные и список PointPassport.
Для каждой из точек:
- Извлекаются настройки оператора для данной точки.
- Формируется временной интервал и события.
- Рассчитываются идеальные и полезные данные.
- Создаётся объект PointPassport, который добавляется в список.
:param df: DataFrame с данными.
:param events: Словарь событий, сгенерированных для этапов.
:param point_quantity: Количество точек.
:return: Объект GraphicPassport или None в случае ошибки.
"""
try:
system_settings = {key: value[0] for key, value in self._settings.system.items()}
graphic_passport = GraphicPassport(
df,
[],
self._form_graphic_useful_data(system_settings)
)
for i in range(point_quantity):
point_settings = Settings(self._get_operator_settings_part(i), system_settings)
timeframe, po_events = self._form_point_events(events, i)
if timeframe and po_events:
point_settings.operator["time_robot_movement"] = po_events["Oncomming"][1] - po_events["Oncomming"][0]
ideal_data = self._form_point_ideal_data(point_settings)
useful_data = self._form_point_useful_data(point_settings.operator)
point_passport = PointPassport(timeframe, po_events, ideal_data, useful_data)
graphic_passport.points_pocket.append(point_passport)
return graphic_passport
except Exception as e:
logger.error(f"_form_graphic_passport - Ошибка при формировании графического паспорта: {e}")
return None
def _form_graphic_useful_data(self, system_settings: Dict) -> UsefulGraphData:
"""
Формирует словарь полезных данных для графического паспорта.
:param system_settings: Словарь системных настроек.
:return: Объект UsefulGraphData.
"""
try:
tesla_time = sum(self._settings.operator.get("Tesla summary time", []))
useful_data = UsefulGraphData(
tesla_time,
system_settings["Range ME, mm"],
system_settings["k_hardness_1"]
)
return useful_data
except Exception as e:
logger.error(f"_form_graphic_useful_data - Ошибка при формировании полезных данных: {e}")
return UsefulGraphData()
def _form_point_useful_data(self, operator_settings: Dict) -> Dict:
"""
Формирует полезные данные для отдельной точки.
:param operator_settings: Словарь настроек оператора для точки.
:return: Словарь с полезными данными (толщина, позиция детали, сила) или пустой словарь.
"""
try:
useful_data = {
"thickness": operator_settings["object_thickness"],
"part_pos": operator_settings["part_pos"],
"force": operator_settings["force_target"]
}
return useful_data
except Exception as e:
logger.error(f"_form_point_useful_data - Ошибка при формировании полезных данных точки: {e}")
return {}
def _form_point_ideal_data(self, point_settings: Settings) -> Dict:
"""
Формирует идеальные данные для отдельной точки с использованием кэша.
Генерируется кэш-ключ из настроек точки, затем либо извлекаются ранее рассчитанные
данные, либо вычисляются новые с помощью билдера.
:param point_settings: Настройки точки.
:return: Словарь с идеальными данными или пустой словарь в случае ошибки.
"""
try:
cache_key = self._generate_cache_key(point_settings)
ideal_data = self._ideal_data_cache.get(
cache_key,
self._build_ideal_data(idealDataBuilder=IdealDataBuilder, point_settings=point_settings)
)
self._ideal_data_cache[cache_key] = ideal_data
return ideal_data
except Exception as e:
logger.error(f"_form_point_ideal_data - Ошибка при формировании идеальных данных точки: {e}")
return {}
def _get_operator_settings_part(self, idx: int) -> Dict:
"""
Извлекает часть настроек оператора для конкретного индекса.
Если индекс выходит за пределы списка, используется первый элемент.
:param idx: Индекс точки.
:return: Словарь настроек оператора для данной точки.
"""
try:
operator_settings = {
key: (value[idx] if idx < len(value) else value[0])
for key, value in self._settings.operator.items()
}
return operator_settings
except Exception as e:
logger.error(f"_get_operator_settings_part - Ошибка при получении настроек оператора для индекса {idx}: {e}")
return {}
def _form_point_events(self, events: Dict, idx: int) -> Tuple[Optional[List[float]], Optional[Dict[str, List[float]]]]:
"""
Формирует временной интервал и события для отдельной точки.
Если событий нет, возвращает (None, None).
:param events: Словарь с событиями для всех этапов.
:param idx: Индекс точки.
:return: Кортеж (timeframe, point_events) или (None, None) в случае ошибки.
"""
try:
timeframe, point_events = None, None
if events is not None:
# Если первое событие основного этапа начинается с 0, сдвигаем индекс
idx_shift = idx + 1 if events[self._stages[-1]][0][0] == 0 else idx
timeframe = [events[self._stages[0]][0][idx], events[self._stages[-1]][1][idx_shift]]
point_events = {key: [value[0][idx], value[1][idx]] for key, value in events.items()}
return timeframe, point_events
except Exception as e:
logger.error(f"_form_point_events - Ошибка при формировании событий для точки {idx}: {e}")
return None, None
class IdealDataBuilder(BaseIdealDataBuilder):
"""
Класс для построения идеальных данных по этапам.
Реализует методы получения DataFrame для различных этапов:
закрытия, сжатия, открытия, движения, сварки,
а также метод получения идеальных временных интервалов.
"""
def get_closingDF(self) -> pd.DataFrame:
"""
Получает DataFrame для этапа закрытия.
"""
try:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
except Exception as e:
logger.error(f"get_closingDF - Ошибка при получении данных для этапа закрытия: {e}")
return pd.DataFrame()
def get_compressionDF(self) -> pd.DataFrame:
"""
Получает DataFrame для этапа сжатия.
"""
try:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
except Exception as e:
logger.error(f"get_compressionDF - Ошибка при получении данных для этапа сжатия: {e}")
return pd.DataFrame()
def get_openingDF(self) -> pd.DataFrame:
"""
Получает DataFrame для этапа открытия.
"""
try:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
except Exception as e:
logger.error(f"get_openingDF - Ошибка при получении данных для этапа открытия: {e}")
return pd.DataFrame()
def get_oncomingDF(self) -> pd.DataFrame:
"""
Получает DataFrame для этапа движения.
"""
try:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
except Exception as e:
logger.error(f"get_oncomingDF - Ошибка при получении данных для этапа движения: {e}")
return pd.DataFrame()
def get_weldingDF(self) -> pd.DataFrame:
"""
Получает DataFrame для этапа сварки.
Используется функция calcPhaseGrow с небольшим сдвигом времени.
Значения масштабируются (умножаются на 1000) для перевода в нужные единицы.
"""
try:
data = []
# Используем небольшое смещение для расчёта сварки
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'] - 0.0001)
X1, X2, V1, V2 = X1 * 1000, X2 * 1000, V1 * 1000, V2 * 1000
points_num = 5
for i in range(points_num + 1):
data.append({
"time": self.welding_time * i / points_num,
"Position FE": X1,
"Position ME": X2,
"Rotor Speed FE": V1,
"Rotor Speed ME": V2,
"Force": F
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"get_weldingDF - Ошибка при получении данных для этапа сварки: {e}")
return pd.DataFrame()
def get_ideal_timings(self) -> List[float]:
"""
Получает список идеальных временных интервалов для этапов.
Список включает: tclose, tgrow, welding_time, getMarkOpen(), tmovement.
"""
try:
data = self.Ts
ideal_timings = [
data['tclose'],
data['tgrow'],
self.welding_time,
self.getMarkOpen(),
data['tmovement']
]
return ideal_timings
except Exception as e:
logger.error(f"get_ideal_timings - Ошибка при получении идеальных временных интервалов: {e}")
return []
def _get_data(self, end_timestamp: float, func) -> pd.DataFrame:
"""
Получает данные до указанного времени (end_timestamp) с шагом, определяемым параметром mul.
Для каждого шага рассчитываются значения с использованием переданной функции func.
В конце добавляется строка с точным значением end_timestamp.
:param end_timestamp: Время окончания получения данных.
:param func: Функция для расчёта значений в зависимости от времени.
:return: DataFrame с рассчитанными данными.
"""
try:
data = []
# Генерируем данные с шагом 1/mul
for i in range(0, int(end_timestamp * self.mul) + 1):
time_val = i / self.mul
X1, X2, V1, V2, F = func(time_val)
data.append({
"time": time_val,
"Position FE": X1 * 1000,
"Position ME": X2 * 1000,
"Rotor Speed FE": V1 * 1000,
"Rotor Speed ME": V2 * 1000,
"Force": F
})
# Добавляем финальную строку с end_timestamp
X1, X2, V1, V2, F = func(end_timestamp)
data.append({
"time": end_timestamp,
"Position FE": X1 * 1000,
"Position ME": X2 * 1000,
"Rotor Speed FE": V1 * 1000,
"Rotor Speed ME": V2 * 1000,
"Force": F
})
return pd.DataFrame(data)
except Exception as e:
logger.error(f"_get_data - Ошибка при получении данных с end_timestamp={end_timestamp}: {e}")
return pd.DataFrame()