from src.utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder import pandas as pd class idealDataBuilder(BaseIdealDataBuilder): def get_closingDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tclose'], self.calcPhaseClose) def get_compressionDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow) def get_openingDF(self) -> pd.DataFrame: return self._get_data(self.getMarkOpen(), self.calcPhaseOpen) def get_oncomingDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement) def get_weldingDF(self) -> pd.DataFrame: data = [] X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']) data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) return pd.DataFrame(data) def get_ideal_timings(self) -> list[float]: data = self.Ts ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности return ideal_timings class PassportFormer(BasePointPassportFormer): def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, list]]: return_data = [self._build_passports_pocket(dataframe) for dataframe in data] self._mediator.notify(self, return_data) def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, list]: passports_pocket = [] events, point_quantity = self._filter_events(dataframe["time"], dataframe) system_settings = {key: value[0] for key, value in self._params[1].items()} for i in range(0, point_quantity): if not dataframe["time"].isna().all(): operator_settings = {} for key, value in self._params[0].items(): if len(value) > i: operator_settings[key] = value[i] else: operator_settings[key] = value[0] params_list = [operator_settings, system_settings] ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list) if i < point_quantity-1: cut_time = events[self._stages[0]][0][i+1] frame = dataframe[dataframe["time"] < cut_time] dataframe = dataframe[dataframe["time"] >= cut_time] else: frame = dataframe point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()} tesla_events = sum([operator_settings[key] for key in self._tesla_stages]) passports_pocket.append([frame, ideal_data, point_events, tesla_events]) return passports_pocket def update_settings(self, params: list[dict, dict]): self._params = params