from PyQt5.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, QGraphicsRectItem, QSpacerItem, QSizePolicy) from PyQt5.QtCore import Qt import copy import traceback import sys from loguru import logger import pyqtgraph as pg import pandas as pd from typing import Optional, Any from utils.base.base import BasePlotWidget class ProcessStage(): mean_value:int start_index:int finish_index:int class PlotWidget(BasePlotWidget): def _create_curve_ideal(self, signal: dict[str, Any], ideal_data: pd.DataFrame, start_timestamp: float, finish_timestamp: float) -> Optional[pg.PlotDataItem]: """ Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: return pg.PlotDataItem( x=start_timestamp + ideal_data["time"], y=ideal_data[signal["name"]], pen=signal["pen"] ) return None def _create_stage_region(self, stage: str, start_timestamp: float, finish_timestamp: float, transparency: int) -> Optional[pg.LinearRegionItem]: """ Создает регион для определённого этапа, если заданы временные рамки. """ if start_timestamp is not None and finish_timestamp is not None: region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False) color = self._stage_colors.get(stage, [100, 100, 100, 100]) region.setBrush(pg.mkBrush(color[:3] + [transparency])) return region return None @staticmethod def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]: plot_item = pg.PlotItem(title=title) # Оптимизация отображения графиков plot_item.setDownsampling(auto=True, mode='peak') plot_item.showGrid(x=True, y=True) plot_item.setClipToView(True) legend = plot_item.addLegend(offset=(70, 20)) return plot_item, legend def _add_stage_regions(self, plot_item: pg.PlotItem, point_events: dict[str, list[float]], dataframe_headers: list[str], reg_items: dict, transparency: int = 75) -> None: """ Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма. """ stages = point_events.keys() if all(stage in dataframe_headers for stage in stages): for stage in stages: start_t, end_t = point_events[stage] region = self._create_stage_region(stage, start_t, end_t, transparency) if region is not None: region.setZValue(-20) plot_item.addItem(region) reg_items["real"].setdefault(stage, []) reg_items["real"][stage].append(region) def _add_ideal_stage_regions(self, plot_item: pg.PlotItem, ideal_data: dict[str, Any], point_events: dict[str, list[float]], reg_items: dict, transparency: int = 125) -> None: """ Добавляет регионы для идеальных этапов. """ ideal_timings = ideal_data["Ideal timings"] stages = list(point_events.keys()) for i, stage in enumerate(stages): start_t = point_events[stage][0] end_t = start_t + ideal_timings[i] region = self._create_stage_region(stage, start_t, end_t, transparency) if region: region.setZValue(-10) plot_item.addItem(region) reg_items["ideal"].setdefault(stage, []) reg_items["ideal"][stage].append(region) def _add_ideal_signals(self, plot_item: pg.PlotItem, legend_item: pg.LegendItem, ideal_data: dict[str, Any], point_events: dict[str, list[float]], ideal_signals: list[dict[str, Any]], curve_items: dict, is_last: bool) -> None: """ Добавляет идеальные сигналы для каждого этапа. """ for signal in ideal_signals: for stage in point_events.keys(): curve = self._create_curve_ideal( signal, ideal_data[stage], point_events[stage][0], point_events[stage][1] ) if curve: curve.setZValue(50) plot_item.addItem(curve) curve_items["ideal"].setdefault(signal["name"], {}) curve_items["ideal"][signal["name"]].setdefault(stage, []) curve_items["ideal"][signal["name"]][stage].append(curve) if is_last: legend_item.addItem(curve, "Ideal " + signal["name"]) def _add_real_signals(self, plot_item: pg.PlotItem, dataframe: pd.DataFrame, real_signals: list[dict[str, Any]], legend: pg.LegendItem, curve_items: dict) -> None: """ Добавляет реальные сигналы из dataframe на виджет. """ dataframe_headers = dataframe.columns.tolist() for signal in real_signals: if signal["name"] in dataframe_headers: plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True) plot.setZValue(0) legend.addItem(plot, signal["name"]) curve_items["real"].setdefault(signal["name"], {}) curve_items["real"][signal["name"]] = plot def _add_performance_label(self, layout: QVBoxLayout, TWC_time: float, ideal_time: float, tesla_time: float, qt_items: dict) -> None: """ Добавляет QLabel с информацией о производительности. """ tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0 tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0 TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0 label_widget = QWidget() label_layout = QHBoxLayout(label_widget) start_label = QLabel("Сокращение длительности: ") real_label = QLabel(f"фактическое = {tesla_TWC} % ") if not tesla_TWC or not TWC_time: real_label.setVisible(False) ideal_label = QLabel(f"идеальное = {tesla_ideal} % ") if not tesla_ideal: ideal_label.setVisible(False) kdip_label = QLabel(f"КДИП = {TWC_ideal}% ") if not TWC_ideal: kdip_label.setVisible(False) label_layout.addWidget(start_label, alignment=Qt.AlignLeft) label_layout.addWidget(real_label, alignment=Qt.AlignLeft) label_layout.addWidget(ideal_label, alignment=Qt.AlignLeft) label_layout.addWidget(kdip_label, alignment=Qt.AlignLeft) spacer = QSpacerItem(1, 1, QSizePolicy.Expanding, QSizePolicy.Minimum) label_layout.addSpacerItem(spacer) self.set_style(label_widget) layout.addWidget(label_widget) qt_items["performance label"] = label_widget qt_items["real performance"] = real_label qt_items["ideal performance"] = ideal_label qt_items["real to ideal performance"] = kdip_label def _build_widget(self, data: list[Any]) -> QWidget: """ Собирает графический виджет для одного набора данных. Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data]. """ result_widget = QWidget() result_layout = QVBoxLayout(result_widget) plot_layout = pg.GraphicsLayoutWidget() reg_items = {"real":{}, "ideal":{}} curve_items = {"real":{}, "ideal":{}} qt_items = {} dataframe, points_pocket, useful_data = data tesla_time = useful_data["tesla_time"] gun_range = useful_data["range"] k_hardness = useful_data["k_hardness"] dat_is_none = dataframe is None widget_steps = len(self._plt_channels) if not dat_is_none: dataframe_headers = dataframe.columns.tolist() point_steps = len(points_pocket) else: point_steps = 1 for widget_num, (channel, description) in enumerate(self._plt_channels.items()): plot_item, legend = self._init_plot_item(title=channel) settings = description["Settings"] global_shift = 0 TWC_time = 0.0 ideal_time = 0.0 worst_perf = 2 # TODO: рассчитать корректный параметр range if settings["mirror ME"] and not dat_is_none: dataframe = self._mirror_shift_data( "ME", description["Real_signals"], dataframe, gun_range ) # Итерация по точкам for cur_point, point_data in enumerate(points_pocket): # point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data] point_timeframe, ideal_dat, point_events, useful_p_data = point_data ideal_data = copy.deepcopy(ideal_dat) if dat_is_none: worst_timeframe = point_timeframe = [global_shift, global_shift+ ideal_data["Ideal cycle"]] point_events = {} keys = list(ideal_data.keys()) shift = 0 for i, time in enumerate(ideal_data["Ideal timings"]): point_events[keys[i]] = [global_shift+shift, global_shift+time+shift] shift += time global_shift +=ideal_data["Ideal cycle"] # TODO: проверить корректность расчетов if False and settings["force compensation FE"] and not dat_is_none: force = useful_p_data["force"] F_comp = - force/k_hardness point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index dataframe.loc[point_idxs] = self._shift_data("FE", description["Real_signals"], dataframe.loc[point_idxs], F_comp) # Модифицируем данные для отображения гарфика if settings["ideals"] and settings["mirror ME"]: for stage in point_events.keys(): ideal_data[stage] = ( self._mirror_shift_ideal("ME", "FE", description["Ideal_signals"], ideal_data[stage], gun_range, useful_p_data["P1"]*1000, useful_p_data["P2"]*1000) ) # Добавляем реальные стадии if settings["stages"] and not dat_is_none: self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75) if settings["workpiece"]: x1 = point_timeframe[0] dx = point_timeframe[1] - x1 y1 = useful_p_data["position"]*1000 dy = useful_p_data["thickness"]*1000 rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush('grey')) rect_item.setPen(pg.mkPen('black', width=3)) plot_item.addItem(rect_item) if settings["force accuracy"]and not dat_is_none: modifier = 0.05 x1 = point_events["Welding"][0] dx = point_events["Welding"][1] - x1 force = useful_p_data["force"] y1 = force*(1-modifier) dy = force*(2*modifier) rect_item = QGraphicsRectItem(x1, y1, dx, dy) rect_item.setZValue(-5) rect_item.setBrush(pg.mkBrush((0,255,0, 50))) rect_item.setPen(pg.mkPen('black', width=0)) plot_item.addItem(rect_item) # Добавляем идеальные стадии и идеальные сигналы if settings["ideals"]: is_last_point = (cur_point == len(points_pocket) - 1) self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100) self._add_ideal_signals(plot_item, legend, ideal_data, point_events, description["Ideal_signals"], curve_items, is_last_point) # Подсчёт производительности if settings["performance"]: is_last_point = (cur_point == len(points_pocket) - 1) if is_last_point: if not dat_is_none: TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]]) else: TWC_delta = 0 ideal_delta = sum(ideal_data["Ideal timings"][0:3]) else: if not dat_is_none: TWC_delta = point_timeframe[1] - point_timeframe[0] else: TWC_delta = 0 ideal_delta = ideal_data["Ideal cycle"] TWC_time += TWC_delta ideal_time += ideal_delta curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1 if curr_perf < worst_perf: worst_perf = curr_perf worst_timeframe = point_timeframe # Считаем прогресс self._update_status(widget_steps, point_steps, widget_num, cur_point) # Добавляем реальные сигналы if not dat_is_none: self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items) if widget_num == 0: main_plot = plot_item else: # Связываем остальные графики с основным графиком plot_item.setXLink(main_plot) if settings["performance"]: self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time, qt_items) plot_layout.addItem(plot_item, widget_num, 0) navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot) if navigator is not None: plot_layout.addItem(navigator, widget_num+1, 0) self._sync_main_plot_with_navigator(main_plot, ROI_region) main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region)) result_layout.addWidget(plot_layout) return result_widget, reg_items, curve_items, qt_items def build(self, data: list[list[Any]]) -> None: """ Создает набор виджетов по предоставленному списку данных. Предполагается, что data — это список элементов вида: [ [dataframe, points_pocket, useful_data], [dataframe, points_pocket, useful_data], ... ] """ try: self._datalen = len(data) widgets_datapack = [self._build_widget(data_sample) for self._datastep, data_sample in enumerate(data)] except: tb = sys.exc_info()[2] tbinfo = traceback.format_tb(tb)[0] pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1]) logger.error(pymsg) widgets_datapack = [QLabel(pymsg)] finally: self._mediator.notify(self, widgets_datapack) def _update_status(self, widgsteps:int, pointsteps:int, cur_widg:int, cur_point:int): if self._datalen != 0: sycle_start = self._datastep/self._datalen*100 + 1 period1 = 100/self._datalen else: sycle_start = 1 period1 = 100 period2 = period1/widgsteps if widgsteps != 0 else period1 period3 = period2/pointsteps if pointsteps != 0 else period2 progress = sycle_start + period2*cur_widg + period3*cur_point self._mediator.update_status(progress)