Compare commits
9 Commits
918663fa44
...
40767d813a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
40767d813a | ||
|
|
dcb05aad76 | ||
|
|
d0a146246e | ||
|
|
84d3e3a88d | ||
|
|
384d52309f | ||
|
|
056d9fedb8 | ||
|
|
67e4a9ea70 | ||
|
|
72b002dea3 | ||
|
|
a04517a15d |
@ -8,7 +8,7 @@
|
||||
"dist_close_end_1": 0.005,
|
||||
"dist_close_end_2": 0.005,
|
||||
"time_wielding": 1,
|
||||
"time_command": 0.06,
|
||||
"time_command": 0.0,
|
||||
"time_robot_movement": 0.2,
|
||||
"object_thickness": 0.0045,
|
||||
"force_target": 5000,
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
{
|
||||
"trace_storage_path": "D:/downloads/a22",
|
||||
"monitor_update_period": 100,
|
||||
"a_max_1": 5.41,
|
||||
"v_max_1": 0.278,
|
||||
"a_max_2": 35.81,
|
||||
@ -13,12 +15,8 @@
|
||||
"transmission_ratio_2": 1,
|
||||
"position_start_1": 0.08,
|
||||
"position_start_2": 0.08,
|
||||
"k_prop": 0.075,
|
||||
"time_capture": 10,
|
||||
"UML_time_scaler": 1000,
|
||||
"Closure_signal": "Closing",
|
||||
"Squeeze_signal": "Squeeze",
|
||||
"Welding_signal": "Welding",
|
||||
"Release_signal": "Relief",
|
||||
"Oncomming_signal": "Oncomming"
|
||||
"k_prop": 0.05,
|
||||
"time_capture": 100000,
|
||||
"UML_time_scaler": 1000
|
||||
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
src/controller/__pycache__/controller.cpython-310.pyc
Normal file
BIN
src/controller/__pycache__/controller.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/controller/__pycache__/converter.cpython-310.pyc
Normal file
BIN
src/controller/__pycache__/converter.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/controller/__pycache__/ideal_data_builder.cpython-310.pyc
Normal file
BIN
src/controller/__pycache__/ideal_data_builder.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/controller/__pycache__/mediator.cpython-310.pyc
Normal file
BIN
src/controller/__pycache__/mediator.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/controller/__pycache__/monitor.cpython-310.pyc
Normal file
BIN
src/controller/__pycache__/monitor.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/controller/__pycache__/monitor.cpython-311.pyc
Normal file
BIN
src/controller/__pycache__/monitor.cpython-311.pyc
Normal file
Binary file not shown.
16
src/controller/controller.py
Normal file
16
src/controller/controller.py
Normal file
@ -0,0 +1,16 @@
|
||||
from PyQt5.QtWidgets import QWidget
|
||||
from PyQt5.QtCore import pyqtSignal
|
||||
|
||||
from src.utils.base.base import BaseController
|
||||
|
||||
|
||||
class Controller(BaseController):
|
||||
|
||||
signal_widgets = pyqtSignal(list)
|
||||
signal_settings = pyqtSignal(list)
|
||||
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
self.signal_widgets.emit(widgets)
|
||||
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
self.signal_settings.emit(settings)
|
||||
20
src/controller/converter.py
Normal file
20
src/controller/converter.py
Normal file
@ -0,0 +1,20 @@
|
||||
import pandas as pd
|
||||
|
||||
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
|
||||
pd.set_option('future.no_silent_downcasting', True)
|
||||
|
||||
from src.utils.base.base import BaseDataConverter
|
||||
|
||||
|
||||
class DataConverter(BaseDataConverter):
|
||||
|
||||
@staticmethod
|
||||
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
|
||||
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
|
||||
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
|
||||
return dataframe
|
||||
|
||||
def convert_data(self, files: list[str]) -> None:
|
||||
dataframes = [pd.read_csv(file) for file in files]
|
||||
converted_dataframes = list(map(self._replace_bool, dataframes))
|
||||
self._mediator.notify(self, converted_dataframes)
|
||||
28
src/controller/mediator.py
Normal file
28
src/controller/mediator.py
Normal file
@ -0,0 +1,28 @@
|
||||
import pandas as pd
|
||||
|
||||
from typing import Union
|
||||
from PyQt5.QtWidgets import QWidget
|
||||
|
||||
from src.utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget
|
||||
|
||||
|
||||
class Mediator(BaseMediator):
|
||||
|
||||
def notify(self,
|
||||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget],
|
||||
data: Union[list[str], list[pd.DataFrame], list[QWidget]]):
|
||||
if issubclass(source.__class__, BaseDirectoryMonitor):
|
||||
self._converter.convert_data(data)
|
||||
|
||||
if issubclass(source.__class__, BaseDataConverter):
|
||||
self._plot.build(data)
|
||||
|
||||
if issubclass(source.__class__, BasePlotWidget):
|
||||
self._controller.send_widgets(data)
|
||||
|
||||
def push_settings(self, settings: list[dict]):
|
||||
self._monitor.update_settings(settings)
|
||||
self._plot.update_settings(settings)
|
||||
self._monitor.force_all_dir()
|
||||
|
||||
|
||||
51
src/controller/monitor.py
Normal file
51
src/controller/monitor.py
Normal file
@ -0,0 +1,51 @@
|
||||
from time import sleep
|
||||
import os
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from src.utils.base.base import BaseDirectoryMonitor
|
||||
|
||||
|
||||
class DirectoryMonitor(BaseDirectoryMonitor):
|
||||
|
||||
def _init_state(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
|
||||
self.update_timer.timeout.connect(self._monitor)
|
||||
logger.info("Monitor initiated!")
|
||||
|
||||
def _monitor(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
|
||||
filter(lambda x: x not in self._files, files))))
|
||||
if new_files:
|
||||
logger.info(f"New files detected: {new_files}")
|
||||
self._mediator.notify(self, new_files)
|
||||
self._files = files
|
||||
if not files:
|
||||
self._files = []
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
self.stop()
|
||||
operator_params, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path']
|
||||
self._update_time = system_params['monitor_update_period']
|
||||
self._init_state()
|
||||
self.start()
|
||||
|
||||
def force_all_dir(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
all_files = []
|
||||
for x in self._files:
|
||||
path = os.path.join(self._directory_path, x)
|
||||
all_files.append(path)
|
||||
if all_files:
|
||||
logger.info(f"Plotting all files: {all_files}")
|
||||
self._mediator.notify(self, all_files)
|
||||
else:
|
||||
logger.info(f"Failed pushing {str(all_files)}")
|
||||
|
||||
|
||||
|
||||
@ -1,3 +1,2 @@
|
||||
from .plot_window import Plotter
|
||||
from .plotter import PlotWidget
|
||||
from .settings_window import settingsWindow
|
||||
from .app import tabWidgetGenerator
|
||||
|
||||
Binary file not shown.
Binary file not shown.
BIN
src/gui/__pycache__/mainGui.cpython-310.pyc
Normal file
BIN
src/gui/__pycache__/mainGui.cpython-310.pyc
Normal file
Binary file not shown.
Binary file not shown.
BIN
src/gui/__pycache__/plotter.cpython-310.pyc
Normal file
BIN
src/gui/__pycache__/plotter.cpython-310.pyc
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,85 +0,0 @@
|
||||
import pyqtgraph as pg
|
||||
|
||||
from src.utils import read_json, DiagramParser
|
||||
from src.uml import Request, UMLCreator
|
||||
from src.OptAlgorithm import OptAlgorithm
|
||||
from src.gui import Plotter, settingsWindow
|
||||
|
||||
|
||||
class tabWidgetGenerator:
|
||||
def __init__(self, directory_to_save):
|
||||
self.UMLgenerator = Request(server_url='http://www.plantuml.com/plantuml/svg/')
|
||||
self.uml_creator = UMLCreator(request_generator=self.UMLgenerator, path_to_save=directory_to_save)
|
||||
self.operator_params = read_json("params/operator_params.json")
|
||||
self.system_params = read_json("params/system_params.json")
|
||||
self.operSettings = settingsWindow("params\operator_params.json", 'Operator', self._update)
|
||||
self.sysSettings = settingsWindow("params\system_params.json", 'System', self._update)
|
||||
|
||||
self.paths = []
|
||||
self.plotters = []
|
||||
self.bool_dicts = []
|
||||
self.float_dicts = []
|
||||
self.timings_dicts = []
|
||||
self.modes = []
|
||||
self.names = []
|
||||
|
||||
def get_widget(self, path):
|
||||
self.paths.append(path)
|
||||
self.plotters.append(Plotter(show_settings_func=self._show_settings))
|
||||
self._getParsedData(path)
|
||||
self._update()
|
||||
return self.plotters[-1].widget
|
||||
|
||||
def _get_ideal_timings(self, opt: OptAlgorithm) -> list[float]:
|
||||
data = opt.Ts
|
||||
ideal_time = [data['tclose'], data['tgrow'], opt.getMarkOpen(), data["tmovement"]]
|
||||
return ideal_time
|
||||
|
||||
def _getParsedData(self, path):
|
||||
self.names.append(path)
|
||||
parser = DiagramParser(system_config=self.system_params)
|
||||
parser.setData(path)
|
||||
self.bool_dicts.append(parser.getBoolDict())
|
||||
self.float_dicts.append(parser.getFloatDict())
|
||||
self.timings_dicts.append(parser.getRealTimings())
|
||||
self.modes.append(parser.getMode())
|
||||
|
||||
|
||||
def _update(self, _ = None):
|
||||
self.operator_params = self.operSettings.getParams()
|
||||
self.system_params = self.sysSettings.getParams()
|
||||
|
||||
opt_algorithm = OptAlgorithm(operator_config=self.operator_params, system_config=self.system_params)
|
||||
ideal_times = self._get_ideal_timings(opt_algorithm)
|
||||
|
||||
for i in range (len(self.plotters)):
|
||||
self.plotters[i].update_data(operator_config=self.operator_params,
|
||||
system_config=self.system_params,
|
||||
opt=opt_algorithm,
|
||||
bool_dict=self.bool_dicts[i],
|
||||
ideal_time=ideal_times,
|
||||
float_dict=self.float_dicts[i],
|
||||
mode = self.modes[i],
|
||||
timings_dict=self.timings_dicts[i])
|
||||
|
||||
self.uml_creator.update_uml(operator_config=self.operator_params,
|
||||
system_config=self.system_params,
|
||||
ideal_time=ideal_times,
|
||||
bool_dict=self.bool_dicts[i],
|
||||
float_dict=self.float_dicts[i],
|
||||
mode = self.modes[i],
|
||||
timings_dict=self.timings_dicts[i],
|
||||
name = self.names[i])
|
||||
|
||||
|
||||
def _show_settings(self):
|
||||
self.operSettings.show()
|
||||
self.sysSettings.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pg.mkQApp("Plotting")
|
||||
temp = tabWidgetGenerator()
|
||||
widget = temp.get_widget("trace_samples/2024_11_08-19_30_49.csv")
|
||||
widget.show()
|
||||
pg.exec()
|
||||
68
src/gui/mainGui.py
Normal file
68
src/gui/mainGui.py
Normal file
@ -0,0 +1,68 @@
|
||||
from datetime import datetime as dt
|
||||
from typing import Optional
|
||||
|
||||
from PyQt5 import QtWidgets
|
||||
from PyQt5.QtCore import Qt
|
||||
from src.utils.base.base import BaseMainWindow, BaseController
|
||||
from src.gui.settings_window import settingsWindow
|
||||
|
||||
class MainWindow(BaseMainWindow):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
super().__init__()
|
||||
self._controller = controller
|
||||
self.initUI()
|
||||
self.set_style(self)
|
||||
self.settings_button.clicked.connect(self._show_settings)
|
||||
self.operSettings = settingsWindow("params\operator_params.json", 'Operator', self._updater_trigger)
|
||||
self.sysSettings = settingsWindow("params\system_params.json", 'System', self._updater_trigger)
|
||||
|
||||
def initUI(self) -> None:
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
self.tabWidget.setTabsClosable(True)
|
||||
self.tabWidget.tabCloseRequested.connect(self._close_tab)
|
||||
layout = QtWidgets.QVBoxLayout()
|
||||
layout.addWidget(self.tabWidget)
|
||||
self.settings_button = QtWidgets.QPushButton("Show settings")
|
||||
self.settings_button.setFixedWidth(160)
|
||||
layout.addWidget(self.settings_button)
|
||||
self.setLayout(layout)
|
||||
|
||||
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
|
||||
for plot_widget in plot_widgets:
|
||||
tab = QtWidgets.QWidget()
|
||||
grid = QtWidgets.QGridLayout()
|
||||
grid.addWidget(plot_widget)
|
||||
tab.setLayout(grid)
|
||||
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
|
||||
self.tabWidget.setCurrentWidget(tab)
|
||||
|
||||
tab_count = self.tabWidget.count()
|
||||
if tab_count > 10:
|
||||
for i in range(0, tab_count-2):
|
||||
self._close_tab(i)
|
||||
|
||||
|
||||
def keyPressEvent(self, a0):
|
||||
if a0.key() == Qt.Key_F5:
|
||||
self.clear()
|
||||
|
||||
def _show_settings(self):
|
||||
self.operSettings.show()
|
||||
self.sysSettings.show()
|
||||
|
||||
def push_settings(self) -> None:
|
||||
self._updater_trigger()
|
||||
|
||||
def _updater_trigger(self) -> None:
|
||||
self.tabWidget.clear()
|
||||
operator_params = self.operSettings.getParams()
|
||||
system_params = self.sysSettings.getParams()
|
||||
self._controller.push_settings([operator_params, system_params])
|
||||
|
||||
def _close_tab(self, index:int) -> None:
|
||||
self.tabWidget.removeTab(index)
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,307 +0,0 @@
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.Qt import QtWidgets
|
||||
from PyQt5.QtCore import Qt
|
||||
import numpy as np
|
||||
|
||||
from src.gui import qt_settings as qts
|
||||
|
||||
from src.OptAlgorithm import OptAlgorithm
|
||||
|
||||
|
||||
class Plotter:
|
||||
def __init__(self, show_settings_func):
|
||||
pg.setConfigOptions(antialias=True)
|
||||
self.alpha = 100 #[0-255 прозрачность фона]
|
||||
self._init_ui()
|
||||
self.settings_button.clicked.connect(show_settings_func)
|
||||
|
||||
|
||||
def update_data(self,
|
||||
system_config : dict,
|
||||
operator_config: dict,
|
||||
opt: OptAlgorithm,
|
||||
ideal_time: list[float],
|
||||
bool_dict: dict,
|
||||
float_dict: dict,
|
||||
timings_dict: dict,
|
||||
mode: bool):
|
||||
self.opt = opt
|
||||
self.bool_dict = bool_dict
|
||||
self.float_dict = float_dict
|
||||
self.timings_dict = timings_dict
|
||||
self.idealTime = ideal_time
|
||||
self.theor_mode = mode
|
||||
self.scaler = int(system_config['UML_time_scaler'])
|
||||
self.WeldTime = operator_config['time_wielding'] #[sec]
|
||||
self.WeldData = self.opt.calcPhaseGrow(self.idealTime[1])
|
||||
self._updatePlots()
|
||||
|
||||
|
||||
def _init_ui(self):
|
||||
self.widget = QtWidgets.QWidget()
|
||||
layout = QtWidgets.QVBoxLayout()
|
||||
self.widget.setLayout(layout)
|
||||
|
||||
self.win = pg.GraphicsLayoutWidget(show=True, title="")
|
||||
self.win.resize(1000,600)
|
||||
self.win.setWindowTitle('')
|
||||
|
||||
layout.addWidget(self.win)
|
||||
self.settings_button = QtWidgets.QPushButton("Show settings")
|
||||
self.settings_button.setFixedWidth(160)
|
||||
info_layout = QtWidgets.QHBoxLayout()
|
||||
layout.addLayout(info_layout)
|
||||
info_layout.addWidget(self.settings_button, alignment = Qt.AlignLeft)
|
||||
info_layout.setSpacing(20)
|
||||
self.efficiency = QtWidgets.QLabel()
|
||||
info_layout.addWidget(self.efficiency, alignment = Qt.AlignRight)
|
||||
|
||||
|
||||
|
||||
|
||||
self.p11, self.l11 = self._init_graph('Electrode force, closure', 'Force', 'N', 'Time', 'ms')
|
||||
#self.p21, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
|
||||
#self.p31, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
|
||||
self.win.nextRow()
|
||||
self.p12, self.l12 = self._init_graph('Rotor Position, closure', 'Posicion', 'mm', 'Time', 'ms')
|
||||
#self.p22, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
|
||||
#self.p32, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
|
||||
self.win.nextRow()
|
||||
self.p13, self.l13 = self._init_graph('Rotor Speed, closure', 'Speed', 'mm/s', 'Time', 'ms')
|
||||
#self.p23, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
|
||||
#self.p33, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
|
||||
self.win.nextRow()
|
||||
|
||||
self.p12.setXLink(self.p11)
|
||||
self.p13.setXLink(self.p11)
|
||||
|
||||
self.p11.setAutoVisible(x=False, y=True)
|
||||
self.p12.setAutoVisible(x=False, y=True)
|
||||
self.p13.setAutoVisible(x=False, y=True)
|
||||
self.widget.setStyleSheet(qts.dark_style)
|
||||
|
||||
def _init_graph(self, title, Yname, Yunits, Xname, Xunits):
|
||||
plot = self.win.addPlot(title = title)
|
||||
plot.showGrid(x=True, y=True)
|
||||
plot.setLabel('left', Yname, units=Yunits)
|
||||
plot.setLabel('bottom', Xname, units=Xunits)
|
||||
legend1 = pg.LegendItem((80,60), offset=(70,20))
|
||||
legend1.setParentItem(plot)
|
||||
return plot, legend1
|
||||
|
||||
def _updatePlots(self):
|
||||
self.p11.clear()
|
||||
self.l11.clear()
|
||||
self.p12.clear()
|
||||
self.l12.clear()
|
||||
self.p13.clear()
|
||||
self.l13.clear()
|
||||
|
||||
if not self.theor_mode:
|
||||
self._plotRealData()
|
||||
self._form_idealdatGraph()
|
||||
self._calcOurScore()
|
||||
|
||||
def _calcOurScore (self):
|
||||
success = []
|
||||
start = np.array(self.timings_dict["closure"])[:, 0]
|
||||
end = np.array(self.timings_dict["opening"])[:, 1]
|
||||
points_timings = end-start
|
||||
|
||||
ideal_time = sum(self.idealTime[:3])+self.WeldTime
|
||||
for point in points_timings:
|
||||
success.append(int(ideal_time/point * 100))
|
||||
if len(success) > 1:
|
||||
maxS = max(success)
|
||||
minS = min(success)
|
||||
average = int(sum(success[:])/len(success))
|
||||
self.efficiency.setText(f'Efficiency Maximum: {maxS}%, Average: {average}%, Minimum: {minS}%' )
|
||||
else:
|
||||
self.efficiency.setText(f'Efficiency Maximum: {success[0]}' )
|
||||
self.efficiency.setStyleSheet(qts.BigSuccessLabel)
|
||||
|
||||
|
||||
def _form_idealdatGraph(self):
|
||||
|
||||
if self.theor_mode:
|
||||
self.timings_dict["closure"] = [[0, self.idealTime[0]]]
|
||||
self.timings_dict["compression"] = [[self.idealTime[0], sum(self.idealTime[:2])]]
|
||||
self.timings_dict["welding"] = [[sum(self.idealTime[:2]), sum(self.idealTime[:2])+self.WeldTime]]
|
||||
self.timings_dict["opening"] = [[sum(self.idealTime[:2])+self.WeldTime, sum(self.idealTime[:3])+self.WeldTime]]
|
||||
|
||||
delta = 10 #points_per_ms
|
||||
for key, items in self.timings_dict.items():
|
||||
for item in items:
|
||||
item_data = []
|
||||
time_data = []
|
||||
|
||||
if key == 'closure':
|
||||
ideal_time = self.idealTime[0]
|
||||
calc = self.opt.calcPhaseClose
|
||||
color = qts.RGBA[0]
|
||||
for i in range(0, int(ideal_time*self.scaler)*delta):
|
||||
time = i/delta
|
||||
item_data.append(calc(time/self.scaler))
|
||||
time_data.append(time+item[0]*self.scaler)
|
||||
#print (item_data[-1], time_data[-1])
|
||||
self._plotIdealData(np.array(time_data), np.array(item_data).T)
|
||||
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
|
||||
|
||||
elif key == 'compression':
|
||||
ideal_time = self.idealTime[1]
|
||||
calc = self.opt.calcPhaseGrow
|
||||
color = qts.RGBA[1]
|
||||
|
||||
for i in range(int(ideal_time*self.scaler)*delta, 0, -1):
|
||||
time = i/delta
|
||||
item_data.append(calc(time/self.scaler))
|
||||
time_data.append(item[1]*self.scaler-(ideal_time*self.scaler-time))
|
||||
#print (item_data[-1], time_data[-1])
|
||||
self._plotIdealData(np.array(time_data), np.array(item_data).T)
|
||||
self._addBackgroundSplitter([(item[1]-ideal_time)*self.scaler, item[1]*self.scaler], color)
|
||||
|
||||
temp = item_data[0][4]
|
||||
x = [time_data[0], time_data[-1], time_data[-1]-0.0001]
|
||||
y = [temp, temp, temp]
|
||||
a1, b1, c1 = self._calculate_equidistant(x, y, 2.5, 3)
|
||||
self.p11.addItem(a1)
|
||||
self.p11.addItem(b1)
|
||||
self.p11.addItem(c1)
|
||||
|
||||
elif key == 'welding':
|
||||
ideal_time = self.WeldTime
|
||||
calc = self._returnWeldData
|
||||
color = qts.RGBA[2]
|
||||
|
||||
for i in range(0, int(ideal_time*self.scaler)*delta):
|
||||
time = i/delta
|
||||
item_data.append(calc(time/self.scaler))
|
||||
time_data.append(time+item[0]*self.scaler)
|
||||
#print (item_data[-1], time_data[-1])
|
||||
self._plotIdealData(np.array(time_data), np.array(item_data).T)
|
||||
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
|
||||
|
||||
x = [time_data[0], time_data[-1], time_data[-1]+0.0001]
|
||||
y = [item_data[0][4], item_data[0][4], item_data[0][4]]
|
||||
a1, b1, c1 = self._calculate_equidistant(x, y, 0.75, 3)
|
||||
self.p11.addItem(a1)
|
||||
self.p11.addItem(b1)
|
||||
self.p11.addItem(c1)
|
||||
|
||||
elif key == 'opening':
|
||||
calc = self.opt.calcPhaseOpen
|
||||
ideal_time = self.idealTime[2]
|
||||
ideal_closure = self.idealTime[3]
|
||||
color = qts.RGBA[3]
|
||||
color_closure = qts.RGBA[4]
|
||||
|
||||
for i in range(0, int(ideal_time*self.scaler)*delta):
|
||||
time = i/delta
|
||||
item_data.append(calc(time/self.scaler))
|
||||
time_data.append(time+item[0]*self.scaler)
|
||||
#print (item_data[-1], time_data[-1])
|
||||
self._plotIdealData(np.array(time_data), np.array(item_data).T)
|
||||
self._addBackgroundSplitter([item[0]*self.scaler,item[0]*self.scaler + time], color)
|
||||
|
||||
item_data = []
|
||||
time_data = []
|
||||
for i in range(0, int(ideal_closure*self.scaler)*delta):
|
||||
time = i/delta
|
||||
item_data.append(self.opt.calcPhaseMovement(time/self.scaler))
|
||||
time_data.append(time+item[1]*self.scaler)
|
||||
self._plotIdealData(np.array(time_data), np.array(item_data).T)
|
||||
self._addBackgroundSplitter([item[1]*self.scaler,item[1]*self.scaler + time], color_closure)
|
||||
|
||||
def _returnWeldData(self, _):
|
||||
return self.WeldData
|
||||
|
||||
|
||||
|
||||
def _plotRealData(self):
|
||||
for i, (key, dat) in enumerate(self.float_dict.items()):
|
||||
dat = np.array(dat).T
|
||||
dat[0] = dat[0]*self.scaler
|
||||
curve = pg.PlotDataItem(dat[0], dat[1], pen=pg.mkPen(color=qts.colors[i], width=2), name=key, autoDownsample=True, downsample=True)
|
||||
if 'Electrode Force' in key:
|
||||
self.p11.addItem(curve)
|
||||
self.l11.addItem(curve, key)
|
||||
elif 'Rotor Position' in key:
|
||||
self.p12.addItem(curve)
|
||||
self.l12.addItem(curve, key)
|
||||
elif 'Rotor Speed' in key:
|
||||
self.p13.addItem(curve)
|
||||
self.l13.addItem(curve, key)
|
||||
return dat[0]
|
||||
|
||||
def _plotIdealData(self, time, data):
|
||||
x_fe = pg.PlotDataItem(time, data[0]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='x_fe', autoDownsample=True, downsample=True)
|
||||
x_me = pg.PlotDataItem(time, data[1]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='x_me', autoDownsample=True, downsample=True)
|
||||
v_fe = pg.PlotDataItem(time, data[2]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='v_fe', autoDownsample=True, downsample=True)
|
||||
v_me = pg.PlotDataItem(time, data[3]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='v_me', autoDownsample=True, downsample=True)
|
||||
f = pg.PlotDataItem(time, data[4], pen=pg.mkPen(color=qts.colors[8], width=2), name='f', autoDownsample=True, downsample=True)
|
||||
|
||||
self.p11.addItem(f)
|
||||
#self.l11.addItem(f, 'Ideal force')
|
||||
|
||||
self.p12.addItem(x_fe)
|
||||
#self.l12.addItem(x_fe, 'FE POS')
|
||||
self.p12.addItem(x_me)
|
||||
#self.l12.addItem(x_me, 'ME POS')
|
||||
|
||||
self.p13.addItem(v_fe)
|
||||
#self.l13.addItem(v_fe, 'FE VEL')
|
||||
self.p13.addItem(v_me)
|
||||
#self.l13.addItem(v_me, 'ME VEL')
|
||||
#self._addBackgroundSplitter()
|
||||
#self._addEquidistances(time, data)
|
||||
|
||||
def _addBackgroundSplitter(self, x, color):
|
||||
alpha = self.alpha
|
||||
y01 = np.array([10000, 10000])
|
||||
y0_1 = np.array([-10000, -10000])
|
||||
a01 = pg.PlotDataItem(x, y01, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
|
||||
a0_1 = pg.PlotDataItem(x, y0_1, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
|
||||
bg1 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
|
||||
bg2 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
|
||||
bg3 = pg.FillBetweenItem(a01, a0_1, color+(alpha,))
|
||||
self.p11.addItem(bg1)
|
||||
self.p12.addItem(bg2)
|
||||
self.p13.addItem(bg3)
|
||||
|
||||
self.p11.setYRange(-1000, 5000)
|
||||
self.p12.setYRange(-50, 250)
|
||||
self.p13.setYRange(-400, 400)
|
||||
|
||||
|
||||
def _makeFiller(self, x1, y1, x2, y2, color):
|
||||
alpha = self.alpha
|
||||
eq1 = pg.PlotDataItem(x1, y1, pen=pg.mkPen(color='#000000', width=1))
|
||||
eq2 = pg.PlotDataItem(x2, y2, pen=pg.mkPen(color='#000000', width=1))
|
||||
bg = pg.FillBetweenItem(eq1, eq2, qts.RGBA[color]+(alpha,))
|
||||
return eq1, eq2, bg
|
||||
|
||||
|
||||
def _calculate_equidistant(self, x, y, percent, color):
|
||||
if len(x) != len(y):
|
||||
raise ValueError("x и y должны быть одного размера")
|
||||
distance = max(y)/100*percent
|
||||
x_eq1 = []
|
||||
y_eq1 = []
|
||||
x_eq2 = []
|
||||
y_eq2 = []
|
||||
|
||||
for i in range(0, len(x) - 1):
|
||||
dx = x[i + 1] - x[i]
|
||||
dy = y[i + 1] - y[i]
|
||||
length = np.sqrt(dx ** 2 + dy ** 2)
|
||||
sinA = dy/length
|
||||
sinB = dx/length
|
||||
|
||||
nx = -sinA*distance
|
||||
ny = sinB*distance
|
||||
x_eq1.append(x[i] + nx)
|
||||
y_eq1.append(y[i] + ny)
|
||||
x_eq2.append(x[i] - nx)
|
||||
y_eq2.append(y[i] - ny)
|
||||
|
||||
return self._makeFiller(np.array(x_eq1), np.array(y_eq1), np.array(x_eq2), np.array(y_eq2), color)
|
||||
210
src/gui/plotter.py
Normal file
210
src/gui/plotter.py
Normal file
@ -0,0 +1,210 @@
|
||||
import pandas as pd
|
||||
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel
|
||||
import pyqtgraph as pg
|
||||
import numpy as np
|
||||
from numpy import floating
|
||||
from typing import Optional, Any, NamedTuple
|
||||
|
||||
from src.utils.base.base import BasePlotWidget
|
||||
from src.utils.base.base import BaseIdealDataBuilder
|
||||
|
||||
|
||||
class idealDataBuilder(BaseIdealDataBuilder):
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
|
||||
|
||||
def get_compressionDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
|
||||
|
||||
def get_openingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
|
||||
|
||||
def get_oncomingDF(self) -> pd.DataFrame:
|
||||
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
data = []
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
|
||||
data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_ideal_timings(self) -> list[float, float, float, float]:
|
||||
data = self.Ts
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen()] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
|
||||
return ideal_timings
|
||||
|
||||
|
||||
class ProcessStage(NamedTuple):
|
||||
mean_value: floating[Any]
|
||||
start_index: int
|
||||
finish_index: int
|
||||
|
||||
|
||||
class PlotWidget(BasePlotWidget):
|
||||
def _create_curve_ideal(self,
|
||||
stage: str,
|
||||
signal: str,
|
||||
start_timestamp: float,
|
||||
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
|
||||
data = self._stage_ideals[stage]
|
||||
|
||||
if start_timestamp and finish_timestamp:
|
||||
plot = pg.PlotDataItem(x=start_timestamp+data["time"], y=data[signal["name"]], pen=signal["pen"])
|
||||
return plot
|
||||
return None
|
||||
|
||||
def _create_stage_region(self,
|
||||
stage: str,
|
||||
start_timestamp: float,
|
||||
finish_timestamp: float) -> Optional[pg.LinearRegionItem]:
|
||||
|
||||
if start_timestamp and finish_timestamp:
|
||||
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
|
||||
region.setBrush(pg.mkBrush(self._stage_colors[stage]))
|
||||
return region
|
||||
return None
|
||||
|
||||
def _get_timestamp(self,
|
||||
stage: str,
|
||||
times: pd.Series,
|
||||
dataframe: pd.DataFrame) -> Optional[list[float]]:
|
||||
stage_diff = np.diff(dataframe[stage])
|
||||
start_index = np.where(stage_diff == 1)[0]
|
||||
finish_index = np.where(stage_diff == -1)[0]
|
||||
if start_index.size:
|
||||
start_timestamp = times[start_index[0]]
|
||||
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
|
||||
return start_timestamp, finish_timestamp
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
|
||||
plot_widget = pg.PlotWidget(title=title)
|
||||
plot_widget.showGrid(x=True, y=True)
|
||||
legend = pg.LegendItem((80, 60), offset=(70, 20))
|
||||
legend.setParentItem(plot_widget.graphicsItem())
|
||||
return plot_widget, legend
|
||||
|
||||
def get_stage_info(self,
|
||||
stage: str,
|
||||
dataframe: pd.DataFrame,
|
||||
signal_name: str) -> Optional[ProcessStage]:
|
||||
if stage in self._stages:
|
||||
stage_diff = np.diff(dataframe[stage])
|
||||
start_index = np.where(stage_diff == 1)[0]
|
||||
finish_index = np.where(stage_diff == -1)[0]
|
||||
|
||||
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else []
|
||||
|
||||
if data.size and start_index.size:
|
||||
start = start_index[0]
|
||||
finish = finish_index[0] if finish_index.size else (len(data) - 1)
|
||||
data_slice = data[start:finish]
|
||||
mean = np.mean(data_slice)
|
||||
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
|
||||
return None
|
||||
|
||||
def _build_widget(self, dataframe: pd.DataFrame) -> QWidget:
|
||||
widget = QWidget()
|
||||
layout = QVBoxLayout()
|
||||
|
||||
time_axis = dataframe["time"]
|
||||
dataframe_headers = dataframe.columns.tolist()
|
||||
|
||||
for channel, description in self._plt_channels.items():
|
||||
plot_widget, legend = self._init_plot_widget(title=channel)
|
||||
|
||||
settings = description["Settings"]
|
||||
if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]):
|
||||
for stage in self._stages:
|
||||
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
|
||||
region = self._create_stage_region(stage, start_timestamp, finish_timestamp)
|
||||
if region:
|
||||
plot_widget.addItem(region)
|
||||
|
||||
for signal in description["Ideal_signals"]:
|
||||
ideal_plot = self._create_curve_ideal(stage, signal, start_timestamp, finish_timestamp)
|
||||
if ideal_plot:
|
||||
plot_widget.addItem(ideal_plot)
|
||||
|
||||
end_timestamp = time_axis[len(time_axis) - 1]
|
||||
region = self._create_stage_region("Oncoming", finish_timestamp, end_timestamp)
|
||||
if region:
|
||||
plot_widget.addItem(region)
|
||||
|
||||
for signal in description["Ideal_signals"]:
|
||||
ideal_plot = self._create_curve_ideal("Oncoming", signal, finish_timestamp, end_timestamp)
|
||||
if ideal_plot:
|
||||
plot_widget.addItem(ideal_plot)
|
||||
|
||||
if settings["performance"] and all([stage in dataframe_headers for stage in self._stages]):
|
||||
delta_timestamp = 0
|
||||
for stage in self._stages:
|
||||
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
|
||||
delta_timestamp += finish_timestamp - start_timestamp
|
||||
|
||||
ideal_delta = self._opt.get_cycle_time()
|
||||
performance = round(ideal_delta/delta_timestamp*100, 2)
|
||||
performance_label = QLabel(f"Performance = {performance} %")
|
||||
layout.addWidget(performance_label)
|
||||
|
||||
if settings["zoom"]:
|
||||
if max(time_axis) < 5.0:
|
||||
stages = [self.get_stage_info("Welding",
|
||||
dataframe,
|
||||
signal["name"]) for signal in description["Real_signals"]]
|
||||
if stages:
|
||||
means_raw = [stage.mean_value for stage in stages]
|
||||
mean = max(means_raw)
|
||||
start = time_axis[stages[0].start_index]
|
||||
finish = time_axis[stages[0].finish_index]
|
||||
|
||||
overshoot = pg.BarGraphItem(x0=0,
|
||||
y0=mean - mean * 0.05,
|
||||
height=mean * 0.05 * 2,
|
||||
width=start,
|
||||
brush=pg.mkBrush([0, 250, 0, 100]))
|
||||
plot_widget.addItem(overshoot)
|
||||
|
||||
stable = pg.BarGraphItem(x0=start,
|
||||
y0=mean - mean * 0.015,
|
||||
height=mean * 0.015 * 2,
|
||||
width=finish - start,
|
||||
brush=pg.mkBrush([0, 250, 0, 100]))
|
||||
plot_widget.addItem(stable)
|
||||
|
||||
plot_widget.setYRange(mean - 260, mean + 260)
|
||||
plot_widget.setInteractive(False)
|
||||
else:
|
||||
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]])
|
||||
region = pg.LinearRegionItem([max_value - max_value * 0.015,
|
||||
max_value + max_value * 0.015],
|
||||
movable=False,
|
||||
orientation="horizontal")
|
||||
|
||||
region.setBrush(pg.mkBrush([0, 250, 0, 100]))
|
||||
plot_widget.setYRange(max_value - 200, max_value + 200)
|
||||
plot_widget.setXRange(3.5, 4.5)
|
||||
plot_widget.addItem(region)
|
||||
plot_widget.setInteractive(False)
|
||||
|
||||
for signal in description["Real_signals"]:
|
||||
if signal["name"] in dataframe_headers:
|
||||
plot = plot_widget.plot(time_axis, dataframe[signal["name"]], pen=signal["pen"])
|
||||
legend.addItem(plot, signal["name"])
|
||||
|
||||
layout.addWidget(plot_widget)
|
||||
|
||||
widget.setLayout(layout)
|
||||
return widget
|
||||
|
||||
def build(self, data: list[pd.DataFrame]) -> None:
|
||||
widgets = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets)
|
||||
|
||||
def update_settings(self, data: list[dict]):
|
||||
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)
|
||||
|
||||
|
||||
|
||||
@ -17,10 +17,10 @@ class settingsWindow(QtWidgets.QWidget):
|
||||
self._init_ui()
|
||||
self.params.sigTreeStateChanged.connect(upd_func)
|
||||
|
||||
def load_settings(self):
|
||||
def load_settings(self) -> None:
|
||||
self.data = read_json(self.settingsPath)
|
||||
|
||||
def write_settings(self):
|
||||
def write_settings(self) -> None:
|
||||
self.getParams()
|
||||
write_json(self.settingsPath, self.data)
|
||||
|
||||
@ -31,7 +31,7 @@ class settingsWindow(QtWidgets.QWidget):
|
||||
params.append({'name': 'Save', 'type': 'action'})
|
||||
return params
|
||||
|
||||
def _init_ui(self):
|
||||
def _init_ui(self) -> None:
|
||||
temp = self._getTreeStructure()
|
||||
self.params = Parameter.create(name=self.name, type='group', children=temp)
|
||||
self.params.param('Save').sigActivated.connect(self.write_settings)
|
||||
|
||||
128
src/main.py
128
src/main.py
@ -1,115 +1,31 @@
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
from PyQt5.QtWidgets import (
|
||||
QApplication,
|
||||
QMainWindow,
|
||||
QTabWidget,
|
||||
QWidget,
|
||||
QVBoxLayout,
|
||||
QLabel,
|
||||
)
|
||||
from PyQt5.QtCore import Qt, QThread, pyqtSignal, QObject, QFileSystemWatcher
|
||||
from PyQt5.QtGui import QIcon
|
||||
from src.gui import qt_settings as qts
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
# Импортируйте ваш класс `app` здесь
|
||||
# Предполагается, что класс `app` предоставляет интерфейс для отображения данных
|
||||
# Если `app` не предоставляет виджет, возможно, потребуется его модифицировать
|
||||
from src.gui.app import tabWidgetGenerator
|
||||
|
||||
|
||||
class DirectoryWatcher(QObject):
|
||||
|
||||
file_created = pyqtSignal(str)
|
||||
|
||||
def __init__(self, directory_path):
|
||||
super().__init__()
|
||||
self.directory_path = directory_path
|
||||
self.watcher = QFileSystemWatcher()
|
||||
self.watcher.addPath(self.directory_path)
|
||||
self.existing_files = set(os.listdir(self.directory_path))
|
||||
self.watcher.directoryChanged.connect(self.on_directory_changed)
|
||||
|
||||
|
||||
def on_directory_changed(self, _):
|
||||
try:
|
||||
current_files = set(os.listdir(self.directory_path))
|
||||
new_files = current_files - self.existing_files
|
||||
self.existing_files = current_files
|
||||
for file in new_files:
|
||||
if file.lower().endswith(".csv"):
|
||||
full_path = os.path.join(self.directory_path, file)
|
||||
self.file_created.emit(full_path)
|
||||
except Exception as e:
|
||||
print(f"Ошибка при обработке изменений директории: {e}")
|
||||
|
||||
|
||||
class MainWindow(QMainWindow):
|
||||
def __init__(self, directory_to_watch):
|
||||
super().__init__()
|
||||
self.setWindowTitle("Мониторинг CSV-файлов")
|
||||
self.setGeometry(100, 100, 800, 600)
|
||||
|
||||
self.tabs = QTabWidget()
|
||||
self.tabs.setStyleSheet(qts.dark_style)
|
||||
self.tabGen = tabWidgetGenerator(directory_to_watch)
|
||||
self.handle_new_file()
|
||||
self.setCentralWidget(self.tabs)
|
||||
|
||||
# self.setWindowIcon(QIcon("path_to_icon.png"))
|
||||
self.start_directory_watcher(directory_to_watch)
|
||||
|
||||
|
||||
def start_directory_watcher(self, directory_path):
|
||||
self.watcher_thread = QThread()
|
||||
self.watcher = DirectoryWatcher(directory_path)
|
||||
self.watcher.moveToThread(self.watcher_thread)
|
||||
|
||||
self.watcher.file_created.connect(self.handle_new_file)
|
||||
self.watcher_thread.started.connect(self.watcher.on_directory_changed)
|
||||
|
||||
self.watcher_thread.start()
|
||||
|
||||
def handle_new_file(self, file_path = None):
|
||||
time.sleep(0.2)
|
||||
if file_path:
|
||||
file_name = os.path.basename(file_path)
|
||||
else:
|
||||
file_path = None
|
||||
file_name = 'Ideal'
|
||||
|
||||
tab_widget = self.tabGen.get_widget(path=file_path)
|
||||
|
||||
tab = QWidget()
|
||||
layout = QVBoxLayout()
|
||||
layout.addWidget(tab_widget)
|
||||
|
||||
label = QLabel(f"{file_name}")
|
||||
label.setAlignment(Qt.AlignCenter)
|
||||
layout.addWidget(label)
|
||||
tab.setLayout(layout)
|
||||
|
||||
self.tabs.addTab(tab, file_name)
|
||||
|
||||
def closeEvent(self, event):
|
||||
self.watcher_thread.quit()
|
||||
self.watcher_thread.wait()
|
||||
event.accept()
|
||||
from src.gui.mainGui import MainWindow
|
||||
from src.controller.monitor import DirectoryMonitor
|
||||
from src.controller.mediator import Mediator
|
||||
from src.controller.converter import DataConverter
|
||||
from src.gui.plotter import PlotWidget
|
||||
from src.controller.controller import Controller
|
||||
|
||||
|
||||
def main():
|
||||
directory_to_watch = "D:/downloads/a22"
|
||||
|
||||
if not os.path.isdir(directory_to_watch):
|
||||
print(f"Директория не найдена: {directory_to_watch}")
|
||||
sys.exit(1)
|
||||
|
||||
app_instance = QApplication(sys.argv)
|
||||
window = MainWindow(directory_to_watch)
|
||||
app = QtWidgets.QApplication(sys.argv)
|
||||
monitor = DirectoryMonitor()
|
||||
data_converter = DataConverter()
|
||||
plot_widget_builder = PlotWidget()
|
||||
controller = Controller()
|
||||
window = MainWindow(controller)
|
||||
mediator = Mediator(monitor, data_converter, plot_widget_builder, controller)
|
||||
window.show()
|
||||
sys.exit(app_instance.exec_())
|
||||
|
||||
controller.signal_widgets.connect(window.show_plot_tabs)
|
||||
controller.signal_settings.connect(mediator.push_settings)
|
||||
|
||||
window.push_settings()
|
||||
|
||||
sys.exit(app.exec_())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
src/utils/__pycache__/base.cpython-310.pyc
Normal file
BIN
src/utils/__pycache__/base.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/utils/__pycache__/base_widgets.cpython-310.pyc
Normal file
BIN
src/utils/__pycache__/base_widgets.cpython-310.pyc
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
src/utils/base/__pycache__/base.cpython-310.pyc
Normal file
BIN
src/utils/base/__pycache__/base.cpython-310.pyc
Normal file
Binary file not shown.
BIN
src/utils/base/__pycache__/base_widgets.cpython-310.pyc
Normal file
BIN
src/utils/base/__pycache__/base_widgets.cpython-310.pyc
Normal file
Binary file not shown.
338
src/utils/base/base.py
Normal file
338
src/utils/base/base.py
Normal file
@ -0,0 +1,338 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Optional, Union
|
||||
|
||||
import pandas as pd
|
||||
from PyQt5.QtCore import QThread, QObject, QTimer
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||||
from src.OptAlgorithm import OptAlgorithm
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class BaseMediator:
|
||||
def __init__(self,
|
||||
monitor: BaseDirectoryMonitor,
|
||||
converter: BaseDataConverter,
|
||||
plot: BasePlotWidget,
|
||||
controller: BaseController):
|
||||
self._monitor = monitor
|
||||
self._monitor.mediator = self
|
||||
self._converter = converter
|
||||
self._converter.mediator = self
|
||||
self._plot = plot
|
||||
self._plot.mediator = self
|
||||
self._controller = controller
|
||||
self._controller.mediator = self
|
||||
|
||||
def notify(self,
|
||||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BaseMainWindow],
|
||||
data: Union[list[str], list[pd.DataFrame], list[QWidget], list[dict]]):
|
||||
...
|
||||
def push_settings (self, data: list[dict]):
|
||||
...
|
||||
|
||||
class BaseDirectoryMonitor:
|
||||
|
||||
update_timer = QTimer()
|
||||
|
||||
def __init__(self,
|
||||
mediator: Optional[BaseMediator] = None):
|
||||
super().__init__()
|
||||
self._directory_path = None
|
||||
self._update_time = None
|
||||
|
||||
self._mediator = mediator
|
||||
|
||||
|
||||
@property
|
||||
def directory_path(self) -> str:
|
||||
return self._directory_path
|
||||
|
||||
@property
|
||||
def update_time(self) -> int:
|
||||
return self._update_time
|
||||
|
||||
@property
|
||||
def files(self) -> list[str]:
|
||||
return self._files
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
|
||||
@mediator.setter
|
||||
def mediator(self, mediator: BaseMediator) -> None:
|
||||
self._mediator = mediator
|
||||
|
||||
def _init_state(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
|
||||
def start(self):
|
||||
self.update_timer.start(self._update_time)
|
||||
|
||||
def stop(self):
|
||||
self.update_timer.stop()
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def force_all_dir(self):
|
||||
...
|
||||
|
||||
class BaseDataConverter:
|
||||
def __init__(self, mediator: Optional[BaseMediator] = None):
|
||||
self._mediator = mediator
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
|
||||
@mediator.setter
|
||||
def mediator(self, mediator: BaseMediator) -> None:
|
||||
self._mediator = mediator
|
||||
|
||||
def convert_data(self, files: list[str]) -> None:
|
||||
...
|
||||
|
||||
|
||||
class BasePlotWidget:
|
||||
def __init__(self,
|
||||
mediator: Optional[BaseMediator] = None):
|
||||
super().__init__()
|
||||
self._mediator = mediator
|
||||
|
||||
self._stages = [
|
||||
"Closing",
|
||||
"Squeeze",
|
||||
"Welding",
|
||||
"Relief"
|
||||
]
|
||||
|
||||
self._stage_colors = {
|
||||
"Closing": [208, 28, 31, 100],
|
||||
"Squeeze": [45, 51, 89, 150],
|
||||
"Welding": [247, 183, 24, 100],
|
||||
"Relief": [0, 134, 88, 100],
|
||||
"Oncoming": [222, 184, 135, 100]
|
||||
}
|
||||
self._plt_channels = {
|
||||
"Electrode Force, N & Welding Current, kA": {
|
||||
"Settings": {
|
||||
"zoom": False,
|
||||
"stages": True,
|
||||
"performance": True
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
"name": "Electrode Force, N ME",
|
||||
"pen": 'r',
|
||||
},
|
||||
{
|
||||
"name": "Electrode Force, N FE",
|
||||
"pen": 'w',
|
||||
},
|
||||
{
|
||||
"name": "Welding Current ME",
|
||||
"pen": "y",
|
||||
}
|
||||
],
|
||||
"Ideal_signals": [
|
||||
{
|
||||
"name": "Force",
|
||||
"pen": {'color': 'g', 'width':3},
|
||||
}
|
||||
]
|
||||
},
|
||||
"Electrode Force, N": {
|
||||
"Settings": {
|
||||
"zoom": True,
|
||||
"stages": False,
|
||||
"performance": False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
"name": "Electrode Force, N ME",
|
||||
"pen": 'r',
|
||||
},
|
||||
{
|
||||
"name": "Electrode Force, N FE",
|
||||
"pen": 'w',
|
||||
}
|
||||
],
|
||||
"Ideal_signals": [
|
||||
{
|
||||
"name": "Force",
|
||||
"pen": {'color': 'r', 'width':3},
|
||||
}
|
||||
]
|
||||
},
|
||||
"Electrode Speed, mm/s": {
|
||||
"Settings": {
|
||||
"zoom": False,
|
||||
"stages": True,
|
||||
"performance": False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
"name": "Rotor Speed, mm/s ME",
|
||||
"pen": 'r',
|
||||
"zoom": False
|
||||
},
|
||||
{
|
||||
"name": "Rotor Speed, mm/s FE",
|
||||
"pen": 'w',
|
||||
"zoom": False
|
||||
}
|
||||
],
|
||||
"Ideal_signals": [
|
||||
{
|
||||
"name": "Rotor Speed ME",
|
||||
"pen": {'color': 'y', 'width':3},
|
||||
"zoom": False
|
||||
},
|
||||
{
|
||||
"name": "Rotor Speed FE",
|
||||
"pen": {'color': 'g', 'width':3},
|
||||
"zoom": False
|
||||
}
|
||||
]
|
||||
},
|
||||
}
|
||||
def _initIdealBuilder(self,
|
||||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||||
data: list[dict] = None):
|
||||
self.opt = idealDataBuilder(data)
|
||||
|
||||
self._stage_ideals = {
|
||||
"Closing": self._opt.get_closingDF(),
|
||||
"Squeeze": self._opt.get_compressionDF(),
|
||||
"Welding": self._opt.get_weldingDF(),
|
||||
"Relief": self._opt.get_openingDF(),
|
||||
"Oncoming": self._opt.get_oncomingDF()
|
||||
}
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
|
||||
@mediator.setter
|
||||
def mediator(self, mediator: BaseMediator) -> None:
|
||||
self._mediator = mediator
|
||||
|
||||
@property
|
||||
def opt(self) -> BaseIdealDataBuilder:
|
||||
return self._opt
|
||||
|
||||
@opt.setter
|
||||
def opt(self, opt: BaseIdealDataBuilder):
|
||||
self._opt = opt
|
||||
|
||||
def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
|
||||
...
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
...
|
||||
|
||||
class BaseController(QObject):
|
||||
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
...
|
||||
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
...
|
||||
|
||||
|
||||
# FIXME: WeldingDF показывает только 1 секунду
|
||||
class BaseIdealDataBuilder(OptAlgorithm):
|
||||
def __init__(self, data: list[dict]):
|
||||
operator_params, system_params = data
|
||||
self.mul = system_params['time_capture']
|
||||
self.welding_time = operator_params['time_wielding']
|
||||
super().__init__(operator_params, system_params)
|
||||
|
||||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||||
data = []
|
||||
for i in range (0, int(end_timestamp*self.mul)):
|
||||
time = i/self.mul
|
||||
X1, X2, V1, V2, F = func(time)
|
||||
data.append({"time":time, "Posicion FE":X1*1000,"Posicion ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_compressionDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_openingDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_tmovementDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_oncomingDF(self) -> pd.DataFrame:
|
||||
...
|
||||
|
||||
def get_ideal_timings(self) -> list[float, float, float, float]:
|
||||
...
|
||||
|
||||
def get_cycle_time(self) -> float:
|
||||
result = sum(self.get_ideal_timings())
|
||||
return result
|
||||
|
||||
class BaseMainWindow(QWidget):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
super().__init__()
|
||||
self._controller = controller
|
||||
...
|
||||
|
||||
@property
|
||||
def controller(self) -> BaseController:
|
||||
return self._controller
|
||||
|
||||
@controller.setter
|
||||
def controller(self, controller: BaseController) -> None:
|
||||
self._controller = controller
|
||||
|
||||
def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
|
||||
object.setStyleSheet("""
|
||||
QWidget {
|
||||
background-color: #0D1117;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
font-size: 14px;
|
||||
}
|
||||
QMessageBox {
|
||||
background-color: #161B22;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
font-size: 14px;
|
||||
}
|
||||
QPushButton {
|
||||
background-color: #FFCC00;
|
||||
color: #0D1117;
|
||||
padding: 12px 25px;
|
||||
border: 2px solid #E6B800;
|
||||
border-radius: 8px;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
}
|
||||
QPushButton:hover:!disabled {
|
||||
background-color: #FFD700;
|
||||
}
|
||||
QPushButton:disabled {
|
||||
background-color: #555555;
|
||||
color: #cccccc;
|
||||
border: none;
|
||||
}
|
||||
QLabel {
|
||||
color: #ffffff;
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}
|
||||
""")
|
||||
Loading…
Reference in New Issue
Block a user