WeldingSpotPerformance/src/utils/base/base.py

583 lines
20 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
from typing import Optional, Union, Any
from cachetools import LRUCache
import numpy as np
import pyqtgraph as pg
import pandas as pd
from PyQt5.QtCore import QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget, QMainWindow, QVBoxLayout
2024-12-05 13:18:53 +03:00
from OptAlgorithm import OptAlgorithm
from utils.qt_settings import dark_style
class BaseMediator:
def __init__(self,
monitor: BaseDirectoryMonitor,
converter: BaseDataConverter,
passportFormer: BasePointPassportFormer,
plot: BasePlotWidget,
controller: BaseController):
self._monitor = monitor
self._monitor.mediator = self
self._converter = converter
self._converter.mediator = self
self._passportFormer = passportFormer
self._passportFormer.mediator = self
self._plot = plot
self._plot.mediator = self
self._controller = controller
self._controller.mediator = self
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
...
def update_settings (self, data: list[dict]):
...
class BaseDirectoryMonitor:
update_timer = QTimer()
def __init__(self,
mediator: Optional[BaseMediator] = None):
super().__init__()
self._directory_path = None
self._update_time = None
self.isActive = False
self._files = []
self._mediator = mediator
@property
def directory_path(self) -> str:
return self._directory_path
@property
def update_time(self) -> int:
return self._update_time
@property
def files(self) -> list[str]:
return self._files
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
def _init_state(self):
files = os.listdir(self._directory_path)
self._files = files
def start(self):
self.isActive = True
self.update_timer.start(int(self._update_time))
def stop(self):
self.isActive = False
self.update_timer.stop()
def update_settings(self, data: list[dict]) -> None:
...
def update_plots(self) -> None:
...
def force_all_dir(self):
...
class BaseDataConverter:
def __init__(self, mediator: Optional[BaseMediator] = None):
self._mediator = mediator
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
def convert_data(self, files: list[str]) -> None:
...
class BasePlotWidget:
def __init__(self,
mediator: Optional[BaseMediator] = None):
super().__init__()
self._mediator = mediator
self._stage_colors = {
"Closing": [220, 20, 60, 100], # Crimson
"Squeeze": [30, 144, 255, 100], # Dodger Blue
"Welding": [128, 128, 128, 100], # Gray
"Relief": [34, 139, 34, 100], # Forest Green
"Oncomming": [255, 165, 0, 100] # Orange
}
self._plt_channels = {
"Electrode Force, N & Welding Current, kA": {
"Settings": {
"zoom": False,
"stages": True,
"performance": True,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False,
"force accuracy":True
},
"Real_signals": [
{
"name": "Electrode Force, N ME",
"pen": 'r',
},
{
"name": "Electrode Force, N FE",
"pen": 'w',
},
{
"name": "Welding Current ME",
"pen": "y",
}
],
"Ideal_signals": [
{
"name": "Force",
"pen": {'color': 'g', 'width':3},
}
]
},
"Electrode Position, mm": {
"Settings": {
"zoom": False,
"stages": True,
"performance": False,
"ideals": True,
"mirror ME": True,
"workpiece": True,
"force compensation FE": True,
"force accuracy":False
},
"Real_signals": [
{
"name": "Rotor Position, mm ME",
"pen": {'color': 'r', 'width':2},
},
{
"name": "Rotor Position, mm FE",
"pen": {'color': 'w', 'width':2},
}
],
"Ideal_signals": [
{
"name": "Position ME",
"pen": {'color': 'g', 'width':4},
},
{
"name": "Position FE",
"pen": {'color': 'b', 'width':4},
}
]
},
"Electrode Speed, mm/s": {
"Settings": {
"zoom": False,
"stages": True,
"performance": False,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False,
"force accuracy":False
},
"Real_signals": [
{
"name": "Rotor Speed, mm/s ME",
"pen": 'r',
"zoom": False
},
{
"name": "Rotor Speed, mm/s FE",
"pen": 'w',
"zoom": False
}
],
"Ideal_signals": [
{
"name": "Rotor Speed ME",
"pen": {'color': 'g', 'width':3},
"zoom": False
},
{
"name": "Rotor Speed FE",
"pen": {'color': 'b', 'width':3},
"zoom": False
}
]
},
}
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
object.setStyleSheet(
"""QLabel {
color: #ffffff;
font-size: 26px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}""")
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotItem(title="Navigator")
navigator.setFixedHeight(100)
# Получение кривых из main_plot
for curve in main_plot.listDataItems():
# Извлекаем данные из кривой
x, y = curve.getData()
curve_name = curve.opts.get("name", None)
signal_pen = curve.opts.get("pen", None)
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
ROI_region.setBounds([0, x[-1]])
navigator.addItem(ROI_region)
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
self._opt = opt
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
...
class BaseController(QObject):
def send_widgets(self, widgets: list[QWidget]) -> None:
...
def update_settings(self, settings: list[dict]) -> None:
...
def raport_mode (self) -> None:
...
def seeking_mode(self) -> None:
...
def open_file(self, filepath: str) -> None:
...
def update_plots(self) -> None:
...
class BaseIdealDataBuilder(OptAlgorithm):
def __init__(self, params: list[dict]):
operator_params, system_params = params
self.mul = system_params['time_capture']
self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame:
...
def get_compressionDF(self) -> pd.DataFrame:
...
def get_openingDF(self) -> pd.DataFrame:
...
def get_tmovementDF(self) -> pd.DataFrame:
...
def get_weldingDF(self) -> pd.DataFrame:
...
def get_oncomingDF(self) -> pd.DataFrame:
...
def get_ideal_timings(self) -> list[float, float, float, float]:
...
def get_cycle_time(self) -> float:
result = sum(self.get_ideal_timings())
return result
class BaseMainWindow(QMainWindow):
def __init__(self,
controller: Optional[BaseController] = None):
super().__init__()
self.resize(200,200)
self._controller = controller
# Создаем центральный виджет и устанавливаем его
self._central_widget = QWidget()
self.setCentralWidget(self._central_widget)
# Устанавливаем основной вертикальный макет для центрального виджета
self._central_layout = QVBoxLayout()
self._central_widget.setLayout(self._central_layout)
self.set_style(self)
...
@property
def controller(self) -> BaseController:
return self._controller
@controller.setter
def controller(self, controller: BaseController) -> None:
self._controller = controller
def set_style(self, object: Union[QTabWidget, QWidget, QMainWindow]) -> None:
object.setStyleSheet(dark_style)
class BasePointPassportFormer:
def __init__(self,
mediator: Optional[BaseMediator] = None):
self._mediator = mediator
self._clear_stage = "Welding"
self._stages = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncomming"
]
self._tesla_stages = [
"Tesla squeeze",
"Tesla closing",
"Tesla welding",
"Tesla oncomming_relief"
]
self._params = []
self._ideal_data_cashe = LRUCache(maxsize=1000)
self._OptAlgorithm_operator_params = [
"dist_open_start_1",
"dist_open_start_2",
"dist_open_after_1",
"dist_open_after_2",
"dist_open_end_1",
"dist_open_end_2",
"dist_close_end_1",
"dist_close_end_2",
"time_command",
"time_robot_movement",
"object_thickness",
"force_target",
"force_capture",
"time_wielding"]
self._OptAlgorithm_system_params = [
"a_max_1",
"v_max_1",
"a_max_2",
"v_max_2",
"mass_1",
"mass_2",
"k_hardness_1",
"k_hardness_2",
"torque_max_1",
"torque_max_2",
"transmission_ratio_1",
"transmission_ratio_2",
"position_start_1",
"position_start_2",
"k_prop",
"time_capture"]
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
2024-12-05 13:18:53 +03:00
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
params: list[dict] = None) -> dict:
self.opt = idealDataBuilder(params)
stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
"Welding": self._opt.get_weldingDF(),
"Relief": self._opt.get_openingDF(),
"Oncomming": self._opt.get_oncomingDF(),
"Ideal cycle": self._opt.get_cycle_time(),
"Ideal timings": self._opt.get_ideal_timings()
}
return stage_ideals
def form_passports(self) -> list[list[pd.DataFrame, dict, list]]:
...
def update_settings(self, params: list) -> None:
...
def _generate_cache_key(self,
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует params_list в хешируемый ключ для кэша.
"""
operator_settings, system_settings = params_list
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in operator_settings.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in system_settings.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
self._opt = opt
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator