Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/gui/app.py
#	src/gui/plot_window.py
#	src/main.py
#	src/utils/diagram_parser.py
This commit is contained in:
Леонид Титов 2024-12-05 12:03:37 +03:00
commit 88c8e34c67
49 changed files with 808 additions and 547 deletions

View File

@ -8,7 +8,7 @@
"dist_close_end_1": 0.005, "dist_close_end_1": 0.005,
"dist_close_end_2": 0.005, "dist_close_end_2": 0.005,
"time_wielding": 1, "time_wielding": 1,
"time_command": 0.06, "time_command": 0.0,
"time_robot_movement": 0.2, "time_robot_movement": 0.2,
"object_thickness": 0.0045, "object_thickness": 0.0045,
"force_target": 5000, "force_target": 5000,

View File

@ -1,11 +1,13 @@
{ {
"trace_storage_path": "D:/downloads/a22",
"monitor_update_period": 100,
"a_max_1": 5.41, "a_max_1": 5.41,
"v_max_1": 0.108, "v_max_1": 0.278,
"a_max_2": 35.81, "a_max_2": 35.81,
"v_max_2": 0.678, "v_max_2": 0.7,
"mass_1": 257, "mass_1": 270,
"mass_2": 1, "mass_2": 1,
"k_hardness_1": 1759291, "k_hardness_1": 2148570,
"k_hardness_2": 0, "k_hardness_2": 0,
"torque_max_1": 20, "torque_max_1": 20,
"torque_max_2": 0, "torque_max_2": 0,
@ -15,9 +17,6 @@
"position_start_2": 0.08, "position_start_2": 0.08,
"k_prop": 0.05, "k_prop": 0.05,
"time_capture": 100000, "time_capture": 100000,
"UML_time_scaler": 1000, "UML_time_scaler": 1000
"Closure_signal": "Closing",
"Squeeze_signal": "Squeeze", }
"Welding_signal": "Welding",
"Release_signal": "Relief"
}

View File

@ -8,13 +8,14 @@ class ConstantCalculator(AutoConfigClass):
super().__init__(ConstantCalculator.params_list, operator_config, system_config) super().__init__(ConstantCalculator.params_list, operator_config, system_config)
for param, value in self.__dict__.items():
if value is int and value < 0:
raise Exception("""Недопустимое значение параметра {param} < 0""")
def calc(self): def calc(self):
constants = {} constants = {}
#self.smin1t = self.smin1 - self.dblock / 2 #self.smin1t = self.smin1 - self.dblock / 2
#self.smin2t = self.smin2 - self.dblock / 2 #self.smin2t = self.smin2 - self.dblock / 2
#self.awork = self.umax / (self.l * self.m)
#self.fl = self.Fd * (1-self.kturn)
#self.flon = self.Fd * self.kturn
constants["Fprop"] = self.k_prop * self.force_target constants["Fprop"] = self.k_prop * self.force_target
constants["freq"] = sqrt(self.k_hardness_1 / self.mass_1) constants["freq"] = sqrt(self.k_hardness_1 / self.mass_1)
constants["eff_control"] = self.torque_max_1 / self.transmission_ratio_1 constants["eff_control"] = self.torque_max_1 / self.transmission_ratio_1

View File

@ -116,7 +116,8 @@ class OptAlgorithm(AutoConfigClass):
dF0 = self.a_max_1 * self.Ts["tclose_1_acc"] * self.k_hardness_1 dF0 = self.a_max_1 * self.Ts["tclose_1_acc"] * self.k_hardness_1
dFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow) dFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
self.Fmeet = 1/ self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - dFmax**2) eps = 1e1
self.Fmeet = 1/ self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - dFmax**2 + eps)
tspeed = self.Ts["tspeed"] tspeed = self.Ts["tspeed"]
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * dF0 * sin(self.freq * tspeed) Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * dF0 * sin(self.freq * tspeed)
if t < self.Ts["tspeed"]: if t < self.Ts["tspeed"]:

View File

@ -51,12 +51,10 @@ class OptTimeCalculator(AutoConfigClass):
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2) t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 + offset T2 = t22 + 2 * t21 + offset
Topen = max(T1, T2) topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1)
topen_1_acc, topen_1_speed = self.calcFirstOpen(Topen, l1+Fs1)
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1) offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
topen_2_acc, topen_2_speed = self.calcSecondOpen(Topen - offset, l2) topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
self.allTimes["topen_1_acc"] = topen_1_acc self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_2_offset"] = offset self.allTimes["topen_2_offset"] = offset
@ -67,11 +65,13 @@ class OptTimeCalculator(AutoConfigClass):
self.allTimes["topen_2_acc"] = topen_2_acc self.allTimes["topen_2_acc"] = topen_2_acc
self.allTimes["topen_2_speed"] = topen_2_speed self.allTimes["topen_2_speed"] = topen_2_speed
if s1 > l1: if s1 >= l1:
raise ValueError("S1 > L1 - недопустимый сценарий") raise Exception("""S1 >= L1 - недопустимый сценарий,
if s2 > l2: проверьте dist_open_after_1, dist_close_end_1""")
raise ValueError("S2 > L2 - недопустимый сценарий") if s2 >= l2:
raise Exception("""S2 >= L2 - недопустимый сценарий,
проверьте dist_open_after_2, dist_close_end_2""")
s1 += Fs1 s1 += Fs1
topen_1_mark = sqrt(2 * s1 / self.a_max_1) topen_1_mark = sqrt(2 * s1 / self.a_max_1)
if topen_1_mark > topen_1_acc: if topen_1_mark > topen_1_acc:
@ -95,7 +95,6 @@ class OptTimeCalculator(AutoConfigClass):
self.allTimes["topen_1_mark"] = topen_1_mark self.allTimes["topen_1_mark"] = topen_1_mark
self.allTimes["topen_2_mark"] = topen_2_mark self.allTimes["topen_2_mark"] = topen_2_mark
self.allTimes["topen"] = Topen
def Tgrow(self) -> None: def Tgrow(self) -> None:
@ -103,13 +102,19 @@ class OptTimeCalculator(AutoConfigClass):
vF0 = v0 * self.k_hardness_1 vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow) vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
l = sqrt(self.eff_control ** 2 + self.mass_1/self.k_hardness_1 * vF0**2)
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0) L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0)
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L)) tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed) Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed)
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2) eps = 1e1
if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps:
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
, проверьте параметры k_hardness_1, mass_1, k_prop""")
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps)
Fstart_prop = self.Fstart_prop Fstart_prop = self.Fstart_prop
if Fmeet > Fstart_prop:
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
, проверьте параметры v_max_1, k_prop""")
tmeet = (Fmeet - Fspeed)/vFmax tmeet = (Fmeet - Fspeed)/vFmax
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet) tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2) vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2)
@ -133,14 +138,15 @@ class OptTimeCalculator(AutoConfigClass):
v0s = [] v0s = []
pos0s = [] pos0s = []
for i in range(1,3): for i in range(1,3):
tq = tmark if tmark < 0:
assert tq > 0 raise Exception("""Отрицательное время этапа раскрытия,
v0 = closeAlgo("V"+str(i), "Open", tq) проверьте dist_open_after_{1,2}, time_command""")
v0 = closeAlgo("V"+str(i), "Open", tmark)
v0s.append(v0) v0s.append(v0)
x0 = closeAlgo("X"+str(i), "Open", tq) x0 = closeAlgo("X"+str(i), "Open", tmark)
x1 = contact[i-1] - self.__dict__["dist_close_end_"+str(i)] x1 = contact[i-1] - self.__dict__["dist_close_end_"+str(i)]
x = x1 - x0 x = x1 - x0
pos0s.append(closeAlgo("X"+str(i), "Open", tq)) pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
Tfull = self.time_robot_movement Tfull = self.time_robot_movement
@ -156,7 +162,9 @@ class OptTimeCalculator(AutoConfigClass):
t3 = (Tfull + v0 / a) / 2 t3 = (Tfull + v0 / a) / 2
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2) sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
assert sqrtval >= 0 if sqrtval < 0:
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
проверьте dist_open_after_{i}, dist_close_end_{i}, time_command, time_robot_movement""")
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2) t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
t1 = min(t1max, (vmax- abs(v0))/a) t1 = min(t1max, (vmax- abs(v0))/a)
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a))) t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
@ -165,7 +173,7 @@ class OptTimeCalculator(AutoConfigClass):
t5max = (Tfull - v0/a)/2 - t1 t5max = (Tfull - v0/a)/2 - t1
v1 = v0 + a * t1 v1 = v0 + a * t1
S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2 S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2
S2max = abs(Sfull) + abs(S1) S2max = Sfull + S1
t5 = min(t5max, (vmax)/a, sqrt(S2max / a)) t5 = min(t5max, (vmax)/a, sqrt(S2max / a))
t3 = abs(v0)/a + t1 + t5 t3 = abs(v0)/a + t1 + t5
t32 = t5 t32 = t5
@ -181,7 +189,7 @@ class OptTimeCalculator(AutoConfigClass):
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1)) t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1))
t4 = max(0, Sleft/v3 + v1/v3 * t2) t4 = max(0, Sleft/v3 + v1/v3 * t2)
tstay = Tfull - t1 - t2 - t3 - t4 - t5 tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
self.allTimes["tmovement_"+str(i)+"_acc"] = t1 self.allTimes["tmovement_"+str(i)+"_acc"] = t1
self.allTimes["tmovement_"+str(i)+"_speed"] = t2 self.allTimes["tmovement_"+str(i)+"_speed"] = t2

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,16 @@
from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import pyqtSignal
from src.utils.base.base import BaseController
class Controller(BaseController):
signal_widgets = pyqtSignal(list)
signal_settings = pyqtSignal(list)
def send_widgets(self, widgets: list[QWidget]) -> None:
self.signal_widgets.emit(widgets)
def push_settings(self, settings: list[dict]) -> None:
self.signal_settings.emit(settings)

View File

@ -0,0 +1,20 @@
import pandas as pd
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
pd.set_option('future.no_silent_downcasting', True)
from src.utils.base.base import BaseDataConverter
class DataConverter(BaseDataConverter):
@staticmethod
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
return dataframe
def convert_data(self, files: list[str]) -> None:
dataframes = [pd.read_csv(file) for file in files]
converted_dataframes = list(map(self._replace_bool, dataframes))
self._mediator.notify(self, converted_dataframes)

View File

@ -0,0 +1,28 @@
import pandas as pd
from typing import Union
from PyQt5.QtWidgets import QWidget
from src.utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget
class Mediator(BaseMediator):
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[QWidget]]):
if issubclass(source.__class__, BaseDirectoryMonitor):
self._converter.convert_data(data)
if issubclass(source.__class__, BaseDataConverter):
self._plot.build(data)
if issubclass(source.__class__, BasePlotWidget):
self._controller.send_widgets(data)
def push_settings(self, settings: list[dict]):
self._monitor.update_settings(settings)
self._plot.update_settings(settings)
self._monitor.force_all_dir()

51
src/controller/monitor.py Normal file
View File

@ -0,0 +1,51 @@
from time import sleep
import os
from loguru import logger
from src.utils.base.base import BaseDirectoryMonitor
class DirectoryMonitor(BaseDirectoryMonitor):
def _init_state(self):
files = os.listdir(self._directory_path)
self._files = files
self.update_timer.timeout.connect(self._monitor)
logger.info("Monitor initiated!")
def _monitor(self):
files = os.listdir(self._directory_path)
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
filter(lambda x: x not in self._files, files))))
if new_files:
logger.info(f"New files detected: {new_files}")
self._mediator.notify(self, new_files)
self._files = files
if not files:
self._files = []
def update_settings(self, data: list[dict]) -> None:
self.stop()
operator_params, system_params = data
self._directory_path = system_params['trace_storage_path']
self._update_time = system_params['monitor_update_period']
self._init_state()
self.start()
def force_all_dir(self):
files = os.listdir(self._directory_path)
self._files = files
all_files = []
for x in self._files:
path = os.path.join(self._directory_path, x)
all_files.append(path)
if all_files:
logger.info(f"Plotting all files: {all_files}")
self._mediator.notify(self, all_files)
else:
logger.info(f"Failed pushing {str(all_files)}")

View File

@ -1,3 +1,2 @@
from .plot_window import Plotter from .plotter import PlotWidget
from .settings_window import settingsWindow from .settings_window import settingsWindow
from .app import tabWidgetGenerator

Binary file not shown.

Binary file not shown.

View File

@ -1,85 +0,0 @@
import pyqtgraph as pg
from utils import read_json, DiagramParser
from uml import Request, UMLCreator
from OptAlgorithm import OptAlgorithm
from gui import Plotter, settingsWindow
class tabWidgetGenerator:
def __init__(self, directory_to_save):
self.UMLgenerator = Request(server_url='http://www.plantuml.com/plantuml/svg/')
self.uml_creator = UMLCreator(request_generator=self.UMLgenerator, path_to_save=directory_to_save)
self.operator_params = read_json("params/operator_params.json")
self.system_params = read_json("params/system_params.json")
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._update)
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._update)
self.paths = []
self.plotters = []
self.bool_dicts = []
self.float_dicts = []
self.timings_dicts = []
self.modes = []
self.names = []
def get_widget(self, path):
self.paths.append(path)
self.plotters.append(Plotter(show_settings_func=self._show_settings))
self._getParsedData(path)
self._update()
return self.plotters[-1].widget
def _get_ideal_timings(self, opt: OptAlgorithm) -> list[float]:
data = opt.Ts
ideal_time = [data['tclose'], data['tgrow'], opt.getMarkOpen(), data["tmovement"]]
return ideal_time
def _getParsedData(self, path):
self.names.append(path)
parser = DiagramParser(system_config=self.system_params)
parser.setData(path)
self.bool_dicts.append(parser.getBoolDict())
self.float_dicts.append(parser.getFloatDict())
self.timings_dicts.append(parser.getRealTimings())
self.modes.append(parser.getMode())
def _update(self, _ = None):
self.operator_params = self.operSettings.getParams()
self.system_params = self.sysSettings.getParams()
opt_algorithm = OptAlgorithm(operator_config=self.operator_params, system_config=self.system_params)
ideal_times = self._get_ideal_timings(opt_algorithm)
for i in range (len(self.plotters)):
self.plotters[i].update_data(operator_config=self.operator_params,
system_config=self.system_params,
opt=opt_algorithm,
bool_dict=self.bool_dicts[i],
ideal_time=ideal_times,
float_dict=self.float_dicts[i],
mode = self.modes[i],
timings_dict=self.timings_dicts[i])
self.uml_creator.update_uml(operator_config=self.operator_params,
system_config=self.system_params,
ideal_time=ideal_times,
bool_dict=self.bool_dicts[i],
float_dict=self.float_dicts[i],
mode = self.modes[i],
timings_dict=self.timings_dicts[i],
name = self.names[i])
def _show_settings(self):
self.operSettings.show()
self.sysSettings.show()
if __name__ == '__main__':
pg.mkQApp("Plotting")
temp = tabWidgetGenerator("")
widget = temp.get_widget("trace_samples/2024_11_08-19_30_49.csv")
widget.show()
pg.exec()

68
src/gui/mainGui.py Normal file
View File

@ -0,0 +1,68 @@
from datetime import datetime as dt
from typing import Optional
from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt
from src.utils.base.base import BaseMainWindow, BaseController
from src.gui.settings_window import settingsWindow
class MainWindow(BaseMainWindow):
def __init__(self,
controller: Optional[BaseController] = None):
super().__init__()
self._controller = controller
self.initUI()
self.set_style(self)
self.settings_button.clicked.connect(self._show_settings)
self.operSettings = settingsWindow("params\operator_params.json", 'Operator', self._updater_trigger)
self.sysSettings = settingsWindow("params\system_params.json", 'System', self._updater_trigger)
def initUI(self) -> None:
self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.tabWidget)
self.settings_button = QtWidgets.QPushButton("Show settings")
self.settings_button.setFixedWidth(160)
layout.addWidget(self.settings_button)
self.setLayout(layout)
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
for plot_widget in plot_widgets:
tab = QtWidgets.QWidget()
grid = QtWidgets.QGridLayout()
grid.addWidget(plot_widget)
tab.setLayout(grid)
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
self.tabWidget.setCurrentWidget(tab)
tab_count = self.tabWidget.count()
if tab_count > 10:
for i in range(0, tab_count-2):
self._close_tab(i)
def keyPressEvent(self, a0):
if a0.key() == Qt.Key_F5:
self.clear()
def _show_settings(self):
self.operSettings.show()
self.sysSettings.show()
def push_settings(self) -> None:
self._updater_trigger()
def _updater_trigger(self) -> None:
self.tabWidget.clear()
operator_params = self.operSettings.getParams()
system_params = self.sysSettings.getParams()
self._controller.push_settings([operator_params, system_params])
def _close_tab(self, index:int) -> None:
self.tabWidget.removeTab(index)

View File

@ -1,307 +0,0 @@
import pyqtgraph as pg
from pyqtgraph.Qt import QtWidgets
from PyQt5.QtCore import Qt
import numpy as np
from gui import qt_settings as qts
from OptAlgorithm import OptAlgorithm
class Plotter:
def __init__(self, show_settings_func):
pg.setConfigOptions(antialias=True)
self.alpha = 100 #[0-255 прозрачность фона]
self._init_ui()
self.settings_button.clicked.connect(show_settings_func)
def update_data(self,
system_config : dict,
operator_config: dict,
opt: OptAlgorithm,
ideal_time: list[float],
bool_dict: dict,
float_dict: dict,
timings_dict: dict,
mode: bool):
self.opt = opt
self.bool_dict = bool_dict
self.float_dict = float_dict
self.timings_dict = timings_dict
self.idealTime = ideal_time
self.theor_mode = mode
self.scaler = int(system_config['UML_time_scaler'])
self.WeldTime = operator_config['time_wielding'] #[sec]
self.WeldData = self.opt.calcPhaseGrow(self.idealTime[1])
self._updatePlots()
def _init_ui(self):
self.widget = QtWidgets.QWidget()
layout = QtWidgets.QVBoxLayout()
self.widget.setLayout(layout)
self.win = pg.GraphicsLayoutWidget(show=True, title="")
self.win.resize(1000,600)
self.win.setWindowTitle('')
layout.addWidget(self.win)
self.settings_button = QtWidgets.QPushButton("Show settings")
self.settings_button.setFixedWidth(160)
info_layout = QtWidgets.QHBoxLayout()
layout.addLayout(info_layout)
info_layout.addWidget(self.settings_button, alignment = Qt.AlignLeft)
info_layout.setSpacing(20)
self.efficiency = QtWidgets.QLabel()
info_layout.addWidget(self.efficiency, alignment = Qt.AlignRight)
self.p11, self.l11 = self._init_graph('Electrode force, closure', 'Force', 'N', 'Time', 'ms')
#self.p21, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
#self.p31, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
self.win.nextRow()
self.p12, self.l12 = self._init_graph('Rotor Position, closure', 'Position', 'mm', 'Time', 'ms')
#self.p22, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
#self.p32, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
self.win.nextRow()
self.p13, self.l13 = self._init_graph('Rotor Speed, closure', 'Speed', 'mm/s', 'Time', 'ms')
#self.p23, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
#self.p33, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
self.win.nextRow()
self.p12.setXLink(self.p11)
self.p13.setXLink(self.p11)
self.p11.setAutoVisible(x=False, y=True)
self.p12.setAutoVisible(x=False, y=True)
self.p13.setAutoVisible(x=False, y=True)
self.widget.setStyleSheet(qts.dark_style)
def _init_graph(self, title, Yname, Yunits, Xname, Xunits):
plot = self.win.addPlot(title = title)
plot.showGrid(x=True, y=True)
plot.setLabel('left', Yname, units=Yunits)
plot.setLabel('bottom', Xname, units=Xunits)
legend1 = pg.LegendItem((80,60), offset=(70,20))
legend1.setParentItem(plot)
return plot, legend1
def _updatePlots(self):
self.p11.clear()
self.l11.clear()
self.p12.clear()
self.l12.clear()
self.p13.clear()
self.l13.clear()
if not self.theor_mode:
self._plotRealData()
self._form_idealdatGraph()
self._calcOurScore()
def _calcOurScore (self):
success = []
start = np.array(self.timings_dict["closure"])[:, 0]
end = np.array(self.timings_dict["opening"])[:, 1]
points_timings = end-start
ideal_time = sum(self.idealTime[:3])+self.WeldTime
for point in points_timings:
success.append(int(ideal_time/point * 100))
if len(success) > 1:
maxS = max(success)
minS = min(success)
average = int(sum(success[:])/len(success))
self.efficiency.setText(f'Efficiency Maximum: {maxS}%, Average: {average}%, Minimum: {minS}%' )
else:
self.efficiency.setText(f'Efficiency Maximum: {success[0]}' )
self.efficiency.setStyleSheet(qts.BigSuccessLabel)
def _form_idealdatGraph(self):
if self.theor_mode:
self.timings_dict["closure"] = [[0, self.idealTime[0]]]
self.timings_dict["compression"] = [[self.idealTime[0], sum(self.idealTime[:2])]]
self.timings_dict["welding"] = [[sum(self.idealTime[:2]), sum(self.idealTime[:2])+self.WeldTime]]
self.timings_dict["opening"] = [[sum(self.idealTime[:2])+self.WeldTime, sum(self.idealTime[:3])+self.WeldTime]]
delta = 10 #points_per_ms
for key, items in self.timings_dict.items():
for item in items:
item_data = []
time_data = []
if key == 'closure':
ideal_time = self.idealTime[0]
calc = self.opt.calcPhaseClose
color = qts.RGBA[0]
for i in range(0, int(ideal_time*self.scaler)*delta):
time = i/delta
item_data.append(calc(time/self.scaler))
time_data.append(time+item[0]*self.scaler)
#print (item_data[-1], time_data[-1])
self._plotIdealData(np.array(time_data), np.array(item_data).T)
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
elif key == 'compression':
ideal_time = self.idealTime[1]
calc = self.opt.calcPhaseGrow
color = qts.RGBA[1]
for i in range(int(ideal_time*self.scaler)*delta, 0, -1):
time = i/delta
item_data.append(calc(time/self.scaler))
time_data.append(item[1]*self.scaler-(ideal_time*self.scaler-time))
#print (item_data[-1], time_data[-1])
self._plotIdealData(np.array(time_data), np.array(item_data).T)
self._addBackgroundSplitter([(item[1]-ideal_time)*self.scaler, item[1]*self.scaler], color)
temp = item_data[0][4]
x = [time_data[0], time_data[-1], time_data[-1]-0.0001]
y = [temp, temp, temp]
a1, b1, c1 = self._calculate_equidistant(x, y, 2.5, 3)
self.p11.addItem(a1)
self.p11.addItem(b1)
self.p11.addItem(c1)
elif key == 'welding':
ideal_time = self.WeldTime
calc = self._returnWeldData
color = qts.RGBA[2]
for i in range(0, int(ideal_time*self.scaler)*delta):
time = i/delta
item_data.append(calc(time/self.scaler))
time_data.append(time+item[0]*self.scaler)
#print (item_data[-1], time_data[-1])
self._plotIdealData(np.array(time_data), np.array(item_data).T)
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
x = [time_data[0], time_data[-1], time_data[-1]+0.0001]
y = [item_data[0][4], item_data[0][4], item_data[0][4]]
a1, b1, c1 = self._calculate_equidistant(x, y, 0.75, 3)
self.p11.addItem(a1)
self.p11.addItem(b1)
self.p11.addItem(c1)
elif key == 'opening':
calc = self.opt.calcPhaseOpen
ideal_time = self.idealTime[2]
ideal_closure = self.idealTime[3]
color = qts.RGBA[3]
color_closure = qts.RGBA[4]
for i in range(0, int(ideal_time*self.scaler)*delta):
time = i/delta
item_data.append(calc(time/self.scaler))
time_data.append(time+item[0]*self.scaler)
#print (item_data[-1], time_data[-1])
self._plotIdealData(np.array(time_data), np.array(item_data).T)
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
item_data = []
time_data = []
for i in range(0, int(ideal_closure*self.scaler)*delta):
time = i/delta
item_data.append(self.opt.calcPhaseMovement(time/self.scaler))
time_data.append(time+item[1]*self.scaler)
self._plotIdealData(np.array(time_data), np.array(item_data).T)
self._addBackgroundSplitter([item[1]*self.scaler,item[1]*self.scaler + time], color_closure)
def _returnWeldData(self, _):
return self.WeldData
def _plotRealData(self):
for i, (key, dat) in enumerate(self.float_dict.items()):
dat = np.array(dat).T
dat[0] = dat[0]*self.scaler
curve = pg.PlotDataItem(dat[0], dat[1], pen=pg.mkPen(color=qts.colors[i], width=2), name=key, autoDownsample=True, downsample=True)
if 'Electrode Force' in key:
self.p11.addItem(curve)
self.l11.addItem(curve, key)
elif 'Rotor Position' in key:
self.p12.addItem(curve)
self.l12.addItem(curve, key)
elif 'Rotor Speed' in key:
self.p13.addItem(curve)
self.l13.addItem(curve, key)
return dat[0]
def _plotIdealData(self, time, data):
x_fe = pg.PlotDataItem(time, data[0]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='x_fe', autoDownsample=True, downsample=True)
x_me = pg.PlotDataItem(time, data[1]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='x_me', autoDownsample=True, downsample=True)
v_fe = pg.PlotDataItem(time, data[2]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='v_fe', autoDownsample=True, downsample=True)
v_me = pg.PlotDataItem(time, data[3]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='v_me', autoDownsample=True, downsample=True)
f = pg.PlotDataItem(time, data[4], pen=pg.mkPen(color=qts.colors[8], width=2), name='f', autoDownsample=True, downsample=True)
self.p11.addItem(f)
#self.l11.addItem(f, 'Ideal force')
self.p12.addItem(x_fe)
#self.l12.addItem(x_fe, 'FE POS')
self.p12.addItem(x_me)
#self.l12.addItem(x_me, 'ME POS')
self.p13.addItem(v_fe)
#self.l13.addItem(v_fe, 'FE VEL')
self.p13.addItem(v_me)
#self.l13.addItem(v_me, 'ME VEL')
#self._addBackgroundSplitter()
#self._addEquidistances(time, data)
def _addBackgroundSplitter(self, x, color):
alpha = self.alpha
y01 = np.array([10000, 10000])
y0_1 = np.array([-10000, -10000])
a01 = pg.PlotDataItem(x, y01, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
a0_1 = pg.PlotDataItem(x, y0_1, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
bg1 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
bg2 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
bg3 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
self.p11.addItem(bg1)
self.p12.addItem(bg2)
self.p13.addItem(bg3)
self.p11.setYRange(-1000, 5000)
self.p12.setYRange(-50, 250)
self.p13.setYRange(-400, 400)
def _makeFiller(self, x1, y1, x2, y2, color):
alpha = self.alpha
eq1 = pg.PlotDataItem(x1, y1, pen=pg.mkPen(color='#000000', width=1))
eq2 = pg.PlotDataItem(x2, y2, pen=pg.mkPen(color='#000000', width=1))
bg = pg.FillBetweenItem(eq1, eq2, qts.RGBA[color]+(alpha,))
return eq1, eq2, bg
def _calculate_equidistant(self, x, y, percent, color):
if len(x) != len(y):
raise ValueError("x и y должны быть одного размера")
distance = max(y)/100*percent
x_eq1 = []
y_eq1 = []
x_eq2 = []
y_eq2 = []
for i in range(0, len(x) - 1):
dx = x[i + 1] - x[i]
dy = y[i + 1] - y[i]
length = np.sqrt(dx ** 2 + dy ** 2)
sinA = dy/length
sinB = dx/length
nx = -sinA*distance
ny = sinB*distance
x_eq1.append(x[i] + nx)
y_eq1.append(y[i] + ny)
x_eq2.append(x[i] - nx)
y_eq2.append(y[i] - ny)
return self._makeFiller(np.array(x_eq1), np.array(y_eq1), np.array(x_eq2), np.array(y_eq2), color)

210
src/gui/plotter.py Normal file
View File

@ -0,0 +1,210 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
import pyqtgraph as pg
import numpy as np
from numpy import floating
from typing import Optional, Any, NamedTuple
from src.utils.base.base import BasePlotWidget
from src.utils.base.base import BaseIdealDataBuilder
class idealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
def get_compressionDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
def get_openingDF(self) -> pd.DataFrame:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
def get_oncomingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
def get_weldingDF(self) -> pd.DataFrame:
data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float, float, float, float]:
data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen()] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
return ideal_timings
class ProcessStage(NamedTuple):
mean_value: floating[Any]
start_index: int
finish_index: int
class PlotWidget(BasePlotWidget):
def _create_curve_ideal(self,
stage: str,
signal: str,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
data = self._stage_ideals[stage]
if start_timestamp and finish_timestamp:
plot = pg.PlotDataItem(x=start_timestamp+data["time"], y=data[signal["name"]], pen=signal["pen"])
return plot
return None
def _create_stage_region(self,
stage: str,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.LinearRegionItem]:
if start_timestamp and finish_timestamp:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
region.setBrush(pg.mkBrush(self._stage_colors[stage]))
return region
return None
def _get_timestamp(self,
stage: str,
times: pd.Series,
dataframe: pd.DataFrame) -> Optional[list[float]]:
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
if start_index.size:
start_timestamp = times[start_index[0]]
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
return start_timestamp, finish_timestamp
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
plot_widget.showGrid(x=True, y=True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def get_stage_info(self,
stage: str,
dataframe: pd.DataFrame,
signal_name: str) -> Optional[ProcessStage]:
if stage in self._stages:
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else []
if data.size and start_index.size:
start = start_index[0]
finish = finish_index[0] if finish_index.size else (len(data) - 1)
data_slice = data[start:finish]
mean = np.mean(data_slice)
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
return None
def _build_widget(self, dataframe: pd.DataFrame) -> QWidget:
widget = QWidget()
layout = QVBoxLayout()
time_axis = dataframe["time"]
dataframe_headers = dataframe.columns.tolist()
for channel, description in self._plt_channels.items():
plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"]
if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]):
for stage in self._stages:
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
region = self._create_stage_region(stage, start_timestamp, finish_timestamp)
if region:
plot_widget.addItem(region)
for signal in description["Ideal_signals"]:
ideal_plot = self._create_curve_ideal(stage, signal, start_timestamp, finish_timestamp)
if ideal_plot:
plot_widget.addItem(ideal_plot)
end_timestamp = time_axis[len(time_axis) - 1]
region = self._create_stage_region("Oncoming", finish_timestamp, end_timestamp)
if region:
plot_widget.addItem(region)
for signal in description["Ideal_signals"]:
ideal_plot = self._create_curve_ideal("Oncoming", signal, finish_timestamp, end_timestamp)
if ideal_plot:
plot_widget.addItem(ideal_plot)
if settings["performance"] and all([stage in dataframe_headers for stage in self._stages]):
delta_timestamp = 0
for stage in self._stages:
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
delta_timestamp += finish_timestamp - start_timestamp
ideal_delta = self._opt.get_cycle_time()
performance = round(ideal_delta/delta_timestamp*100, 2)
performance_label = QLabel(f"Performance = {performance} %")
layout.addWidget(performance_label)
if settings["zoom"]:
if max(time_axis) < 5.0:
stages = [self.get_stage_info("Welding",
dataframe,
signal["name"]) for signal in description["Real_signals"]]
if stages:
means_raw = [stage.mean_value for stage in stages]
mean = max(means_raw)
start = time_axis[stages[0].start_index]
finish = time_axis[stages[0].finish_index]
overshoot = pg.BarGraphItem(x0=0,
y0=mean - mean * 0.05,
height=mean * 0.05 * 2,
width=start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(overshoot)
stable = pg.BarGraphItem(x0=start,
y0=mean - mean * 0.015,
height=mean * 0.015 * 2,
width=finish - start,
brush=pg.mkBrush([0, 250, 0, 100]))
plot_widget.addItem(stable)
plot_widget.setYRange(mean - 260, mean + 260)
plot_widget.setInteractive(False)
else:
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]])
region = pg.LinearRegionItem([max_value - max_value * 0.015,
max_value + max_value * 0.015],
movable=False,
orientation="horizontal")
region.setBrush(pg.mkBrush([0, 250, 0, 100]))
plot_widget.setYRange(max_value - 200, max_value + 200)
plot_widget.setXRange(3.5, 4.5)
plot_widget.addItem(region)
plot_widget.setInteractive(False)
for signal in description["Real_signals"]:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(time_axis, dataframe[signal["name"]], pen=signal["pen"])
legend.addItem(plot, signal["name"])
layout.addWidget(plot_widget)
widget.setLayout(layout)
return widget
def build(self, data: list[pd.DataFrame]) -> None:
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)
def update_settings(self, data: list[dict]):
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)

View File

@ -17,10 +17,10 @@ class settingsWindow(QtWidgets.QWidget):
self._init_ui() self._init_ui()
self.params.sigTreeStateChanged.connect(upd_func) self.params.sigTreeStateChanged.connect(upd_func)
def load_settings(self): def load_settings(self) -> None:
self.data = read_json(self.settingsPath) self.data = read_json(self.settingsPath)
def write_settings(self): def write_settings(self) -> None:
self.getParams() self.getParams()
write_json(self.settingsPath, self.data) write_json(self.settingsPath, self.data)
@ -31,7 +31,7 @@ class settingsWindow(QtWidgets.QWidget):
params.append({'name': 'Save', 'type': 'action'}) params.append({'name': 'Save', 'type': 'action'})
return params return params
def _init_ui(self): def _init_ui(self) -> None:
temp = self._getTreeStructure() temp = self._getTreeStructure()
self.params = Parameter.create(name=self.name, type='group', children=temp) self.params = Parameter.create(name=self.name, type='group', children=temp)
self.params.param('Save').sigActivated.connect(self.write_settings) self.params.param('Save').sigActivated.connect(self.write_settings)

View File

@ -1,115 +1,31 @@
import sys import sys
import os from PyQt5 import QtWidgets
import time
from PyQt5.QtWidgets import (
QApplication,
QMainWindow,
QTabWidget,
QWidget,
QVBoxLayout,
QLabel,
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QFileSystemWatcher
from PyQt5.QtGui import QIcon
from gui import qt_settings as qts
# Импортируйте ваш класс `app` здесь from src.gui.mainGui import MainWindow
# Предполагается, что класс `app` предоставляет интерфейс для отображения данных from src.controller.monitor import DirectoryMonitor
# Если `app` не предоставляет виджет, возможно, потребуется его модифицировать from src.controller.mediator import Mediator
from gui.app import tabWidgetGenerator from src.controller.converter import DataConverter
from src.gui.plotter import PlotWidget
from src.controller.controller import Controller
class DirectoryWatcher(QObject):
file_created = pyqtSignal(str)
def __init__(self, directory_path):
super().__init__()
self.directory_path = directory_path
self.watcher = QFileSystemWatcher()
self.watcher.addPath(self.directory_path)
self.existing_files = set(os.listdir(self.directory_path))
self.watcher.directoryChanged.connect(self.on_directory_changed)
def on_directory_changed(self):
try:
current_files = set(os.listdir(self.directory_path))
new_files = current_files - self.existing_files
self.existing_files = current_files
for file in new_files:
if file.lower().endswith(".csv"):
full_path = os.path.join(self.directory_path, file)
self.file_created.emit(full_path)
except Exception as e:
print(f"Ошибка при обработке изменений директории: {e}")
class MainWindow(QMainWindow):
def __init__(self, directory_to_watch):
super().__init__()
self.setWindowTitle("Мониторинг CSV-файлов")
self.setGeometry(100, 100, 800, 600)
self.tabs = QTabWidget()
self.tabs.setStyleSheet(qts.dark_style)
self.tabGen = tabWidgetGenerator(directory_to_watch)
self.handle_new_file()
self.setCentralWidget(self.tabs)
# self.setWindowIcon(QIcon("path_to_icon.png"))
self.start_directory_watcher(directory_to_watch)
def start_directory_watcher(self, directory_path):
self.watcher_thread = QThread()
self.watcher = DirectoryWatcher(directory_path)
self.watcher.moveToThread(self.watcher_thread)
self.watcher.file_created.connect(self.handle_new_file)
self.watcher_thread.started.connect(self.watcher.on_directory_changed)
self.watcher_thread.start()
def handle_new_file(self, file_path = None):
time.sleep(0.2)
if file_path:
file_name = os.path.basename(file_path)
else:
file_path = None
file_name = 'Ideal'
tab_widget = self.tabGen.get_widget(path=file_path)
tab = QWidget()
layout = QVBoxLayout()
layout.addWidget(tab_widget)
label = QLabel(f"{file_name}")
label.setAlignment(Qt.AlignCenter)
layout.addWidget(label)
tab.setLayout(layout)
self.tabs.addTab(tab, file_name)
def closeEvent(self, event):
self.watcher_thread.quit()
self.watcher_thread.wait()
event.accept()
def main(): def main():
directory_to_watch = "/home/andrei/Desktop/bla" app = QtWidgets.QApplication(sys.argv)
monitor = DirectoryMonitor()
if not os.path.isdir(directory_to_watch): data_converter = DataConverter()
print(f"Директория не найдена: {directory_to_watch}") plot_widget_builder = PlotWidget()
sys.exit(1) controller = Controller()
window = MainWindow(controller)
app_instance = QApplication(sys.argv) mediator = Mediator(monitor, data_converter, plot_widget_builder, controller)
window = MainWindow(directory_to_watch)
window.show() window.show()
sys.exit(app_instance.exec_())
controller.signal_widgets.connect(window.show_plot_tabs)
controller.signal_settings.connect(mediator.push_settings)
window.push_settings()
sys.exit(app.exec_())
if __name__ == "__main__": if __name__ == '__main__':
main() main()

Binary file not shown.

Binary file not shown.

Binary file not shown.

338
src/utils/base/base.py Normal file
View File

@ -0,0 +1,338 @@
from __future__ import annotations
import os
from typing import Optional, Union
import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget
from src.OptAlgorithm import OptAlgorithm
import pandas as pd
class BaseMediator:
def __init__(self,
monitor: BaseDirectoryMonitor,
converter: BaseDataConverter,
plot: BasePlotWidget,
controller: BaseController):
self._monitor = monitor
self._monitor.mediator = self
self._converter = converter
self._converter.mediator = self
self._plot = plot
self._plot.mediator = self
self._controller = controller
self._controller.mediator = self
def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BaseMainWindow],
data: Union[list[str], list[pd.DataFrame], list[QWidget], list[dict]]):
...
def push_settings (self, data: list[dict]):
...
class BaseDirectoryMonitor:
update_timer = QTimer()
def __init__(self,
mediator: Optional[BaseMediator] = None):
super().__init__()
self._directory_path = None
self._update_time = None
self._mediator = mediator
@property
def directory_path(self) -> str:
return self._directory_path
@property
def update_time(self) -> int:
return self._update_time
@property
def files(self) -> list[str]:
return self._files
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
def _init_state(self):
files = os.listdir(self._directory_path)
self._files = files
def start(self):
self.update_timer.start(self._update_time)
def stop(self):
self.update_timer.stop()
def update_settings(self, data: list[dict]) -> None:
...
def force_all_dir(self):
...
class BaseDataConverter:
def __init__(self, mediator: Optional[BaseMediator] = None):
self._mediator = mediator
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
def convert_data(self, files: list[str]) -> None:
...
class BasePlotWidget:
def __init__(self,
mediator: Optional[BaseMediator] = None):
super().__init__()
self._mediator = mediator
self._stages = [
"Closing",
"Squeeze",
"Welding",
"Relief"
]
self._stage_colors = {
"Closing": [208, 28, 31, 100],
"Squeeze": [45, 51, 89, 150],
"Welding": [247, 183, 24, 100],
"Relief": [0, 134, 88, 100],
"Oncoming": [222, 184, 135, 100]
}
self._plt_channels = {
"Electrode Force, N & Welding Current, kA": {
"Settings": {
"zoom": False,
"stages": True,
"performance": True
},
"Real_signals": [
{
"name": "Electrode Force, N ME",
"pen": 'r',
},
{
"name": "Electrode Force, N FE",
"pen": 'w',
},
{
"name": "Welding Current ME",
"pen": "y",
}
],
"Ideal_signals": [
{
"name": "Force",
"pen": {'color': 'g', 'width':3},
}
]
},
"Electrode Force, N": {
"Settings": {
"zoom": True,
"stages": False,
"performance": False
},
"Real_signals": [
{
"name": "Electrode Force, N ME",
"pen": 'r',
},
{
"name": "Electrode Force, N FE",
"pen": 'w',
}
],
"Ideal_signals": [
{
"name": "Force",
"pen": {'color': 'r', 'width':3},
}
]
},
"Electrode Speed, mm/s": {
"Settings": {
"zoom": False,
"stages": True,
"performance": False
},
"Real_signals": [
{
"name": "Rotor Speed, mm/s ME",
"pen": 'r',
"zoom": False
},
{
"name": "Rotor Speed, mm/s FE",
"pen": 'w',
"zoom": False
}
],
"Ideal_signals": [
{
"name": "Rotor Speed ME",
"pen": {'color': 'y', 'width':3},
"zoom": False
},
{
"name": "Rotor Speed FE",
"pen": {'color': 'g', 'width':3},
"zoom": False
}
]
},
}
def _initIdealBuilder(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
data: list[dict] = None):
self.opt = idealDataBuilder(data)
self._stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
"Welding": self._opt.get_weldingDF(),
"Relief": self._opt.get_openingDF(),
"Oncoming": self._opt.get_oncomingDF()
}
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
self._opt = opt
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
...
def update_settings(self, data: list[dict]) -> None:
...
class BaseController(QObject):
def send_widgets(self, widgets: list[QWidget]) -> None:
...
def push_settings(self, settings: list[dict]) -> None:
...
# FIXME: WeldingDF показывает только 1 секунду
class BaseIdealDataBuilder(OptAlgorithm):
def __init__(self, data: list[dict]):
operator_params, system_params = data
self.mul = system_params['time_capture']
self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Posicion FE":X1*1000,"Posicion ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame:
...
def get_compressionDF(self) -> pd.DataFrame:
...
def get_openingDF(self) -> pd.DataFrame:
...
def get_tmovementDF(self) -> pd.DataFrame:
...
def get_weldingDF(self) -> pd.DataFrame:
...
def get_oncomingDF(self) -> pd.DataFrame:
...
def get_ideal_timings(self) -> list[float, float, float, float]:
...
def get_cycle_time(self) -> float:
result = sum(self.get_ideal_timings())
return result
class BaseMainWindow(QWidget):
def __init__(self,
controller: Optional[BaseController] = None):
super().__init__()
self._controller = controller
...
@property
def controller(self) -> BaseController:
return self._controller
@controller.setter
def controller(self, controller: BaseController) -> None:
self._controller = controller
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
object.setStyleSheet("""
QWidget {
background-color: #0D1117;
font-family: "Segoe UI", sans-serif;
font-size: 14px;
}
QMessageBox {
background-color: #161B22;
font-family: "Segoe UI", sans-serif;
font-size: 14px;
}
QPushButton {
background-color: #FFCC00;
color: #0D1117;
padding: 12px 25px;
border: 2px solid #E6B800;
border-radius: 8px;
font-family: "Segoe UI", sans-serif;
font-size: 16px;
font-weight: bold;
}
QPushButton:hover:!disabled {
background-color: #FFD700;
}
QPushButton:disabled {
background-color: #555555;
color: #cccccc;
border: none;
}
QLabel {
color: #ffffff;
font-size: 16px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}
""")

View File

@ -7,7 +7,8 @@ class DiagramParser:
self.signals = [system_config["Closure_signal"], self.signals = [system_config["Closure_signal"],
system_config["Squeeze_signal"], system_config["Squeeze_signal"],
system_config["Welding_signal"], system_config["Welding_signal"],
system_config["Release_signal"]] system_config["Release_signal"],
system_config["Oncomming_signal"]]
self.boolDict = {} self.boolDict = {}
self.floatDict = {} self.floatDict = {}
@ -29,14 +30,11 @@ class DiagramParser:
self.floatDict[signalName] = self._getFloatChanges(signalName) self.floatDict[signalName] = self._getFloatChanges(signalName)
for key, items in self.boolDict.items(): for key, items in self.boolDict.items():
if key == self.signals[0]: if key == self.signals[0]: name = "closure"
name = "closure" elif key == self.signals[1]: name = "compression"
elif key == self.signals[1]: elif key == self.signals[2]: name = "welding"
name = "compression" elif key == self.signals[3]: name = "opening"
elif key == self.signals[2]: elif key == self.signals[4]: name = "oncomming"
name = "welding"
elif key == self.signals[3]:
name = "opening"
self.timingsDict[name] = [] self.timingsDict[name] = []
len_items = len(items) len_items = len(items)