63 lines
3.0 KiB
Python
63 lines
3.0 KiB
Python
|
|
from src.utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||
|
|
import pandas as pd
|
||
|
|
|
||
|
|
class idealDataBuilder(BaseIdealDataBuilder):
|
||
|
|
def get_closingDF(self) -> pd.DataFrame:
|
||
|
|
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
||
|
|
|
||
|
|
def get_compressionDF(self) -> pd.DataFrame:
|
||
|
|
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
||
|
|
|
||
|
|
def get_openingDF(self) -> pd.DataFrame:
|
||
|
|
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
||
|
|
|
||
|
|
def get_oncomingDF(self) -> pd.DataFrame:
|
||
|
|
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
||
|
|
|
||
|
|
def get_weldingDF(self) -> pd.DataFrame:
|
||
|
|
data = []
|
||
|
|
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
|
||
|
|
data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||
|
|
data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||
|
|
return pd.DataFrame(data)
|
||
|
|
|
||
|
|
def get_ideal_timings(self) -> list[float]:
|
||
|
|
data = self.Ts
|
||
|
|
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen()] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
|
||
|
|
return ideal_timings
|
||
|
|
|
||
|
|
class PassportFormer(BasePointPassportFormer):
|
||
|
|
|
||
|
|
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, list]]:
|
||
|
|
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
|
||
|
|
self._mediator.notify(self, return_data)
|
||
|
|
|
||
|
|
|
||
|
|
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, list]:
|
||
|
|
passports_pocket = []
|
||
|
|
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||
|
|
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||
|
|
|
||
|
|
for i in range(0, point_quantity):
|
||
|
|
if not dataframe["time"].isna().all():
|
||
|
|
operator_settings = {}
|
||
|
|
for key, value in self._params[0].items():
|
||
|
|
if len(value) > i:
|
||
|
|
operator_settings[key] = value[i]
|
||
|
|
else:
|
||
|
|
operator_settings[key] = value[0]
|
||
|
|
params_list = [operator_settings, system_settings]
|
||
|
|
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||
|
|
|
||
|
|
if i < point_quantity-1:
|
||
|
|
cut_time = events[self._stages[0]][0][i+1]
|
||
|
|
frame = dataframe[dataframe["time"] < cut_time]
|
||
|
|
dataframe = dataframe[dataframe["time"] >= cut_time]
|
||
|
|
else:
|
||
|
|
frame = dataframe
|
||
|
|
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
|
||
|
|
passports_pocket.append([frame, ideal_data, point_events])
|
||
|
|
return passports_pocket
|
||
|
|
|
||
|
|
def update_settings(self, params: list[dict, dict]):
|
||
|
|
self._params = params
|