WeldingSpotPerformance/src/base/base.py

682 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import os
from typing import Optional, Union, Any
from cachetools import LRUCache
import numpy as np
import pyqtgraph as pg
import pandas as pd
from PyQt5.QtCore import QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget, QMainWindow, QVBoxLayout
from OptAlgorithm import OptAlgorithm
from utils.qt_settings import dark_style
# TODO: Глобальное замечание: Базовые классы создаются для обозначения неких базовых свойств объектов.
# Конкретная же реализация возлагается на классы - наследники.
class BaseMediator:
def __init__(self,
converter: BaseDataConverter,
# TODO: passport_former
passportFormer: BasePointPassportFormer,
plot: BasePlotWidget,
controller: BaseController,
file_manager: BaseFileManager):
self._converter = converter
self._converter.mediator = self
self._passportFormer = passportFormer
self._passportFormer.mediator = self
self._plot = plot
self._plot.mediator = self
self._controller = controller
self._controller.mediator = self
self._file_manager = file_manager
self._file_manager.mediator = self
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
...
# TODO: Отступы между методами класса - одна пустая строка.
def update_settings (self, data: list[dict]):
...
def update_status(self, msg: Union[str, float]) -> None:
...
# TODO: Отступы между классами - две пустых строки.
class BaseDirectoryMonitor:
update_timer = QTimer()
def __init__(self,
file_manager: Optional[BaseFileManager] = None):
super().__init__()
self._directory_path = None
self._update_time = None
self.isActive = False
self._files = []
self._file_manager = file_manager
@property
def directory_path(self) -> str:
return self._directory_path
@property
def update_time(self) -> int:
return self._update_time
@property
def files(self) -> list[str]:
return self._files
@property
def file_manager(self) -> BaseFileManager:
return self._file_manager
@file_manager.setter
def file_manager(self, file_manager: BaseFileManager) -> None:
self._file_manager = file_manager
def init_state(self):
files = os.listdir(self._directory_path)
self._files = files
def start(self):
self.isActive = True
self.update_timer.start(int(self._update_time))
def stop(self):
self.isActive = False
self.update_timer.stop()
def pause(self):
self.update_timer.stop()
# TODO: Почему монитор директорий обновляет какие-то настройки, графики и т.д.? Помним про
# принцип единичной ответственности.
def update_settings(self, data: list[dict]) -> None:
...
def update_plots(self) -> None:
...
def force_all_dir(self):
# TODO: По сигнатуре метода вообще не понятно, что он должен делать.
...
class BaseDataConverter:
def __init__(self, mediator: Optional[BaseMediator] = None):
self._mediator = mediator
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
def convert_data(self, files: list[str]) -> None:
...
class BasePlotWidget:
def __init__(self,
mediator: Optional[BaseMediator] = None):
super().__init__()
self._mediator = mediator
self._stage_colors = {
"Closing": [220, 20, 60, 100], # Crimson
"Squeeze": [30, 144, 255, 100], # Dodger Blue
"Welding": [128, 128, 128, 100], # Gray
"Relief": [34, 139, 34, 100], # Forest Green
"Oncomming": [255, 165, 0, 100] # Orange
}
self._plt_channels = {
"Electrode Force, N & Welding Current, kA": {
"Settings": {
"zoom": False,
"stages": True,
"performance": True,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False,
"force accuracy":True
},
"Real_signals": [
{
"name": "Electrode Force, N ME",
"pen": 'r',
},
{
"name": "Electrode Force, N FE",
"pen": 'w',
},
{
"name": "Welding Current ME",
"pen": "y",
}
],
"Ideal_signals": [
{
"name": "Force",
"pen": {'color': 'g', 'width':3},
}
]
},
"Electrode Position, mm": {
"Settings": {
"zoom": False,
"stages": True,
"performance": False,
"ideals": True,
"mirror ME": True,
"workpiece": True,
"force compensation FE": True,
"force accuracy":False
},
"Real_signals": [
{
"name": "Rotor Position, mm ME",
"pen": {'color': 'r', 'width':2},
},
{
"name": "Rotor Position, mm FE",
"pen": {'color': 'w', 'width':2},
}
],
"Ideal_signals": [
{
"name": "Position ME",
"pen": {'color': 'g', 'width':4},
},
{
"name": "Position FE",
"pen": {'color': 'b', 'width':4},
}
]
},
"Electrode Speed, mm/s": {
"Settings": {
"zoom": False,
"stages": True,
"performance": False,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False,
"force accuracy":False
},
"Real_signals": [
{
"name": "Rotor Speed, mm/s ME",
"pen": 'r',
"zoom": False
},
{
"name": "Rotor Speed, mm/s FE",
"pen": 'w',
"zoom": False
}
],
"Ideal_signals": [
{
"name": "Rotor Speed ME",
"pen": {'color': 'g', 'width':3},
"zoom": False
},
{
"name": "Rotor Speed FE",
"pen": {'color': 'b', 'width':3},
"zoom": False
}
]
},
}
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
# TODO: object - зарезервированное слово. Использовать эти слова для именования переменных не правильно.
# Если хочется использовать именно такое слово, можно написать "object_".
object.setStyleSheet(
"""QLabel {
color: #ffffff;
font-size: 26px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}""")
def _downsample_data(self, x, y, max_points=5000):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
# TODO: Какой тип данных у 'x'? Какой тип данных у 'y'? Надо добавить аннотации типов.
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotItem(title="Navigator")
navigator.setFixedHeight(100)
# Получение кривых из main_plot
for curve in main_plot.listDataItems():
# Извлекаем данные из кривой
x, y = curve.getData()
curve_name = curve.opts.get("name", None)
signal_pen = curve.opts.get("pen", None)
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
ROI_region.setBounds([0, x[-1]])
navigator.addItem(ROI_region)
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
# TODO: Возвращаемый результат не соответствует аннотированному в сигнатуре метода.
return navigator, ROI_region
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
# TODO: Методы _mirror_shift_data и _shift_data нарушают принцип DRY: дублирование кода.
# Сделать ОДИН метод, одним из входных параметров которого будет lambda-функция, которая применяется к dataframe.
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
# TODO: Атрибуты класса следует сначала определять в конструкторе класса.
# TODO: Что такое opt? Optical? Option? Optimal? Optimus Prime?
self._opt = opt
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
...
class BaseController(QObject):
def __init__(self,
mediator: Optional[BaseMediator] = None,
file_manager: Optional[BaseFileManager] = None):
super().__init__()
self._mediator = mediator
self._file_manager = file_manager
def send_widgets(self, widgets: list[QWidget]) -> None:
...
def update_settings(self, settings: list[dict]) -> None:
...
def set_working_mode(self, mode:int) -> None:
...
def open_file(self, filepath: str) -> None:
...
def update_plots(self) -> None:
...
def update_status(self, msg:str) -> None:
...
def update_progress(self, progress:int) -> None:
...
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
@property
def file_manager(self) -> BaseFileManager:
return self._file_manager
@file_manager.setter
def file_manager(self, file_manager: BaseFileManager) -> None:
self._file_manager = file_manager
class BaseFileManager:
def __init__(self,
mediator: Optional[BaseMediator] = None,
monitor: Optional[BaseDirectoryMonitor] = None):
self._mediator = mediator
self._monitor = monitor
self._paths_library = set()
@property
def paths_library(self) -> set:
return self._paths_library
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
@property
def monitor(self) -> BaseDirectoryMonitor:
return self._monitor
@monitor.setter
def monitor(self, monitor: BaseDirectoryMonitor) -> None:
self._monitor = monitor
def replot_all(self) -> None:
...
def open_custom_file(self, path:str) -> None:
...
def set_mode(self, num:int) -> None:
...
def update_monitor_settings(self, settings:list[dict]) -> None:
...
def add_new_paths(self, paths:list[str]):
...
class BaseIdealDataBuilder(OptAlgorithm):
def __init__(self, params: list[dict]):
operator_params, system_params = params
self.mul = system_params['time_capture']
self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame:
...
def get_compressionDF(self) -> pd.DataFrame:
...
def get_openingDF(self) -> pd.DataFrame:
...
def get_tmovementDF(self) -> pd.DataFrame:
...
def get_weldingDF(self) -> pd.DataFrame:
...
def get_oncomingDF(self) -> pd.DataFrame:
...
def get_ideal_timings(self) -> list[float, float, float, float]:
...
def get_cycle_time(self) -> float:
result = sum(self.get_ideal_timings())
return result
class BaseMainWindow(QMainWindow):
def __init__(self,
controller: Optional[BaseController] = None):
super().__init__()
self.resize(200,200)
self._controller = controller
# Создаем центральный виджет и устанавливаем его
self._central_widget = QWidget()
self.setCentralWidget(self._central_widget)
# Устанавливаем основной вертикальный макет для центрального виджета
self._central_layout = QVBoxLayout()
self._central_widget.setLayout(self._central_layout)
self.set_style(self)
...
@property
def controller(self) -> BaseController:
return self._controller
@controller.setter
def controller(self, controller: BaseController) -> None:
# TODO: зачем делать setter для контроллера, если ты его экземпляр передаешь в конструктор?
self._controller = controller
def set_style(self, object: Union[QTabWidget, QWidget, QMainWindow]) -> None:
object.setStyleSheet(dark_style)
class BasePointPassportFormer:
def __init__(self,
mediator: Optional[BaseMediator] = None):
self._mediator = mediator
self._clear_stage = "Welding"
self._stages = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncomming"
]
self._tesla_stages = [
"Tesla squeeze",
"Tesla closing",
"Tesla welding",
"Tesla oncomming_relief"
]
self._params = []
self._ideal_data_cashe = LRUCache(maxsize=1000)
self._OptAlgorithm_operator_params = [
"dist_open_start_1",
"dist_open_start_2",
"dist_open_after_1",
"dist_open_after_2",
"dist_open_end_1",
"dist_open_end_2",
"dist_close_end_1",
"dist_close_end_2",
"time_command",
"time_robot_movement",
"object_thickness",
"force_target",
"force_capture",
"time_wielding"]
self._OptAlgorithm_system_params = [
"a_max_1",
"v_max_1",
"a_max_2",
"v_max_2",
"mass_1",
"mass_2",
"k_hardness_1",
"k_hardness_2",
"torque_max_1",
"torque_max_2",
"transmission_ratio_1",
"transmission_ratio_2",
"position_start_1",
"position_start_2",
"k_prop",
"time_capture"]
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
params: list[dict] = None) -> dict:
self.opt = idealDataBuilder(params)
stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
"Welding": self._opt.get_weldingDF(),
"Relief": self._opt.get_openingDF(),
"Oncomming": self._opt.get_oncomingDF(),
"Ideal cycle": self._opt.get_cycle_time(),
"Ideal timings": self._opt.get_ideal_timings()
}
return stage_ideals
def form_passports(self) -> list[list[pd.DataFrame, dict, list]]:
...
def update_settings(self, params: list) -> None:
...
def _generate_cache_key(self,
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует params_list в хешируемый ключ для кэша.
"""
operator_settings, system_settings = params_list
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in operator_settings.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in system_settings.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
self._opt = opt
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator