from __future__ import annotations import os from typing import Optional, Union, Any from cachetools import LRUCache import pandas as pd from PyQt5.QtCore import QObject, QTimer from PyQt5.QtWidgets import QWidget, QTabWidget from OptAlgorithm import OptAlgorithm import numpy as np import pyqtgraph as pg class BaseMediator: def __init__(self, monitor: BaseDirectoryMonitor, converter: BaseDataConverter, passportFormer: BasePointPassportFormer, plot: BasePlotWidget, controller: BaseController): self._monitor = monitor self._monitor.mediator = self self._converter = converter self._converter.mediator = self self._passportFormer = passportFormer self._passportFormer.mediator = self self._plot = plot self._plot.mediator = self self._controller = controller self._controller.mediator = self def notify(self, source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget], data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]): ... def update_settings (self, data: list[dict]): ... class BaseDirectoryMonitor: update_timer = QTimer() def __init__(self, mediator: Optional[BaseMediator] = None): super().__init__() self._directory_path = None self._update_time = None self.isActive = False self._files = None self._mediator = mediator @property def directory_path(self) -> str: return self._directory_path @property def update_time(self) -> int: return self._update_time @property def files(self) -> list[str]: return self._files @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def _init_state(self): files = os.listdir(self._directory_path) self._files = files def start(self): self.isActive = True self.update_timer.start(int(self._update_time)) def stop(self): self.isActive = False self.update_timer.stop() def update_settings(self, data: list[dict]) -> None: ... def update_plots(self) -> None: ... def force_all_dir(self): ... class BaseDataConverter: def __init__(self, mediator: Optional[BaseMediator] = None): self._mediator = mediator @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator def convert_data(self, files: list[str]) -> None: ... class BasePlotWidget: def __init__(self, mediator: Optional[BaseMediator] = None): super().__init__() self._mediator = mediator self._stage_colors = { "Closing": [220, 20, 60, 100], # Crimson "Squeeze": [30, 144, 255, 100], # Dodger Blue "Welding": [128, 128, 128, 100], # Gray "Relief": [34, 139, 34, 100], # Forest Green "Oncomming": [255, 165, 0, 100] # Orange } self._plt_channels = { "Electrode Force, N & Welding Current, kA": { "Settings": { "zoom": False, "stages": True, "performance": True, "ideals": False, "mirror ME": False, "workpiece": False, "force compensation FE": False, "force accuracy":True }, "Real_signals": [ { "name": "Electrode Force, N ME", "pen": 'r', }, { "name": "Electrode Force, N FE", "pen": 'w', }, { "name": "Welding Current ME", "pen": "y", } ], "Ideal_signals": [ { "name": "Force", "pen": {'color': 'g', 'width':3}, } ] }, "Electrode Position, mm": { "Settings": { "zoom": False, "stages": True, "performance": False, "ideals": False, "mirror ME": True, "workpiece": True, "force compensation FE": True, "force accuracy":False }, "Real_signals": [ { "name": "Rotor Position, mm ME", "pen": {'color': 'r', 'width':2}, }, { "name": "Rotor Position, mm FE", "pen": {'color': 'w', 'width':2}, } ], "Ideal_signals": [ { "name": "Position ME", "pen": {'color': 'g', 'width':4}, }, { "name": "Position FE", "pen": {'color': 'b', 'width':4}, } ] }, "Electrode Speed, mm/s": { "Settings": { "zoom": False, "stages": True, "performance": False, "ideals": True, "mirror ME": False, "workpiece": False, "force compensation FE": False, "force accuracy":False }, "Real_signals": [ { "name": "Rotor Speed, mm/s ME", "pen": 'r', "zoom": False }, { "name": "Rotor Speed, mm/s FE", "pen": 'w', "zoom": False } ], "Ideal_signals": [ { "name": "Rotor Speed ME", "pen": {'color': 'g', 'width':3}, "zoom": False }, { "name": "Rotor Speed FE", "pen": {'color': 'b', 'width':3}, "zoom": False } ] }, } def set_style(self, object: Union[QTabWidget, QWidget]) -> None: object.setStyleSheet( """QLabel { color: #ffffff; font-size: 26px; font-weight: bold; font-family: "Segoe UI", sans-serif; }""") def _downsample_data(self, x, y, max_points=5000): """ Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора. """ if len(x) > max_points: factor = len(x) // max_points x_downsampled = x[::factor] y_downsampled = y[::factor] return x_downsampled, y_downsampled return x, y def _create_navigator(self, time_region:tuple[float, float], main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]: """ Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе. """ navigator = pg.PlotItem(title="Navigator") navigator.setFixedHeight(100) # Получение кривых из main_plot for curve in main_plot.listDataItems(): # Извлекаем данные из кривой x, y = curve.getData() curve_name = curve.opts.get("name", None) signal_pen = curve.opts.get("pen", None) x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000) navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name) ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4)) ROI_region.setBounds([0, x[-1]]) navigator.addItem(ROI_region) navigator.getViewBox().setLimits(xMin=0, xMax=x[-1]) # Связываем изменение региона навигатора с обновлением области просмотра основного графика ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region)) return navigator, ROI_region def _sync_main_plot_with_navigator(self, main_plot: pg.PlotItem, region: pg.LinearRegionItem): """ Синхронизирует область просмотра основного графика с регионом навигатора. """ x_min, x_max = region.getRegion() if main_plot: main_plot.blockSignals(True) main_plot.setXRange(x_min, x_max, padding=0) main_plot.blockSignals(False) def _mirror_shift_data(self, valid_str: str, signals: list[dict], dataframe: pd.DataFrame, shift: float) -> pd.DataFrame: keys = dataframe.keys() for signal in signals: if valid_str in signal["name"] and signal["name"] in keys: dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x) return dataframe def _shift_data(self, valid_str: str, signals: list[dict], dataframe: pd.DataFrame, shift: float) -> pd.DataFrame: keys = dataframe.keys() for signal in signals: if valid_str in signal["name"] and signal["name"] in keys: dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift) return dataframe def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem): """ Синхронизирует регион навигатора с областью просмотра основного графика. """ if region: x_min, x_max = main_plot region.blockSignals(True) # Предотвращаем рекурсию region.setRegion([x_min, x_max]) region.blockSignals(False) @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator @property def opt(self) -> BaseIdealDataBuilder: return self._opt @opt.setter def opt(self, opt: BaseIdealDataBuilder): self._opt = opt def build(self, data: list[pd.DataFrame]) -> list[QWidget]: ... class BaseController(QObject): def send_widgets(self, widgets: list[QWidget]) -> None: ... def update_settings(self, settings: list[dict]) -> None: ... def raport_mode (self, filepath: str) -> None: ... def seeking_mode(self) -> None: ... class BaseIdealDataBuilder(OptAlgorithm): def __init__(self, params: list[dict]): operator_params, system_params = params self.mul = system_params['time_capture'] self.welding_time = operator_params['time_wielding'] super().__init__(operator_params, system_params) def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: data = [] for i in range (0, int(end_timestamp*self.mul)): time = i/self.mul X1, X2, V1, V2, F = func(time) data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) return pd.DataFrame(data) def get_closingDF(self) -> pd.DataFrame: ... def get_compressionDF(self) -> pd.DataFrame: ... def get_openingDF(self) -> pd.DataFrame: ... def get_tmovementDF(self) -> pd.DataFrame: ... def get_weldingDF(self) -> pd.DataFrame: ... def get_oncomingDF(self) -> pd.DataFrame: ... def get_ideal_timings(self) -> list[float, float, float, float]: ... def get_cycle_time(self) -> float: result = sum(self.get_ideal_timings()) return result class BaseMainWindow(QWidget): def __init__(self, controller: Optional[BaseController] = None): super().__init__() self.set_style(self) self.resize(200,200) self._controller = controller ... @property def controller(self) -> BaseController: return self._controller @controller.setter def controller(self, controller: BaseController) -> None: self._controller = controller def set_style(self, object: Union[QTabWidget, QWidget]) -> None: object.setStyleSheet(""" QWidget { background-color: #0D1117; font-family: "Segoe UI", sans-serif; font-size: 14px; } QMessageBox { background-color: #161B22; font-family: "Segoe UI", sans-serif; font-size: 14px; } QPushButton { background-color: #FFCC00; color: #0D1117; padding: 12px 25px; border: 2px solid #E6B800; border-radius: 8px; font-family: "Segoe UI", sans-serif; font-size: 16px; font-weight: bold; } QPushButton:hover:!disabled { background-color: #FFD700; } QPushButton:disabled { background-color: #555555; color: #cccccc; border: none; } QLabel { color: #ffffff; font-size: 16px; font-weight: bold; font-family: "Segoe UI", sans-serif; } """) class BasePointPassportFormer: def __init__(self, mediator: Optional[BaseMediator] = None): self._mediator = mediator self._clear_stage = "Welding" self._stages = [ "Closing", "Squeeze", "Welding", "Relief", "Oncomming" ] self._tesla_stages = [ "Tesla squeeze", "Tesla closing", "Tesla welding", "Tesla oncomming_relief" ] self._params = [] self._ideal_data_cashe = LRUCache(maxsize=1000) self._OptAlgorithm_operator_params = [ "dist_open_start_1", "dist_open_start_2", "dist_open_after_1", "dist_open_after_2", "dist_open_end_1", "dist_open_end_2", "dist_close_end_1", "dist_close_end_2", "time_command", "time_robot_movement", "object_thickness", "force_target", "force_capture", "time_wielding"] self._OptAlgorithm_system_params = [ "a_max_1", "v_max_1", "a_max_2", "v_max_2", "mass_1", "mass_2", "k_hardness_1", "k_hardness_2", "torque_max_1", "torque_max_2", "transmission_ratio_1", "transmission_ratio_2", "position_start_1", "position_start_2", "k_prop", "time_capture"] def _find_indexes(self, signal: str, dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]: stage_diff = np.diff(dataframe[signal]) start_idx = np.where(stage_diff == 1) finish_idx = np.where(stage_diff == -1) return start_idx[0], finish_idx[0] def _find_events(self, signal: str, times:pd.Series, dataframe: pd.DataFrame) -> tuple[list[float], list[float]]: start_idx, finish_idx = self._find_indexes(signal, dataframe) if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]: start_idx = np.insert(start_idx, 0, 0) start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else [] end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else [] if len(start_list) - len(end_list) == 1: end_list.append(float(times.iloc[-1])) return start_list, end_list def _filter_events(self, times: pd.Series, dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]: events = {} point_quantity = 0 if self._clear_stage in self._stages: start_list, end_list = self._find_events(self._clear_stage, times, dataframe) point_quantity = len(start_list) if point_quantity == 0: #TODO: добавить обработку исключения return [] for stage in self._stages: start_list, end_list = self._find_events(stage, times, dataframe) temp = min([len(start_list), len(end_list)]) if temp < point_quantity: print ("cant find enough", stage) start_list += [0]*(point_quantity - temp) end_list += [1]*(point_quantity - temp) events[stage] = [start_list, end_list] return events, point_quantity def _build_ideal_data(self, idealDataBuilder: Optional[BaseIdealDataBuilder] = None, params: list[dict] = None) -> dict: self.opt = idealDataBuilder(params) stage_ideals = { "Closing": self._opt.get_closingDF(), "Squeeze": self._opt.get_compressionDF(), "Welding": self._opt.get_weldingDF(), "Relief": self._opt.get_openingDF(), "Oncomming": self._opt.get_oncomingDF(), "Ideal cycle": self._opt.get_cycle_time(), "Ideal timings": self._opt.get_ideal_timings() } return stage_ideals def form_passports(self) -> list[list[pd.DataFrame, dict, list]]: ... def update_settings(self, params: list) -> None: ... def _generate_cache_key(self, params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]: """ Преобразует params_list в хешируемый ключ для кэша. """ operator_settings, system_settings = params_list # Преобразуем словари в отсортированные кортежи пар ключ-значение operator_tuple = frozenset((key, value) for key, value in operator_settings.items() if str(key) in self._OptAlgorithm_operator_params) system_tuple = frozenset((key, value) for key, value in system_settings.items() if str(key) in self._OptAlgorithm_system_params) return (operator_tuple, system_tuple) @property def opt(self) -> BaseIdealDataBuilder: return self._opt @opt.setter def opt(self, opt: BaseIdealDataBuilder): self._opt = opt @property def mediator(self) -> BaseMediator: return self._mediator @mediator.setter def mediator(self, mediator: BaseMediator) -> None: self._mediator = mediator