2024-11-25 14:01:09 +03:00
|
|
|
import pandas as pd
|
|
|
|
|
from PyQt5.QtWidgets import QWidget, QVBoxLayout
|
|
|
|
|
import pyqtgraph as pg
|
|
|
|
|
import numpy as np
|
|
|
|
|
from numpy import floating
|
|
|
|
|
from typing import Optional, Any, NamedTuple
|
|
|
|
|
|
|
|
|
|
from src.utils.base.base import BasePlotWidget
|
2024-11-25 17:20:00 +03:00
|
|
|
from src.utils.base.base import BaseIdealDataBuilder
|
2024-11-25 14:01:09 +03:00
|
|
|
|
|
|
|
|
|
2024-11-25 17:20:00 +03:00
|
|
|
class idealDataBuilder(BaseIdealDataBuilder):
|
|
|
|
|
def get_closingDF(self) -> pd.DataFrame:
|
|
|
|
|
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
|
|
|
|
|
|
|
|
|
def get_compressionDF(self) -> pd.DataFrame:
|
|
|
|
|
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
|
|
|
|
|
|
|
|
|
def get_openingDF(self) -> pd.DataFrame:
|
|
|
|
|
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
|
|
|
|
|
|
|
|
|
def get_tmovementDF(self) -> pd.DataFrame:
|
|
|
|
|
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
|
|
|
|
|
|
|
|
|
def get_weldingDF(self, end_time: float) -> pd.DataFrame:
|
|
|
|
|
data = []
|
|
|
|
|
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
|
|
|
|
|
data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
|
|
|
|
data.append({"time":end_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
|
|
|
|
return pd.DataFrame(data)
|
|
|
|
|
|
|
|
|
|
def get_ideal_timings(self) -> list[float, float, float, float]:
|
|
|
|
|
data = self.Ts
|
|
|
|
|
ideal_timings = [data['tclose'], data['tgrow'], self.getMarkOpen(), data["tmovement"]]
|
|
|
|
|
return ideal_timings
|
|
|
|
|
|
|
|
|
|
|
2024-11-25 14:01:09 +03:00
|
|
|
class ProcessStage(NamedTuple):
|
|
|
|
|
mean_value: floating[Any]
|
|
|
|
|
start_index: int
|
|
|
|
|
finish_index: int
|
|
|
|
|
|
|
|
|
|
|
2024-11-25 17:20:00 +03:00
|
|
|
class PlotWidget(BasePlotWidget):
|
2024-11-25 14:01:09 +03:00
|
|
|
def _create_stage_ideal(self,
|
|
|
|
|
stage: str,
|
|
|
|
|
signal: str,
|
|
|
|
|
times: pd.Series,
|
|
|
|
|
dataframe: pd.DataFrame) -> Optional[pg.LinearRegionItem]:
|
|
|
|
|
stage_diff = np.diff(dataframe[stage])
|
|
|
|
|
start_index = np.where(stage_diff == 1)[0]
|
|
|
|
|
finish_index = np.where(stage_diff == -1)[0]
|
2024-11-25 17:20:00 +03:00
|
|
|
data = self._stage_ideals[stage]
|
2024-11-25 14:01:09 +03:00
|
|
|
|
|
|
|
|
if start_index.size:
|
|
|
|
|
start_timestamp = times[start_index[0]]
|
|
|
|
|
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
|
|
|
|
|
plot = pg.PlotDataItem(x=start_timestamp+data["time"], y=data[signal["name"]], pen=signal["pen"])
|
|
|
|
|
return plot
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _create_stage_region(self,
|
|
|
|
|
stage: str,
|
|
|
|
|
times: pd.Series,
|
|
|
|
|
dataframe: pd.DataFrame) -> Optional[pg.LinearRegionItem]:
|
|
|
|
|
stage_diff = np.diff(dataframe[stage])
|
|
|
|
|
start_index = np.where(stage_diff == 1)[0]
|
|
|
|
|
finish_index = np.where(stage_diff == -1)[0]
|
|
|
|
|
|
|
|
|
|
if start_index.size:
|
|
|
|
|
start_timestamp = times[start_index[0]]
|
|
|
|
|
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
|
|
|
|
|
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
|
|
|
|
|
region.setBrush(pg.mkBrush(self._stage_colors[stage]))
|
|
|
|
|
return region
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
2024-11-25 17:20:00 +03:00
|
|
|
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
|
2024-11-25 14:01:09 +03:00
|
|
|
plot_widget = pg.PlotWidget(title=title)
|
|
|
|
|
plot_widget.showGrid(x=True, y=True)
|
|
|
|
|
legend = pg.LegendItem((80, 60), offset=(70, 20))
|
|
|
|
|
legend.setParentItem(plot_widget.graphicsItem())
|
|
|
|
|
return plot_widget, legend
|
|
|
|
|
|
|
|
|
|
def get_stage_info(self,
|
|
|
|
|
stage: str,
|
|
|
|
|
dataframe: pd.DataFrame,
|
|
|
|
|
signal_name: str) -> Optional[ProcessStage]:
|
|
|
|
|
if stage in self._stages:
|
|
|
|
|
stage_diff = np.diff(dataframe[stage])
|
|
|
|
|
start_index = np.where(stage_diff == 1)[0]
|
|
|
|
|
finish_index = np.where(stage_diff == -1)[0]
|
|
|
|
|
|
|
|
|
|
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else []
|
|
|
|
|
|
|
|
|
|
if data.size and start_index.size:
|
|
|
|
|
start = start_index[0]
|
|
|
|
|
finish = finish_index[0] if finish_index.size else (len(data) - 1)
|
|
|
|
|
data_slice = data[start:finish]
|
|
|
|
|
mean = np.mean(data_slice)
|
|
|
|
|
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _build_widget(self, dataframe: pd.DataFrame) -> QWidget:
|
|
|
|
|
widget = QWidget()
|
|
|
|
|
layout = QVBoxLayout()
|
|
|
|
|
|
|
|
|
|
time_axis = dataframe["time"]
|
|
|
|
|
dataframe_headers = dataframe.columns.tolist()
|
|
|
|
|
|
|
|
|
|
for channel, description in self._plt_channels.items():
|
|
|
|
|
plot_widget, legend = self._init_plot_widget(title=channel)
|
|
|
|
|
|
|
|
|
|
settings = description["Settings"]
|
|
|
|
|
if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]):
|
|
|
|
|
for stage in self._stages:
|
|
|
|
|
region = self._create_stage_region(stage, time_axis, dataframe)
|
|
|
|
|
for signal in description["Ideal_signals"]:
|
|
|
|
|
ideal_plot = self._create_stage_ideal(stage, signal ,time_axis, dataframe)
|
|
|
|
|
if ideal_plot:
|
|
|
|
|
plot_widget.addItem(ideal_plot)
|
|
|
|
|
|
|
|
|
|
if region:
|
|
|
|
|
plot_widget.addItem(region)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if settings["zoom"]:
|
|
|
|
|
if max(time_axis) < 5.0:
|
|
|
|
|
stages = [self.get_stage_info("Welding",
|
|
|
|
|
dataframe,
|
|
|
|
|
signal["name"]) for signal in description["Real_signals"]]
|
|
|
|
|
if stages:
|
|
|
|
|
means_raw = [stage.mean_value for stage in stages]
|
|
|
|
|
mean = max(means_raw)
|
|
|
|
|
start = time_axis[stages[0].start_index]
|
|
|
|
|
finish = time_axis[stages[0].finish_index]
|
|
|
|
|
|
|
|
|
|
overshoot = pg.BarGraphItem(x0=0,
|
|
|
|
|
y0=mean - mean * 0.05,
|
|
|
|
|
height=mean * 0.05 * 2,
|
|
|
|
|
width=start,
|
|
|
|
|
brush=pg.mkBrush([0, 250, 0, 100]))
|
|
|
|
|
plot_widget.addItem(overshoot)
|
|
|
|
|
|
|
|
|
|
stable = pg.BarGraphItem(x0=start,
|
|
|
|
|
y0=mean - mean * 0.015,
|
|
|
|
|
height=mean * 0.015 * 2,
|
|
|
|
|
width=finish - start,
|
|
|
|
|
brush=pg.mkBrush([0, 250, 0, 100]))
|
|
|
|
|
plot_widget.addItem(stable)
|
|
|
|
|
|
|
|
|
|
plot_widget.setYRange(mean - 260, mean + 260)
|
|
|
|
|
plot_widget.setInteractive(False)
|
|
|
|
|
else:
|
|
|
|
|
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]])
|
|
|
|
|
region = pg.LinearRegionItem([max_value - max_value * 0.015,
|
|
|
|
|
max_value + max_value * 0.015],
|
|
|
|
|
movable=False,
|
|
|
|
|
orientation="horizontal")
|
|
|
|
|
|
|
|
|
|
region.setBrush(pg.mkBrush([0, 250, 0, 100]))
|
|
|
|
|
plot_widget.setYRange(max_value - 200, max_value + 200)
|
|
|
|
|
plot_widget.setXRange(3.5, 4.5)
|
|
|
|
|
plot_widget.addItem(region)
|
|
|
|
|
plot_widget.setInteractive(False)
|
|
|
|
|
|
|
|
|
|
for signal in description["Real_signals"]:
|
|
|
|
|
if signal["name"] in dataframe_headers:
|
|
|
|
|
plot = plot_widget.plot(time_axis, dataframe[signal["name"]], pen=signal["pen"])
|
|
|
|
|
legend.addItem(plot, signal["name"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
layout.addWidget(plot_widget)
|
|
|
|
|
|
|
|
|
|
widget.setLayout(layout)
|
|
|
|
|
return widget
|
|
|
|
|
|
|
|
|
|
def build(self, data: list[pd.DataFrame]) -> None:
|
|
|
|
|
widgets = [self._build_widget(data_sample) for data_sample in data]
|
|
|
|
|
self._mediator.notify(self, widgets)
|
|
|
|
|
|
2024-11-25 17:20:00 +03:00
|
|
|
def update_settings(self, data: list[dict]):
|
|
|
|
|
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)
|
|
|
|
|
|
|
|
|
|
|
2024-11-25 14:01:09 +03:00
|
|
|
|