chore: функционал PlotWidget разнесен в два класса

This commit is contained in:
Andrew 2025-01-27 19:03:05 +03:00
parent b7a190ec54
commit 93be450da6

View File

@ -1,7 +1,8 @@
import copy
import traceback
import sys
from typing import Optional, Any
from typing import Optional, Tuple, List, Any
from dataclasses import dataclass
from PyQt5.QtWidgets import (
QWidget,
@ -17,7 +18,18 @@ from loguru import logger
import pyqtgraph as pg
import pandas as pd
from base.base import BasePlotWidget, GraphicPassport
from base.base import BasePlotWidget, GraphicPassport, PlotItems, PointPassport, UsefulGraphData
@dataclass
class ChannelTimings():
shift: float = 0
TWC_time: float = 0.0
ideal_time: float = 0.0
client_time: float = 0.0
worst_performance: float = 2
worst_timeframe: list[float] = [0, 0]
class PlotWidget(BasePlotWidget):
@ -27,25 +39,27 @@ class PlotWidget(BasePlotWidget):
Создает набор виджетов по предоставленному списку данных.
"""
try:
# TODO: Про инициализацию атрибутов класса где-то уже писал. На всякий случай: она делается в конструкторе.
self._datalen = len(data)
widgets_datapack = [self._build_widget(data_sample) for self._datastep, data_sample in enumerate(data)]
except:
# TODO: Добавить конкретные исключения.
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
# TODO: Логи должны быть понятны обычному пользователю. Traceback что-то там - это не информативно и может
# быть непонятно, поэтому пишем осознанные сообщения. Если надо, то создаем собственные
# классы исключений. (Касается всего проекта - уже не первый раз натыкаюсь на это.)
logger.error(pymsg)
widgets_datapack = [QLabel(pymsg)]
except (KeyError, ValueError, TypeError) as e:
# Provide user-friendly log message, log traceback in debug.
logger.error(f"Возникла проблема при сборке данных графика: {e}")
logger.debug(traceback.format_exc())
error_label = QLabel("Произошла ошибка при формировании графиков. Пожалуйста, проверьте корректность данных.")
widgets_datapack = [error_label]
except Exception as e:
# Catch all remaining exceptions.
logger.error(f"Непредвиденная ошибка при формировании графиков: {e}")
logger.debug(traceback.format_exc())
error_label = QLabel("Непредвиденная ошибка при формировании графиков.")
widgets_datapack = [error_label]
finally:
# Notify mediator
self._mediator.notify(self, widgets_datapack)
def _downsample_data(self, x, y, max_points=5000):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
# TODO: Какой тип данных у 'x'? Какой тип данных у 'y'? Надо добавить аннотации типов.
@staticmethod
def _downsample_data(x:list, y:list, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
@ -58,7 +72,7 @@ class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
main_plot: pg.PlotItem) -> Tuple[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
@ -80,13 +94,11 @@ class PlotWidget(BasePlotWidget):
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
# TODO: Возвращаемый результат не соответствует аннотированному в сигнатуре метода.
return navigator, ROI_region
return (navigator, ROI_region)
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
@staticmethod
def _sync_main_plot_with_navigator(main_plot: pg.PlotItem,
region: pg.LinearRegionItem) -> None:
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
@ -96,34 +108,8 @@ class PlotWidget(BasePlotWidget):
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
# TODO: Методы _mirror_shift_data и _shift_data нарушают принцип DRY: дублирование кода.
# Сделать ОДИН метод, одним из входных параметров которого будет lambda-функция, которая применяется к dataframe.
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
@staticmethod
def _sync_navigator_with_main(main_plot: pg.PlotItem, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
@ -133,6 +119,213 @@ class PlotWidget(BasePlotWidget):
region.setRegion([x_min, x_max])
region.blockSignals(False)
def _add_performance_label(self,
layout: QVBoxLayout,
timings: ChannelTimings,
qt_items: dict) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
# TODO: Почему PlotWidget создает Label? Вынести в другое место.
tesla_TWC = round((1 - timings.TWC_time/timings.client_time)*100, 2) if timings.client_time else 0.0
tesla_ideal = round((1 - timings.ideal_time/timings.client_time)*100, 2) if timings.client_time else 0.0
TWC_ideal = round((timings.ideal_time/timings.TWC_time)*100, 2) if timings.TWC_time else 0.0
label_widget = QWidget()
label_layout = QHBoxLayout(label_widget)
start_label = QLabel("Сокращение длительности: ")
real_label = QLabel(f"фактическое = {tesla_TWC} % ")
if not tesla_TWC or not timings.TWC_time: real_label.setVisible(False)
ideal_label = QLabel(f"идеальное = {tesla_ideal} % ")
if not tesla_ideal: ideal_label.setVisible(False)
kdip_label = QLabel(f"КДИП = {TWC_ideal}% ")
if not TWC_ideal: kdip_label.setVisible(False)
label_layout.addWidget(start_label, alignment=Qt.AlignLeft)
label_layout.addWidget(real_label, alignment=Qt.AlignLeft)
label_layout.addWidget(ideal_label, alignment=Qt.AlignLeft)
label_layout.addWidget(kdip_label, alignment=Qt.AlignLeft)
spacer = QSpacerItem(1, 1, QSizePolicy.Expanding, QSizePolicy.Minimum)
label_layout.addSpacerItem(spacer)
self.set_style(label_widget)
layout.addWidget(label_widget)
qt_items["performance label"] = label_widget
qt_items["real performance"] = real_label
qt_items["ideal performance"] = ideal_label
qt_items["real to ideal performance"] = kdip_label
def _build_widget(self, graphic_passport: GraphicPassport) -> QWidget:
# TODO: Исходя из сигнатуры метода, он создает и возвращает виджет с графиками.
# Простыня "result_widget, reg_items, curve_items, qt_items" не похожа на виджет с графиком.
"""
Собирает графический виджет для одного набора данных.
"""
container_widget, container_layout, plot_layout, pyqt_container = self._generate_widget_container()
plotter = PlotItemGenerator(graphic_passport, len(self._plt_channels), self._stage_colors)
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_item, plot_timings = plotter.generate_plot_item(widget_num, channel, description, pyqt_container)
if widget_num == 0:
main_plot = plot_item
else:
# Связываем остальные графики с основным графиком
plot_item.setXLink(main_plot)
if description["settings"]["performance"]:
self._add_performance_label(container_layout, plot_timings, pyqt_container.qt_items)
plot_layout.addItem(plot_item, widget_num, 0)
navigator, ROI_region = self._create_navigator(plot_timings.worst_timeframe, main_plot)
if navigator is not None:
plot_layout.addItem(navigator, widget_num+1, 0)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
container_layout.addWidget(plot_layout)
container_widget.setProperty("pyqt_container", pyqt_container)
return container_widget
@staticmethod
def _generate_widget_container() -> Tuple[QWidget, QVBoxLayout, pg.GraphicsLayoutWidget, PlotItems]:
container_widget = QWidget()
container_layout = QVBoxLayout(container_widget)
plot_layout = pg.GraphicsLayoutWidget()
pyqt_container = PlotItems({"real":{}, "ideal":{}}, {"real":{}, "ideal":{}}, {})
return (container_widget, container_layout, plot_layout, pyqt_container)
def _update_status(self, widgsteps:int, pointsteps:int, cur_widg:int, cur_point:int):
if self._datalen:
sycle_start = self._datastep/self._datalen*100 + 1
period1 = 100/self._datalen
else:
sycle_start = 1
period1 = 100
period2 = period1/widgsteps if widgsteps != 0 else period1
period3 = period2/pointsteps if pointsteps != 0 else period2
progress = sycle_start + period2*cur_widg + period3*cur_point
self._controller.update_progress(progress)
class PlotItemGenerator:
def __init__(self,
graphic_passport: GraphicPassport,
widget_steps: int,
colors: dict) -> None:
self._stage_colors = colors
self._ideal_mode = graphic_passport.dataframe is None
if not self._ideal_mode:
dataframe_headers = graphic_passport.dataframe.columns.tolist()
point_steps = len(graphic_passport.points_pocket)
else:
dataframe_headers = []
point_steps = 1
self._datapack = [
graphic_passport.dataframe,
dataframe_headers,
graphic_passport.useful_data,
graphic_passport.points_pocket,
widget_steps,
point_steps
]
def generate_plot_item(self,
widget_num: int,
channel: str,
description: dict[str, Any],
pyqt_container: PlotItems) -> Tuple[pg.PlotItem, ChannelTimings]:
dataframe, dataframe_headers, useful_data, points_pocket, widget_steps, point_steps = self._datapack
plot_item, legend = self._init_plot_item(title=channel)
settings = description["Settings"]
timings = ChannelTimings()
timings.client_time = useful_data.client_time
# TODO: рассчитать корректный параметр range
if settings["mirror ME"] and not self._ideal_mode:
dataframe = self._shift_data(
"ME",
description["Real_signals"],
dataframe,
lambda x: useful_data.range_ME-x
)
# Итерация по точкам
for cur_point, data in enumerate(points_pocket):
point_data: PointPassport = data
ideal_data = copy.deepcopy(point_data.ideal_data)
is_last = (cur_point == len(points_pocket) - 1)
if self._ideal_mode:
timings, point_events, point_data.timeframe = self._generate_synthetic_events(timings, ideal_data)
else:
if settings["force compensation FE"]:
force = point_data.useful_data["force"]
k_hardness = useful_data.k_hardness
signals = description["Real_signals"]
dataframe = self._apply_force_compensation(force, k_hardness, dataframe, point_data.timeframe, signals)
if settings["stages"]:
self._add_stage_regions(plot_item, point_data.events, dataframe_headers, pyqt_container.regions, 75)
if settings["force accuracy"]:
force = point_data.useful_data["force"]
self._add_force_accuracy_region(point_events["Welding"], force, plot_item)
if settings["ideals"] and settings["mirror ME"]:
for stage in point_events.keys():
ideal_data[stage] = self._shift_data("ME", description["Ideal_signals"], ideal_data[stage], lambda x: useful_data.range_ME-x)
if settings["workpiece"]:
self._add_workpiece(point_data, plot_item)
if settings["ideals"]:
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, pyqt_container.regions, 100)
self._add_ideal_signals(plot_item, legend, ideal_data, point_events, description["Ideal_signals"], pyqt_container.curves, is_last)
if settings["performance"]:
timings = self._calc_performance(timings, point_data, ideal_data, is_last)
self._update_status(widget_steps, point_steps, widget_num, cur_point)
# Добавляем реальные сигналы
if not self._ideal_mode:
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, pyqt_container.curves)
return (plot_item, timings)
@staticmethod
def _shift_data(valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
func: function) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(func )
return dataframe
@staticmethod
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
plot_item = pg.PlotItem(title=title)
# Оптимизация отображения графиков
plot_item.setDownsampling(auto=True, mode='peak')
plot_item.showGrid(x=True, y=True)
plot_item.setClipToView(True)
legend = plot_item.addLegend(offset=(70, 20))
return plot_item, legend
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
@ -164,16 +357,6 @@ class PlotWidget(BasePlotWidget):
return region
return None
@staticmethod
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
plot_item = pg.PlotItem(title=title)
# Оптимизация отображения графиков
plot_item.setDownsampling(auto=True, mode='peak')
plot_item.showGrid(x=True, y=True)
plot_item.setClipToView(True)
legend = plot_item.addLegend(offset=(70, 20))
return plot_item, legend
def _add_stage_regions(self,
plot_item: pg.PlotItem,
point_events: dict[str, list[float]],
@ -260,217 +443,78 @@ class PlotWidget(BasePlotWidget):
curve_items["real"].setdefault(signal["name"], {})
curve_items["real"][signal["name"]] = plot
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float,
qt_items: dict) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
# TODO: Почему PlotWidget создает Label? Вынести в другое место.
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
def _add_force_accuracy_region(self, event:list, force: float, plot_item:pg.PlotItem) -> None:
modifier = 0.05
x1 = event[0]
dx = event[1] - x1
y1 = force*(1-modifier)
dy = force*(2*modifier)
label_widget = QWidget()
label_layout = QHBoxLayout(label_widget)
start_label = QLabel("Сокращение длительности: ")
real_label = QLabel(f"фактическое = {tesla_TWC} % ")
if not tesla_TWC or not TWC_time: real_label.setVisible(False)
ideal_label = QLabel(f"идеальное = {tesla_ideal} % ")
if not tesla_ideal: ideal_label.setVisible(False)
kdip_label = QLabel(f"КДИП = {TWC_ideal}% ")
if not TWC_ideal: kdip_label.setVisible(False)
label_layout.addWidget(start_label, alignment=Qt.AlignLeft)
label_layout.addWidget(real_label, alignment=Qt.AlignLeft)
label_layout.addWidget(ideal_label, alignment=Qt.AlignLeft)
label_layout.addWidget(kdip_label, alignment=Qt.AlignLeft)
spacer = QSpacerItem(1, 1, QSizePolicy.Expanding, QSizePolicy.Minimum)
label_layout.addSpacerItem(spacer)
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
rect_item.setPen(pg.mkPen('black', width=0))
plot_item.addItem(rect_item)
self.set_style(label_widget)
layout.addWidget(label_widget)
def _add_workpiece(self, point_data:PointPassport, plot_item: pg.PlotItem) -> None:
x1 = point_data.timeframe[0]
dx = point_data.timeframe[1] - x1
y1 = point_data.useful_data["L2"]*1000
dy = point_data.useful_data["thickness"]*1000
qt_items["performance label"] = label_widget
qt_items["real performance"] = real_label
qt_items["ideal performance"] = ideal_label
qt_items["real to ideal performance"] = kdip_label
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush('grey'))
rect_item.setPen(pg.mkPen('black', width=3))
plot_item.addItem(rect_item)
def _build_widget(self, graphic_passport: GraphicPassport) -> QWidget:
# TODO: Исходя из сигнатуры метода, он создает и возвращает виджет с графиками.
# Простыня "result_widget, reg_items, curve_items, qt_items" не похожа на виджет с графиком.
# TODO: Данный метод должен содержать только ту логику, которая относится непосредственно к графикам.
# Все остальное - вынести. Оставшийся метод декомпозировать - очень трудно вообще понять, что тут происходит.
"""
Собирает графический виджет для одного набора данных.
"""
result_widget = QWidget()
result_layout = QVBoxLayout(result_widget)
plot_layout = pg.GraphicsLayoutWidget()
reg_items = {"real":{}, "ideal":{}}
curve_items = {"real":{}, "ideal":{}}
qt_items = {}
dataframe = graphic_passport.dataframe
tesla_time = graphic_passport.useful_data["tesla_time"]
range_ME = graphic_passport.useful_data["range_ME"]
k_hardness = graphic_passport.useful_data["k_hardness"]
dat_is_none = dataframe is None
widget_steps = len(self._plt_channels)
if not dat_is_none:
dataframe_headers = dataframe.columns.tolist()
point_steps = len(graphic_passport.points_pocket)
else: point_steps = 1
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_item, legend = self._init_plot_item(title=channel)
settings = description["Settings"]
global_shift = 0
TWC_time = 0.0
ideal_time = 0.0
worst_perf = 2
# TODO: рассчитать корректный параметр range
if settings["mirror ME"] and not dat_is_none:
dataframe = self._mirror_shift_data(
"ME",
description["Real_signals"],
dataframe,
range_ME
)
# Итерация по точкам
for cur_point, point_data in enumerate(graphic_passport.points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data]
point_timeframe, ideal_dat, point_events, useful_p_data = point_data
ideal_data = copy.deepcopy(ideal_dat)
if dat_is_none:
worst_timeframe = point_timeframe = [global_shift, global_shift+ ideal_data["Ideal cycle"]]
point_events = {}
keys = list(ideal_data.keys())
shift = 0
for i, time in enumerate(ideal_data["Ideal timings"]):
point_events[keys[i]] = [global_shift+shift, global_shift+time+shift]
shift += time
global_shift +=ideal_data["Ideal cycle"]
# TODO: проверить корректность расчетов
if settings["force compensation FE"] and not dat_is_none:
force = useful_p_data["force"]
F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
dataframe.loc[point_idxs] = self._shift_data("FE", description["Real_signals"], dataframe.loc[point_idxs], F_comp)
# Модифицируем данные для отображения гарфика
if settings["ideals"] and settings["mirror ME"]:
for stage in point_events.keys():
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
# Добавляем реальные стадии
if settings["stages"] and not dat_is_none:
self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75)
if settings["workpiece"]:
x1 = point_timeframe[0]
dx = point_timeframe[1] - x1
y1 = useful_p_data["L2"]*1000
dy = useful_p_data["thickness"]*1000
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush('grey'))
rect_item.setPen(pg.mkPen('black', width=3))
plot_item.addItem(rect_item)
if settings["force accuracy"]and not dat_is_none:
modifier = 0.05
x1 = point_events["Welding"][0]
dx = point_events["Welding"][1] - x1
force = useful_p_data["force"]
y1 = force*(1-modifier)
dy = force*(2*modifier)
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
rect_item.setPen(pg.mkPen('black', width=0))
plot_item.addItem(rect_item)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
is_last_point = (cur_point == len(graphic_passport.points_pocket) - 1)
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100)
self._add_ideal_signals(plot_item, legend, ideal_data, point_events, description["Ideal_signals"], curve_items, is_last_point)
# Подсчёт производительности
if settings["performance"]:
is_last_point = (cur_point == len(graphic_passport.points_pocket) - 1)
if is_last_point:
if not dat_is_none: TWC_delta = sum([point_events[stage][1] - point_events[stage][0]
for stage in ["Closing", "Squeeze", "Welding"]])
else: TWC_delta = 0
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
if not dat_is_none: TWC_delta = point_timeframe[1] - point_timeframe[0]
else: TWC_delta = 0
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
# Считаем прогресс
self._update_status(widget_steps, point_steps, widget_num, cur_point)
# Добавляем реальные сигналы
if not dat_is_none:
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
if widget_num == 0:
main_plot = plot_item
else:
# Связываем остальные графики с основным графиком
plot_item.setXLink(main_plot)
if settings["performance"]:
self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time, qt_items)
plot_layout.addItem(plot_item, widget_num, 0)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot)
if navigator is not None:
plot_layout.addItem(navigator, widget_num+1, 0)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
result_layout.addWidget(plot_layout)
return result_widget, reg_items, curve_items, qt_items
def _update_status(self, widgsteps:int, pointsteps:int, cur_widg:int, cur_point:int):
if self._datalen:
sycle_start = self._datastep/self._datalen*100 + 1
period1 = 100/self._datalen
def _calc_performance(self,
timings:ChannelTimings,
point_data: PointPassport,
ideal_data:dict,
is_last:bool) -> ChannelTimings:
if is_last:
if not self._ideal_mode:
TWC_delta = sum([point_data.events[stage][1] - point_data.events[stage][0]
for stage in ["Closing", "Squeeze", "Welding"]])
else: TWC_delta = 0
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
sycle_start = 1
period1 = 100
if not self._ideal_mode: TWC_delta = point_data.timeframe[1] - point_data.timeframe[0]
else: TWC_delta = 0
ideal_delta = ideal_data["Ideal cycle"]
period2 = period1/widgsteps if widgsteps != 0 else period1
period3 = period2/pointsteps if pointsteps != 0 else period2
progress = sycle_start + period2*cur_widg + period3*cur_point
# TODO: см. модуль mediator.py
self._controller.update_progress(progress)
timings.TWC_time += TWC_delta
timings.ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < timings.worst_performance:
timings.worst_performance = curr_perf
timings.worst_timeframe = point_data.timeframe
return timings
def _generate_synthetic_events(self,
timings:ChannelTimings,
ideal_data:dict) -> Tuple[ChannelTimings, dict, list[float]]:
timings.worst_timeframe = point_timeframe = [timings.shift, timings.shift+ ideal_data["Ideal cycle"]]
point_events = {}
keys = list(ideal_data.keys())
shift = 0
for i, time in enumerate(ideal_data["Ideal timings"]):
point_events[keys[i]] = [timings.shift+shift, timings.shift+time+shift]
shift += time
timings.shift +=ideal_data["Ideal cycle"]
return (timings, point_events, point_timeframe)
def _apply_force_compensation(self,
force: float,
k_hardness: float,
dataframe:pd.DataFrame,
point_timeframe:list,
real_signals:list[dict]) -> pd.DataFrame:
F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
dataframe.loc[point_idxs] = self._shift_data("FE", real_signals, dataframe.loc[point_idxs], lambda x: x + F_comp)
return dataframe