WeldingSpotPerformance/src/controller/passport_former.py

243 lines
10 KiB
Python
Raw Normal View History

import traceback
import sys
from typing import Optional, Any
import numpy as np
import pandas as pd
from loguru import logger
from base.base import BasePointPassportFormer, BaseIdealDataBuilder, PointPassport, GraphicPassport, Settings
class PassportFormer(BasePointPassportFormer):
def form_passports(self, data: list[pd.DataFrame]) -> list[GraphicPassport]:
try:
return_data = [self._build_graphic_passport(dataframe) for dataframe in data]
except:
# TODO: обработка исключений!!!
tb = sys.exc_info()[2]
# TODO: Нормальные сообщения в лог!
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
return_data = []
finally:
self._mediator.notify(self, return_data)
def update_settings(self, settings: Settings):
self._settings = settings
@staticmethod
def _find_indexes(signal: str,
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
point_settings: Settings = None) -> dict:
self.opt_algorithm = idealDataBuilder(point_settings)
stage_ideals = {
"Closing": self._opt_algorithm.get_closingDF(),
"Squeeze": self._opt_algorithm.get_compressionDF(),
"Welding": self._opt_algorithm.get_weldingDF(),
"Relief": self._opt_algorithm.get_openingDF(),
"Oncomming": self._opt_algorithm.get_oncomingDF(),
"Ideal cycle": self._opt_algorithm.get_cycle_time(),
"Ideal timings": self._opt_algorithm.get_ideal_timings()
}
return stage_ideals
def _generate_cache_key(self,
point_settings:Settings) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует point_settings в хешируемый ключ для кэша.
"""
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in point_settings.operator.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in point_settings.system.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
def _build_graphic_passport(self, dataframe: pd.DataFrame) -> GraphicPassport:
if dataframe is not None:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
else:
events = None
key = list(self._settings.operator.keys())[0]
point_quantity = len(self._settings.operator[key])
graphic_passport = GraphicPassport()
graphic_passport.dataframe = dataframe
graphic_passport.points_pocket = []
system_settings = {key: value[0] for key, value in self._settings.system.items()}
graphic_passport.useful_data = self._form_graphic_useful_data(system_settings)
for i in range(point_quantity):
point_settings = Settings()
point_settings.operator = self._get_operator_settings_part(i)
point_settings.system = system_settings
point_passport = PointPassport()
point_passport.ideal_data = self._form_point_ideal_data(point_settings)
point_passport.useful_data = self._form_point_useful_data(point_settings.operator)
point_passport.timeframe, point_passport.events = self._form_point_events(events, i)
graphic_passport.points_pocket.append(point_passport)
return graphic_passport
def _form_graphic_useful_data(self, system_settings:dict) -> dict:
tesla_time = sum(self._settings.operator.get("Tesla summary time", []))
useful_data = {"tesla_time": tesla_time,
"range_ME": system_settings["Range ME, mm"],
"k_hardness": system_settings["k_hardness_1"]
}
return useful_data
def _form_point_useful_data(self, operator_settings:dict) -> dict:
useful_data = {"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
return useful_data
def _form_point_ideal_data(self, point_settings:Settings) -> dict:
cache_key = self._generate_cache_key(point_settings)
ideal_data = self._ideal_data_cashe.get(cache_key,
self._build_ideal_data(idealDataBuilder=IdealDataBuilder, params=point_settings))
self._ideal_data_cashe[cache_key] = ideal_data
return ideal_data
def _get_operator_settings_part(self, idx:int) -> dict:
operator_settings = {
key: (value[idx] if idx < len(value) else value[0])
for key, value in self._settings.operator.items()
}
return operator_settings
def _form_point_events(self, events:dict, idx) -> list:
timeframe, point_events = None, None
if events is not None:
idx_shift = idx+1 if events[self._stages[-1]][0][0] == 0 else idx
timeframe = [events[self._stages[0]][0][idx], events[self._stages[-1]][1][idx_shift]]
point_events = {key: [value[0][idx], value[1][idx]] for key, value in events.items()}
return timeframe, point_events
class IdealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
def get_compressionDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
def get_openingDF(self) -> pd.DataFrame:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
def get_oncomingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
def get_weldingDF(self) -> pd.DataFrame:
data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
data.append({
"time":0,
"Position FE":X1,
"Position ME":X2,
"Rotor Speed FE":V1,
"Rotor Speed ME":V2,
"Force":F
})
data.append({
"time":self.welding_time,
"Position FE":X1,
"Position ME":X2,
"Rotor Speed FE":V1,
"Rotor Speed ME":V2,
"Force":F
})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]:
data = self.Ts
ideal_timings = [
data['tclose'],
data['tgrow'],
self.welding_time,
self.getMarkOpen(),
data['tmovement']
]
return ideal_timings
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({
"time":time,
"Position FE":X1*1000,
"Position ME":X2*1000,
"Rotor Speed FE":V1*1000,
"Rotor Speed ME":V2*1000,
"Force":F
})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({
"time":end_timestamp,
"Position FE":X1*1000,
"Position ME":X2*1000,
"Rotor Speed FE":V1*1000,
"Rotor Speed ME":V2*1000,
"Force":F
})
return pd.DataFrame(data)