Compare commits

..

No commits in common. "2d98c1c5b7eaa6271cbdf9ad8893df3f6291b21e" and "b9fe95a4aa916a76b1922dd81c4c682473f56e80" have entirely different histories.

22 changed files with 238 additions and 508 deletions

1
.gitignore vendored
View File

@ -1,7 +1,6 @@
/__pycache__
/tck_venv
/.venv
/.vscode
/.idea
/venv
*.txt

View File

@ -167,7 +167,7 @@
0.418,
0.454,
0.458,
0.44,
0.440,
0.49,
0.47,
0.44,
@ -192,7 +192,7 @@
0.0045
],
"force_target": [
4000.0,
5000.0,
5000.0,
5000.0,
5000.0,

View File

@ -1,6 +1,6 @@
{
"trace_storage_path": [
"D:/downloads/a22"
"/home/leonid/WorkUM/DevUFC/diagramm/performance"
],
"monitor_update_period": [
1000.0
@ -51,7 +51,7 @@
0.075
],
"time_capture": [
1000.0
100000.0
],
"UML_time_scaler": [
1000.0

Binary file not shown.

View File

@ -11,7 +11,6 @@ class Mediator(BaseMediator):
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
if issubclass(source.__class__, BaseDirectoryMonitor):
self._converter.convert_data(data)

View File

@ -1,8 +1,6 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd
class idealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
@ -30,48 +28,39 @@ class idealDataBuilder(BaseIdealDataBuilder):
class PassportFormer(BasePointPassportFormer):
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, int]]:
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, list]]:
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
self._mediator.notify(self, return_data)
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, list]:
passports_pocket = []
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
system_settings = {key: value[0] for key, value in self._params[1].items()}
tesla_time = sum(self._params[0].get("Tesla summary time", []))
points_pocket = []
time_is_valid = not dataframe["time"].isna().all()
if time_is_valid:
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
for i in range(point_quantity):
operator_settings = {
key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
}
for i in range(0, point_quantity):
if not dataframe["time"].isna().all():
operator_settings = {}
for key, value in self._params[0].items():
if len(value) > i:
operator_settings[key] = value[i]
else:
operator_settings[key] = value[0]
params_list = [operator_settings, system_settings]
cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe:
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
idx = i+1 if idx_shift else i
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
points_pocket.append([point_timeframe, ideal_data, point_events])
return dataframe, points_pocket, tesla_time
if i < point_quantity-1:
cut_time = events[self._stages[0]][0][i+1]
frame = dataframe[dataframe["time"] < cut_time]
dataframe = dataframe[dataframe["time"] >= cut_time]
else:
frame = dataframe
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
# TODO: определить время каждого цикла теслы
tesla_time = sum(self._params[0]["Tesla summary time"])
#tesla_events = sum([operator_settings[key] for key in self._tesla_stages])
passports_pocket.append([frame, ideal_data, point_events, tesla_time])
return passports_pocket
def update_settings(self, params: list[dict, dict]):
self._params = params

View File

@ -1,289 +1,182 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
from PyQt5.QtGui import QFont
import pyqtgraph as pg
from typing import Optional, Any
import numpy as np
from numpy import floating
from typing import Optional, Any, NamedTuple
from utils.base.base import BasePlotWidget
class ProcessStage():
mean_value:int
start_index:int
finish_index:int
class ProcessStage(NamedTuple):
mean_value: floating[Any]
start_index: int
finish_index: int
class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
"""
Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
return pg.PlotDataItem(
x=start_timestamp + ideal_data["time"],
y=ideal_data[signal["name"]],
pen=signal["pen"]
)
signal: str,
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
if start_timestamp and finish_timestamp:
plot = pg.PlotDataItem(x=start_timestamp+ideal_data["time"], y=ideal_data[signal["name"]], pen=signal["pen"])
return plot
return None
def _create_stage_region(self,
stage: str,
start_timestamp: float,
finish_timestamp: float,
transparency: int) -> Optional[pg.LinearRegionItem]:
"""
Создает регион для определённого этапа, если заданы временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
transparency:int) -> Optional[pg.LinearRegionItem]:
if start_timestamp and finish_timestamp:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
color = self._stage_colors.get(stage, [100, 100, 100, 100])
region.setBrush(pg.mkBrush(color[:3] + [transparency]))
region.setBrush(pg.mkBrush(self._stage_colors[stage][:3] + [transparency]))
return region
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True)
plot_widget.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
"""
stages = point_events.keys()
if all(stage in dataframe_headers for stage in stages):
for stage in stages:
start_t, end_t = point_events[stage]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
plot_widget.addItem(region)
def get_stage_info(self,
stage: str,
dataframe: pd.DataFrame,
signal_name: str) -> Optional[ProcessStage]:
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
"""
ideal_timings = ideal_data["Ideal timings"]
stages = list(point_events.keys())
for i, stage in enumerate(stages):
start_t = point_events[stage][0]
end_t = start_t + ideal_timings[i]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
plot_widget.addItem(region)
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
for stage in point_events.keys():
for signal in ideal_signals:
curve = self._create_curve_ideal(
signal,
ideal_data[stage],
point_events[stage][0],
point_events[stage][1]
)
if curve:
plot_widget.addItem(curve)
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else []
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
legend.addItem(plot, signal["name"])
if data.size and start_index.size:
start = start_index[0]
finish = finish_index[0] if finish_index.size else (len(data) - 1)
data_slice = data[start:finish]
mean = np.mean(data_slice)
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
return None
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
)
self.set_style(performance_label)
layout.addWidget(performance_label)
performance_label.update()
def _build_widget(self, data: list[Any]) -> QWidget:
"""
Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, tesla_time].
"""
def _build_widget(self, data: list[pd.DataFrame, dict, dict, list]) -> QWidget:
widget = QWidget()
layout = QVBoxLayout(widget)
layout = QVBoxLayout()
dataframe, points_pocket, tesla_time = data
dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
for channel, description in self._plt_channels.items():
performance_list = []
df_continuous = pd.DataFrame({})
plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"]
TWC_time = 0.0
ideal_time = 0.0
worst_perf = 2
# Итерация по точкам
for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events]
point_timeframe, ideal_data, point_events = point_data
# Добавляем реальные стадии
if settings["stages"]:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
# Подсчёт производительности
if settings["performance"]:
is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
TWC_delta = point_timeframe[1] - point_timeframe[0]
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
# Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
if widget_num == 0:
main_plot = plot_widget
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
TWC_time = 0
ideal_time = 0
for cur_point, (dataframe, ideal_data, events, tesla_time) in enumerate(data):
df_continuous = pd.concat([df_continuous, dataframe], axis=0)
dataframe_headers = dataframe.columns.tolist()
stages = events.keys()
if settings["stages"] and all([stage in dataframe_headers for stage in stages]):
for stage in stages:
start_t, end_t = events[stage]
region = self._create_stage_region(stage, start_t, end_t, 75)
if region:
plot_widget.addItem(region)
if settings["ideals"]:
for i, stage in enumerate(stages):
start_t, _ = events[stage]
end_t = start_t + ideal_data["Ideal timings"][i]
region = self._create_stage_region(stage, start_t, end_t, 125)
if region:
plot_widget.addItem(region)
for signal in description["Ideal_signals"]:
curve = self._create_curve_ideal(signal, ideal_data[stage], events[stage][0], events[stage][1])
if curve:
plot_widget.addItem(curve)
if settings["performance"]:
if cur_point == len(data) -1:
ideal_time += sum(ideal_data["Ideal timings"][0:3])
TWC_time += sum([events[stage][1] - events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
else:
for stage in stages:
TWC_time += events[stage][1] - events[stage][0]
ideal_time += ideal_data["Ideal cycle"]
if settings["zoom"]:
pass
"""if max(time_axis) < 5.0:
stages = [self.get_stage_info("Welding",
dataframe,
signal["name"]) for signal in description["Real_signals"]]
if stages:
means_raw = [stage.mean_value for stage in stages]
mean = max(means_raw)
start = time_axis[stages[0].start_index]
finish = time_axis[stages[0].finish_index]
overshoot = pg.BarGraphItem(x0=0,
y0=mean - mean * 0.05,
height=mean * 0.05 * 2,
width=start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(overshoot)
stable = pg.BarGraphItem(x0=start,
y0=mean - mean * 0.015,
height=mean * 0.015 * 2,
width=finish - start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(stable)
plot_widget.setYRange(mean - 260, mean + 260)
plot_widget.setInteractive(False)
else:
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]])
region = pg.LinearRegionItem([max_value - max_value * 0.015,
max_value + max_value * 0.015],
movable=False,
orientation="horizontal")
region.setBrush(pg.mkBrush([0, 250, 0, 100]))
plot_widget.setYRange(max_value - 200, max_value + 200)
plot_widget.setXRange(3.5, 4.5)
plot_widget.addItem(region)
plot_widget.setInteractive(False)"""
for signal in description["Real_signals"]:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(df_continuous["time"], df_continuous[signal["name"]], pen=signal["pen"])
legend.addItem(plot, signal["name"])
if settings["performance"]:
tesla_TWC = round((1 - TWC_time/tesla_time)*100,2)
tesla_ideal = round((1 - ideal_time/tesla_time)*100,2)
TWC_ideal = round((ideal_time/TWC_time)*100,2)
performance_label = QLabel(f"Сокращение длительности: фактическое = {tesla_TWC} %, идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%")
layout.addWidget(performance_label)
performance_label.update()
layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout)
return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
...
]
"""
def build(self, data: list[pd.DataFrame]) -> None:
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)

View File

@ -1,58 +1,41 @@
from typing import Callable, Optional, Any
from PyQt5.QtWidgets import (
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
)
import pyqtgraph as pg
from PyQt5.QtWidgets import QWidget, QTableWidget, QVBoxLayout, QTableWidgetItem, QLabel, QPushButton, QLineEdit, QHBoxLayout
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QIntValidator
from utils.json_tools import read_json, write_json
from gui import qt_settings as qts
class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
"""
Окно настроек для редактирования параметров.
:param path: Путь к файлу настроек (JSON).
:param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк).
"""
super().__init__()
def __init__(self, path: str, name: str, upd_func):
super(settingsWindow, self).__init__()
self._settingsPath = path
self._name = name
self._data: dict[str, list[Any]] = {}
self._data = {}
self._upd_func = upd_func
self._num_points: Optional[QLineEdit] = None
self._param_table: Optional[QTableWidget] = None
self.load_settings()
self._init_ui()
def load_settings(self) -> None:
"""Загружает настройки из JSON-файла."""
data = read_json(self._settingsPath)
if isinstance(data, dict):
self._data = data
else:
self._data = {}
self._data = read_json(self._settingsPath)
def write_settings(self) -> None:
"""Записывает текущие настройки в JSON-файл."""
write_json(self._settingsPath, self._data)
def getParams(self) -> dict:
"""Возвращает текущий словарь параметров."""
return self._data
def _init_ui(self) -> None:
"""Инициализирует UI: кнопки, поля ввода, таблицу."""
save_button = QPushButton("Save")
restore_button = QPushButton("Restore")
def _init_ui(self) -> None:
save_button = QPushButton()
restore_button = QPushButton()
save_button.setText("Save")
restore_button.setText("Restore")
self._num_points = QLineEdit()
self._num_points.setPlaceholderText("Enter the number of welding points")
self._num_points.setValidator(QIntValidator())
control_layout = QHBoxLayout()
control_layout.addWidget(save_button)
control_layout.addWidget(restore_button)
@ -63,7 +46,7 @@ class settingsWindow(QWidget):
self._num_points.editingFinished.connect(self._expand)
self._param_table = QTableWidget()
self._populate_table()
self._restore()
layout = QVBoxLayout()
header = QLabel(self._name)
@ -72,115 +55,49 @@ class settingsWindow(QWidget):
layout.addWidget(self._param_table)
self.setLayout(layout)
self.setStyleSheet(qts.white_style)
#self.show()
def _populate_table(self) -> None:
"""Заполняет таблицу значениями из self._data."""
# Если нет данных для заполнения
if not self._data:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
def _save(self) -> dict:
self._data = {}
for i in range(self._param_table.rowCount()):
key = self._param_table.item(i, 0).text()
data = []
for j in range(1, self._param_table.columnCount()):
param = self._param_table.item(i, j).text()
if key != "trace_storage_path":
param = float(param)
data.append(param)
self._data[key] = data
self.write_settings()
self._upd_func()
# Предполагаем, что у всех ключей одинаковая длина списков параметров.
first_key = next(iter(self._data), None)
if first_key is None:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
column_count = len(self._data[first_key]) + 1
def _restore(self) -> None:
self._param_table.setRowCount(len(self._data))
self._param_table.setColumnCount(column_count)
key = next(iter(self._data))
self._param_table.setColumnCount(len(self._data[key])+1)
for i, (key, items) in enumerate(self._data.items()):
self._param_table.setColumnCount(len(self._data[key])+1)
self._param_table.setItem(i, 0, QTableWidgetItem(key))
for j, item in enumerate(items):
self._param_table.setItem(i, j+1, QTableWidgetItem(str(item)))
def _save(self) -> None:
"""Сохраняет текущие параметры из таблицы в self._data и вызывает _upd_func()."""
new_data = {}
row_count = self._param_table.rowCount()
col_count = self._param_table.columnCount()
for i in range(row_count):
key_item = self._param_table.item(i, 0)
if key_item is None:
continue
key = key_item.text()
# Если ключ пустой, пропускаем
if not key:
continue
row_data = []
for j in range(1, col_count):
cell_item = self._param_table.item(i, j)
if cell_item is None:
continue
param_str = cell_item.text()
# Если ключ не trace_storage_path, конвертируем в float
if key != "trace_storage_path":
try:
param = float(param_str)
except ValueError:
param = 0.0
else:
param = param_str
row_data.append(param)
new_data[key] = row_data
self._data = new_data
self.write_settings()
self._upd_func()
def _restore(self) -> None:
"""Перезагружает данные из файла и обновляет таблицу."""
self.load_settings()
self._populate_table()
def _expand(self) -> None:
"""Расширяет количество столбцов таблицы в зависимости от введённого значения."""
if not self._num_points:
return
num_points_text = self._num_points.text()
if not num_points_text.isdigit():
return
num_points = int(num_points_text)
if num_points < 0:
return
param=int(self._num_points.text())
prev_columns = self._param_table.columnCount()
desired_columns = num_points + 1
self._param_table.setColumnCount(param+1)
if prev_columns < param+1:
for i in range(prev_columns, param+1):
for j, (key, items) in enumerate(self._data.items()):
self._param_table.setItem(j, i, QTableWidgetItem(str(items[-1])))
if desired_columns <= prev_columns:
return
self._param_table.setColumnCount(desired_columns)
# Новые столбцы заполняем последним известным параметром для каждого ключа
for i, (key, items) in enumerate(self._data.items()):
# Если нет данных, пропускаем
if not items:
continue
last_value = str(items[-1])
for col in range(prev_columns, desired_columns):
self._param_table.setItem(i, col, QTableWidgetItem(last_value))
# Добавляем новый элемент также в self._data для консистентности
# После этого можно будет сохранить при нажатии Save
# Дополним также и в self._data
additional_count = desired_columns - prev_columns
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
if __name__ == '__main__':
import pyqtgraph as pg
app = pg.mkQApp("Parameter Tree Example")
window = settingsWindow('params\operator_params.json', 'operator')
app.exec()

View File

@ -1,5 +1,4 @@
import sys
import pyqtgraph as pg
from PyQt5 import QtWidgets
from gui.mainGui import MainWindow
@ -12,7 +11,6 @@ from controller.passportFormer import PassportFormer
def main():
pg.setConfigOptions(useOpenGL=False, antialias=False)
app = QtWidgets.QApplication(sys.argv)
monitor = DirectoryMonitor()
data_converter = DataConverter()

View File

@ -1,13 +1,11 @@
from __future__ import annotations
import os
from typing import Optional, Union, Any
from cachetools import LRUCache
from typing import Optional, Union
import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget
from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm
import pandas as pd
import pandas as pd
@ -114,12 +112,12 @@ class BasePlotWidget:
self._stage_colors = {
"Closing": [220, 20, 60, 100], # Crimson
"Squeeze": [30, 144, 255, 100], # Dodger Blue
"Welding": [128, 128, 128, 100], # Gray
"Relief": [34, 139, 34, 100], # Forest Green
"Oncomming": [255, 165, 0, 100] # Orange
}
"Closing": [208, 28, 31, 100],
"Squeeze": [45, 51, 89, 150],
"Welding": [64, 64, 64, 100],
"Relief": [0, 134, 88, 100],
"Oncomming": [0, 79, 0, 100]
}
self._plt_channels = {
"Electrode Force, N & Welding Current, kA": {
"Settings": {
@ -210,14 +208,6 @@ class BasePlotWidget:
]
},
}
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
object.setStyleSheet(
"""QLabel {
color: #ffffff;
font-size: 26px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}""")
@property
def mediator(self) -> BaseMediator:
@ -256,7 +246,6 @@ class BaseIdealDataBuilder(OptAlgorithm):
self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)):
@ -362,44 +351,10 @@ class BasePointPassportFormer:
"Tesla welding",
"Tesla oncomming_relief"
]
self._params = []
self._ideal_data_cashe = LRUCache(maxsize=1000)
self._OptAlgorithm_operator_params = [
"dist_open_start_1",
"dist_open_start_2",
"dist_open_after_1",
"dist_open_after_2",
"dist_open_end_1",
"dist_open_end_2",
"dist_close_end_1",
"dist_close_end_2",
"time_command",
"time_robot_movement",
"object_thickness",
"force_target",
"force_capture",
"time_wielding"]
self._OptAlgorithm_system_params = [
"a_max_1",
"v_max_1",
"a_max_2",
"v_max_2",
"mass_1",
"mass_2",
"k_hardness_1",
"k_hardness_2",
"torque_max_1",
"torque_max_2",
"transmission_ratio_1",
"transmission_ratio_2",
"position_start_1",
"position_start_2",
"k_prop",
"time_capture"]
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
@ -408,45 +363,41 @@ class BasePointPassportFormer:
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
dataframe: pd.DataFrame) -> list[list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
if start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
#print (start_idx)
start_list = times.loc[start_idx].tolist()
end_list = times.loc[finish_idx].tolist()
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
end_list.append(float(times[len(times)-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
dataframe: pd.DataFrame) -> list[dict[list[float]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
for i in range(temp, point_quantity):
print ("cant find enough", stage)
start_list.append(0)
end_list.append(1)
events[stage] = [start_list[:point_quantity], end_list[:point_quantity]]
#print(events, point_quantity)
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
params: list[dict] = None) -> dict:
self.opt = idealDataBuilder(params)
stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
@ -464,22 +415,6 @@ class BasePointPassportFormer:
def update_settings(self, params: list) -> None:
...
def _generate_cache_key(self,
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует params_list в хешируемый ключ для кэша.
"""
operator_settings, system_settings = params_list
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in operator_settings.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in system_settings.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
@property
def opt(self) -> BaseIdealDataBuilder: