130 lines
5.8 KiB
Python
130 lines
5.8 KiB
Python
import pandas as pd
|
|
import traceback
|
|
import sys
|
|
from loguru import logger
|
|
|
|
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
|
|
|
|
|
class idealDataBuilder(BaseIdealDataBuilder):
|
|
def get_closingDF(self) -> pd.DataFrame:
|
|
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
|
|
|
def get_compressionDF(self) -> pd.DataFrame:
|
|
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
|
|
|
def get_openingDF(self) -> pd.DataFrame:
|
|
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
|
|
|
def get_oncomingDF(self) -> pd.DataFrame:
|
|
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
|
|
|
def get_weldingDF(self) -> pd.DataFrame:
|
|
data = []
|
|
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
|
|
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
|
|
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
|
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
|
return pd.DataFrame(data)
|
|
|
|
def get_ideal_timings(self) -> list[float]:
|
|
data = self.Ts
|
|
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
|
|
return ideal_timings
|
|
|
|
class PassportFormer(BasePointPassportFormer):
|
|
|
|
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, int]]:
|
|
try:
|
|
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
|
|
except:
|
|
tb = sys.exc_info()[2]
|
|
tbinfo = traceback.format_tb(tb)[0]
|
|
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
|
|
logger.error(pymsg)
|
|
return_data = []
|
|
finally:
|
|
self._mediator.notify(self, return_data)
|
|
|
|
|
|
def _get_time_value(self, dataframe: pd.DataFrame):
|
|
return dataframe['time'] if 'time' in dataframe else dataframe['Time']
|
|
|
|
|
|
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
|
|
|
|
if dataframe is not None:
|
|
events, point_quantity = self._filter_events(self._get_time_value(dataframe), dataframe)
|
|
if point_quantity == 0:
|
|
return []
|
|
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
|
else:
|
|
events = None
|
|
key = list(self._params[0].keys())[0]
|
|
point_quantity = len(self._params[0][key])
|
|
|
|
points_pocket = []
|
|
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
|
tesla_time = sum(self._params[0].get("Tesla summary time", []))
|
|
useful_data = {
|
|
"tesla_time": tesla_time,
|
|
"range": system_settings["gun_range"],
|
|
"k_hardness": system_settings["k_hardness_1"]
|
|
}
|
|
|
|
for i in range(point_quantity):
|
|
operator_settings = {
|
|
key: (value[i] if i < len(value) else value[0])
|
|
for key, value in self._params[0].items()
|
|
}
|
|
|
|
next_i = i + 1
|
|
|
|
P1 = operator_settings["object_position"] + 0.5 * operator_settings["object_thickness"]
|
|
P2 = operator_settings["object_position"] - 0.5 * operator_settings["object_thickness"]
|
|
|
|
if next_i < point_quantity:
|
|
next_operator_settings = {
|
|
key: (value[next_i] if i < len(value) else value[0])
|
|
for key, value in self._params[0].items()
|
|
}
|
|
|
|
next_P1 = next_operator_settings["object_position"] + 0.5*next_operator_settings["object_thickness"]
|
|
next_P2 = next_operator_settings["object_position"] - 0.5*next_operator_settings["object_thickness"]
|
|
|
|
displacement_me = next_P1 - P1
|
|
displacement_fe = next_P2 - P2
|
|
|
|
operator_settings["distance_h_end1"] -= displacement_fe
|
|
operator_settings["distance_h_end2"] += displacement_me
|
|
|
|
LFEAbs = operator_settings["distance_l_1"]
|
|
LMEAbs = operator_settings["distance_l_2"]
|
|
|
|
operator_settings["distance_l_1"] = P2 - LFEAbs
|
|
operator_settings["distance_l_2"] = LMEAbs - P1
|
|
|
|
params_list = [operator_settings, system_settings]
|
|
cache_key = self._generate_cache_key(params_list)
|
|
if cache_key in self._ideal_data_cashe:
|
|
ideal_data = self._ideal_data_cashe[cache_key]
|
|
else:
|
|
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
|
self._ideal_data_cashe[cache_key] = ideal_data
|
|
if events is not None:
|
|
idx = i+1 if idx_shift else i
|
|
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
|
|
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
|
|
else:
|
|
point_timeframe, point_events = None, None
|
|
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
|
"position": operator_settings["object_position"]-0.5*operator_settings["object_thickness"],
|
|
"force": operator_settings["force_target"],
|
|
"P1": operator_settings["object_position"] + 0.5*operator_settings["object_thickness"],
|
|
"P2": operator_settings["object_position"] - 0.5*operator_settings["object_thickness"]}
|
|
|
|
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
|
return dataframe, points_pocket, useful_data
|
|
|
|
def update_settings(self, params: list[dict, dict]):
|
|
self._params = params |