import pandas as pd from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel import pyqtgraph as pg import numpy as np from numpy import floating from typing import Optional, Any, NamedTuple from src.utils.base.base import BasePlotWidget from src.utils.base.base import BaseIdealDataBuilder class idealDataBuilder(BaseIdealDataBuilder): def get_closingDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tclose'], self.calcPhaseClose) def get_compressionDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow) def get_openingDF(self) -> pd.DataFrame: return self._get_data(self.getMarkOpen(), self.calcPhaseOpen) def get_oncomingDF(self) -> pd.DataFrame: return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement) def get_weldingDF(self) -> pd.DataFrame: data = [] X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']) data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) return pd.DataFrame(data) def get_ideal_timings(self) -> list[float, float, float, float]: data = self.Ts ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen()] # TODO: add data['tmovement'], Oncoming не учитывается в производительности return ideal_timings class ProcessStage(NamedTuple): mean_value: floating[Any] start_index: int finish_index: int class PlotWidget(BasePlotWidget): def _create_curve_ideal(self, stage: str, signal: str, start_timestamp: float, finish_timestamp: float) -> Optional[pg.PlotDataItem]: data = self._stage_ideals[stage] if start_timestamp and finish_timestamp: plot = pg.PlotDataItem(x=start_timestamp+data["time"], y=data[signal["name"]], pen=signal["pen"]) return plot return None def _create_stage_region(self, stage: str, start_timestamp: float, finish_timestamp: float) -> Optional[pg.LinearRegionItem]: if start_timestamp and finish_timestamp: region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False) region.setBrush(pg.mkBrush(self._stage_colors[stage])) return region return None def _get_timestamp(self, stage: str, times: pd.Series, dataframe: pd.DataFrame) -> Optional[list[float]]: stage_diff = np.diff(dataframe[stage]) start_index = np.where(stage_diff == 1)[0] finish_index = np.where(stage_diff == -1)[0] if start_index.size: start_timestamp = times[start_index[0]] finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1] return start_timestamp, finish_timestamp return None @staticmethod def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]: plot_widget = pg.PlotWidget(title=title) plot_widget.showGrid(x=True, y=True) legend = pg.LegendItem((80, 60), offset=(70, 20)) legend.setParentItem(plot_widget.graphicsItem()) return plot_widget, legend def get_stage_info(self, stage: str, dataframe: pd.DataFrame, signal_name: str) -> Optional[ProcessStage]: if stage in self._stages: stage_diff = np.diff(dataframe[stage]) start_index = np.where(stage_diff == 1)[0] finish_index = np.where(stage_diff == -1)[0] data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else [] if data.size and start_index.size: start = start_index[0] finish = finish_index[0] if finish_index.size else (len(data) - 1) data_slice = data[start:finish] mean = np.mean(data_slice) return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish)) return None def _build_widget(self, dataframe: pd.DataFrame) -> QWidget: widget = QWidget() layout = QVBoxLayout() time_axis = dataframe["time"] dataframe_headers = dataframe.columns.tolist() for channel, description in self._plt_channels.items(): plot_widget, legend = self._init_plot_widget(title=channel) settings = description["Settings"] if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]): for stage in self._stages: start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe) region = self._create_stage_region(stage, start_timestamp, finish_timestamp) if region: plot_widget.addItem(region) for signal in description["Ideal_signals"]: ideal_plot = self._create_curve_ideal(stage, signal, start_timestamp, finish_timestamp) if ideal_plot: plot_widget.addItem(ideal_plot) end_timestamp = time_axis[len(time_axis) - 1] region = self._create_stage_region("Oncoming", finish_timestamp, end_timestamp) if region: plot_widget.addItem(region) for signal in description["Ideal_signals"]: ideal_plot = self._create_curve_ideal("Oncoming", signal, finish_timestamp, end_timestamp) if ideal_plot: plot_widget.addItem(ideal_plot) if settings["performance"] and all([stage in dataframe_headers for stage in self._stages]): delta_timestamp = 0 for stage in self._stages: start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe) delta_timestamp += finish_timestamp - start_timestamp ideal_delta = self._opt.get_cycle_time() performance = round(ideal_delta/delta_timestamp*100, 2) performance_label = QLabel(f"Performance = {performance} %") layout.addWidget(performance_label) if settings["zoom"]: if max(time_axis) < 5.0: stages = [self.get_stage_info("Welding", dataframe, signal["name"]) for signal in description["Real_signals"]] if stages: means_raw = [stage.mean_value for stage in stages] mean = max(means_raw) start = time_axis[stages[0].start_index] finish = time_axis[stages[0].finish_index] overshoot = pg.BarGraphItem(x0=0, y0=mean - mean * 0.05, height=mean * 0.05 * 2, width=start, brush=pg.mkBrush([0, 250, 0, 100])) plot_widget.addItem(overshoot) stable = pg.BarGraphItem(x0=start, y0=mean - mean * 0.015, height=mean * 0.015 * 2, width=finish - start, brush=pg.mkBrush([0, 250, 0, 100])) plot_widget.addItem(stable) plot_widget.setYRange(mean - 260, mean + 260) plot_widget.setInteractive(False) else: max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]]) region = pg.LinearRegionItem([max_value - max_value * 0.015, max_value + max_value * 0.015], movable=False, orientation="horizontal") region.setBrush(pg.mkBrush([0, 250, 0, 100])) plot_widget.setYRange(max_value - 200, max_value + 200) plot_widget.setXRange(3.5, 4.5) plot_widget.addItem(region) plot_widget.setInteractive(False) for signal in description["Real_signals"]: if signal["name"] in dataframe_headers: plot = plot_widget.plot(time_axis, dataframe[signal["name"]], pen=signal["pen"]) legend.addItem(plot, signal["name"]) layout.addWidget(plot_widget) widget.setLayout(layout) return widget def build(self, data: list[pd.DataFrame]) -> None: widgets = [self._build_widget(data_sample) for data_sample in data] self._mediator.notify(self, widgets) def update_settings(self, data: list[dict]): self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)