chore: переработана база + проведена декомпозиция методов в passport_former
This commit is contained in:
parent
38b9778431
commit
57be163907
272
src/base/base.py
272
src/base/base.py
@ -1,10 +1,9 @@
|
||||
from __future__ import annotations
|
||||
import os
|
||||
from typing import Optional, Union, Any
|
||||
from typing import Optional, Union
|
||||
from dataclasses import dataclass
|
||||
|
||||
from cachetools import LRUCache
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
import pandas as pd
|
||||
from PyQt5.QtCore import QObject, QTimer
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget, QMainWindow, QVBoxLayout
|
||||
@ -13,22 +12,32 @@ from OptAlgorithm import OptAlgorithm
|
||||
from utils.qt_settings import dark_style
|
||||
|
||||
|
||||
# TODO: Глобальное замечание: Базовые классы создаются для обозначения неких базовых свойств объектов.
|
||||
# Конкретная же реализация возлагается на классы - наследники.
|
||||
@dataclass
|
||||
class PointPassport:
|
||||
timeframe: list
|
||||
events: dict
|
||||
ideal_data: dict
|
||||
useful_data: dict
|
||||
|
||||
|
||||
@dataclass
|
||||
class GraphicPassport:
|
||||
dataframe: pd.DataFrame
|
||||
points_pocket: list[PointPassport]
|
||||
useful_data: dict
|
||||
|
||||
|
||||
class BaseMediator:
|
||||
def __init__(self,
|
||||
converter: BaseDataConverter,
|
||||
# TODO: passport_former
|
||||
passportFormer: BasePointPassportFormer,
|
||||
passport_former: BasePointPassportFormer,
|
||||
plot: BasePlotWidget,
|
||||
controller: BaseController,
|
||||
file_manager: BaseFileManager):
|
||||
self._converter = converter
|
||||
self._converter.mediator = self
|
||||
self._passportFormer = passportFormer
|
||||
self._passportFormer.mediator = self
|
||||
self._passport_former = passport_former
|
||||
self._passport_former.mediator = self
|
||||
self._plot = plot
|
||||
self._plot.mediator = self
|
||||
self._controller = controller
|
||||
@ -40,13 +49,8 @@ class BaseMediator:
|
||||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
|
||||
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
|
||||
...
|
||||
# TODO: Отступы между методами класса - одна пустая строка.
|
||||
def update_settings (self, data: list[dict]):
|
||||
...
|
||||
|
||||
def update_status(self, msg: Union[str, float]) -> None:
|
||||
...
|
||||
# TODO: Отступы между классами - две пустых строки.
|
||||
|
||||
class BaseDirectoryMonitor:
|
||||
|
||||
update_timer = QTimer()
|
||||
@ -95,17 +99,6 @@ class BaseDirectoryMonitor:
|
||||
def pause(self):
|
||||
self.update_timer.stop()
|
||||
|
||||
# TODO: Почему монитор директорий обновляет какие-то настройки, графики и т.д.? Помним про
|
||||
# принцип единичной ответственности.
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def update_plots(self) -> None:
|
||||
...
|
||||
|
||||
def force_all_dir(self):
|
||||
# TODO: По сигнатуре метода вообще не понятно, что он должен делать.
|
||||
...
|
||||
|
||||
class BaseDataConverter:
|
||||
def __init__(self, mediator: Optional[BaseMediator] = None):
|
||||
@ -238,11 +231,10 @@ class BasePlotWidget:
|
||||
]
|
||||
},
|
||||
}
|
||||
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
# TODO: object - зарезервированное слово. Использовать эти слова для именования переменных не правильно.
|
||||
# Если хочется использовать именно такое слово, можно написать "object_".
|
||||
object.setStyleSheet(
|
||||
|
||||
@staticmethod
|
||||
def set_style(object_: Union[QTabWidget, QWidget]) -> None:
|
||||
object_.setStyleSheet(
|
||||
"""QLabel {
|
||||
color: #ffffff;
|
||||
font-size: 26px;
|
||||
@ -250,96 +242,6 @@ class BasePlotWidget:
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}""")
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
# TODO: Какой тип данных у 'x'? Какой тип данных у 'y'? Надо добавить аннотации типов.
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotItem(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
# Получение кривых из main_plot
|
||||
for curve in main_plot.listDataItems():
|
||||
# Извлекаем данные из кривой
|
||||
x, y = curve.getData()
|
||||
curve_name = curve.opts.get("name", None)
|
||||
signal_pen = curve.opts.get("pen", None)
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
|
||||
ROI_region.setBounds([0, x[-1]])
|
||||
navigator.addItem(ROI_region)
|
||||
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
# TODO: Возвращаемый результат не соответствует аннотированному в сигнатуре метода.
|
||||
return navigator, ROI_region
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotItem,
|
||||
region: pg.LinearRegionItem):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
# TODO: Методы _mirror_shift_data и _shift_data нарушают принцип DRY: дублирование кода.
|
||||
# Сделать ОДИН метод, одним из входных параметров которого будет lambda-функция, которая применяется к dataframe.
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
@ -347,16 +249,6 @@ class BasePlotWidget:
|
||||
@mediator.setter
|
||||
def mediator(self, mediator: BaseMediator) -> None:
|
||||
self._mediator = mediator
|
||||
|
||||
@property
|
||||
def opt(self) -> BaseIdealDataBuilder:
|
||||
return self._opt
|
||||
|
||||
@opt.setter
|
||||
def opt(self, opt: BaseIdealDataBuilder):
|
||||
# TODO: Атрибуты класса следует сначала определять в конструкторе класса.
|
||||
# TODO: Что такое opt? Optical? Option? Optimal? Optimus Prime?
|
||||
self._opt = opt
|
||||
|
||||
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
|
||||
...
|
||||
@ -454,7 +346,6 @@ class BaseFileManager:
|
||||
...
|
||||
|
||||
|
||||
|
||||
class BaseIdealDataBuilder(OptAlgorithm):
|
||||
|
||||
def __init__(self, params: list[dict]):
|
||||
@ -462,17 +353,6 @@ class BaseIdealDataBuilder(OptAlgorithm):
|
||||
self.mul = system_params['time_capture']
|
||||
self.welding_time = operator_params['time_wielding']
|
||||
super().__init__(operator_params, system_params)
|
||||
|
||||
|
||||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||||
data = []
|
||||
for i in range (0, int(end_timestamp*self.mul)+1):
|
||||
time = i/self.mul
|
||||
X1, X2, V1, V2, F = func(time)
|
||||
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
X1, X2, V1, V2, F = func(end_timestamp)
|
||||
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
...
|
||||
@ -499,12 +379,12 @@ class BaseIdealDataBuilder(OptAlgorithm):
|
||||
result = sum(self.get_ideal_timings())
|
||||
return result
|
||||
|
||||
|
||||
class BaseMainWindow(QMainWindow):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.resize(200,200)
|
||||
self._controller = controller
|
||||
# Создаем центральный виджет и устанавливаем его
|
||||
self._central_widget = QWidget()
|
||||
self.setCentralWidget(self._central_widget)
|
||||
@ -515,17 +395,9 @@ class BaseMainWindow(QMainWindow):
|
||||
self.set_style(self)
|
||||
...
|
||||
|
||||
@property
|
||||
def controller(self) -> BaseController:
|
||||
return self._controller
|
||||
|
||||
@controller.setter
|
||||
def controller(self, controller: BaseController) -> None:
|
||||
# TODO: зачем делать setter для контроллера, если ты его экземпляр передаешь в конструктор?
|
||||
self._controller = controller
|
||||
|
||||
def set_style(self, object: Union[QTabWidget, QWidget, QMainWindow]) -> None:
|
||||
object.setStyleSheet(dark_style)
|
||||
|
||||
|
||||
class BasePointPassportFormer:
|
||||
|
||||
@ -580,98 +452,20 @@ class BasePointPassportFormer:
|
||||
"position_start_2",
|
||||
"k_prop",
|
||||
"time_capture"]
|
||||
|
||||
def _find_indexes(self,
|
||||
signal: str,
|
||||
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
|
||||
stage_diff = np.diff(dataframe[signal])
|
||||
start_idx = np.where(stage_diff == 1)
|
||||
finish_idx = np.where(stage_diff == -1)
|
||||
return start_idx[0], finish_idx[0]
|
||||
|
||||
def _find_events(self,
|
||||
signal: str,
|
||||
times:pd.Series,
|
||||
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
|
||||
start_idx, finish_idx = self._find_indexes(signal, dataframe)
|
||||
|
||||
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
|
||||
start_idx = np.insert(start_idx, 0, 0)
|
||||
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
|
||||
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
|
||||
if len(start_list) - len(end_list) == 1:
|
||||
end_list.append(float(times.iloc[-1]))
|
||||
|
||||
return start_list, end_list
|
||||
|
||||
def _filter_events(self,
|
||||
times: pd.Series,
|
||||
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
|
||||
events = {}
|
||||
point_quantity = 0
|
||||
if self._clear_stage in self._stages:
|
||||
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
|
||||
point_quantity = len(start_list)
|
||||
if point_quantity == 0:
|
||||
#TODO: добавить обработку исключения
|
||||
return []
|
||||
|
||||
for stage in self._stages:
|
||||
start_list, end_list = self._find_events(stage, times, dataframe)
|
||||
temp = min([len(start_list), len(end_list)])
|
||||
if temp < point_quantity:
|
||||
print ("cant find enough", stage)
|
||||
start_list += [0]*(point_quantity - temp)
|
||||
end_list += [1]*(point_quantity - temp)
|
||||
|
||||
events[stage] = [start_list, end_list]
|
||||
return events, point_quantity
|
||||
|
||||
def _build_ideal_data(self,
|
||||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||||
params: list[dict] = None) -> dict:
|
||||
self.opt = idealDataBuilder(params)
|
||||
stage_ideals = {
|
||||
"Closing": self._opt.get_closingDF(),
|
||||
"Squeeze": self._opt.get_compressionDF(),
|
||||
"Welding": self._opt.get_weldingDF(),
|
||||
"Relief": self._opt.get_openingDF(),
|
||||
"Oncomming": self._opt.get_oncomingDF(),
|
||||
"Ideal cycle": self._opt.get_cycle_time(),
|
||||
"Ideal timings": self._opt.get_ideal_timings()
|
||||
}
|
||||
return stage_ideals
|
||||
|
||||
def form_passports(self) -> list[list[pd.DataFrame, dict, list]]:
|
||||
def form_passports(self) -> list[GraphicPassport]:
|
||||
...
|
||||
|
||||
def update_settings(self, params: list) -> None:
|
||||
...
|
||||
|
||||
def _generate_cache_key(self,
|
||||
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
|
||||
"""
|
||||
Преобразует params_list в хешируемый ключ для кэша.
|
||||
"""
|
||||
operator_settings, system_settings = params_list
|
||||
|
||||
# Преобразуем словари в отсортированные кортежи пар ключ-значение
|
||||
operator_tuple = frozenset((key, value)
|
||||
for key, value in operator_settings.items()
|
||||
if str(key) in self._OptAlgorithm_operator_params)
|
||||
|
||||
system_tuple = frozenset((key, value)
|
||||
for key, value in system_settings.items()
|
||||
if str(key) in self._OptAlgorithm_system_params)
|
||||
return (operator_tuple, system_tuple)
|
||||
|
||||
@property
|
||||
def opt(self) -> BaseIdealDataBuilder:
|
||||
return self._opt
|
||||
def opt_algorithm(self) -> BaseIdealDataBuilder:
|
||||
return self._opt_algorithm
|
||||
|
||||
@opt.setter
|
||||
def opt(self, opt: BaseIdealDataBuilder):
|
||||
self._opt = opt
|
||||
@opt_algorithm.setter
|
||||
def opt_algorithm(self, opt_algorithm: BaseIdealDataBuilder):
|
||||
self._opt_algorithm = opt_algorithm
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
|
||||
@ -1,5 +1,3 @@
|
||||
from typing import Union
|
||||
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||||
from PyQt5.QtGui import QPixmap
|
||||
from PyQt5.QtCore import pyqtSignal
|
||||
|
||||
@ -21,7 +21,7 @@ class Mediator(BaseMediator):
|
||||
|
||||
if issubclass(source.__class__, BaseDataConverter):
|
||||
self._controller.update_progress(1)
|
||||
self._passportFormer.form_passports(data)
|
||||
self._passport_former.form_passports(data)
|
||||
|
||||
if issubclass(source.__class__, BasePointPassportFormer):
|
||||
self._controller.update_progress(2)
|
||||
@ -33,7 +33,7 @@ class Mediator(BaseMediator):
|
||||
|
||||
if issubclass(source.__class__, BaseController):
|
||||
self._file_manager.update_monitor_settings(data)
|
||||
self._passportFormer.update_settings(data)
|
||||
self._passport_former.update_settings(data)
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,107 +0,0 @@
|
||||
import traceback
|
||||
import sys
|
||||
|
||||
import pandas as pd
|
||||
from loguru import logger
|
||||
|
||||
from base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||||
|
||||
|
||||
class idealDataBuilder(BaseIdealDataBuilder):
|
||||
# TODO: Имя класса с большой буквы.
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
||||
|
||||
def get_compressionDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
||||
|
||||
def get_openingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
||||
|
||||
def get_oncomingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
data = []
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
|
||||
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
|
||||
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_ideal_timings(self) -> list[float]:
|
||||
data = self.Ts
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
|
||||
return ideal_timings
|
||||
|
||||
class PassportFormer(BasePointPassportFormer):
|
||||
|
||||
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, int]]:
|
||||
# TODO: сигнатура метода не соответствует сигнатуре метода класса родителя
|
||||
try:
|
||||
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
|
||||
except:
|
||||
# TODO: обработка исключений!!!
|
||||
tb = sys.exc_info()[2]
|
||||
# TODO: Нормальные сообщения в лог!
|
||||
tbinfo = traceback.format_tb(tb)[0]
|
||||
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
|
||||
logger.error(pymsg)
|
||||
return_data = []
|
||||
finally:
|
||||
self._mediator.notify(self, return_data)
|
||||
|
||||
|
||||
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
|
||||
# TODO: Еще раз проверь, что написано в аннотации, и что метод возвращает.
|
||||
# TODO: Практически нечитаемо. Надо оптимизировать и декомпозировать.
|
||||
if dataframe is not None:
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
||||
else:
|
||||
events = None
|
||||
key = list(self._params[0].keys())[0]
|
||||
point_quantity = len(self._params[0][key])
|
||||
|
||||
points_pocket = []
|
||||
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||||
tesla_time = sum(self._params[0].get("Tesla summary time", []))
|
||||
useful_data = {"tesla_time": tesla_time,
|
||||
"range_ME": system_settings["Range ME, mm"],
|
||||
"k_hardness": system_settings["k_hardness_1"]
|
||||
}
|
||||
|
||||
for i in range(point_quantity):
|
||||
operator_settings = {
|
||||
key: (value[i] if i < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
params_list = [operator_settings, system_settings]
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
# TODO: if-else заменить на:
|
||||
# ideal_data = self._ideal_data_cashe.get(cache_key, self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list))
|
||||
# self._ideal_data_cashe[cache_key] = ideal_data
|
||||
if cache_key in self._ideal_data_cashe :
|
||||
ideal_data = self._ideal_data_cashe[cache_key]
|
||||
else:
|
||||
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
# TODO: point_timeframe, point_events = None, None
|
||||
if events is not None:
|
||||
idx = i+1 if idx_shift else i
|
||||
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
|
||||
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
|
||||
else:
|
||||
# TODO: убрать else
|
||||
point_timeframe, point_events = None, None
|
||||
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
|
||||
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
||||
return dataframe, points_pocket, useful_data
|
||||
|
||||
def update_settings(self, params: list[dict, dict]):
|
||||
self._params = params
|
||||
244
src/controller/passport_former.py
Normal file
244
src/controller/passport_former.py
Normal file
@ -0,0 +1,244 @@
|
||||
import traceback
|
||||
import sys
|
||||
from typing import Optional, Any
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from loguru import logger
|
||||
|
||||
from base.base import BasePointPassportFormer, BaseIdealDataBuilder, PointPassport, GraphicPassport
|
||||
|
||||
|
||||
class IdealDataBuilder(BaseIdealDataBuilder):
|
||||
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
||||
|
||||
def get_compressionDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
||||
|
||||
def get_openingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
||||
|
||||
def get_oncomingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
data = []
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
|
||||
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
|
||||
data.append({
|
||||
"time":0,
|
||||
"Position FE":X1,
|
||||
"Position ME":X2,
|
||||
"Rotor Speed FE":V1,
|
||||
"Rotor Speed ME":V2,
|
||||
"Force":F
|
||||
})
|
||||
data.append({
|
||||
"time":self.welding_time,
|
||||
"Position FE":X1,
|
||||
"Position ME":X2,
|
||||
"Rotor Speed FE":V1,
|
||||
"Rotor Speed ME":V2,
|
||||
"Force":F
|
||||
})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_ideal_timings(self) -> list[float]:
|
||||
data = self.Ts
|
||||
ideal_timings = [
|
||||
data['tclose'],
|
||||
data['tgrow'],
|
||||
self.welding_time,
|
||||
self.getMarkOpen(),
|
||||
data['tmovement']
|
||||
]
|
||||
return ideal_timings
|
||||
|
||||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||||
data = []
|
||||
for i in range (0, int(end_timestamp*self.mul)+1):
|
||||
time = i/self.mul
|
||||
X1, X2, V1, V2, F = func(time)
|
||||
data.append({
|
||||
"time":time,
|
||||
"Position FE":X1*1000,
|
||||
"Position ME":X2*1000,
|
||||
"Rotor Speed FE":V1*1000,
|
||||
"Rotor Speed ME":V2*1000,
|
||||
"Force":F
|
||||
})
|
||||
X1, X2, V1, V2, F = func(end_timestamp)
|
||||
data.append({
|
||||
"time":end_timestamp,
|
||||
"Position FE":X1*1000,
|
||||
"Position ME":X2*1000,
|
||||
"Rotor Speed FE":V1*1000,
|
||||
"Rotor Speed ME":V2*1000,
|
||||
"Force":F
|
||||
})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
|
||||
class PassportFormer(BasePointPassportFormer):
|
||||
|
||||
def form_passports(self, data: list[pd.DataFrame]) -> list[GraphicPassport]:
|
||||
try:
|
||||
return_data = [self._build_graphic_passport(dataframe) for dataframe in data]
|
||||
except:
|
||||
# TODO: обработка исключений!!!
|
||||
tb = sys.exc_info()[2]
|
||||
# TODO: Нормальные сообщения в лог!
|
||||
tbinfo = traceback.format_tb(tb)[0]
|
||||
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
|
||||
logger.error(pymsg)
|
||||
return_data = []
|
||||
finally:
|
||||
self._mediator.notify(self, return_data)
|
||||
|
||||
def update_settings(self, params: list[dict, dict]):
|
||||
self._params = params
|
||||
|
||||
@staticmethod
|
||||
def _find_indexes(signal: str,
|
||||
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
|
||||
stage_diff = np.diff(dataframe[signal])
|
||||
start_idx = np.where(stage_diff == 1)
|
||||
finish_idx = np.where(stage_diff == -1)
|
||||
return start_idx[0], finish_idx[0]
|
||||
|
||||
def _find_events(self,
|
||||
signal: str,
|
||||
times:pd.Series,
|
||||
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
|
||||
start_idx, finish_idx = self._find_indexes(signal, dataframe)
|
||||
|
||||
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
|
||||
start_idx = np.insert(start_idx, 0, 0)
|
||||
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
|
||||
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
|
||||
if len(start_list) - len(end_list) == 1:
|
||||
end_list.append(float(times.iloc[-1]))
|
||||
|
||||
return start_list, end_list
|
||||
|
||||
def _filter_events(self,
|
||||
times: pd.Series,
|
||||
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
|
||||
events = {}
|
||||
point_quantity = 0
|
||||
if self._clear_stage in self._stages:
|
||||
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
|
||||
point_quantity = len(start_list)
|
||||
if point_quantity == 0:
|
||||
#TODO: добавить обработку исключения
|
||||
return []
|
||||
|
||||
for stage in self._stages:
|
||||
start_list, end_list = self._find_events(stage, times, dataframe)
|
||||
temp = min([len(start_list), len(end_list)])
|
||||
if temp < point_quantity:
|
||||
print ("cant find enough", stage)
|
||||
start_list += [0]*(point_quantity - temp)
|
||||
end_list += [1]*(point_quantity - temp)
|
||||
|
||||
events[stage] = [start_list, end_list]
|
||||
return events, point_quantity
|
||||
|
||||
def _build_ideal_data(self,
|
||||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||||
params: list[dict] = None) -> dict:
|
||||
self.opt_algorithm = idealDataBuilder(params)
|
||||
stage_ideals = {
|
||||
"Closing": self._opt_algorithm.get_closingDF(),
|
||||
"Squeeze": self._opt_algorithm.get_compressionDF(),
|
||||
"Welding": self._opt_algorithm.get_weldingDF(),
|
||||
"Relief": self._opt_algorithm.get_openingDF(),
|
||||
"Oncomming": self._opt_algorithm.get_oncomingDF(),
|
||||
"Ideal cycle": self._opt_algorithm.get_cycle_time(),
|
||||
"Ideal timings": self._opt_algorithm.get_ideal_timings()
|
||||
}
|
||||
return stage_ideals
|
||||
|
||||
def _generate_cache_key(self,
|
||||
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
|
||||
"""
|
||||
Преобразует params_list в хешируемый ключ для кэша.
|
||||
"""
|
||||
operator_settings, system_settings = params_list
|
||||
|
||||
# Преобразуем словари в отсортированные кортежи пар ключ-значение
|
||||
operator_tuple = frozenset((key, value)
|
||||
for key, value in operator_settings.items()
|
||||
if str(key) in self._OptAlgorithm_operator_params)
|
||||
|
||||
system_tuple = frozenset((key, value)
|
||||
for key, value in system_settings.items()
|
||||
if str(key) in self._OptAlgorithm_system_params)
|
||||
return (operator_tuple, system_tuple)
|
||||
|
||||
def _build_graphic_passport(self, dataframe: pd.DataFrame) -> GraphicPassport:
|
||||
|
||||
if dataframe is not None:
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
else:
|
||||
events = None
|
||||
key = list(self._params[0].keys())[0]
|
||||
point_quantity = len(self._params[0][key])
|
||||
|
||||
graphic_passport = GraphicPassport()
|
||||
graphic_passport.dataframe = dataframe
|
||||
graphic_passport.points_pocket = []
|
||||
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||||
graphic_passport.useful_data = self._form_graphic_useful_data(system_settings)
|
||||
|
||||
for i in range(point_quantity):
|
||||
operator_settings = self._get_operator_settings_part(i)
|
||||
params_list = [operator_settings, system_settings]
|
||||
|
||||
point_passport = PointPassport()
|
||||
point_passport.ideal_data = self._form_point_ideal_data(params_list)
|
||||
point_passport.useful_data = self._form_point_useful_data(operator_settings)
|
||||
point_passport.timeframe, point_passport.events = self._form_point_events(events, i)
|
||||
|
||||
graphic_passport.points_pocket.append(point_passport)
|
||||
return graphic_passport
|
||||
|
||||
def _form_graphic_useful_data(self, system_settings:dict) -> dict:
|
||||
tesla_time = sum(self._params[0].get("Tesla summary time", []))
|
||||
useful_data = {"tesla_time": tesla_time,
|
||||
"range_ME": system_settings["Range ME, mm"],
|
||||
"k_hardness": system_settings["k_hardness_1"]
|
||||
}
|
||||
return useful_data
|
||||
|
||||
def _form_point_useful_data(self, operator_settings:dict) -> dict:
|
||||
useful_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
return useful_data
|
||||
|
||||
def _form_point_ideal_data(self, params_list:list) -> dict:
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
ideal_data = self._ideal_data_cashe.get(cache_key,
|
||||
self._build_ideal_data(idealDataBuilder=IdealDataBuilder, params=params_list))
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
return ideal_data
|
||||
|
||||
def _get_operator_settings_part(self, idx:int) -> dict:
|
||||
operator_settings = {
|
||||
key: (value[idx] if idx < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
return operator_settings
|
||||
|
||||
def _form_point_events(self, events:dict, idx) -> list:
|
||||
timeframe, point_events = None, None
|
||||
if events is not None:
|
||||
idx_shift = idx+1 if events[self._stages[-1]][0][0] == 0 else idx
|
||||
timeframe = [events[self._stages[0]][0][idx], events[self._stages[-1]][1][idx_shift]]
|
||||
point_events = {key: [value[0][idx], value[1][idx]] for key, value in events.items()}
|
||||
return timeframe, point_events
|
||||
@ -13,17 +13,99 @@ import pyqtgraph as pg
|
||||
import pandas as pd
|
||||
|
||||
from base.base import BasePlotWidget
|
||||
# TODO: Навести порядок в импортах.
|
||||
|
||||
class ProcessStage():
|
||||
# TODO: Для чего тут наследование от ничего?
|
||||
# TODO: Если я правильно понял сценарий использования этого класса, то правильно это сделать либо отнаследовавшись
|
||||
# от NamedTuple, либо использовать @dataclass.
|
||||
mean_value:int
|
||||
start_index:int
|
||||
finish_index:int
|
||||
|
||||
class PlotWidget(BasePlotWidget):
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
# TODO: Какой тип данных у 'x'? Какой тип данных у 'y'? Надо добавить аннотации типов.
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotItem(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
# Получение кривых из main_plot
|
||||
for curve in main_plot.listDataItems():
|
||||
# Извлекаем данные из кривой
|
||||
x, y = curve.getData()
|
||||
curve_name = curve.opts.get("name", None)
|
||||
signal_pen = curve.opts.get("pen", None)
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
|
||||
ROI_region.setBounds([0, x[-1]])
|
||||
navigator.addItem(ROI_region)
|
||||
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
# TODO: Возвращаемый результат не соответствует аннотированному в сигнатуре метода.
|
||||
return navigator, ROI_region
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotItem,
|
||||
region: pg.LinearRegionItem):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
# TODO: Методы _mirror_shift_data и _shift_data нарушают принцип DRY: дублирование кода.
|
||||
# Сделать ОДИН метод, одним из входных параметров которого будет lambda-функция, которая применяется к dataframe.
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
|
||||
# TODO: Данный метод статичный. Для обозначения подобных методов используется декоратор @staticmethod.
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
|
||||
def _create_curve_ideal(self,
|
||||
signal: dict[str, Any],
|
||||
@ -383,8 +465,7 @@ class PlotWidget(BasePlotWidget):
|
||||
self._mediator.notify(self, widgets_datapack)
|
||||
|
||||
def _update_status(self, widgsteps:int, pointsteps:int, cur_widg:int, cur_point:int):
|
||||
# TODO: if self._datalen: ...
|
||||
if self._datalen != 0:
|
||||
if self._datalen:
|
||||
sycle_start = self._datastep/self._datalen*100 + 1
|
||||
period1 = 100/self._datalen
|
||||
else:
|
||||
|
||||
@ -105,10 +105,9 @@ class settingsWindow(QWidget):
|
||||
for i, (_, items) in enumerate(self._data.items()):
|
||||
for j, item in enumerate(items):
|
||||
self._param_table.setItem(i, j, QTableWidgetItem(str(item)))
|
||||
# TODO: if isinstance()...
|
||||
if type(item) == int:
|
||||
if isinstance(item, int):
|
||||
self._param_table.setItemDelegateForRow(i, int_delegate)
|
||||
elif type(item) == float:
|
||||
elif isinstance(item, float):
|
||||
self._param_table.setItemDelegateForRow(i, float_delegate)
|
||||
else:
|
||||
self._param_table.setItemDelegateForRow(i, str_delegate)
|
||||
|
||||
@ -9,7 +9,7 @@ from controller.mediator import Mediator
|
||||
from controller.converter import DataConverter
|
||||
from gui.plotter import PlotWidget
|
||||
from controller.controller import Controller
|
||||
from controller.passportFormer import PassportFormer
|
||||
from src.controller.passport_former import PassportFormer
|
||||
|
||||
# TODO: Именование модулей: lowercase / snake_case.
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user