WeldingSpotPerformance/src/gui/plotter.py

290 lines
13 KiB
Python
Raw Normal View History

import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
import pyqtgraph as pg
from typing import Optional, Any
2024-12-05 13:18:53 +03:00
from utils.base.base import BasePlotWidget
class ProcessStage():
mean_value:int
start_index:int
finish_index:int
class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
"""
Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
return pg.PlotDataItem(
x=start_timestamp + ideal_data["time"],
y=ideal_data[signal["name"]],
pen=signal["pen"]
)
return None
def _create_stage_region(self,
stage: str,
start_timestamp: float,
finish_timestamp: float,
transparency: int) -> Optional[pg.LinearRegionItem]:
"""
Создает регион для определённого этапа, если заданы временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
color = self._stage_colors.get(stage, [100, 100, 100, 100])
region.setBrush(pg.mkBrush(color[:3] + [transparency]))
return region
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True)
plot_widget.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
"""
stages = point_events.keys()
if all(stage in dataframe_headers for stage in stages):
for stage in stages:
start_t, end_t = point_events[stage]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
plot_widget.addItem(region)
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
"""
ideal_timings = ideal_data["Ideal timings"]
stages = list(point_events.keys())
for i, stage in enumerate(stages):
start_t = point_events[stage][0]
end_t = start_t + ideal_timings[i]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
plot_widget.addItem(region)
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
for stage in point_events.keys():
for signal in ideal_signals:
curve = self._create_curve_ideal(
signal,
ideal_data[stage],
point_events[stage][0],
point_events[stage][1]
)
if curve:
plot_widget.addItem(curve)
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
legend.addItem(plot, signal["name"])
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
)
self.set_style(performance_label)
layout.addWidget(performance_label)
performance_label.update()
def _build_widget(self, data: list[Any]) -> QWidget:
"""
Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, tesla_time].
"""
widget = QWidget()
layout = QVBoxLayout(widget)
dataframe, points_pocket, tesla_time = data
dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"]
TWC_time = 0.0
ideal_time = 0.0
worst_perf = 2
# Итерация по точкам
for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events]
point_timeframe, ideal_data, point_events = point_data
# Добавляем реальные стадии
if settings["stages"]:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
# Подсчёт производительности
if settings["performance"]:
is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
TWC_delta = point_timeframe[1] - point_timeframe[0]
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
# Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
if widget_num == 0:
main_plot = plot_widget
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout)
return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
...
]
"""
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)