from __future__ import annotations import os from typing import Optional, Union, Any from cachetools import LRUCache import pandas as pd from PyQt5.QtCore import QThread, QObject, QTimer from PyQt5.QtWidgets import QWidget, QTabWidget from PyQt5.QtOpenGL import QGLWidget from OptAlgorithm import OptAlgorithm import pandas as pd import pandas as pd import numpy as np class BaseMediator: def __init__(self, monitor: BaseDirectoryMonitor, converter: BaseDataConverter, passportFormer: BasePointPassportFormer, plot: BasePlotWidget, controller: BaseController): self._monitor = monitor self._monitor.mediator = self self._converter = converter self._converter.mediator = self self._passportFormer = passportFormer self._passportFormer.mediator = self self._plot = plot self._plot.mediator = self self._controller = controller self._controller.mediator = self def notify(self, source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget], data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]): ... def push_settings (self, data: list[dict]): ... class BaseDirectoryMonitor: update_timer = QTimer() def __init__(self, mediator: Optional[BaseMediator] = None): super().__init__() self._directory_path = None self._update_time = None self._mediator = mediator @property def directory_path(self) -> str: return self._directory_path @property def update_time(self) -> int: return self._update_time @property def files(self) -> list[str]: return self._files @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def _init_state(self): files = os.listdir(self._directory_path) self._files = files def start(self): self.update_timer.start(int(self._update_time)) def stop(self): self.update_timer.stop() def update_settings(self, data: list[dict]) -> None: ... def force_all_dir(self): ... class BaseDataConverter: def __init__(self, mediator: Optional[BaseMediator] = None): self._mediator = mediator @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def convert_data(self, files: list[str]) -> None: ... class BasePlotWidget: def __init__(self, mediator: Optional[BaseMediator] = None): super().__init__() self._mediator = mediator self._stage_colors = { "Closing": [220, 20, 60, 100], # Crimson "Squeeze": [30, 144, 255, 100], # Dodger Blue "Welding": [128, 128, 128, 100], # Gray "Relief": [34, 139, 34, 100], # Forest Green "Oncomming": [255, 165, 0, 100] # Orange } self._plt_channels = { "Electrode Force, N & Welding Current, kA": { "Settings": { "zoom": False, "stages": True, "performance": True, "ideals": True, "mirror ME": False, "workpiece": False, "force compensation FE": False }, "Real_signals": [ { "name": "Electrode Force, N ME", "pen": 'r', }, { "name": "Electrode Force, N FE", "pen": 'w', }, { "name": "Welding Current ME", "pen": "y", } ], "Ideal_signals": [ { "name": "Force", "pen": {'color': 'g', 'width':3}, } ] }, "Electrode Position, mm": { "Settings": { "zoom": False, "stages": True, "performance": False, "ideals": True, "mirror ME": True, "workpiece": True, "force compensation FE": True }, "Real_signals": [ { "name": "Rotor Position, mm ME", "pen": {'color': 'r', 'width':2}, }, { "name": "Rotor Position, mm FE", "pen": {'color': 'w', 'width':2}, } ], "Ideal_signals": [ { "name": "Position ME", "pen": {'color': 'g', 'width':4}, }, { "name": "Position FE", "pen": {'color': 'b', 'width':4}, } ] }, "Electrode Speed, mm/s": { "Settings": { "zoom": False, "stages": True, "performance": False, "ideals": True, "mirror ME": False, "workpiece": False, "force compensation FE": False }, "Real_signals": [ { "name": "Rotor Speed, mm/s ME", "pen": 'r', "zoom": False }, { "name": "Rotor Speed, mm/s FE", "pen": 'w', "zoom": False } ], "Ideal_signals": [ { "name": "Rotor Speed ME", "pen": {'color': 'g', 'width':3}, "zoom": False }, { "name": "Rotor Speed FE", "pen": {'color': 'b', 'width':3}, "zoom": False } ] }, } def set_style(self, object: Union[QTabWidget, QWidget]) -> None: object.setStyleSheet( """QLabel { color: #ffffff; font-size: 26px; font-weight: bold; font-family: "Segoe UI", sans-serif; }""") @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator @property def opt(self) -> BaseIdealDataBuilder: return self._opt @opt.setter def opt(self, opt: BaseIdealDataBuilder): self._opt = opt def build(self, data: list[pd.DataFrame]) -> list[QWidget]: ... class BaseController(QObject): def send_widgets(self, widgets: list[QWidget]) -> None: ... def push_settings(self, settings: list[dict]) -> None: ... def open_custom_file (self, filepath: str) -> None: ... class BaseIdealDataBuilder(OptAlgorithm): def __init__(self, params: list[dict]): operator_params, system_params = params self.mul = system_params['time_capture'] self.welding_time = operator_params['time_wielding'] super().__init__(operator_params, system_params) def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: data = [] for i in range (0, int(end_timestamp*self.mul)): time = i/self.mul X1, X2, V1, V2, F = func(time) data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) return pd.DataFrame(data) def get_closingDF(self) -> pd.DataFrame: ... def get_compressionDF(self) -> pd.DataFrame: ... def get_openingDF(self) -> pd.DataFrame: ... def get_tmovementDF(self) -> pd.DataFrame: ... def get_weldingDF(self) -> pd.DataFrame: ... def get_oncomingDF(self) -> pd.DataFrame: ... def get_ideal_timings(self) -> list[float, float, float, float]: ... def get_cycle_time(self) -> float: result = sum(self.get_ideal_timings()) return result class BaseMainWindow(QWidget): def __init__(self, controller: Optional[BaseController] = None): super().__init__() self._controller = controller ... @property def controller(self) -> BaseController: return self._controller @controller.setter def controller(self, controller: BaseController) -> None: self._controller = controller def set_style(self, object: Union[QTabWidget, QWidget]) -> None: object.setStyleSheet(""" QWidget { background-color: #0D1117; font-family: "Segoe UI", sans-serif; font-size: 14px; } QMessageBox { background-color: #161B22; font-family: "Segoe UI", sans-serif; font-size: 14px; } QPushButton { background-color: #FFCC00; color: #0D1117; padding: 12px 25px; border: 2px solid #E6B800; border-radius: 8px; font-family: "Segoe UI", sans-serif; font-size: 16px; font-weight: bold; } QPushButton:hover:!disabled { background-color: #FFD700; } QPushButton:disabled { background-color: #555555; color: #cccccc; border: none; } QLabel { color: #ffffff; font-size: 16px; font-weight: bold; font-family: "Segoe UI", sans-serif; } """) class BasePointPassportFormer: def __init__(self, mediator: Optional[BaseMediator] = None): self._mediator = mediator self._clear_stage = "Welding" self._stages = [ "Closing", "Squeeze", "Welding", "Relief", "Oncomming" ] self._tesla_stages = [ "Tesla squeeze", "Tesla closing", "Tesla welding", "Tesla oncomming_relief" ] self._params = [] self._ideal_data_cashe = LRUCache(maxsize=1000) self._OptAlgorithm_operator_params = [ "dist_open_start_1", "dist_open_start_2", "dist_open_after_1", "dist_open_after_2", "dist_open_end_1", "dist_open_end_2", "dist_close_end_1", "dist_close_end_2", "time_command", "time_robot_movement", "object_thickness", "force_target", "force_capture", "time_wielding"] self._OptAlgorithm_system_params = [ "a_max_1", "v_max_1", "a_max_2", "v_max_2", "mass_1", "mass_2", "k_hardness_1", "k_hardness_2", "torque_max_1", "torque_max_2", "transmission_ratio_1", "transmission_ratio_2", "position_start_1", "position_start_2", "k_prop", "time_capture"] def _find_indexes(self, signal: str, dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]: stage_diff = np.diff(dataframe[signal]) start_idx = np.where(stage_diff == 1) finish_idx = np.where(stage_diff == -1) return start_idx[0], finish_idx[0] def _find_events(self, signal: str, times:pd.Series, dataframe: pd.DataFrame) -> tuple[list[float], list[float]]: start_idx, finish_idx = self._find_indexes(signal, dataframe) if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]: start_idx = np.insert(start_idx, 0, 0) start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else [] end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else [] if len(start_list) - len(end_list) == 1: end_list.append(float(times.iloc[-1])) return start_list, end_list def _filter_events(self, times: pd.Series, dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]: events = {} point_quantity = 0 if self._clear_stage in self._stages: start_list, end_list = self._find_events(self._clear_stage, times, dataframe) point_quantity = len(start_list) if point_quantity == 0: #TODO: добавить обработку исключения return [] for stage in self._stages: start_list, end_list = self._find_events(stage, times, dataframe) temp = min([len(start_list), len(end_list)]) if temp < point_quantity: print ("cant find enough", stage) start_list += [0]*(point_quantity - temp) end_list += [1]*(point_quantity - temp) events[stage] = [start_list, end_list] return events, point_quantity def _build_ideal_data(self, idealDataBuilder: Optional[BaseIdealDataBuilder] = None, params: list[dict] = None) -> dict: self.opt = idealDataBuilder(params) stage_ideals = { "Closing": self._opt.get_closingDF(), "Squeeze": self._opt.get_compressionDF(), "Welding": self._opt.get_weldingDF(), "Relief": self._opt.get_openingDF(), "Oncomming": self._opt.get_oncomingDF(), "Ideal cycle": self._opt.get_cycle_time(), "Ideal timings": self._opt.get_ideal_timings() } return stage_ideals def form_passports(self) -> list[list[pd.DataFrame, dict, list]]: ... def update_settings(self, params: list) -> None: ... def _generate_cache_key(self, params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]: """ Преобразует params_list в хешируемый ключ для кэша. """ operator_settings, system_settings = params_list # Преобразуем словари в отсортированные кортежи пар ключ-значение operator_tuple = frozenset((key, value) for key, value in operator_settings.items() if str(key) in self._OptAlgorithm_operator_params) system_tuple = frozenset((key, value) for key, value in system_settings.items() if str(key) in self._OptAlgorithm_system_params) return (operator_tuple, system_tuple) @property def opt(self) -> BaseIdealDataBuilder: return self._opt @opt.setter def opt(self, opt: BaseIdealDataBuilder): self._opt = opt @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator