589 lines
16 KiB
Python
589 lines
16 KiB
Python
from __future__ import annotations
|
||
import os
|
||
from typing import Optional, Union
|
||
from dataclasses import dataclass
|
||
|
||
from cachetools import LRUCache
|
||
import pandas as pd
|
||
from PyQt5.QtCore import QObject, QTimer
|
||
from PyQt5.QtWidgets import QWidget, QTabWidget, QMainWindow, QVBoxLayout
|
||
|
||
from OptAlgorithm import OptAlgorithm
|
||
from utils.qt_settings import dark_style
|
||
|
||
@dataclass
|
||
class KukaTXT:
|
||
time: float = 0
|
||
endtime: float = 0
|
||
#module: str
|
||
func: str = ""
|
||
type_: str = ""
|
||
signal: str = ""
|
||
#line: int = 0
|
||
#point_name: str = ""
|
||
#point_coord: dict = field(default_factory=lambda: {})
|
||
#blending: str = ""
|
||
#blending_param: float = 0
|
||
#velocities: dict = field(default_factory=lambda: {})
|
||
#accelerarions: dict = field(default_factory=lambda: {})
|
||
#base: dict = field(default_factory=lambda: {})
|
||
#tool: dict = field(default_factory=lambda: {})
|
||
#ipo_mode: str = ""
|
||
#motion_mode: str = ""
|
||
#load: dict = field(default_factory=lambda: {})
|
||
#load_a3: dict = field(default_factory=lambda: {})
|
||
|
||
|
||
@dataclass
|
||
class KukaDataHead:
|
||
rob_ID: int
|
||
filename: str
|
||
channels: dict
|
||
|
||
|
||
@dataclass
|
||
class PlotItems:
|
||
regions: dict
|
||
curves: dict
|
||
qt_items: dict
|
||
|
||
|
||
@dataclass
|
||
class PointPassport:
|
||
timeframe: list
|
||
events: dict
|
||
ideal_data: dict
|
||
useful_data: dict
|
||
|
||
|
||
@dataclass
|
||
class UsefulGraphData:
|
||
client_time: float
|
||
range_ME: float
|
||
k_hardness: float
|
||
|
||
|
||
@dataclass
|
||
class GraphicPassport:
|
||
dataframe: pd.DataFrame
|
||
points_pocket: list[PointPassport]
|
||
useful_data: UsefulGraphData
|
||
|
||
|
||
@dataclass
|
||
class Settings:
|
||
operator: dict
|
||
system: dict
|
||
|
||
|
||
class BaseMediator:
|
||
def __init__(self,
|
||
converter: BaseDataConverter,
|
||
passport_former: BasePointPassportFormer,
|
||
plot: BasePlotWidget,
|
||
controller: BaseController,
|
||
file_manager: BaseFileManager,
|
||
trace_processor: BaseRawTraceProcessor):
|
||
self._converter = converter
|
||
self._converter.mediator = self
|
||
self._passport_former = passport_former
|
||
self._passport_former.mediator = self
|
||
self._plot = plot
|
||
self._plot.mediator = self
|
||
self._controller = controller
|
||
self._controller.mediator = self
|
||
self._file_manager = file_manager
|
||
self._file_manager.mediator = self
|
||
self._trace_processor = trace_processor
|
||
self._trace_processor.mediator = self
|
||
|
||
def notify(self,
|
||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget, BaseRawTraceProcessor],
|
||
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget], pd.DataFrame]):
|
||
...
|
||
|
||
def prerender(self, data:list[str]) -> None:
|
||
...
|
||
|
||
class BaseDirectoryMonitor:
|
||
|
||
update_timer = QTimer()
|
||
|
||
def __init__(self,
|
||
file_manager: Optional[BaseFileManager] = None):
|
||
super().__init__()
|
||
self._directory_path = None
|
||
self._update_time = None
|
||
self.isActive = False
|
||
self._files = []
|
||
self._file_manager = file_manager
|
||
|
||
@property
|
||
def directory_path(self) -> str:
|
||
return self._directory_path
|
||
|
||
@property
|
||
def update_time(self) -> int:
|
||
return self._update_time
|
||
|
||
@property
|
||
def files(self) -> list[str]:
|
||
return self._files
|
||
|
||
@property
|
||
def file_manager(self) -> BaseFileManager:
|
||
return self._file_manager
|
||
|
||
@file_manager.setter
|
||
def file_manager(self, file_manager: BaseFileManager) -> None:
|
||
self._file_manager = file_manager
|
||
|
||
def init_state(self):
|
||
files = os.listdir(self._directory_path)
|
||
self._files = files
|
||
|
||
def start(self):
|
||
self.isActive = True
|
||
self.update_timer.start(int(self._update_time))
|
||
|
||
def stop(self):
|
||
self.isActive = False
|
||
self.update_timer.stop()
|
||
|
||
def pause(self):
|
||
self.update_timer.stop()
|
||
|
||
|
||
class BaseDataConverter:
|
||
def __init__(self, mediator: Optional[BaseMediator] = None):
|
||
self._mediator = mediator
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
def convert_data(self, files: list[str]) -> None:
|
||
...
|
||
|
||
|
||
class BasePlotWidget:
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None,
|
||
controller: BaseController = None):
|
||
super().__init__()
|
||
self._mediator: BaseMediator = mediator
|
||
self._controller: BaseController = controller
|
||
self._datalen: int = 0
|
||
self._datastep: int = 0
|
||
|
||
self._stage_colors = {
|
||
"Closing": [220, 20, 60, 100], # Crimson
|
||
"Squeeze": [30, 144, 255, 100], # Dodger Blue
|
||
"Welding": [128, 128, 128, 100], # Gray
|
||
"Relief": [34, 139, 34, 100], # Forest Green
|
||
"Oncomming": [255, 165, 0, 100] # Orange
|
||
}
|
||
self._plt_channels = {
|
||
"Electrode Force, N & Welding Current, kA": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": True,
|
||
"ideals": True,
|
||
"mirror ME": False,
|
||
"workpiece": False,
|
||
"force compensation FE": False,
|
||
"force accuracy":True
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Electrode Force, N ME",
|
||
"pen": 'r',
|
||
},
|
||
{
|
||
"name": "Electrode Force, N FE",
|
||
"pen": 'w',
|
||
},
|
||
#{
|
||
# "name": "Welding Current ME",
|
||
# "pen": "y",
|
||
#}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Force",
|
||
"pen": {'color': 'g', 'width':3},
|
||
}
|
||
]
|
||
},
|
||
"Electrode Position, mm": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": False,
|
||
"ideals": True,
|
||
"mirror ME": True,
|
||
"workpiece": True,
|
||
"force compensation FE": True,
|
||
"force accuracy":False
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Rotor Position, mm ME",
|
||
"pen": {'color': 'r', 'width':2},
|
||
},
|
||
{
|
||
"name": "Rotor Position, mm FE",
|
||
"pen": {'color': 'w', 'width':2},
|
||
}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Position ME",
|
||
"pen": {'color': 'g', 'width':4},
|
||
},
|
||
{
|
||
"name": "Position FE",
|
||
"pen": {'color': 'b', 'width':4},
|
||
}
|
||
]
|
||
},
|
||
"Electrode Speed, mm/s": {
|
||
"Settings": {
|
||
"zoom": False,
|
||
"stages": True,
|
||
"performance": False,
|
||
"ideals": True,
|
||
"mirror ME": False,
|
||
"workpiece": False,
|
||
"force compensation FE": False,
|
||
"force accuracy":False
|
||
},
|
||
"Real_signals": [
|
||
{
|
||
"name": "Rotor Speed, mm/s ME",
|
||
"pen": 'r',
|
||
"zoom": False
|
||
},
|
||
{
|
||
"name": "Rotor Speed, mm/s FE",
|
||
"pen": 'w',
|
||
"zoom": False
|
||
}
|
||
],
|
||
"Ideal_signals": [
|
||
{
|
||
"name": "Rotor Speed ME",
|
||
"pen": {'color': 'g', 'width':3},
|
||
"zoom": False
|
||
},
|
||
{
|
||
"name": "Rotor Speed FE",
|
||
"pen": {'color': 'b', 'width':3},
|
||
"zoom": False
|
||
}
|
||
]
|
||
},
|
||
}
|
||
|
||
@staticmethod
|
||
def set_style(object_: Union[QTabWidget, QWidget]) -> None:
|
||
object_.setStyleSheet(
|
||
"""QLabel {
|
||
color: #ffffff;
|
||
font-size: 26px;
|
||
font-weight: bold;
|
||
font-family: "Segoe UI", sans-serif;
|
||
}""")
|
||
|
||
@property
|
||
def controller(self) -> BaseController:
|
||
return self._controller
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
|
||
...
|
||
|
||
def build_raw_trace(self, data:pd.DataFrame) -> None:
|
||
...
|
||
|
||
class BaseController(QObject):
|
||
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None,
|
||
file_manager: Optional[BaseFileManager] = None):
|
||
super().__init__()
|
||
self._mediator = mediator
|
||
self._file_manager = file_manager
|
||
|
||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||
...
|
||
|
||
def update_settings(self, settings: list[dict]) -> None:
|
||
...
|
||
|
||
def set_working_mode(self, mode:int) -> None:
|
||
...
|
||
|
||
def open_file(self, filepath: str) -> None:
|
||
...
|
||
|
||
def open_dir(self, dirpath:str) -> None:
|
||
...
|
||
|
||
def update_plots(self) -> None:
|
||
...
|
||
|
||
def update_status(self, msg:str) -> None:
|
||
...
|
||
|
||
def update_progress(self, progress:int) -> None:
|
||
...
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
@property
|
||
def file_manager(self) -> BaseFileManager:
|
||
return self._file_manager
|
||
|
||
@file_manager.setter
|
||
def file_manager(self, file_manager: BaseFileManager) -> None:
|
||
self._file_manager = file_manager
|
||
|
||
|
||
class BaseFileManager:
|
||
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None,
|
||
monitor: Optional[BaseDirectoryMonitor] = None):
|
||
self._mediator = mediator
|
||
self._monitor = monitor
|
||
self._paths_library = set()
|
||
|
||
@property
|
||
def paths_library(self) -> set:
|
||
return self._paths_library
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
@property
|
||
def monitor(self) -> BaseDirectoryMonitor:
|
||
return self._monitor
|
||
|
||
@monitor.setter
|
||
def monitor(self, monitor: BaseDirectoryMonitor) -> None:
|
||
self._monitor = monitor
|
||
|
||
def replot_all(self) -> None:
|
||
...
|
||
|
||
def open_custom_file(self, path:str) -> None:
|
||
...
|
||
|
||
def open_raw_traces_dir(self, path:str) -> None:
|
||
...
|
||
|
||
def set_mode(self, num:int) -> None:
|
||
...
|
||
|
||
def update_monitor_settings(self, settings:list[dict]) -> None:
|
||
...
|
||
|
||
def add_new_paths(self, paths:list[str]):
|
||
...
|
||
|
||
|
||
class BaseIdealDataBuilder(OptAlgorithm):
|
||
|
||
def __init__(self, settings: Settings):
|
||
self.mul = settings.system['time_capture']
|
||
self.welding_time = settings.operator['time_wielding']
|
||
super().__init__(settings.system, settings.operator)
|
||
|
||
def get_closingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_compressionDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_openingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_tmovementDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_weldingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_oncomingDF(self) -> pd.DataFrame:
|
||
...
|
||
|
||
def get_ideal_timings(self) -> list[float, float, float, float]:
|
||
...
|
||
|
||
def get_cycle_time(self) -> float:
|
||
result = sum(self.get_ideal_timings())
|
||
return result
|
||
|
||
|
||
class BaseMainWindow(QMainWindow):
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.resize(200,200)
|
||
# Создаем центральный виджет и устанавливаем его
|
||
self._central_widget = QWidget()
|
||
self.setCentralWidget(self._central_widget)
|
||
|
||
# Устанавливаем основной вертикальный макет для центрального виджета
|
||
self._central_layout = QVBoxLayout()
|
||
self._central_widget.setLayout(self._central_layout)
|
||
self.set_style(self)
|
||
...
|
||
|
||
def set_style(self, object: Union[QTabWidget, QWidget, QMainWindow]) -> None:
|
||
object.setStyleSheet(dark_style)
|
||
|
||
|
||
class BasePointPassportFormer:
|
||
|
||
def __init__(self,
|
||
mediator: Optional[BaseMediator] = None):
|
||
self._mediator = mediator
|
||
self._clear_stage = "Welding"
|
||
self._settings = Settings
|
||
self._stages = [
|
||
"Closing",
|
||
"Squeeze",
|
||
"Welding",
|
||
"Relief",
|
||
"Oncomming"
|
||
]
|
||
self._tesla_stages = [
|
||
"Tesla squeeze",
|
||
"Tesla closing",
|
||
"Tesla welding",
|
||
"Tesla oncomming_relief"
|
||
]
|
||
self._ideal_data_cashe = LRUCache(maxsize=1000)
|
||
self._OptAlgorithm_operator_params = [
|
||
"dist_open_start_1",
|
||
"dist_open_start_2",
|
||
"dist_open_after_1",
|
||
"dist_open_after_2",
|
||
"dist_open_end_1",
|
||
"dist_open_end_2",
|
||
"dist_close_end_1",
|
||
"dist_close_end_2",
|
||
"time_command",
|
||
"time_robot_movement",
|
||
"object_thickness",
|
||
"force_target",
|
||
"force_capture",
|
||
"time_wielding"]
|
||
self._OptAlgorithm_system_params = [
|
||
"a_max_1",
|
||
"v_max_1",
|
||
"a_max_2",
|
||
"v_max_2",
|
||
"mass_1",
|
||
"mass_2",
|
||
"k_hardness_1",
|
||
"k_hardness_2",
|
||
"torque_max_1",
|
||
"torque_max_2",
|
||
"transmission_ratio_1",
|
||
"transmission_ratio_2",
|
||
"position_start_1",
|
||
"position_start_2",
|
||
"k_prop",
|
||
"time_capture"]
|
||
|
||
def form_passports(self) -> list[GraphicPassport]:
|
||
...
|
||
|
||
def update_settings(self, params: list) -> None:
|
||
...
|
||
|
||
@property
|
||
def opt_algorithm(self) -> BaseIdealDataBuilder:
|
||
return self._opt_algorithm
|
||
|
||
@opt_algorithm.setter
|
||
def opt_algorithm(self, opt_algorithm: BaseIdealDataBuilder):
|
||
self._opt_algorithm = opt_algorithm
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
|
||
class BaseRawTraceProcessor:
|
||
|
||
def __init__(self,
|
||
dataparser:BaseKukaDataParser,
|
||
textparser:BaseKukaTextParser,
|
||
mediator:Optional[BaseMediator] = None):
|
||
self._mediator = mediator
|
||
self._dataparser = dataparser
|
||
self._textparser = textparser
|
||
self._trace_df = None
|
||
self._text_data = None
|
||
|
||
def prerender(self, data:list[str]) -> None:
|
||
...
|
||
|
||
@property
|
||
def mediator(self) -> BaseMediator:
|
||
return self._mediator
|
||
|
||
@mediator.setter
|
||
def mediator(self, mediator: BaseMediator) -> None:
|
||
self._mediator = mediator
|
||
|
||
|
||
class BaseKukaDataParser:
|
||
|
||
def __init__(self):
|
||
self._ch_name = None
|
||
|
||
def parse(self, head_path: str) -> pd.DataFrame:
|
||
...
|
||
|
||
|
||
class BaseKukaTextParser:
|
||
|
||
def __init__(self):
|
||
self._in_msg = None
|
||
self._datapacks = None
|
||
|
||
def parse(self, path:str ) -> list[KukaTXT]:
|
||
...
|