603 lines
21 KiB
Python
603 lines
21 KiB
Python
from __future__ import annotations
|
||
|
||
import os
|
||
from typing import Optional, Union, Any
|
||
from cachetools import LRUCache
|
||
|
||
import pandas as pd
|
||
from PyQt5.QtCore import QObject, QTimer
|
||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||
from OptAlgorithm import OptAlgorithm
|
||
import numpy as np
|
||
import pyqtgraph as pg
|
||
|
||
|
||
|
||
class BaseMediator:
|
||
def __init__(self,
|
||
monitor: BaseDirectoryMonitor,
|
||
converter: BaseDataConverter,
|
||
passportFormer: BasePointPassportFormer,
|
||
plot: BasePlotWidget,
|
||
controller: BaseController):
|
||
self._monitor = monitor
|
||
self._monitor.mediator = self
|
||
self._converter = converter
|
||
self._converter.mediator = self
|
||
self._passportFormer = passportFormer
|
||
self._passportFormer.mediator = self
|
||
self._plot = plot
|
||
self._plot.mediator = self
|
||
self._controller = controller
|
||
self._controller.mediator = self
|
||
|
||
def notify(self,
|
||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
|
||
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
|
||
...
|
||
def update_settings (self, data: list[dict]):
|
||
...
|
||
|
||
class BaseDirectoryMonitor:
|
||
|
||
update_timer = QTimer()
|
||
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None):
|
||
super().__init__()
|
||
self._directory_path = None
|
||
self._update_time = None
|
||
self.isActive = False
|
||
self._files = []
|
||
self._mediator = mediator
|
||
|
||
|
||
@property
|
||
def directory_path(self) -> str:
|
||
return self._directory_path
|
||
|
||
@property
|
||
def update_time(self) -> int:
|
||
return self._update_time
|
||
|
||
@property
|
||
def files(self) -> list[str]:
|
||
return self._files
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
def _init_state(self):
|
||
files = os.listdir(self._directory_path)
|
||
self._files = files
|
||
|
||
def start(self):
|
||
self.isActive = True
|
||
self.update_timer.start(int(self._update_time))
|
||
|
||
def stop(self):
|
||
self.isActive = False
|
||
self.update_timer.stop()
|
||
|
||
def update_settings(self, data: list[dict]) -> None:
|
||
...
|
||
|
||
def update_plots(self) -> None:
|
||
...
|
||
|
||
def force_all_dir(self):
|
||
...
|
||
|
||
class BaseDataConverter:
|
||
def __init__(self, mediator: Optional[BaseMediator] = None):
|
||
self._mediator = mediator
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
def convert_data(self, files: list[str]) -> None:
|
||
...
|
||
|
||
|
||
class BasePlotWidget:
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None):
|
||
super().__init__()
|
||
self._mediator = mediator
|
||
|
||
self._stage_colors = {
|
||
"Closing": [220, 20, 60, 100], # Crimson
|
||
"Squeeze": [30, 144, 255, 100], # Dodger Blue
|
||
"Welding": [128, 128, 128, 100], # Gray
|
||
"Relief": [34, 139, 34, 100], # Forest Green
|
||
"Oncomming": [255, 165, 0, 100] # Orange
|
||
}
|
||
self._plt_channels = {
|
||
"Electrode Force, N & Welding Current, kA": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": True,
|
||
"ideals": True,
|
||
"mirror ME": False,
|
||
"workpiece": False,
|
||
"force compensation FE": False,
|
||
"force accuracy":True
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Electrode Force, N ME",
|
||
"pen": 'r',
|
||
},
|
||
{
|
||
"name": "Electrode Force, N FE",
|
||
"pen": 'w',
|
||
},
|
||
{
|
||
"name": "Welding Current ME",
|
||
"pen": "y",
|
||
}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Force",
|
||
"pen": {'color': 'g', 'width':3},
|
||
}
|
||
]
|
||
},
|
||
"Electrode Position, mm": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": False,
|
||
"ideals": True,
|
||
"mirror ME": True,
|
||
"workpiece": True,
|
||
"force compensation FE": True,
|
||
"force accuracy":False
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Rotor Position, mm ME",
|
||
"pen": {'color': 'r', 'width':2},
|
||
},
|
||
{
|
||
"name": "Rotor Position, mm FE",
|
||
"pen": {'color': 'w', 'width':2},
|
||
}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Position ME",
|
||
"pen": {'color': 'g', 'width':4},
|
||
},
|
||
{
|
||
"name": "Position FE",
|
||
"pen": {'color': 'b', 'width':4},
|
||
}
|
||
]
|
||
},
|
||
"Electrode Speed, mm/s": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": False,
|
||
"ideals": True,
|
||
"mirror ME": False,
|
||
"workpiece": False,
|
||
"force compensation FE": False,
|
||
"force accuracy":False
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Rotor Speed, mm/s ME",
|
||
"pen": 'r',
|
||
"zoom": False
|
||
},
|
||
{
|
||
"name": "Rotor Speed, mm/s FE",
|
||
"pen": 'w',
|
||
"zoom": False
|
||
}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Rotor Speed ME",
|
||
"pen": {'color': 'g', 'width':3},
|
||
"zoom": False
|
||
},
|
||
{
|
||
"name": "Rotor Speed FE",
|
||
"pen": {'color': 'b', 'width':3},
|
||
"zoom": False
|
||
}
|
||
]
|
||
},
|
||
}
|
||
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
|
||
object.setStyleSheet(
|
||
"""QLabel {
|
||
color: #ffffff;
|
||
font-size: 26px;
|
||
font-weight: bold;
|
||
font-family: "Segoe UI", sans-serif;
|
||
}""")
|
||
|
||
def _downsample_data(self, x, y, max_points=5000):
|
||
"""
|
||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||
"""
|
||
if len(x) > max_points:
|
||
factor = len(x) // max_points
|
||
x_downsampled = x[::factor]
|
||
y_downsampled = y[::factor]
|
||
return x_downsampled, y_downsampled
|
||
return x, y
|
||
|
||
def _create_navigator(self,
|
||
time_region:tuple[float, float],
|
||
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||
"""
|
||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||
"""
|
||
navigator = pg.PlotItem(title="Navigator")
|
||
navigator.setFixedHeight(100)
|
||
# Получение кривых из main_plot
|
||
for curve in main_plot.listDataItems():
|
||
# Извлекаем данные из кривой
|
||
x, y = curve.getData()
|
||
curve_name = curve.opts.get("name", None)
|
||
signal_pen = curve.opts.get("pen", None)
|
||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
|
||
|
||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
|
||
ROI_region.setBounds([0, x[-1]])
|
||
navigator.addItem(ROI_region)
|
||
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
|
||
|
||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||
|
||
return navigator, ROI_region
|
||
|
||
def _sync_main_plot_with_navigator(self,
|
||
main_plot: pg.PlotItem,
|
||
region: pg.LinearRegionItem):
|
||
"""
|
||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||
"""
|
||
x_min, x_max = region.getRegion()
|
||
if main_plot:
|
||
main_plot.blockSignals(True)
|
||
main_plot.setXRange(x_min, x_max, padding=0)
|
||
main_plot.blockSignals(False)
|
||
|
||
def _mirror_shift_data(self,
|
||
valid_str: str,
|
||
signals: list[dict],
|
||
dataframe: pd.DataFrame,
|
||
shift: float) -> pd.DataFrame:
|
||
keys = dataframe.keys()
|
||
for signal in signals:
|
||
if valid_str in signal["name"] and signal["name"] in keys:
|
||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||
return dataframe
|
||
|
||
def _shift_data(self,
|
||
valid_str: str,
|
||
signals: list[dict],
|
||
dataframe: pd.DataFrame,
|
||
shift: float) -> pd.DataFrame:
|
||
keys = dataframe.keys()
|
||
for signal in signals:
|
||
if valid_str in signal["name"] and signal["name"] in keys:
|
||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||
return dataframe
|
||
|
||
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
|
||
"""
|
||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||
"""
|
||
if region:
|
||
x_min, x_max = main_plot
|
||
region.blockSignals(True) # Предотвращаем рекурсию
|
||
region.setRegion([x_min, x_max])
|
||
region.blockSignals(False)
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
@property
|
||
def opt(self) -> BaseIdealDataBuilder:
|
||
return self._opt
|
||
|
||
@opt.setter
|
||
def opt(self, opt: BaseIdealDataBuilder):
|
||
self._opt = opt
|
||
|
||
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
|
||
...
|
||
|
||
|
||
class BaseController(QObject):
|
||
|
||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||
...
|
||
|
||
def update_settings(self, settings: list[dict]) -> None:
|
||
...
|
||
|
||
def raport_mode (self, filepath: str) -> None:
|
||
...
|
||
|
||
def seeking_mode(self) -> None:
|
||
...
|
||
|
||
class BaseIdealDataBuilder(OptAlgorithm):
|
||
|
||
def __init__(self, params: list[dict]):
|
||
operator_params, system_params = params
|
||
self.mul = system_params['time_capture']
|
||
self.welding_time = operator_params['time_wielding']
|
||
super().__init__(operator_params, system_params)
|
||
|
||
|
||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||
data = []
|
||
for i in range (0, int(end_timestamp*self.mul)+1):
|
||
time = i/self.mul
|
||
X1, X2, V1, V2, F = func(time)
|
||
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||
X1, X2, V1, V2, F = func(end_timestamp)
|
||
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||
return pd.DataFrame(data)
|
||
|
||
def get_closingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_compressionDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_openingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_tmovementDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_weldingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_oncomingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_ideal_timings(self) -> list[float, float, float, float]:
|
||
...
|
||
|
||
def get_cycle_time(self) -> float:
|
||
result = sum(self.get_ideal_timings())
|
||
return result
|
||
|
||
class BaseMainWindow(QWidget):
|
||
def __init__(self,
|
||
controller: Optional[BaseController] = None):
|
||
super().__init__()
|
||
self.set_style(self)
|
||
self.resize(200,200)
|
||
self._controller = controller
|
||
...
|
||
|
||
@property
|
||
def controller(self) -> BaseController:
|
||
return self._controller
|
||
|
||
@controller.setter
|
||
def controller(self, controller: BaseController) -> None:
|
||
self._controller = controller
|
||
|
||
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
|
||
object.setStyleSheet("""
|
||
QWidget {
|
||
background-color: #0D1117;
|
||
font-family: "Segoe UI", sans-serif;
|
||
font-size: 14px;
|
||
}
|
||
QMessageBox {
|
||
background-color: #161B22;
|
||
font-family: "Segoe UI", sans-serif;
|
||
font-size: 14px;
|
||
}
|
||
QPushButton {
|
||
background-color: #FFCC00;
|
||
color: #0D1117;
|
||
padding: 12px 25px;
|
||
border: 2px solid #E6B800;
|
||
border-radius: 8px;
|
||
font-family: "Segoe UI", sans-serif;
|
||
font-size: 16px;
|
||
font-weight: bold;
|
||
}
|
||
QPushButton:hover:!disabled {
|
||
background-color: #FFD700;
|
||
}
|
||
QPushButton:disabled {
|
||
background-color: #555555;
|
||
color: #cccccc;
|
||
border: none;
|
||
}
|
||
QLabel {
|
||
color: #ffffff;
|
||
font-size: 16px;
|
||
font-weight: bold;
|
||
font-family: "Segoe UI", sans-serif;
|
||
}
|
||
""")
|
||
|
||
class BasePointPassportFormer:
|
||
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None):
|
||
self._mediator = mediator
|
||
self._clear_stage = "Welding"
|
||
self._stages = [
|
||
"Closing",
|
||
"Squeeze",
|
||
"Welding",
|
||
"Relief",
|
||
"Oncomming"
|
||
]
|
||
self._tesla_stages = [
|
||
"Tesla squeeze",
|
||
"Tesla closing",
|
||
"Tesla welding",
|
||
"Tesla oncomming_relief"
|
||
]
|
||
self._params = []
|
||
self._ideal_data_cashe = LRUCache(maxsize=1000)
|
||
self._OptAlgorithm_operator_params = [
|
||
"dist_open_start_1",
|
||
"dist_open_start_2",
|
||
"dist_open_after_1",
|
||
"dist_open_after_2",
|
||
"dist_open_end_1",
|
||
"dist_open_end_2",
|
||
"dist_close_end_1",
|
||
"dist_close_end_2",
|
||
"time_command",
|
||
"time_robot_movement",
|
||
"object_thickness",
|
||
"force_target",
|
||
"force_capture",
|
||
"time_wielding"]
|
||
self._OptAlgorithm_system_params = [
|
||
"a_max_1",
|
||
"v_max_1",
|
||
"a_max_2",
|
||
"v_max_2",
|
||
"mass_1",
|
||
"mass_2",
|
||
"k_hardness_1",
|
||
"k_hardness_2",
|
||
"torque_max_1",
|
||
"torque_max_2",
|
||
"transmission_ratio_1",
|
||
"transmission_ratio_2",
|
||
"position_start_1",
|
||
"position_start_2",
|
||
"k_prop",
|
||
"time_capture"]
|
||
|
||
def _find_indexes(self,
|
||
signal: str,
|
||
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
|
||
stage_diff = np.diff(dataframe[signal])
|
||
start_idx = np.where(stage_diff == 1)
|
||
finish_idx = np.where(stage_diff == -1)
|
||
return start_idx[0], finish_idx[0]
|
||
|
||
def _find_events(self,
|
||
signal: str,
|
||
times:pd.Series,
|
||
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
|
||
start_idx, finish_idx = self._find_indexes(signal, dataframe)
|
||
|
||
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
|
||
start_idx = np.insert(start_idx, 0, 0)
|
||
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
|
||
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
|
||
if len(start_list) - len(end_list) == 1:
|
||
end_list.append(float(times.iloc[-1]))
|
||
|
||
return start_list, end_list
|
||
|
||
def _filter_events(self,
|
||
times: pd.Series,
|
||
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
|
||
events = {}
|
||
point_quantity = 0
|
||
if self._clear_stage in self._stages:
|
||
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
|
||
point_quantity = len(start_list)
|
||
if point_quantity == 0:
|
||
#TODO: добавить обработку исключения
|
||
return []
|
||
|
||
for stage in self._stages:
|
||
start_list, end_list = self._find_events(stage, times, dataframe)
|
||
temp = min([len(start_list), len(end_list)])
|
||
if temp < point_quantity:
|
||
print ("cant find enough", stage)
|
||
start_list += [0]*(point_quantity - temp)
|
||
end_list += [1]*(point_quantity - temp)
|
||
|
||
events[stage] = [start_list, end_list]
|
||
return events, point_quantity
|
||
|
||
def _build_ideal_data(self,
|
||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||
params: list[dict] = None) -> dict:
|
||
self.opt = idealDataBuilder(params)
|
||
stage_ideals = {
|
||
"Closing": self._opt.get_closingDF(),
|
||
"Squeeze": self._opt.get_compressionDF(),
|
||
"Welding": self._opt.get_weldingDF(),
|
||
"Relief": self._opt.get_openingDF(),
|
||
"Oncomming": self._opt.get_oncomingDF(),
|
||
"Ideal cycle": self._opt.get_cycle_time(),
|
||
"Ideal timings": self._opt.get_ideal_timings()
|
||
}
|
||
return stage_ideals
|
||
|
||
def form_passports(self) -> list[list[pd.DataFrame, dict, list]]:
|
||
...
|
||
|
||
def update_settings(self, params: list) -> None:
|
||
...
|
||
|
||
def _generate_cache_key(self,
|
||
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
|
||
"""
|
||
Преобразует params_list в хешируемый ключ для кэша.
|
||
"""
|
||
operator_settings, system_settings = params_list
|
||
|
||
# Преобразуем словари в отсортированные кортежи пар ключ-значение
|
||
operator_tuple = frozenset((key, value)
|
||
for key, value in operator_settings.items()
|
||
if str(key) in self._OptAlgorithm_operator_params)
|
||
|
||
system_tuple = frozenset((key, value)
|
||
for key, value in system_settings.items()
|
||
if str(key) in self._OptAlgorithm_system_params)
|
||
return (operator_tuple, system_tuple)
|
||
|
||
@property
|
||
def opt(self) -> BaseIdealDataBuilder:
|
||
return self._opt
|
||
|
||
@opt.setter
|
||
def opt(self, opt: BaseIdealDataBuilder):
|
||
self._opt = opt
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator |