WeldingSpotPerformance/src/gui/plotter.py

315 lines
14 KiB
Python
Raw Normal View History

import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
import copy
import pyqtgraph as pg
from typing import Optional, Any
2024-12-05 13:18:53 +03:00
from utils.base.base import BasePlotWidget
class ProcessStage():
mean_value:int
start_index:int
finish_index:int
class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
"""
Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
return pg.PlotDataItem(
x=start_timestamp + ideal_data["time"],
y=ideal_data[signal["name"]],
pen=signal["pen"]
)
return None
def _create_stage_region(self,
stage: str,
start_timestamp: float,
finish_timestamp: float,
transparency: int) -> Optional[pg.LinearRegionItem]:
"""
Создает регион для определённого этапа, если заданы временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
color = self._stage_colors.get(stage, [100, 100, 100, 100])
region.setBrush(pg.mkBrush(color[:3] + [transparency]))
return region
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True)
plot_widget.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
"""
stages = point_events.keys()
if all(stage in dataframe_headers for stage in stages):
for stage in stages:
start_t, end_t = point_events[stage]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
plot_widget.addItem(region)
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
"""
ideal_timings = ideal_data["Ideal timings"]
stages = list(point_events.keys())
for i, stage in enumerate(stages):
start_t = point_events[stage][0]
end_t = start_t + ideal_timings[i]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
plot_widget.addItem(region)
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
for stage in point_events.keys():
for signal in ideal_signals:
curve = self._create_curve_ideal(
signal,
ideal_data[stage],
point_events[stage][0],
point_events[stage][1]
)
if curve:
plot_widget.addItem(curve)
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
legend.addItem(plot, signal["name"])
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
)
self.set_style(performance_label)
layout.addWidget(performance_label)
performance_label.update()
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _build_widget(self, data: list[Any]) -> QWidget:
"""
Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
"""
widget = QWidget()
layout = QVBoxLayout(widget)
dataframe, points_pocket, useful_data = data
tesla_time = useful_data["tesla_time"]
range_ME = useful_data["range_ME"]
dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"]
TWC_time = 0.0
ideal_time = 0.0
worst_perf = 2
if settings["mirror ME"]:
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
# Итерация по точкам
for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events]
point_timeframe, ideal_dat, point_events = point_data
ideal_data = copy.deepcopy(ideal_dat)
# Модифицируем данные для отображения гарфика
if settings["ideals"] and settings["mirror ME"]:
for stage in point_events.keys():
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
# Добавляем реальные стадии
if settings["stages"]:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
# Подсчёт производительности
if settings["performance"]:
is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
TWC_delta = point_timeframe[1] - point_timeframe[0]
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
# Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
if widget_num == 0:
main_plot = plot_widget
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout)
return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
...
]
"""
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)