from __future__ import annotations import os from typing import Optional, Union import pandas as pd from PyQt5.QtCore import QThread, QObject, QTimer from PyQt5.QtWidgets import QWidget from src.OptAlgorithm import OptAlgorithm import pandas as pd class BaseMediator: def __init__(self, monitor: BaseDirectoryMonitor, converter: BaseDataConverter, plot: BasePlotWidget, controller: BaseController): self._monitor = monitor self._monitor.mediator = self self._converter = converter self._converter.mediator = self self._plot = plot self._plot.mediator = self self._controller = controller def notify(self, source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget], data: Union[list[str], list[pd.DataFrame], list[QWidget]]): ... class BaseDirectoryMonitor: update_timer = QTimer() def __init__(self, directory_path: str, update_time: int, mediator: Optional[BaseMediator] = None): super().__init__() self._directory_path = directory_path self._update_time = update_time self._mediator = mediator self._files: list[str] = [] self._init_state() @property def directory_path(self) -> str: return self._directory_path @property def update_time(self) -> int: return self._update_time @property def files(self) -> list[str]: return self._files @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def _init_state(self): files = os.listdir(self._directory_path) self._files = files def start(self): self.update_timer.start(self._update_time) class BaseDataConverter: def __init__(self, mediator: Optional[BaseMediator] = None): self._mediator = mediator @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def convert_data(self, files: list[str]) -> None: ... class BasePlotWidget: def __init__(self, mediator: Optional[BaseMediator] = None, idealDataBuilder: Optional[BaseIdealDataBuilder] = None): super().__init__() self._mediator = mediator self._opt = idealDataBuilder self._stages = [ "Relief", "Closing", "Squeeze", "Welding" ] self._stage_colors = { "Closing": [208, 28, 31, 100], "Squeeze": [45, 51, 89, 150], "Welding": [247, 183, 24, 100], "Relief": [0, 134, 88, 100] } self._stage_ideals = { "Closing": self._opt.get_closingDF, "Squeeze": self._opt.get_compressionDF, "Welding": self._opt.get_weldingDF, "Relief": self._opt.get_openingDF } self._plt_channels = { "Electrode Force, N & Welding Current, kA": { "Settings": { "zoom": False, "stages": True }, "Real_signals": [ { "name": "Electrode Force, N ME", "pen": 'r', }, { "name": "Electrode Force, N FE", "pen": 'w', }, { "name": "Welding Current ME", "pen": "y", } ], "Ideal_signals": [ { "name": "Force", "pen": {'color': 'g', 'width':3}, } ] }, "Electrode Force, N": { "Settings": { "zoom": True, "stages": False }, "Real_signals": [ { "name": "Electrode Force, N ME", "pen": 'r', }, { "name": "Electrode Force, N FE", "pen": 'w', } ], "Ideal_signals": [ { "name": "Force", "pen": {'color': 'r', 'width':3}, } ] }, "Electrode Speed, mm/s": { "Settings": { "zoom": False, "stages": True }, "Real_signals": [ { "name": "Rotor Speed, mm/s ME", "pen": 'r', "zoom": False }, { "name": "Rotor Speed, mm/s FE", "pen": 'w', "zoom": False } ], "Ideal_signals": [ { "name": "Rotor Speed ME", "pen": {'color': 'y', 'width':3}, "zoom": False }, { "name": "Rotor Speed FE", "pen": {'color': 'g', 'width':3}, "zoom": False } ] }, } @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def build(self, data: list[pd.DataFrame]) -> list[QWidget]: ... class BaseController(QObject): def send_widgets(self, widgets: list[QWidget]) -> None: ... # FIXME: WeldingDF показывает только 1 секунду class BaseIdealDataBuilder(OptAlgorithm): def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: data = [] for i in range (0, int(end_timestamp*self.mul)): time = i/self.mul X1, X2, V1, V2, F = func(time) data.append({"time":time, "Posicion FE":X1*1000,"Posicion ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) return pd.DataFrame(data) def get_closingDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tclose'], self.calcPhaseClose) def get_compressionDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow) def get_openingDF(self) -> pd.DataFrame: return self._get_data(self.getMarkOpen(), self.calcPhaseOpen) def get_tmovementDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement) def get_weldingDF(self) -> pd.DataFrame: data = [] X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']) data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":1, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) return pd.DataFrame(data) def get_ideal_timings(self) -> list[float, float, float, float]: data = self.Ts ideal_timings = [data['tclose'], data['tgrow'], self.getMarkOpen(), data["tmovement"]] return ideal_timings