from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem import copy import traceback import sys from loguru import logger import pyqtgraph as pg import pandas as pd from typing import Optional, Any from utils.base.base import BasePlotWidget class ProcessStage(): mean_value:int start_index:int finish_index:int class PlotWidget(BasePlotWidget): def _create_curve_ideal(self, signal: dict[str, Any], ideal_data: pd.DataFrame, start_timestamp: float, finish_timestamp: float) -> Optional[pg.PlotDataItem]: """ Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: return pg.PlotDataItem( x=start_timestamp + ideal_data["time"], y=ideal_data[signal["name"]], pen=signal["pen"] ) return None def _create_stage_region(self, stage: str, start_timestamp: float, finish_timestamp: float, transparency: int) -> Optional[pg.LinearRegionItem]: """ Создает регион для определённого этапа, если заданы временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False) color = self._stage_colors.get(stage, [100, 100, 100, 100]) region.setBrush(pg.mkBrush(color[:3] + [transparency])) return region return None @staticmethod def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]: plot_item = pg.PlotItem(title=title) # Оптимизация отображения графиков plot_item.setDownsampling(auto=True, mode='peak') plot_item.showGrid(x=True, y=True) plot_item.setClipToView(True) legend = plot_item.addLegend(offset=(70, 20)) return plot_item, legend def _add_stage_regions(self, plot_item: pg.PlotItem, point_events: dict[str, list[float]], dataframe_headers: list[str], reg_items: dict, transparency: int = 75) -> None: """ Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма. """ stages = point_events.keys() if all(stage in dataframe_headers for stage in stages): for stage in stages: start_t, end_t = point_events[stage] region = self._create_stage_region(stage, start_t, end_t, transparency) if region is not None: region.setZValue(-20) plot_item.addItem(region) reg_items["real"].setdefault(stage, []) reg_items["real"][stage].append(region) def _add_ideal_stage_regions(self, plot_item: pg.PlotItem, ideal_data: dict[str, Any], point_events: dict[str, list[float]], reg_items: dict, transparency: int = 125) -> None: """ Добавляет регионы для идеальных этапов. """ ideal_timings = ideal_data["Ideal timings"] stages = list(point_events.keys()) for i, stage in enumerate(stages): start_t = point_events[stage][0] end_t = start_t + ideal_timings[i] region = self._create_stage_region(stage, start_t, end_t, transparency) if region: region.setZValue(-10) plot_item.addItem(region) reg_items["ideal"].setdefault(stage, []) reg_items["ideal"][stage].append(region) def _add_ideal_signals(self, plot_item: pg.PlotItem, ideal_data: dict[str, Any], point_events: dict[str, list[float]], ideal_signals: list[dict[str, Any]], curve_items: dict) -> None: """ Добавляет идеальные сигналы для каждого этапа. """ for stage in point_events.keys(): for signal in ideal_signals: curve = self._create_curve_ideal( signal, ideal_data[stage], point_events[stage][0], point_events[stage][1] ) if curve: curve.setZValue(50) plot_item.addItem(curve) curve_items["ideal"].setdefault(signal["name"], {}) curve_items["ideal"][signal["name"]].setdefault(stage, []) curve_items["ideal"][signal["name"]][stage].append(curve) def _add_real_signals(self, plot_item: pg.PlotItem, dataframe: pd.DataFrame, real_signals: list[dict[str, Any]], legend: pg.LegendItem, curve_items: dict) -> None: """ Добавляет реальные сигналы из dataframe на виджет. """ dataframe_headers = dataframe.columns.tolist() for signal in real_signals: if signal["name"] in dataframe_headers: plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True) plot.setZValue(0) legend.addItem(plot, signal["name"]) curve_items["real"].setdefault(signal["name"], {}) curve_items["real"][signal["name"]] = plot def _add_performance_label(self, layout: QVBoxLayout, TWC_time: float, ideal_time: float, tesla_time: float) -> None: """ Добавляет QLabel с информацией о производительности. """ tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0 tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0 TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0 performance_label = QLabel( f"Сокращение длительности: фактическое = {tesla_TWC} %" ) #f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%" self.set_style(performance_label) layout.addWidget(performance_label) performance_label.update() def _build_widget(self, data: list[Any]) -> QWidget: """ Собирает графический виджет для одного набора данных. Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data]. """ result_widget = QWidget() result_layout = QVBoxLayout(result_widget) plot_layout = pg.GraphicsLayoutWidget() reg_items = {"real":{}, "ideal":{}} curve_items = {"real":{}, "ideal":{}} dataframe, points_pocket, useful_data = data tesla_time = useful_data["tesla_time"] range_ME = useful_data["range_ME"] k_hardness = useful_data["k_hardness"] dat_is_none = dataframe is None if not dat_is_none: dataframe_headers = dataframe.columns.tolist() for widget_num, (channel, description) in enumerate(self._plt_channels.items()): plot_item, legend = self._init_plot_item(title=channel) settings = description["Settings"] TWC_time = 0.0 ideal_time = 0.0 worst_perf = 2 # TODO: рассчитать корректный параметр range if settings["mirror ME"] and not dat_is_none: dataframe = self._mirror_shift_data( "ME", description["Real_signals"], dataframe, range_ME ) # Итерация по точкам for cur_point, point_data in enumerate(points_pocket): # point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data] point_timeframe, ideal_dat, point_events, useful_p_data = point_data ideal_data = copy.deepcopy(ideal_dat) if dat_is_none: worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]] point_events = {} keys = list(ideal_data.keys()) shift = 0 for i, time in enumerate(ideal_data["Ideal timings"]): point_events[keys[i]] = [shift, time+shift] shift += time # TODO: проверить корректность расчетов if settings["force compensation FE"] and not dat_is_none: force = useful_p_data["force"] F_comp = - force/k_hardness point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index dataframe.loc[point_idxs] = self._shift_data("FE", description["Real_signals"], dataframe.loc[point_idxs], F_comp) # Модифицируем данные для отображения гарфика if settings["ideals"] and settings["mirror ME"]: for stage in point_events.keys(): ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME) # Добавляем реальные стадии if settings["stages"] and not dat_is_none: self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75) if settings["workpiece"]: x1 = point_timeframe[0] dx = point_timeframe[1] - x1 y1 = useful_p_data["L2"]*1000 dy = useful_p_data["thickness"]*1000 rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush('grey')) rect_item.setPen(pg.mkPen('black', width=3)) plot_item.addItem(rect_item) if settings["force accuracy"]and not dat_is_none: modifier = 0.05 x1 = point_events["Welding"][0] dx = point_events["Welding"][1] - x1 force = useful_p_data["force"] y1 = force*(1-modifier) dy = force*(2*modifier) rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush((0,255,0, 50))) rect_item.setPen(pg.mkPen('black', width=0)) plot_item.addItem(rect_item) # Добавляем идеальные стадии и идеальные сигналы if settings["ideals"]: self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100) self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items) # Подсчёт производительности if settings["performance"]and not dat_is_none: is_last_point = (cur_point == len(points_pocket) - 1) if is_last_point: TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]]) ideal_delta = sum(ideal_data["Ideal timings"][0:3]) else: TWC_delta = point_timeframe[1] - point_timeframe[0] ideal_delta = ideal_data["Ideal cycle"] TWC_time += TWC_delta ideal_time += ideal_delta curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1 if curr_perf < worst_perf: worst_perf = curr_perf worst_timeframe = point_timeframe # Добавляем реальные сигналы if not dat_is_none: self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items) if widget_num == 0: main_plot = plot_item else: # Связываем остальные графики с основным графиком plot_item.setXLink(main_plot) if settings["performance"] and not dat_is_none: self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time) plot_layout.addItem(plot_item, widget_num, 0) navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot) if navigator is not None: plot_layout.addItem(navigator, widget_num+1, 0) self._sync_main_plot_with_navigator(main_plot, ROI_region) main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region)) result_layout.addWidget(plot_layout) return result_widget, reg_items, curve_items def build(self, data: list[list[Any]]) -> None: """ Создает набор виджетов по предоставленному списку данных. Предполагается, что data — это список элементов вида: [ [dataframe, points_pocket, useful_data], [dataframe, points_pocket, useful_data], ... ] """ try: widgets_datapack = [self._build_widget(data_sample) for data_sample in data] except: tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1]) logger.error(pymsg) widgets_datapack = [QLabel(pymsg)] finally: self._mediator.notify(self, widgets_datapack)