from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem import copy import pyqtgraph as pg import pandas as pd from typing import Optional, Any from utils.base.base import BasePlotWidget class ProcessStage(): mean_value:int start_index:int finish_index:int class PlotWidget(BasePlotWidget): def _create_curve_ideal(self, signal: dict[str, Any], ideal_data: pd.DataFrame, start_timestamp: float, finish_timestamp: float) -> Optional[pg.PlotDataItem]: """ Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: return pg.PlotDataItem( x=start_timestamp + ideal_data["time"], y=ideal_data[signal["name"]], pen=signal["pen"] ) return None def _create_stage_region(self, stage: str, start_timestamp: float, finish_timestamp: float, transparency: int) -> Optional[pg.LinearRegionItem]: """ Создает регион для определённого этапа, если заданы временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False) color = self._stage_colors.get(stage, [100, 100, 100, 100]) region.setBrush(pg.mkBrush(color[:3] + [transparency])) return region return None @staticmethod def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]: plot_widget = pg.PlotWidget(title=title) # Оптимизация отображения графиков plot_widget.setDownsampling(auto=True, mode='peak') plot_widget.showGrid(x=True, y=True) plot_widget.setClipToView(True) legend = pg.LegendItem((80, 60), offset=(70, 20)) legend.setParentItem(plot_widget.graphicsItem()) return plot_widget, legend def _add_stage_regions(self, plot_widget: pg.PlotWidget, point_events: dict[str, list[float]], dataframe_headers: list[str], reg_items: dict, transparency: int = 75) -> None: """ Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма. """ stages = point_events.keys() if all(stage in dataframe_headers for stage in stages): for stage in stages: start_t, end_t = point_events[stage] region = self._create_stage_region(stage, start_t, end_t, transparency) if region is not None: region.setZValue(-20) plot_widget.addItem(region) reg_items["real"].setdefault(stage, []) reg_items["real"][stage].append(region) def _add_ideal_stage_regions(self, plot_widget: pg.PlotWidget, ideal_data: dict[str, Any], point_events: dict[str, list[float]], reg_items: dict, transparency: int = 125) -> None: """ Добавляет регионы для идеальных этапов. """ ideal_timings = ideal_data["Ideal timings"] stages = list(point_events.keys()) for i, stage in enumerate(stages): start_t = point_events[stage][0] end_t = start_t + ideal_timings[i] region = self._create_stage_region(stage, start_t, end_t, transparency) if region: region.setZValue(-10) plot_widget.addItem(region) reg_items["ideal"].setdefault(stage, []) reg_items["ideal"][stage].append(region) def _add_ideal_signals(self, plot_widget: pg.PlotWidget, ideal_data: dict[str, Any], point_events: dict[str, list[float]], ideal_signals: list[dict[str, Any]], curve_items: dict) -> None: """ Добавляет идеальные сигналы для каждого этапа. """ for stage in point_events.keys(): for signal in ideal_signals: curve = self._create_curve_ideal( signal, ideal_data[stage], point_events[stage][0], point_events[stage][1] ) if curve: curve.setZValue(10) plot_widget.addItem(curve) curve_items["ideal"].setdefault(signal["name"], {}) curve_items["ideal"][signal["name"]].setdefault(stage, []) curve_items["ideal"][signal["name"]][stage].append(curve) def _add_real_signals(self, plot_widget: pg.PlotWidget, dataframe: pd.DataFrame, real_signals: list[dict[str, Any]], legend: pg.LegendItem, curve_items: dict) -> None: """ Добавляет реальные сигналы из dataframe на виджет. """ dataframe_headers = dataframe.columns.tolist() for signal in real_signals: if signal["name"] in dataframe_headers: plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True) plot.setZValue(0) legend.addItem(plot, signal["name"]) curve_items["real"].setdefault(signal["name"], {}) curve_items["real"][signal["name"]] = plot def _add_performance_label(self, layout: QVBoxLayout, TWC_time: float, ideal_time: float, tesla_time: float) -> None: """ Добавляет QLabel с информацией о производительности. """ tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0 tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0 TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0 performance_label = QLabel( f"Сокращение длительности: фактическое = {tesla_TWC} %, " f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%" ) self.set_style(performance_label) layout.addWidget(performance_label) performance_label.update() def _build_widget(self, data: list[Any]) -> QWidget: """ Собирает графический виджет для одного набора данных. Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data]. """ widget = QWidget() layout = QVBoxLayout(widget) reg_items = {"real":{}, "ideal":{}} curve_items = {"real":{}, "ideal":{}} dataframe, points_pocket, useful_data = data tesla_time = useful_data["tesla_time"] range_ME = useful_data["range_ME"] k_hardness = useful_data["k_hardness"] dataframe_headers = dataframe.columns.tolist() for widget_num, (channel, description) in enumerate(self._plt_channels.items()): plot_widget, legend = self._init_plot_widget(title=channel) settings = description["Settings"] TWC_time = 0.0 ideal_time = 0.0 worst_perf = 2 # TODO: рассчитать корректный параметр range if settings["mirror ME"]: dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME) # Итерация по точкам for cur_point, point_data in enumerate(points_pocket): # point_data структура: [point_timeframe, ideal_data, point_events] point_timeframe, ideal_dat, point_events, useful_p_data = point_data ideal_data = copy.deepcopy(ideal_dat) # TODO: проверить корректность расчетов if settings["force compensation FE"]: force = useful_p_data["force"] F_comp = - force/k_hardness point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index dataframe.loc[point_idxs] = self._shift_data("FE", description["Real_signals"], dataframe.loc[point_idxs], F_comp) # Модифицируем данные для отображения гарфика if settings["ideals"] and settings["mirror ME"]: for stage in point_events.keys(): ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME) # Добавляем реальные стадии if settings["stages"]: self._add_stage_regions(plot_widget, point_events, dataframe_headers, reg_items, 75) # TODO: подобрать не вырвеглазные цвета, возможно ограничить зону if settings["workpiece"]: x1 = point_timeframe[0] dx = point_timeframe[1] - x1 y1 = useful_p_data["L2"]*1000 dy = useful_p_data["thickness"]*1000 rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush('grey')) rect_item.setPen(pg.mkPen('black', width=3)) plot_widget.addItem(rect_item) if settings["force accuracy"]: modifier = 0.05 x1 = point_events["Welding"][0] dx = point_events["Welding"][1] - x1 force = useful_p_data["force"] y1 = force*(1-modifier) dy = force*(2*modifier) rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush((0,255,0, 50))) rect_item.setPen(pg.mkPen('black', width=0)) plot_widget.addItem(rect_item) # Добавляем идеальные стадии и идеальные сигналы if settings["ideals"]: self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, reg_items, 100) self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"], curve_items) # Подсчёт производительности if settings["performance"]: is_last_point = (cur_point == len(points_pocket) - 1) if is_last_point: TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]]) ideal_delta = sum(ideal_data["Ideal timings"][0:3]) else: TWC_delta = point_timeframe[1] - point_timeframe[0] ideal_delta = ideal_data["Ideal cycle"] TWC_time += TWC_delta ideal_time += ideal_delta curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1 if curr_perf < worst_perf: worst_perf = curr_perf worst_timeframe = point_timeframe # Добавляем реальные сигналы self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend, curve_items) if widget_num == 0: main_plot = plot_widget else: # Связываем остальные графики с основным графиком plot_widget.setXLink(main_plot) # Если есть настройка производительности, добавляем label if settings["performance"]: self._add_performance_label(layout, TWC_time, ideal_time, tesla_time) navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"]) layout.addWidget(plot_widget) layout.addWidget(navigator) self._sync_main_plot_with_navigator(main_plot, ROI_region) main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region)) widget.setLayout(layout) return widget, reg_items, curve_items def build(self, data: list[list[Any]]) -> None: """ Создает набор виджетов по предоставленному списку данных. Предполагается, что data — это список элементов вида: [ [dataframe, points_pocket, useful_data], [dataframe, points_pocket, useful_data], ... ] """ widgets_datapack = [self._build_widget(data_sample) for data_sample in data] self._mediator.notify(self, widgets_datapack)