chore: из класса plotterWidget вынесены не относящиеся к нему методы в BasePointPassportFormer
This commit is contained in:
parent
cee262939b
commit
a0d6cba386
Binary file not shown.
@ -5,8 +5,7 @@ import numpy as np
|
||||
from numpy import floating
|
||||
from typing import Optional, Any, NamedTuple
|
||||
|
||||
from src.utils.base.base import BasePlotWidget
|
||||
from src.utils.base.base import BaseIdealDataBuilder
|
||||
from src.utils.base.base import BasePlotWidget, BasePointPassportFormer, BaseIdealDataBuilder
|
||||
|
||||
|
||||
class idealDataBuilder(BaseIdealDataBuilder):
|
||||
@ -41,7 +40,7 @@ class ProcessStage(NamedTuple):
|
||||
finish_index: int
|
||||
|
||||
|
||||
class PlotWidget(BasePlotWidget):
|
||||
class PlotWidget(BasePlotWidget, BasePointPassportFormer):
|
||||
def _create_curve_ideal(self,
|
||||
stage: str,
|
||||
signal: str,
|
||||
@ -64,19 +63,6 @@ class PlotWidget(BasePlotWidget):
|
||||
region.setBrush(pg.mkBrush(self._stage_colors[stage]))
|
||||
return region
|
||||
return None
|
||||
|
||||
def _get_timestamp(self,
|
||||
stage: str,
|
||||
times: pd.Series,
|
||||
dataframe: pd.DataFrame) -> Optional[list[float]]:
|
||||
stage_diff = np.diff(dataframe[stage])
|
||||
start_index = np.where(stage_diff == 1)[0]
|
||||
finish_index = np.where(stage_diff == -1)[0]
|
||||
if start_index.size:
|
||||
start_timestamp = times[start_index[0]]
|
||||
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
|
||||
return start_timestamp, finish_timestamp
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
|
||||
@ -116,38 +102,32 @@ class PlotWidget(BasePlotWidget):
|
||||
plot_widget, legend = self._init_plot_widget(title=channel)
|
||||
|
||||
settings = description["Settings"]
|
||||
if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]):
|
||||
for stage in self._stages:
|
||||
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
|
||||
region = self._create_stage_region(stage, start_timestamp, finish_timestamp)
|
||||
if region:
|
||||
plot_widget.addItem(region)
|
||||
|
||||
for signal in description["Ideal_signals"]:
|
||||
ideal_plot = self._create_curve_ideal(stage, signal, start_timestamp, finish_timestamp)
|
||||
if ideal_plot:
|
||||
plot_widget.addItem(ideal_plot)
|
||||
if (settings["stages"] or settings["performance"]) and all([stage in dataframe_headers for stage in self._stages]):
|
||||
events = self._filter_events(time_axis, dataframe)
|
||||
point_quantity = len(events[self._clear_stage][0])
|
||||
if settings["stages"]:
|
||||
for stage in self._stages:
|
||||
start_t, end_t = events[stage]
|
||||
for i in range(len(start_t)):
|
||||
region = self._create_stage_region(stage, start_t[i], end_t[i])
|
||||
if region:
|
||||
plot_widget.addItem(region)
|
||||
for signal in description["Ideal_signals"]:
|
||||
ideal_plot = self._create_curve_ideal(stage, signal, start_t[i], end_t[i])
|
||||
if ideal_plot:
|
||||
plot_widget.addItem(ideal_plot)
|
||||
|
||||
end_timestamp = time_axis[len(time_axis) - 1]
|
||||
region = self._create_stage_region("Oncoming", finish_timestamp, end_timestamp)
|
||||
if region:
|
||||
plot_widget.addItem(region)
|
||||
|
||||
for signal in description["Ideal_signals"]:
|
||||
ideal_plot = self._create_curve_ideal("Oncoming", signal, finish_timestamp, end_timestamp)
|
||||
if ideal_plot:
|
||||
plot_widget.addItem(ideal_plot)
|
||||
|
||||
if settings["performance"] and all([stage in dataframe_headers for stage in self._stages]):
|
||||
delta_timestamp = 0
|
||||
for stage in self._stages:
|
||||
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
|
||||
delta_timestamp += finish_timestamp - start_timestamp
|
||||
|
||||
ideal_delta = self._opt.get_cycle_time()
|
||||
performance = round(ideal_delta/delta_timestamp*100, 2)
|
||||
performance_label = QLabel(f"Performance = {performance} %")
|
||||
layout.addWidget(performance_label)
|
||||
if settings["performance"]:
|
||||
ideal_delta = self._opt.get_cycle_time()
|
||||
delta = np.zeros(point_quantity)
|
||||
for stage in self._stages:
|
||||
try:
|
||||
start_stage, stop_stage = events[stage]
|
||||
delta += np.array(stop_stage)-np.array(start_stage)
|
||||
except: print("Signal ", stage, " is abnormal..." )
|
||||
performance_list = ideal_delta/delta*100
|
||||
performance_label = QLabel(f"Performance: best = {performance_list.max()} %, worse = {performance_list.min()} %, average = {performance_list.mean()}")
|
||||
layout.addWidget(performance_label)
|
||||
|
||||
if settings["zoom"]:
|
||||
if max(time_axis) < 5.0:
|
||||
@ -203,8 +183,8 @@ class PlotWidget(BasePlotWidget):
|
||||
widgets = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets)
|
||||
|
||||
def update_settings(self, data: list[dict]):
|
||||
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)
|
||||
def update_settings(self, params: list[dict]):
|
||||
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, params=params)
|
||||
|
||||
|
||||
|
||||
|
||||
Binary file not shown.
@ -8,6 +8,10 @@ from PyQt5.QtCore import QThread, QObject, QTimer
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||||
from src.OptAlgorithm import OptAlgorithm
|
||||
import pandas as pd
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
|
||||
|
||||
|
||||
class BaseMediator:
|
||||
@ -102,20 +106,14 @@ class BasePlotWidget:
|
||||
mediator: Optional[BaseMediator] = None):
|
||||
super().__init__()
|
||||
self._mediator = mediator
|
||||
|
||||
self._stages = [
|
||||
"Closing",
|
||||
"Squeeze",
|
||||
"Welding",
|
||||
"Relief"
|
||||
]
|
||||
|
||||
|
||||
self._stage_colors = {
|
||||
"Closing": [208, 28, 31, 100],
|
||||
"Squeeze": [45, 51, 89, 150],
|
||||
"Welding": [247, 183, 24, 100],
|
||||
"Relief": [0, 134, 88, 100],
|
||||
"Oncoming": [222, 184, 135, 100]
|
||||
"Oncomming": [222, 184, 135, 100]
|
||||
}
|
||||
self._plt_channels = {
|
||||
"Electrode Force, N & Welding Current, kA": {
|
||||
@ -200,18 +198,7 @@ class BasePlotWidget:
|
||||
]
|
||||
},
|
||||
}
|
||||
def _initIdealBuilder(self,
|
||||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||||
data: list[dict] = None):
|
||||
self.opt = idealDataBuilder(data)
|
||||
|
||||
self._stage_ideals = {
|
||||
"Closing": self._opt.get_closingDF(),
|
||||
"Squeeze": self._opt.get_compressionDF(),
|
||||
"Welding": self._opt.get_weldingDF(),
|
||||
"Relief": self._opt.get_openingDF(),
|
||||
"Oncoming": self._opt.get_oncomingDF()
|
||||
}
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
@ -243,10 +230,9 @@ class BaseController(QObject):
|
||||
...
|
||||
|
||||
|
||||
# FIXME: WeldingDF показывает только 1 секунду
|
||||
class BaseIdealDataBuilder(OptAlgorithm):
|
||||
def __init__(self, data: list[dict]):
|
||||
operator_params, system_params = data
|
||||
def __init__(self, params: list[dict]):
|
||||
operator_params, system_params = params
|
||||
self.mul = system_params['time_capture']
|
||||
self.welding_time = operator_params['time_wielding']
|
||||
super().__init__(operator_params, system_params)
|
||||
@ -337,4 +323,70 @@ class BaseMainWindow(QWidget):
|
||||
}
|
||||
""")
|
||||
|
||||
|
||||
class BasePointPassportFormer:
|
||||
|
||||
def __init__(self):
|
||||
self._clear_stage = "Welding"
|
||||
self._stages = [
|
||||
"Closing",
|
||||
"Squeeze",
|
||||
"Welding",
|
||||
"Relief",
|
||||
]
|
||||
|
||||
def _find_indexes(self,
|
||||
signal: str,
|
||||
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
|
||||
stage_diff = np.diff(dataframe[signal])
|
||||
start_idx = np.where(stage_diff == 1)
|
||||
finish_idx = np.where(stage_diff == -1)
|
||||
return start_idx[0], finish_idx[0]
|
||||
|
||||
def _find_events(self,
|
||||
signal: str,
|
||||
times:pd.Series,
|
||||
dataframe: pd.DataFrame) -> list[list[float]]:
|
||||
start_idx, finish_idx = self._find_indexes(signal, dataframe)
|
||||
start_list = times.loc[start_idx].tolist()
|
||||
end_list = times.loc[finish_idx].tolist()
|
||||
if len(start_list) - len(end_list) == 1:
|
||||
end_list.append(float(times[len(times)-1]))
|
||||
return start_list, end_list
|
||||
|
||||
def _filter_events(self,
|
||||
times: pd.Series,
|
||||
dataframe: pd.DataFrame) -> dict[list[float]]:
|
||||
events = {}
|
||||
if self._clear_stage in self._stages:
|
||||
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
|
||||
point_quantity = len(start_list)
|
||||
for stage in self._stages:
|
||||
start_list, end_list = self._find_events(stage, times, dataframe)
|
||||
events[stage] = [start_list[:point_quantity], end_list[:point_quantity]]
|
||||
return events
|
||||
|
||||
def _initIdealBuilder(self,
|
||||
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
|
||||
params: list[dict] = None):
|
||||
self.opt = idealDataBuilder(params)
|
||||
|
||||
self._stage_ideals = {
|
||||
"Closing": self._opt.get_closingDF(),
|
||||
"Squeeze": self._opt.get_compressionDF(),
|
||||
"Welding": self._opt.get_weldingDF(),
|
||||
"Relief": self._opt.get_openingDF(),
|
||||
"Oncomming": self._opt.get_oncomingDF()
|
||||
}
|
||||
|
||||
def _create_curve_ideal(self,
|
||||
stage: str):
|
||||
data = self._stage_ideals[stage]
|
||||
...
|
||||
|
||||
@property
|
||||
def opt(self) -> BaseIdealDataBuilder:
|
||||
return self._opt
|
||||
|
||||
@opt.setter
|
||||
def opt(self, opt: BaseIdealDataBuilder):
|
||||
self._opt = opt
|
||||
Loading…
Reference in New Issue
Block a user