chore: выполнил автоматизацию формирования паспорта точки и создания виджетов для плоттера

This commit is contained in:
Andrew 2024-12-09 16:05:34 +03:00
parent b9fe95a4aa
commit 8968e3d57a
22 changed files with 483 additions and 221 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
/__pycache__
/tck_venv
/.venv
/.vscode
/.idea
/venv
*.txt

View File

@ -167,7 +167,7 @@
0.418,
0.454,
0.458,
0.440,
0.44,
0.49,
0.47,
0.44,
@ -192,7 +192,7 @@
0.0045
],
"force_target": [
5000.0,
4000.0,
5000.0,
5000.0,
5000.0,

View File

@ -1,6 +1,6 @@
{
"trace_storage_path": [
"/home/leonid/WorkUM/DevUFC/diagramm/performance"
"D:/downloads/a22"
],
"monitor_update_period": [
1000.0

BIN
profile_results.prof Normal file

Binary file not shown.

View File

@ -11,6 +11,7 @@ class Mediator(BaseMediator):
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
if issubclass(source.__class__, BaseDirectoryMonitor):
self._converter.convert_data(data)

View File

@ -1,5 +1,8 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd
from typing import Any
class idealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
@ -27,40 +30,47 @@ class idealDataBuilder(BaseIdealDataBuilder):
return ideal_timings
class PassportFormer(BasePointPassportFormer):
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, list]]:
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, int]]:
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
self._mediator.notify(self, return_data)
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, list]:
passports_pocket = []
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
system_settings = {key: value[0] for key, value in self._params[1].items()}
for i in range(0, point_quantity):
if not dataframe["time"].isna().all():
operator_settings = {}
for key, value in self._params[0].items():
if len(value) > i:
operator_settings[key] = value[i]
else:
operator_settings[key] = value[0]
tesla_time = sum(self._params[0].get("Tesla summary time", []))
points_pocket = []
time_is_valid = not dataframe["time"].isna().all()
if time_is_valid:
for i in range(point_quantity):
operator_settings = {
key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
}
params_list = [operator_settings, system_settings]
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
if i < point_quantity-1:
cut_time = events[self._stages[0]][0][i+1]
frame = dataframe[dataframe["time"] < cut_time]
dataframe = dataframe[dataframe["time"] >= cut_time]
else:
frame = dataframe
cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe:
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][i+1]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
# TODO: определить время каждого цикла теслы
tesla_time = sum(self._params[0]["Tesla summary time"])
#tesla_events = sum([operator_settings[key] for key in self._tesla_stages])
passports_pocket.append([frame, ideal_data, point_events, tesla_time])
return passports_pocket
points_pocket.append([point_timeframe, ideal_data, point_events])
return dataframe, points_pocket, tesla_time
def update_settings(self, params: list[dict, dict]):
self._params = params

View File

@ -1,182 +1,290 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
from PyQt5.QtGui import QFont
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
import pyqtgraph as pg
import numpy as np
from numpy import floating
from typing import Optional, Any, NamedTuple
from typing import Optional, Any
from PyQt5.QtOpenGL import QGLWidget
from utils.base.base import BasePlotWidget
class ProcessStage(NamedTuple):
mean_value: floating[Any]
start_index: int
finish_index: int
class ProcessStage():
mean_value:int
start_index:int
finish_index:int
class PlotWidget(BasePlotWidget):
def _create_curve_ideal(self,
signal: str,
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
if start_timestamp and finish_timestamp:
plot = pg.PlotDataItem(x=start_timestamp+ideal_data["time"], y=ideal_data[signal["name"]], pen=signal["pen"])
return plot
return None
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
navigator.setBackground('d')
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 50))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
"""
Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
return pg.PlotDataItem(
x=start_timestamp + ideal_data["time"],
y=ideal_data[signal["name"]],
pen=signal["pen"]
)
return None
def _create_stage_region(self,
stage: str,
start_timestamp: float,
finish_timestamp: float,
transparency:int) -> Optional[pg.LinearRegionItem]:
if start_timestamp and finish_timestamp:
transparency: int) -> Optional[pg.LinearRegionItem]:
"""
Создает регион для определённого этапа, если заданы временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
region.setBrush(pg.mkBrush(self._stage_colors[stage][:3] + [transparency]))
color = self._stage_colors.get(stage, [100, 100, 100, 100])
region.setBrush(pg.mkBrush(color[:3] + [transparency]))
return region
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def get_stage_info(self,
stage: str,
dataframe: pd.DataFrame,
signal_name: str) -> Optional[ProcessStage]:
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else []
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
"""
stages = point_events.keys()
if all(stage in dataframe_headers for stage in stages):
for stage in stages:
start_t, end_t = point_events[stage]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
plot_widget.addItem(region)
if data.size and start_index.size:
start = start_index[0]
finish = finish_index[0] if finish_index.size else (len(data) - 1)
data_slice = data[start:finish]
mean = np.mean(data_slice)
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
return None
def _build_widget(self, data: list[pd.DataFrame, dict, dict, list]) -> QWidget:
widget = QWidget()
layout = QVBoxLayout()
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
"""
ideal_timings = ideal_data["Ideal timings"]
stages = list(point_events.keys())
for i, stage in enumerate(stages):
start_t = point_events[stage][0]
end_t = start_t + ideal_timings[i]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
plot_widget.addItem(region)
for channel, description in self._plt_channels.items():
performance_list = []
df_continuous = pd.DataFrame({})
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
for stage in point_events.keys():
for signal in ideal_signals:
curve = self._create_curve_ideal(
signal,
ideal_data[stage],
point_events[stage][0],
point_events[stage][1]
)
if curve:
plot_widget.addItem(curve)
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
legend.addItem(plot, signal["name"])
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
)
layout.addWidget(performance_label)
performance_label.update()
def _build_widget(self, data: list[Any]) -> QWidget:
"""
Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, tesla_time].
"""
widget = QGLWidget()
layout = QVBoxLayout(widget)
dataframe, points_pocket, tesla_time = data
dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"]
if settings["performance"]:
TWC_time = 0
ideal_time = 0
TWC_time = 0.0
ideal_time = 0.0
worst_perf = 2
for cur_point, (dataframe, ideal_data, events, tesla_time) in enumerate(data):
df_continuous = pd.concat([df_continuous, dataframe], axis=0)
dataframe_headers = dataframe.columns.tolist()
stages = events.keys()
# Итерация по точкам
for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events]
point_timeframe, ideal_data, point_events = point_data
if settings["stages"] and all([stage in dataframe_headers for stage in stages]):
for stage in stages:
start_t, end_t = events[stage]
region = self._create_stage_region(stage, start_t, end_t, 75)
if region:
plot_widget.addItem(region)
# Добавляем реальные стадии
if settings["stages"]:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, transparency=75)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
for i, stage in enumerate(stages):
start_t, _ = events[stage]
end_t = start_t + ideal_data["Ideal timings"][i]
region = self._create_stage_region(stage, start_t, end_t, 125)
if region:
plot_widget.addItem(region)
for signal in description["Ideal_signals"]:
curve = self._create_curve_ideal(signal, ideal_data[stage], events[stage][0], events[stage][1])
if curve:
plot_widget.addItem(curve)
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
# Подсчёт производительности
if settings["performance"]:
if cur_point == len(data) -1:
ideal_time += sum(ideal_data["Ideal timings"][0:3])
TWC_time += sum([events[stage][1] - events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
for stage in stages:
TWC_time += events[stage][1] - events[stage][0]
ideal_time += ideal_data["Ideal cycle"]
TWC_delta = point_timeframe[1] - point_timeframe[0]
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
if settings["zoom"]:
pass
"""if max(time_axis) < 5.0:
stages = [self.get_stage_info("Welding",
dataframe,
signal["name"]) for signal in description["Real_signals"]]
if stages:
means_raw = [stage.mean_value for stage in stages]
mean = max(means_raw)
start = time_axis[stages[0].start_index]
finish = time_axis[stages[0].finish_index]
overshoot = pg.BarGraphItem(x0=0,
y0=mean - mean * 0.05,
height=mean * 0.05 * 2,
width=start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(overshoot)
stable = pg.BarGraphItem(x0=start,
y0=mean - mean * 0.015,
height=mean * 0.015 * 2,
width=finish - start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(stable)
plot_widget.setYRange(mean - 260, mean + 260)
plot_widget.setInteractive(False)
else:
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]])
region = pg.LinearRegionItem([max_value - max_value * 0.015,
max_value + max_value * 0.015],
movable=False,
orientation="horizontal")
region.setBrush(pg.mkBrush([0, 250, 0, 100]))
plot_widget.setYRange(max_value - 200, max_value + 200)
plot_widget.setXRange(3.5, 4.5)
plot_widget.addItem(region)
plot_widget.setInteractive(False)"""
for signal in description["Real_signals"]:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(df_continuous["time"], df_continuous[signal["name"]], pen=signal["pen"])
legend.addItem(plot, signal["name"])
# Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
if widget_num == 0:
main_plot = plot_widget
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
tesla_TWC = round((1 - TWC_time/tesla_time)*100,2)
tesla_ideal = round((1 - ideal_time/tesla_time)*100,2)
TWC_ideal = round((ideal_time/TWC_time)*100,2)
performance_label = QLabel(f"Сокращение длительности: фактическое = {tesla_TWC} %, идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%")
layout.addWidget(performance_label)
performance_label.update()
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout)
return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[pd.DataFrame]) -> None:
def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
...
]
"""
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)

View File

@ -1,41 +1,58 @@
import pyqtgraph as pg
from PyQt5.QtWidgets import QWidget, QTableWidget, QVBoxLayout, QTableWidgetItem, QLabel, QPushButton, QLineEdit, QHBoxLayout
from PyQt5.QtCore import Qt
from typing import Callable, Optional, Any
from PyQt5.QtWidgets import (
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
)
from PyQt5.QtGui import QIntValidator
from utils.json_tools import read_json, write_json
from gui import qt_settings as qts
class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func):
super(settingsWindow, self).__init__()
def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
"""
Окно настроек для редактирования параметров.
:param path: Путь к файлу настроек (JSON).
:param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк).
"""
super().__init__()
self._settingsPath = path
self._name = name
self._data = {}
self._data: dict[str, list[Any]] = {}
self._upd_func = upd_func
self._num_points: Optional[QLineEdit] = None
self._param_table: Optional[QTableWidget] = None
self.load_settings()
self._init_ui()
def load_settings(self) -> None:
self._data = read_json(self._settingsPath)
"""Загружает настройки из JSON-файла."""
data = read_json(self._settingsPath)
if isinstance(data, dict):
self._data = data
else:
self._data = {}
def write_settings(self) -> None:
"""Записывает текущие настройки в JSON-файл."""
write_json(self._settingsPath, self._data)
def getParams(self) -> dict:
"""Возвращает текущий словарь параметров."""
return self._data
def _init_ui(self) -> None:
save_button = QPushButton()
restore_button = QPushButton()
save_button.setText("Save")
restore_button.setText("Restore")
"""Инициализирует UI: кнопки, поля ввода, таблицу."""
save_button = QPushButton("Save")
restore_button = QPushButton("Restore")
self._num_points = QLineEdit()
self._num_points.setPlaceholderText("Enter the number of welding points")
self._num_points.setValidator(QIntValidator())
control_layout = QHBoxLayout()
control_layout.addWidget(save_button)
control_layout.addWidget(restore_button)
@ -44,9 +61,9 @@ class settingsWindow(QWidget):
save_button.pressed.connect(self._save)
restore_button.pressed.connect(self._restore)
self._num_points.editingFinished.connect(self._expand)
self._param_table = QTableWidget()
self._restore()
self._populate_table()
layout = QVBoxLayout()
header = QLabel(self._name)
@ -55,49 +72,115 @@ class settingsWindow(QWidget):
layout.addWidget(self._param_table)
self.setLayout(layout)
self.setStyleSheet(qts.white_style)
#self.show()
def _save(self) -> dict:
self._data = {}
for i in range(self._param_table.rowCount()):
key = self._param_table.item(i, 0).text()
data = []
for j in range(1, self._param_table.columnCount()):
param = self._param_table.item(i, j).text()
if key != "trace_storage_path":
param = float(param)
data.append(param)
self._data[key] = data
self.write_settings()
self._upd_func()
def _restore(self) -> None:
def _populate_table(self) -> None:
"""Заполняет таблицу значениями из self._data."""
# Если нет данных для заполнения
if not self._data:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
# Предполагаем, что у всех ключей одинаковая длина списков параметров.
first_key = next(iter(self._data), None)
if first_key is None:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
column_count = len(self._data[first_key]) + 1
self._param_table.setRowCount(len(self._data))
key = next(iter(self._data))
self._param_table.setColumnCount(len(self._data[key])+1)
self._param_table.setColumnCount(column_count)
for i, (key, items) in enumerate(self._data.items()):
self._param_table.setColumnCount(len(self._data[key])+1)
self._param_table.setItem(i, 0, QTableWidgetItem(key))
for j, item in enumerate(items):
self._param_table.setItem(i, j+1, QTableWidgetItem(str(item)))
def _save(self) -> None:
"""Сохраняет текущие параметры из таблицы в self._data и вызывает _upd_func()."""
new_data = {}
row_count = self._param_table.rowCount()
col_count = self._param_table.columnCount()
for i in range(row_count):
key_item = self._param_table.item(i, 0)
if key_item is None:
continue
key = key_item.text()
# Если ключ пустой, пропускаем
if not key:
continue
row_data = []
for j in range(1, col_count):
cell_item = self._param_table.item(i, j)
if cell_item is None:
continue
param_str = cell_item.text()
# Если ключ не trace_storage_path, конвертируем в float
if key != "trace_storage_path":
try:
param = float(param_str)
except ValueError:
param = 0.0
else:
param = param_str
row_data.append(param)
new_data[key] = row_data
self._data = new_data
self.write_settings()
self._upd_func()
def _restore(self) -> None:
"""Перезагружает данные из файла и обновляет таблицу."""
self.load_settings()
self._populate_table()
def _expand(self) -> None:
param=int(self._num_points.text())
"""Расширяет количество столбцов таблицы в зависимости от введённого значения."""
if not self._num_points:
return
num_points_text = self._num_points.text()
if not num_points_text.isdigit():
return
num_points = int(num_points_text)
if num_points < 0:
return
prev_columns = self._param_table.columnCount()
self._param_table.setColumnCount(param+1)
if prev_columns < param+1:
for i in range(prev_columns, param+1):
for j, (key, items) in enumerate(self._data.items()):
self._param_table.setItem(j, i, QTableWidgetItem(str(items[-1])))
desired_columns = num_points + 1
if desired_columns <= prev_columns:
return
self._param_table.setColumnCount(desired_columns)
# Новые столбцы заполняем последним известным параметром для каждого ключа
for i, (key, items) in enumerate(self._data.items()):
# Если нет данных, пропускаем
if not items:
continue
last_value = str(items[-1])
for col in range(prev_columns, desired_columns):
self._param_table.setItem(i, col, QTableWidgetItem(last_value))
# Добавляем новый элемент также в self._data для консистентности
# После этого можно будет сохранить при нажатии Save
# Дополним также и в self._data
additional_count = desired_columns - prev_columns
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
if __name__ == '__main__':
import pyqtgraph as pg
app = pg.mkQApp("Parameter Tree Example")
window = settingsWindow('params\operator_params.json', 'operator')
app.exec()

View File

@ -1,4 +1,5 @@
import sys
import pyqtgraph as pg
from PyQt5 import QtWidgets
from gui.mainGui import MainWindow
@ -11,6 +12,7 @@ from controller.passportFormer import PassportFormer
def main():
pg.setConfigOptions(useOpenGL=False, antialias=False)
app = QtWidgets.QApplication(sys.argv)
monitor = DirectoryMonitor()
data_converter = DataConverter()

View File

@ -1,11 +1,13 @@
from __future__ import annotations
import os
from typing import Optional, Union
from typing import Optional, Union, Any
from cachetools import LRUCache
import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget
from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm
import pandas as pd
import pandas as pd
@ -245,6 +247,7 @@ class BaseIdealDataBuilder(OptAlgorithm):
self.mul = system_params['time_capture']
self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
@ -351,10 +354,44 @@ class BasePointPassportFormer:
"Tesla welding",
"Tesla oncomming_relief"
]
self._params = []
self._ideal_data_cashe = LRUCache(maxsize=1000)
self._OptAlgorithm_operator_params = [
"dist_open_start_1",
"dist_open_start_2",
"dist_open_after_1",
"dist_open_after_2",
"dist_open_end_1",
"dist_open_end_2",
"dist_close_end_1",
"dist_close_end_2",
"time_command",
"time_robot_movement",
"object_thickness",
"force_target",
"force_capture",
"time_wielding"]
self._OptAlgorithm_system_params = [
"a_max_1",
"v_max_1",
"a_max_2",
"v_max_2",
"mass_1",
"mass_2",
"k_hardness_1",
"k_hardness_2",
"torque_max_1",
"torque_max_2",
"transmission_ratio_1",
"transmission_ratio_2",
"position_start_1",
"position_start_2",
"k_prop",
"time_capture"]
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
@ -363,41 +400,45 @@ class BasePointPassportFormer:
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> list[list[float]]:
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if start_idx[0] > finish_idx[0]:
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
#print (start_idx)
start_list = times.loc[start_idx].tolist()
end_list = times.loc[finish_idx].tolist()
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times[len(times)-1]))
end_list.append(float(times.iloc[-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> list[dict[list[float]], int]:
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
for i in range(temp, point_quantity):
print ("cant find enough", stage)
start_list.append(0)
end_list.append(1)
events[stage] = [start_list[:point_quantity], end_list[:point_quantity]]
#print(events, point_quantity)
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
params: list[dict] = None) -> dict:
params: list[dict] = None) -> dict:
self.opt = idealDataBuilder(params)
stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
@ -415,6 +456,22 @@ class BasePointPassportFormer:
def update_settings(self, params: list) -> None:
...
def _generate_cache_key(self,
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует params_list в хешируемый ключ для кэша.
"""
operator_settings, system_settings = params_list
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in operator_settings.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in system_settings.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
@property
def opt(self) -> BaseIdealDataBuilder: