Compare commits

...

30 Commits

Author SHA1 Message Date
Andrew
02268f1f7b style: изменил расположение слоев. поменял цвета 2024-12-17 15:27:03 +03:00
Andrew
45b28f2e15 fix: починил компенсацию gun flection 2024-12-16 16:49:01 +03:00
Andrew
d8f7b865b9 feature: добавлена кнопка для открытия любой папки для единовременной загрузки всех находящихся там файлов 2024-12-16 15:49:01 +03:00
Andrew
2294869130 dev: добавлена возможность учета компенсации прогибов в отображении позиции FE 2024-12-16 15:09:45 +03:00
Andrew
511325071b dev: добавил возможность отрисовывать заготовку как прямоугольник 2024-12-16 14:24:26 +03:00
Andrew
f8c07e18fc dev: добавил возможность отражения графика по оси абсцисс 2024-12-16 13:59:42 +03:00
16416d6011 fix: Исправлен некорректный подсчет точки смыкания 2024-12-16 14:05:02 +03:00
Andrew
647f137d33 fix: заменил библиотеку intervaltree на pandas 2024-12-16 10:26:13 +03:00
Andrew
2d98c1c5b7 style: увеличен шрифт производительности, изменены цвета областей графиков 2024-12-09 19:54:08 +03:00
Andrew
dcd04e21b6 fix: исправлен выбор смещения индекса при формировании временных границ точки 2024-12-09 17:48:33 +03:00
Andrew
4262450df1 fix: добавлена обработка исключения (трейс начинается не с oncoming) 2024-12-09 17:24:30 +03:00
Andrew
8968e3d57a chore: выполнил автоматизацию формирования паспорта точки и создания виджетов для плоттера 2024-12-09 16:05:34 +03:00
b9fe95a4aa style: Цвет "Oncoming" и исправление опечаток 2024-12-06 12:29:27 +03:00
9a8f8abbe4 docs: Отображение результативности 2024-12-06 11:12:08 +03:00
3954fe8ef9 fix: Загрузка времени обновления из настроек в мс 2024-12-06 11:11:47 +03:00
c6276f01c4 Merge remote-tracking branch 'origin/feature-performance-analysis' into feature-performance-analysis 2024-12-06 11:11:06 +03:00
3ce53cd79c fix: Параметры оператора под деталь Tesla 2024-12-06 11:11:00 +03:00
fd0a514282 fix: Исправлено ограничение по скорости в переходе Closing|Grow 2024-12-05 19:06:48 +03:00
68fc88a5d3 fix: Исправил обработку начала со статусом "Oncoming" 2024-12-05 15:32:09 +03:00
Andrew
45de3c76fc fix: Исправил загрузку настроек 2024-12-05 14:12:14 +03:00
Andrew
4a32be89be fix: Исправил "tesla summary time" 2024-12-05 14:10:59 +03:00
6a244afadd fix: Исправил обработку начала со статусом "Oncoming" 2024-12-05 13:27:54 +03:00
88003f4f02 fix: Исправил пакет src 2024-12-05 13:18:53 +03:00
Andrew
9f8e3d4710 dev: производительность теперь считается по циклу всей детали. изменены цвета 2024-12-05 12:02:07 +03:00
Andrew
d4f7d61607 dev: добавлены параметры теслы в operator_params 2024-12-05 11:19:55 +03:00
Andrew
507036e81c fix: добавлен сигнал "oncoming" в отображение и расчет производительности 2024-12-05 07:31:48 +03:00
Andrew
0a9b1c50b8 dev: добавлена возможность задавать параметры оператора для каждой точки 2024-12-04 20:01:30 +03:00
Andrew
a0d6cba386 chore: из класса plotterWidget вынесены не относящиеся к нему методы в BasePointPassportFormer 2024-12-03 17:21:22 +03:00
Andrew
cee262939b chore: логика поиска интервалов перенесена на более эффективную библиотеку intervaltree 2024-12-03 12:03:52 +03:00
Andrew
10e7cf2a37 dev: добавлен парсер r64 файлов 2024-12-02 18:41:02 +03:00
44 changed files with 12301 additions and 28767 deletions

4
.gitignore vendored
View File

@ -1,7 +1,9 @@
/__pycache__ /__pycache__
/tck_venv /tck_venv
/.venv /.venv
/.vscode
/.idea /.idea
/venv /venv
*.txt *.txt
*.svg *.svg
/test.py

View File

@ -1,16 +1,306 @@
{ {
"dist_open_start_1": 0.005, "distance_h_start_1": [
"dist_open_start_2": 0.005, 0.003,
"dist_open_after_1": 0.006, 0.003,
"dist_open_after_2": 0.006, 0.003,
"dist_open_end_1": 0.01, 0.003,
"dist_open_end_2": 0.05, 0.003,
"dist_close_end_1": 0.005, 0.003,
"dist_close_end_2": 0.005, 0.003,
"time_wielding": 1, 0.003,
"time_command": 0.0, 0.003,
"time_robot_movement": 0.2, 0.003,
"object_thickness": 0.0045, 0.003,
"force_target": 5000, 0.003,
"force_capture": 500 0.003,
0.003
],
"distance_h_start_2": [
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005
],
"distance_s_1": [
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001
],
"distance_s_2": [
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001,
0.001
],
"distance_l_1": [
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02,
0.02
],
"distance_l_2": [
0.0275,
0.03,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033
],
"distance_h_end1": [
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003,
0.003
],
"distance_h_end2": [
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005,
0.005
],
"time_wielding": [
1.332,
1.644,
1.644,
1.428,
1.284,
1.308,
1.272,
1.38,
1.416,
1.392,
1.38,
1.404,
1.452,
1.452
],
"time_command": [
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0
],
"time_robot_movement": [
0.314,
0.331,
0.356,
0.428,
0.418,
0.454,
0.458,
0.44,
0.49,
0.47,
0.44,
0.425,
0.464,
0.5
],
"object_thickness": [
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045,
0.0045
],
"force_target": [
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0,
5000.0
],
"force_capture": [
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0,
500.0
],
"Tesla closing": [
0.216,
0.228,
0.252,
0.216,
0.228,
0.216,
0.228,
0.228,
0.228,
0.216,
0.228,
0.216,
0.216,
0.216
],
"Tesla squeeze": [
0.276,
0.288,
0.264,
0.264,
0.276,
0.276,
0.312,
0.276,
0.24,
0.24,
0.24,
0.24,
0.24,
0.24
],
"Tesla welding": [
1.332,
1.644,
1.644,
1.428,
1.284,
1.308,
1.272,
1.38,
1.416,
1.392,
1.38,
1.404,
1.452,
1.452
],
"Tesla oncomming_relief": [
0.528,
0.528,
0.54,
0.636,
0.504,
0.468,
0.492,
0.54,
0.563,
0.588,
0.541,
0.564,
0.576,
0.507
],
"Tesla summary time": [
2.34,
2.652,
2.796,
2.4,
2.208,
2.34,
2.256,
2.544,
2.405,
2.405,
2.358,
2.37,
2.442,
1.908
]
} }

View File

@ -1,22 +1,62 @@
{ {
"trace_storage_path": "D:/downloads/a22", "trace_storage_path": [
"monitor_update_period": 100, "D:/downloads/a22"
"a_max_1": 5.41, ],
"v_max_1": 0.278, "monitor_update_period": [
"a_max_2": 35.81, 1000.0
"v_max_2": 0.7, ],
"mass_1": 270, "a_max_1": [
"mass_2": 1, 7.96
"k_hardness_1": 2148570, ],
"k_hardness_2": 0, "v_max_1": [
"torque_max_1": 20, 0.499
"torque_max_2": 0, ],
"transmission_ratio_1": 0.00125, "a_max_2": [
"transmission_ratio_2": 1, 35.81
"position_start_1": 0.08, ],
"position_start_2": 0.08, "v_max_2": [
"k_prop": 0.05, 0.75
"time_capture": 100000, ],
"UML_time_scaler": 1000 "mass_1": [
270.0
],
"mass_2": [
1.0
],
"k_hardness_1": [
2148570.0
],
"k_hardness_2": [
0.0
],
"torque_max_1": [
13.8
],
"torque_max_2": [
0.0
],
"transmission_ratio_1": [
0.00125
],
"transmission_ratio_2": [
1.0
],
"contact_distance_1": [
0.02
],
"contact_distance_2": [
0.081
],
"k_prop": [
0.075
],
"time_capture": [
1000.0
],
"UML_time_scaler": [
1000.0
],
"Range ME, mm": [
115.0
]
} }

BIN
profile_results.prof Normal file

Binary file not shown.

View File

@ -1,4 +1,4 @@
from src.OptAlgorithm.AutoConfigClass import AutoConfigClass from OptAlgorithm.AutoConfigClass import AutoConfigClass
from numpy import sqrt from numpy import sqrt
class ConstantCalculator(AutoConfigClass): class ConstantCalculator(AutoConfigClass):
@ -14,8 +14,6 @@ class ConstantCalculator(AutoConfigClass):
def calc(self): def calc(self):
constants = {} constants = {}
#self.smin1t = self.smin1 - self.dblock / 2
#self.smin2t = self.smin2 - self.dblock / 2
constants["Fprop"] = self.k_prop * self.force_target constants["Fprop"] = self.k_prop * self.force_target
constants["freq"] = sqrt(self.k_hardness_1 / self.mass_1) constants["freq"] = sqrt(self.k_hardness_1 / self.mass_1)
constants["eff_control"] = self.torque_max_1 / self.transmission_ratio_1 constants["eff_control"] = self.torque_max_1 / self.transmission_ratio_1

View File

@ -1,7 +1,7 @@
from src.OptAlgorithm.PhaseCalc import PhaseCalc from OptAlgorithm.PhaseCalc import PhaseCalc
from src.OptAlgorithm.OptTimeCalculator import OptTimeCalculator from OptAlgorithm.OptTimeCalculator import OptTimeCalculator
from src.OptAlgorithm.AutoConfigClass import AutoConfigClass from OptAlgorithm.AutoConfigClass import AutoConfigClass
from src.OptAlgorithm.ConstantCalculator import ConstantCalculator from OptAlgorithm.ConstantCalculator import ConstantCalculator
from numpy import cos, sin, sqrt, cbrt, arcsin from numpy import cos, sin, sqrt, cbrt, arcsin
@ -16,16 +16,18 @@ class OptAlgorithm(AutoConfigClass):
calc = OptTimeCalculator(operator_config, system_config) calc = OptTimeCalculator(operator_config, system_config)
self.Ts = calc.T(self.dist_open_start_1, self.Ts = calc.T(self.distance_h_start_1,
self.dist_open_start_2, self.distance_h_start_2,
self.dist_open_after_1, self.distance_s_1,
self.dist_open_after_2, self.distance_s_2,
self.dist_open_end_1, self.distance_l_1,
self.dist_open_end_2) self.distance_l_2)
self.x1_start = self.contact_distance_1 - self.distance_h_start_1
self.x1Contact = self.dist_open_start_1 + self.position_start_1 self.x2_start = self.contact_distance_2 - self.distance_h_start_2
self.x2Contact = self.dist_open_start_2 + self.position_start_2
self.x1_contact = self.contact_distance_1
self.x2_contact = self.contact_distance_2
self.pos0s, self.movementV0s = calc.Tmovement(self.getSpecific, self.getMarkOpen()) self.pos0s, self.movementV0s = calc.Tmovement(self.getSpecific, self.getMarkOpen())
@ -46,8 +48,7 @@ class OptAlgorithm(AutoConfigClass):
t2 = max(t - self.Ts["tclose_1_acc"], 0) t2 = max(t - self.Ts["tclose_1_acc"], 0)
x1 = self.a_max_1 * self.Ts["tclose_1_acc"] * t2 x1 = self.a_max_1 * self.Ts["tclose_1_acc"] * t2
return x0 + x1 + self.x1_start
return x0 + x1 + self.position_start_1
def V2Close(self, t: float): def V2Close(self, t: float):
if t < self.Ts["tclose_2_acc"]: if t < self.Ts["tclose_2_acc"]:
@ -68,7 +69,7 @@ class OptAlgorithm(AutoConfigClass):
t3 = max(min(t - self.Ts["tclose_2_speed"]- self.Ts["tclose_2_acc"], self.Ts["tclose_2_acc"]), 0) t3 = max(min(t - self.Ts["tclose_2_speed"]- self.Ts["tclose_2_acc"], self.Ts["tclose_2_acc"]), 0)
x2 = self.a_max_2 * self.Ts["tclose_2_acc"] * t3 - self.a_max_2 * t3 * t3 / 2 x2 = self.a_max_2 * self.Ts["tclose_2_acc"] * t3 - self.a_max_2 * t3 * t3 / 2
return x0 + x1 + x2 + self.position_start_2 return x0 + x1 + x2 + self.x2_start
def FClose(self, t: float): def FClose(self, t: float):
return 0 return 0
@ -95,7 +96,7 @@ class OptAlgorithm(AutoConfigClass):
def X1Grow(self, t: float): def X1Grow(self, t: float):
F = self.FGrow(t) F = self.FGrow(t)
x = F / self.k_hardness_1 x = F / self.k_hardness_1
return x + self.x1Contact return x + self.x1_contact
def V2Grow(self, t: float): def V2Grow(self, t: float):
""" """
@ -109,7 +110,7 @@ class OptAlgorithm(AutoConfigClass):
Считается, что верхний электрод не влияет на набор усилия, Считается, что верхний электрод не влияет на набор усилия,
функция не реализована!, возвращает 0. Устанавливайте kturn = 0 функция не реализована!, возвращает 0. Устанавливайте kturn = 0
""" """
return self.x2Contact return self.x2_contact
def FGrow(self, t: float): def FGrow(self, t: float):
v0 = self.a_max_1 * self.Ts["tclose_1_acc"] v0 = self.a_max_1 * self.Ts["tclose_1_acc"]
@ -161,7 +162,7 @@ class OptAlgorithm(AutoConfigClass):
t3 = max(min(t - self.Ts["topen_1_speed"]- self.Ts["topen_1_acc"], self.Ts["topen_1_acc"]), 0) t3 = max(min(t - self.Ts["topen_1_speed"]- self.Ts["topen_1_acc"], self.Ts["topen_1_acc"]), 0)
x2 = -self.a_max_1 * self.Ts["topen_1_acc"] * t3 + self.a_max_1 * t3 * t3 / 2 x2 = -self.a_max_1 * self.Ts["topen_1_acc"] * t3 + self.a_max_1 * t3 * t3 / 2
return xm + x0 + x1 + x2 + self.x1Contact return xm + x0 + x1 + x2 + self.x1_contact
def V2Open(self, t: float): def V2Open(self, t: float):
@ -187,18 +188,18 @@ class OptAlgorithm(AutoConfigClass):
t3 = max(min(t - self.Ts["topen_2_speed"]- self.Ts["topen_2_acc"], self.Ts["topen_2_acc"]), 0) t3 = max(min(t - self.Ts["topen_2_speed"]- self.Ts["topen_2_acc"], self.Ts["topen_2_acc"]), 0)
x2 = -self.a_max_2 * self.Ts["topen_2_acc"] * t3 + self.a_max_2 * t3 * t3 / 2 x2 = -self.a_max_2 * self.Ts["topen_2_acc"] * t3 + self.a_max_2 * t3 * t3 / 2
return x0 + x1 + x2 + self.x2Contact return x0 + x1 + x2 + self.x2_contact
def FOpen(self, t: float): def FOpen(self, t: float):
x1 = self.X1Open(t) x1 = self.X1Open(t)
x2 = self.X2Open(t) x2 = self.X2Open(t)
F = self.k_hardness_1 * max(0, (x1 - self.x1Contact)) F = self.k_hardness_1 * max(0, (x1 - self.x1_contact))
return F return F
def FMovement(self, t: float): def FMovement(self, t: float):
x1 = self.X1Movement(t) x1 = self.X1Movement(t)
x2 = self.X2Movement(t) x2 = self.X2Movement(t)
F = self.k_hardness_1 * max(0, (x1 - self.x1Contact)) F = self.k_hardness_1 * max(0, (x1 - self.x1_contact))
return F return F
def X1Movement(self, t: float): def X1Movement(self, t: float):

View File

@ -1,7 +1,7 @@
from numpy import sqrt, arcsin, arccos, cos, sin from numpy import sqrt, arcsin, arccos, cos, sin
from src.OptAlgorithm.AutoConfigClass import AutoConfigClass from OptAlgorithm.AutoConfigClass import AutoConfigClass
from src.OptAlgorithm.ConstantCalculator import ConstantCalculator from OptAlgorithm.ConstantCalculator import ConstantCalculator
class OptTimeCalculator(AutoConfigClass): class OptTimeCalculator(AutoConfigClass):
@ -19,7 +19,7 @@ class OptTimeCalculator(AutoConfigClass):
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1) v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow) v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
t1 = v0 / self.a_max_1 t1 = v0 / self.a_max_1
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0q) t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0)
T1 = t1 + t2t T1 = t1 + t2t
t21 = sqrt(h2/self.a_max_2) t21 = sqrt(h2/self.a_max_2)
@ -67,10 +67,10 @@ class OptTimeCalculator(AutoConfigClass):
if s1 >= l1: if s1 >= l1:
raise Exception("""S1 >= L1 - недопустимый сценарий, raise Exception("""S1 >= L1 - недопустимый сценарий,
проверьте dist_open_after_1, dist_close_end_1""") проверьте distance_s_1, distance_h_end1""")
if s2 >= l2: if s2 >= l2:
raise Exception("""S2 >= L2 - недопустимый сценарий, raise Exception("""S2 >= L2 - недопустимый сценарий,
проверьте dist_open_after_2, dist_close_end_2""") проверьте distance_s_2, distance_h_end2""")
s1 += Fs1 s1 += Fs1
topen_1_mark = sqrt(2 * s1 / self.a_max_1) topen_1_mark = sqrt(2 * s1 / self.a_max_1)
@ -134,23 +134,23 @@ class OptTimeCalculator(AutoConfigClass):
return self.allTimes return self.allTimes
def Tmovement(self, closeAlgo, tmark) -> None: def Tmovement(self, closeAlgo, tmark) -> None:
contact = [self.dist_open_start_1 + self.position_start_1, self.dist_open_start_2 + self.position_start_2] contact = [self.contact_distance_1, self.contact_distance_2]
v0s = [] v0s = []
pos0s = [] pos0s = []
for i in range(1,3): for i in range(1,3):
if tmark < 0: if tmark < 0:
raise Exception("""Отрицательное время этапа раскрытия, raise Exception("""Отрицательное время этапа раскрытия,
проверьте dist_open_after_{1,2}, time_command""") проверьте distance_s_{1,2}, time_command""")
v0 = closeAlgo("V"+str(i), "Open", tmark) v0 = closeAlgo("V"+str(i), "Open", tmark)
v0s.append(v0) v0s.append(v0)
x0 = closeAlgo("X"+str(i), "Open", tmark) x0 = closeAlgo("X"+str(i), "Open", tmark)
x1 = contact[i-1] - self.__dict__["dist_close_end_"+str(i)] x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)]
x = x1 - x0 x = x1 - x0
pos0s.append(closeAlgo("X"+str(i), "Open", tmark)) pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
Tfull = self.time_robot_movement Tfull = self.time_robot_movement
L = self.__dict__["dist_open_end_"+str(i)] L = self.__dict__["distance_l_"+str(i)]
maxL = contact[i-1] - L - x0 maxL = contact[i-1] - L - x0
self.Tmovementi(i, x, Tfull, v0, maxL) self.Tmovementi(i, x, Tfull, v0, maxL)
return pos0s, v0s return pos0s, v0s
@ -164,7 +164,7 @@ class OptTimeCalculator(AutoConfigClass):
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2) sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
if sqrtval < 0: if sqrtval < 0:
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время, raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
проверьте dist_open_after_{i}, dist_close_end_{i}, time_command, time_robot_movement""") проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2) t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
t1 = min(t1max, (vmax- abs(v0))/a) t1 = min(t1max, (vmax- abs(v0))/a)
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a))) t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
@ -204,27 +204,29 @@ class OptTimeCalculator(AutoConfigClass):
self.allTimes["tmovement"] = T self.allTimes["tmovement"] = T
def calcFirstClose(self, T : float, s : float) -> tuple[float, float]: def calcFirstClose(self, T : float, s : float) -> tuple[float, float]:
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1)) t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1))
t1 = min(t1, self.v_max_1 / self.a_max_1) t1 = min(t1, v0 / self.a_max_1)
t2 = sqrt(max(0, T**2 - 2 * s / self.a_max_1)) t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
return t1, t2 return t1, t2
def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]: def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2
t1 = min(t1, self.v_max_1 / self.a_max_1) t1 = min(t1, self.v_max_1 / self.a_max_1)
t2 = sqrt(max(0, T * T - 4 * s / self.a_max_1)) t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
return t1, t2 return t1, t2
def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]: def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2) t1 = min(t1, self.v_max_2 / self.a_max_2)
t2 = sqrt(max(0, T * T - 4 * s / self.a_max_2)) t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
return t1, t2 return t1, t2
def calcSecondClose(self, T : float, s : float) -> tuple[float, float]: def calcSecondClose(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2) t1 = min(t1, self.v_max_2 / self.a_max_2)
t2 = sqrt(max(0, T * T - 4 * s / self.a_max_2)) t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
return t1, t2 return t1, t2
def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float: def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float:

Binary file not shown.

View File

@ -1,16 +1,20 @@
from PyQt5.QtWidgets import QWidget from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import pyqtSignal from PyQt5.QtCore import pyqtSignal
from src.utils.base.base import BaseController from utils.base.base import BaseController
class Controller(BaseController): class Controller(BaseController):
signal_widgets = pyqtSignal(list) signal_widgets = pyqtSignal(list)
signal_settings = pyqtSignal(list) signal_settings = pyqtSignal(list)
signal_monitor = pyqtSignal(str)
def send_widgets(self, widgets: list[QWidget]) -> None: def send_widgets(self, widgets: list[QWidget]) -> None:
self.signal_widgets.emit(widgets) self.signal_widgets.emit(widgets)
def push_settings(self, settings: list[dict]) -> None: def push_settings(self, settings: list[dict]) -> None:
self.signal_settings.emit(settings) self.signal_settings.emit(settings)
def open_custom_file (self, filepath: str) -> None:
self.signal_monitor.emit(filepath)

View File

@ -3,7 +3,7 @@ import pandas as pd
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить. #FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
pd.set_option('future.no_silent_downcasting', True) pd.set_option('future.no_silent_downcasting', True)
from src.utils.base.base import BaseDataConverter from utils.base.base import BaseDataConverter
class DataConverter(BaseDataConverter): class DataConverter(BaseDataConverter):

View File

@ -3,18 +3,22 @@ import pandas as pd
from typing import Union from typing import Union
from PyQt5.QtWidgets import QWidget from PyQt5.QtWidgets import QWidget
from src.utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer
class Mediator(BaseMediator): class Mediator(BaseMediator):
def notify(self, def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget], source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[QWidget]]): data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
if issubclass(source.__class__, BaseDirectoryMonitor): if issubclass(source.__class__, BaseDirectoryMonitor):
self._converter.convert_data(data) self._converter.convert_data(data)
if issubclass(source.__class__, BaseDataConverter): if issubclass(source.__class__, BaseDataConverter):
self._passportFormer.form_passports(data)
if issubclass(source.__class__, BasePointPassportFormer):
self._plot.build(data) self._plot.build(data)
if issubclass(source.__class__, BasePlotWidget): if issubclass(source.__class__, BasePlotWidget):
@ -22,7 +26,7 @@ class Mediator(BaseMediator):
def push_settings(self, settings: list[dict]): def push_settings(self, settings: list[dict]):
self._monitor.update_settings(settings) self._monitor.update_settings(settings)
self._plot.update_settings(settings) self._passportFormer.update_settings(settings)
self._monitor.force_all_dir() self._monitor.force_all_dir()

View File

@ -3,7 +3,7 @@ import os
from loguru import logger from loguru import logger
from src.utils.base.base import BaseDirectoryMonitor from utils.base.base import BaseDirectoryMonitor
class DirectoryMonitor(BaseDirectoryMonitor): class DirectoryMonitor(BaseDirectoryMonitor):
@ -29,8 +29,8 @@ class DirectoryMonitor(BaseDirectoryMonitor):
def update_settings(self, data: list[dict]) -> None: def update_settings(self, data: list[dict]) -> None:
self.stop() self.stop()
operator_params, system_params = data operator_params, system_params = data
self._directory_path = system_params['trace_storage_path'] self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'] self._update_time = system_params['monitor_update_period'][0]
self._init_state() self._init_state()
self.start() self.start()
@ -47,5 +47,9 @@ class DirectoryMonitor(BaseDirectoryMonitor):
else: else:
logger.info(f"Failed pushing {str(all_files)}") logger.info(f"Failed pushing {str(all_files)}")
def custom_dir_extract(self, dict_path:str ):
files = os.listdir(dict_path)
if files is not None:
all_files = [os.path.join(dict_path, file) for file in files]
self._mediator.notify(self, all_files)

View File

@ -0,0 +1,84 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd
class idealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
def get_compressionDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
def get_openingDF(self) -> pd.DataFrame:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
def get_oncomingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
def get_weldingDF(self) -> pd.DataFrame:
data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]:
data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
return ideal_timings
class PassportFormer(BasePointPassportFormer):
def form_passports(self, data: list[pd.DataFrame]) -> list[list[pd.DataFrame, dict, int]]:
return_data = [self._build_passports_pocket(dataframe) for dataframe in data]
self._mediator.notify(self, return_data)
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
system_settings = {key: value[0] for key, value in self._params[1].items()}
tesla_time = sum(self._params[0].get("Tesla summary time", []))
useful_data = {"tesla_time": tesla_time,
"range_ME": system_settings["Range ME, mm"],
"k_hardness": system_settings["k_hardness_1"]
}
points_pocket = []
time_is_valid = not dataframe["time"].isna().all()
if time_is_valid:
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
for i in range(point_quantity):
operator_settings = {
key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
}
params_list = [operator_settings, system_settings]
cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
idx = i+1 if idx_shift else i
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
useful_p_data = {"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
return dataframe, points_pocket, useful_data
def update_settings(self, params: list[dict, dict]):
self._params = params

View File

@ -3,8 +3,8 @@ from typing import Optional
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
from src.utils.base.base import BaseMainWindow, BaseController from utils.base.base import BaseMainWindow, BaseController
from src.gui.settings_window import settingsWindow from gui.settings_window import settingsWindow
class MainWindow(BaseMainWindow): class MainWindow(BaseMainWindow):
def __init__(self, def __init__(self,
@ -14,8 +14,9 @@ class MainWindow(BaseMainWindow):
self.initUI() self.initUI()
self.set_style(self) self.set_style(self)
self.settings_button.clicked.connect(self._show_settings) self.settings_button.clicked.connect(self._show_settings)
self.operSettings = settingsWindow("params\operator_params.json", 'Operator', self._updater_trigger) self.select_dir_button.clicked.connect(self._select_dir)
self.sysSettings = settingsWindow("params\system_params.json", 'System', self._updater_trigger) self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
def initUI(self) -> None: def initUI(self) -> None:
self.tabWidget = QtWidgets.QTabWidget() self.tabWidget = QtWidgets.QTabWidget()
@ -23,9 +24,17 @@ class MainWindow(BaseMainWindow):
self.tabWidget.tabCloseRequested.connect(self._close_tab) self.tabWidget.tabCloseRequested.connect(self._close_tab)
layout = QtWidgets.QVBoxLayout() layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.tabWidget) layout.addWidget(self.tabWidget)
self.settings_button = QtWidgets.QPushButton("Show settings") self.settings_button = QtWidgets.QPushButton("Show settings")
self.settings_button.setFixedWidth(160) self.settings_button.setFixedWidth(160)
layout.addWidget(self.settings_button) self.select_dir_button = QtWidgets.QPushButton("Open directory")
self.select_dir_button.setFixedWidth(175)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(self.settings_button)
button_layout.addWidget(self.select_dir_button)
layout.addLayout(button_layout)
self.setLayout(layout) self.setLayout(layout)
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None: def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
@ -45,7 +54,7 @@ class MainWindow(BaseMainWindow):
def keyPressEvent(self, a0): def keyPressEvent(self, a0):
if a0.key() == Qt.Key_F5: if a0.key() == Qt.Key_F5:
self.clear() pass
def _show_settings(self): def _show_settings(self):
self.operSettings.show() self.operSettings.show()
@ -63,6 +72,10 @@ class MainWindow(BaseMainWindow):
def _close_tab(self, index:int) -> None: def _close_tab(self, index:int) -> None:
self.tabWidget.removeTab(index) self.tabWidget.removeTab(index)
def _select_dir(self):
folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "")
if folder_path:
self._controller.open_custom_file(folder_path)

View File

@ -1,210 +1,351 @@
import pandas as pd import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
import copy
import pyqtgraph as pg import pyqtgraph as pg
import numpy as np from typing import Optional, Any
from numpy import floating
from typing import Optional, Any, NamedTuple
from src.utils.base.base import BasePlotWidget from utils.base.base import BasePlotWidget
from src.utils.base.base import BaseIdealDataBuilder
class idealDataBuilder(BaseIdealDataBuilder):
def get_closingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tclose'], self.calcPhaseClose)
def get_compressionDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tgrow'], self.calcPhaseGrow)
def get_openingDF(self) -> pd.DataFrame:
return self._get_data(self.getMarkOpen(), self.calcPhaseOpen)
def get_oncomingDF(self) -> pd.DataFrame:
return self._get_data(self.Ts['tmovement'], self.calcPhaseMovement)
def get_weldingDF(self) -> pd.DataFrame:
data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
data.append({"time":0, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Posicion FE":X1,"Posicion ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float, float, float, float]:
data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen()] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
return ideal_timings
class ProcessStage(NamedTuple):
mean_value: floating[Any]
start_index: int
finish_index: int
class ProcessStage():
mean_value:int
start_index:int
finish_index:int
class PlotWidget(BasePlotWidget): class PlotWidget(BasePlotWidget):
def _create_curve_ideal(self,
stage: str,
signal: str,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
data = self._stage_ideals[stage]
if start_timestamp and finish_timestamp: def _create_navigator(self,
plot = pg.PlotDataItem(x=start_timestamp+data["time"], y=data[signal["name"]], pen=signal["pen"]) time_region:tuple[float, float],
return plot main_plot: pg.PlotWidget,
return None dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
ideal_data: pd.DataFrame,
start_timestamp: float,
finish_timestamp: float) -> Optional[pg.PlotDataItem]:
"""
Создаёт идеальную кривую для сигнала, если заданы корректные временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
return pg.PlotDataItem(
x=start_timestamp + ideal_data["time"],
y=ideal_data[signal["name"]],
pen=signal["pen"]
)
return None
def _create_stage_region(self, def _create_stage_region(self,
stage: str, stage: str,
start_timestamp: float, start_timestamp: float,
finish_timestamp: float) -> Optional[pg.LinearRegionItem]: finish_timestamp: float,
transparency: int) -> Optional[pg.LinearRegionItem]:
if start_timestamp and finish_timestamp: """
Создает регион для определённого этапа, если заданы временные рамки.
"""
if start_timestamp is not None and finish_timestamp is not None:
region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False) region = pg.LinearRegionItem([start_timestamp, finish_timestamp], movable=False)
region.setBrush(pg.mkBrush(self._stage_colors[stage])) color = self._stage_colors.get(stage, [100, 100, 100, 100])
region.setBrush(pg.mkBrush(color[:3] + [transparency]))
return region return region
return None return None
def _get_timestamp(self,
stage: str,
times: pd.Series,
dataframe: pd.DataFrame) -> Optional[list[float]]:
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
if start_index.size:
start_timestamp = times[start_index[0]]
finish_timestamp = times[finish_index[0]] if finish_index.size else times[len(times) - 1]
return start_timestamp, finish_timestamp
return None
@staticmethod @staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]: def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title) plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True) plot_widget.showGrid(x=True, y=True)
plot_widget.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20)) legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem()) legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend return plot_widget, legend
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
"""
stages = point_events.keys()
if all(stage in dataframe_headers for stage in stages):
for stage in stages:
start_t, end_t = point_events[stage]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
region.setZValue(-20)
plot_widget.addItem(region)
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
"""
ideal_timings = ideal_data["Ideal timings"]
stages = list(point_events.keys())
for i, stage in enumerate(stages):
start_t = point_events[stage][0]
end_t = start_t + ideal_timings[i]
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
region.setZValue(-10)
plot_widget.addItem(region)
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
for stage in point_events.keys():
for signal in ideal_signals:
curve = self._create_curve_ideal(
signal,
ideal_data[stage],
point_events[stage][0],
point_events[stage][1]
)
if curve:
curve.setZValue(10)
plot_widget.addItem(curve)
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
plot.setZValue(0)
legend.addItem(plot, signal["name"])
def _add_performance_label(self,
layout: QVBoxLayout,
TWC_time: float,
ideal_time: float,
tesla_time: float) -> None:
"""
Добавляет QLabel с информацией о производительности.
"""
tesla_TWC = round((1 - TWC_time/tesla_time)*100, 2) if tesla_time else 0.0
tesla_ideal = round((1 - ideal_time/tesla_time)*100, 2) if tesla_time else 0.0
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
)
self.set_style(performance_label)
layout.addWidget(performance_label)
performance_label.update()
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def get_stage_info(self,
stage: str,
dataframe: pd.DataFrame,
signal_name: str) -> Optional[ProcessStage]:
if stage in self._stages:
stage_diff = np.diff(dataframe[stage])
start_index = np.where(stage_diff == 1)[0]
finish_index = np.where(stage_diff == -1)[0]
data = dataframe[signal_name] if signal_name in dataframe.columns.tolist() else [] def _build_widget(self, data: list[Any]) -> QWidget:
"""
if data.size and start_index.size: Собирает графический виджет для одного набора данных.
start = start_index[0] Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
finish = finish_index[0] if finish_index.size else (len(data) - 1) """
data_slice = data[start:finish]
mean = np.mean(data_slice)
return ProcessStage(mean_value=mean, start_index=int(start), finish_index=int(finish))
return None
def _build_widget(self, dataframe: pd.DataFrame) -> QWidget:
widget = QWidget() widget = QWidget()
layout = QVBoxLayout() layout = QVBoxLayout(widget)
time_axis = dataframe["time"] dataframe, points_pocket, useful_data = data
tesla_time = useful_data["tesla_time"]
range_ME = useful_data["range_ME"]
k_hardness = useful_data["k_hardness"]
dataframe_headers = dataframe.columns.tolist() dataframe_headers = dataframe.columns.tolist()
for channel, description in self._plt_channels.items(): for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel) plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"] settings = description["Settings"]
if settings["stages"] and all([stage in dataframe_headers for stage in self._stages]):
for stage in self._stages:
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
region = self._create_stage_region(stage, start_timestamp, finish_timestamp)
if region:
plot_widget.addItem(region)
for signal in description["Ideal_signals"]:
ideal_plot = self._create_curve_ideal(stage, signal, start_timestamp, finish_timestamp)
if ideal_plot:
plot_widget.addItem(ideal_plot)
end_timestamp = time_axis[len(time_axis) - 1] TWC_time = 0.0
region = self._create_stage_region("Oncoming", finish_timestamp, end_timestamp) ideal_time = 0.0
if region: worst_perf = 2
plot_widget.addItem(region)
# TODO: рассчитать корректный параметр range
if settings["mirror ME"]:
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
for signal in description["Ideal_signals"]: # Итерация по точкам
ideal_plot = self._create_curve_ideal("Oncoming", signal, finish_timestamp, end_timestamp) for cur_point, point_data in enumerate(points_pocket):
if ideal_plot: # point_data структура: [point_timeframe, ideal_data, point_events]
plot_widget.addItem(ideal_plot) point_timeframe, ideal_dat, point_events, useful_p_data = point_data
ideal_data = copy.deepcopy(ideal_dat)
# TODO: проверить корректность расчетов
if settings["force compensation FE"]:
force = useful_p_data["force"]
F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
if settings["performance"] and all([stage in dataframe_headers for stage in self._stages]): dataframe.loc[point_idxs] = self._shift_data("FE", description["Real_signals"], dataframe.loc[point_idxs], F_comp)
delta_timestamp = 0
for stage in self._stages:
start_timestamp, finish_timestamp = self._get_timestamp(stage, time_axis, dataframe)
delta_timestamp += finish_timestamp - start_timestamp
ideal_delta = self._opt.get_cycle_time() # Модифицируем данные для отображения гарфика
performance = round(ideal_delta/delta_timestamp*100, 2) if settings["ideals"] and settings["mirror ME"]:
performance_label = QLabel(f"Performance = {performance} %") for stage in point_events.keys():
layout.addWidget(performance_label) ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
if settings["zoom"]: # Добавляем реальные стадии
if max(time_axis) < 5.0: if settings["stages"]:
stages = [self.get_stage_info("Welding", self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
dataframe,
signal["name"]) for signal in description["Real_signals"]]
if stages:
means_raw = [stage.mean_value for stage in stages]
mean = max(means_raw)
start = time_axis[stages[0].start_index]
finish = time_axis[stages[0].finish_index]
overshoot = pg.BarGraphItem(x0=0, # TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
y0=mean - mean * 0.05, if settings["workpiece"]:
height=mean * 0.05 * 2, x1 = point_timeframe[0]
width=start, dx = point_timeframe[1] - x1
brush=pg.mkBrush([0, 250, 0, 100])) y1 = useful_p_data["L2"]*1000
plot_widget.addItem(overshoot) dy = useful_p_data["thickness"]*1000
stable = pg.BarGraphItem(x0=start, rect_item = QGraphicsRectItem(x1, y1, dx, dy)
y0=mean - mean * 0.015, rect_item.setZValue(-5)
height=mean * 0.015 * 2, rect_item.setBrush(pg.mkBrush('grey'))
width=finish - start, rect_item.setPen(pg.mkPen('black', width=3))
brush=pg.mkBrush([0, 250, 0, 100])) plot_widget.addItem(rect_item)
plot_widget.addItem(stable)
plot_widget.setYRange(mean - 260, mean + 260) # Добавляем идеальные стадии и идеальные сигналы
plot_widget.setInteractive(False) if settings["ideals"]:
else: self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
max_value = min([max(dataframe[signal["name"]]) for signal in description["Real_signals"]]) self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
region = pg.LinearRegionItem([max_value - max_value * 0.015,
max_value + max_value * 0.015],
movable=False,
orientation="horizontal")
region.setBrush(pg.mkBrush([0, 250, 0, 100])) # Подсчёт производительности
plot_widget.setYRange(max_value - 200, max_value + 200) if settings["performance"]:
plot_widget.setXRange(3.5, 4.5) is_last_point = (cur_point == len(points_pocket) - 1)
plot_widget.addItem(region) if is_last_point:
plot_widget.setInteractive(False) TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
ideal_delta = sum(ideal_data["Ideal timings"][0:3])
else:
TWC_delta = point_timeframe[1] - point_timeframe[0]
ideal_delta = ideal_data["Ideal cycle"]
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
for signal in description["Real_signals"]: # Добавляем реальные сигналы
if signal["name"] in dataframe_headers: self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
plot = plot_widget.plot(time_axis, dataframe[signal["name"]], pen=signal["pen"]) if widget_num == 0:
legend.addItem(plot, signal["name"]) main_plot = plot_widget
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
layout.addWidget(plot_widget) layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout) widget.setLayout(layout)
return widget return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[pd.DataFrame]) -> None: def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
...
]
"""
widgets = [self._build_widget(data_sample) for data_sample in data] widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets) self._mediator.notify(self, widgets)
def update_settings(self, data: list[dict]):
self._initIdealBuilder(idealDataBuilder=idealDataBuilder, data=data)

View File

@ -1,57 +1,186 @@
import pyqtgraph as pg from typing import Callable, Optional, Any
from pyqtgraph.Qt import QtWidgets from PyQt5.QtWidgets import (
from pyqtgraph.parametertree import Parameter, ParameterTree QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
)
from PyQt5.QtGui import QIntValidator
from src.utils.json_tools import read_json, write_json from utils.json_tools import read_json, write_json
from src.gui import qt_settings as qts from gui import qt_settings as qts
class settingsWindow(QtWidgets.QWidget): class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func): def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
super(settingsWindow, self).__init__() """
self.settingsPath = path Окно настроек для редактирования параметров.
self.name = name
self.data = {} :param path: Путь к файлу настроек (JSON).
self.params = None :param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк).
"""
super().__init__()
self._settingsPath = path
self._name = name
self._data: dict[str, list[Any]] = {}
self._upd_func = upd_func
self._num_points: Optional[QLineEdit] = None
self._param_table: Optional[QTableWidget] = None
self.load_settings() self.load_settings()
self._init_ui() self._init_ui()
self.params.sigTreeStateChanged.connect(upd_func)
def load_settings(self) -> None: def load_settings(self) -> None:
self.data = read_json(self.settingsPath) """Загружает настройки из JSON-файла."""
data = read_json(self._settingsPath)
if isinstance(data, dict):
self._data = data
else:
self._data = {}
def write_settings(self) -> None: def write_settings(self) -> None:
self.getParams() """Записывает текущие настройки в JSON-файл."""
write_json(self.settingsPath, self.data) write_json(self._settingsPath, self._data)
def getParams(self) -> dict:
"""Возвращает текущий словарь параметров."""
return self._data
def _getTreeStructure(self) -> list:
params = []
for key, value in self.data.items():
params.append({'name': str(key), 'type': type(value).__name__, 'value': value})
params.append({'name': 'Save', 'type': 'action'})
return params
def _init_ui(self) -> None: def _init_ui(self) -> None:
temp = self._getTreeStructure() """Инициализирует UI: кнопки, поля ввода, таблицу."""
self.params = Parameter.create(name=self.name, type='group', children=temp) save_button = QPushButton("Save")
self.params.param('Save').sigActivated.connect(self.write_settings) restore_button = QPushButton("Restore")
ParamsTree = ParameterTree() self._num_points = QLineEdit()
ParamsTree.setParameters(self.params, showTop=True) self._num_points.setPlaceholderText("Enter the number of welding points")
layout = QtWidgets.QGridLayout() self._num_points.setValidator(QIntValidator())
layout.addWidget(ParamsTree, 0,0)
control_layout = QHBoxLayout()
control_layout.addWidget(save_button)
control_layout.addWidget(restore_button)
control_layout.addWidget(self._num_points)
save_button.pressed.connect(self._save)
restore_button.pressed.connect(self._restore)
self._num_points.editingFinished.connect(self._expand)
self._param_table = QTableWidget()
self._populate_table()
layout = QVBoxLayout()
header = QLabel(self._name)
layout.addWidget(header)
layout.addLayout(control_layout)
layout.addWidget(self._param_table)
self.setLayout(layout) self.setLayout(layout)
self.setStyleSheet(qts.white_style) self.setStyleSheet(qts.white_style)
# self.show()
def _populate_table(self) -> None:
"""Заполняет таблицу значениями из self._data."""
# Если нет данных для заполнения
if not self._data:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
# Предполагаем, что у всех ключей одинаковая длина списков параметров.
first_key = next(iter(self._data), None)
if first_key is None:
self._param_table.setRowCount(0)
self._param_table.setColumnCount(0)
return
column_count = len(self._data[first_key]) + 1
self._param_table.setRowCount(len(self._data))
self._param_table.setColumnCount(column_count)
for i, (key, items) in enumerate(self._data.items()):
self._param_table.setItem(i, 0, QTableWidgetItem(key))
for j, item in enumerate(items):
self._param_table.setItem(i, j+1, QTableWidgetItem(str(item)))
def _save(self) -> None:
"""Сохраняет текущие параметры из таблицы в self._data и вызывает _upd_func()."""
new_data = {}
row_count = self._param_table.rowCount()
col_count = self._param_table.columnCount()
for i in range(row_count):
key_item = self._param_table.item(i, 0)
if key_item is None:
continue
key = key_item.text()
# Если ключ пустой, пропускаем
if not key:
continue
row_data = []
for j in range(1, col_count):
cell_item = self._param_table.item(i, j)
if cell_item is None:
continue
param_str = cell_item.text()
# Если ключ не trace_storage_path, конвертируем в float
if key != "trace_storage_path":
try:
param = float(param_str)
except ValueError:
param = 0.0
else:
param = param_str
row_data.append(param)
new_data[key] = row_data
self._data = new_data
self.write_settings()
self._upd_func()
def _restore(self) -> None:
"""Перезагружает данные из файла и обновляет таблицу."""
self.load_settings()
self._populate_table()
def _expand(self) -> None:
"""Расширяет количество столбцов таблицы в зависимости от введённого значения."""
if not self._num_points:
return
num_points_text = self._num_points.text()
if not num_points_text.isdigit():
return
num_points = int(num_points_text)
if num_points < 0:
return
prev_columns = self._param_table.columnCount()
desired_columns = num_points + 1
if desired_columns <= prev_columns:
return
self._param_table.setColumnCount(desired_columns)
# Новые столбцы заполняем последним известным параметром для каждого ключа
for i, (key, items) in enumerate(self._data.items()):
# Если нет данных, пропускаем
if not items:
continue
last_value = str(items[-1])
for col in range(prev_columns, desired_columns):
self._param_table.setItem(i, col, QTableWidgetItem(last_value))
# Добавляем новый элемент также в self._data для консистентности
# После этого можно будет сохранить при нажатии Save
# Дополним также и в self._data
additional_count = desired_columns - prev_columns
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
def getParams(self) -> dict:
self.data = {}
for p in self.params:
if p.name() != 'Save':
self.data[p.name()] = p.value()
return self.data
if __name__ == '__main__': if __name__ == '__main__':
import pyqtgraph as pg
app = pg.mkQApp("Parameter Tree Example") app = pg.mkQApp("Parameter Tree Example")
window = settingsWindow('params\operator_params.json', 'operator') window = settingsWindow('params\operator_params.json', 'operator')
app.exec() app.exec()

View File

@ -1,26 +1,31 @@
import sys import sys
import pyqtgraph as pg
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from src.gui.mainGui import MainWindow from gui.mainGui import MainWindow
from src.controller.monitor import DirectoryMonitor from controller.monitor import DirectoryMonitor
from src.controller.mediator import Mediator from controller.mediator import Mediator
from src.controller.converter import DataConverter from controller.converter import DataConverter
from src.gui.plotter import PlotWidget from gui.plotter import PlotWidget
from src.controller.controller import Controller from controller.controller import Controller
from controller.passportFormer import PassportFormer
def main(): def main():
pg.setConfigOptions(useOpenGL=False, antialias=False)
app = QtWidgets.QApplication(sys.argv) app = QtWidgets.QApplication(sys.argv)
monitor = DirectoryMonitor() monitor = DirectoryMonitor()
data_converter = DataConverter() data_converter = DataConverter()
plot_widget_builder = PlotWidget() plot_widget_builder = PlotWidget()
controller = Controller() controller = Controller()
passport_former = PassportFormer()
window = MainWindow(controller) window = MainWindow(controller)
mediator = Mediator(monitor, data_converter, plot_widget_builder, controller) mediator = Mediator(monitor, data_converter, passport_former, plot_widget_builder, controller)
window.show() window.show()
controller.signal_widgets.connect(window.show_plot_tabs) controller.signal_widgets.connect(window.show_plot_tabs)
controller.signal_settings.connect(mediator.push_settings) controller.signal_settings.connect(mediator.push_settings)
controller.signal_monitor.connect(monitor.custom_dir_extract)
window.push_settings() window.push_settings()

Binary file not shown.

View File

@ -0,0 +1,25 @@
from typing import Optional
import os
import numpy as np
import pandas as pd
from roboter import Performance
class PerformanceProcessor:
def calc_performance(self, path:str, TWC_raw:pd.DataFrame):
factory = Performance()
comm_df, rob_df, TWC_df = factory.job(path, TWC_raw)
if __name__ == "__main__":
path = "D:\downloads\Test2\TeslaTIME29_71_KRCIO.dat"

203
src/performance/roboter.py Normal file
View File

@ -0,0 +1,203 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
import os
import numpy as np
import pandas as pd
class BasePerformanceFactory(ABC):
@abstractmethod
def factory_method(self):
...
def job(self):
...
def _get_file_data(self, path) -> pd.DataFrame:
head, file = self._dat_parser(path)
self.dat_name = file[:-4]
path_r64 = os.path.dirname(path) + '\\' + file
time_axis, dataframe = self._r64_parser(path_r64, head)
dataframe = pd.concat([dataframe, time_axis], axis=1)
return dataframe
def _dat_parser(self, path: str) -> list[dict, str]:
with open(path, 'r') as file:
head = {'channels': 0}
inside_channel = False
channels = 0
for line in file:
line = line.strip()
if line == '#BEGINCHANNELHEADER' or line == "#BEGINGLOBALHEADER":
inside_channel = True
elif line == '#ENDCHANNELHEADER' or line == "#ENDGLOBALHEADER":
inside_channel = False
pass
# Формирование словаря
elif inside_channel:
_, data = line.split(',')
match _:
case '102':
head['rob_id'] = data
case '200':
ch_name = data
if ch_name != 'Zeit': channels +=1
head[ch_name] = {}
case '202':
head[ch_name]['unit'] = data
case '211':
file = data
case '220':
head[ch_name]['len'] = int(data)
case '221':
head[ch_name]['num'] = int(data)
case '241':
head[ch_name]['multiplyer'] = float(data)
head['channels'] = int(channels)
return head, file
def _r64_parser(self, path: str, head: dict) -> Optional[list[pd.Series, pd.DataFrame]]:
ch = head['channels']
keys = list(head.keys())[-ch:]
len_timestamps = head['Zeit']['len']
t_step = head['Zeit']['multiplyer']
time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step))
time_axis.name = 'time'
dataframe = pd.DataFrame({})
with open(path, 'rb') as file:
data = file.read()
floats = np.frombuffer(data, dtype='<d') # Little-endian double
for key in keys:
step = head[key]['num']-1
result = pd.Series(np.array(floats[step::ch])* head[key]['multiplyer'])
result.name = key
dataframe = pd.concat([dataframe, result], axis=1)
return time_axis, dataframe
class BaseProduct(ABC):
def __init__(self):
self._signals = []
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
dataframe: pd.DataFrame,
signals: list[str]) -> Optional[dict[dict[pd.Series]]]:
intervals = {}
end_time = 0
for signal in signals:
start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe))
start_series = dataframe.loc[start_idx, "time"].reset_index(drop=True)
end_series = dataframe.loc[finish_idx, "time"].reset_index(drop=True)
end_series.fillna(end_time)
intervals[signal] = {"rise": start_series,
"fall": end_series}
return intervals
def _form_intervals(self,
start: pd.Series,
end: pd.Series) -> dict:
if len(start) != len(end):
for i in range(1, len(end)):
if end[i-1] > start[i]:
start = start.drop(i).reset_index(drop=True)
intervals = {'start':start.tolist, 'end':end.tolist}
return intervals
@abstractmethod
def operation(self):
...
class Performance(BasePerformanceFactory):
def robot_method(self) -> BaseProduct:
return RobotData()
def TWC_method(self) -> BaseProduct:
return TWC_Data()
def comm_method(self) -> BaseProduct:
return CommData()
def job(self, path:str, TWC_raw:pd.DataFrame) -> list[pd.DataFrame, list[pd.DataFrame], list[pd.DataFrame]]:
robot = self.robot_method()
TWC = self.TWC_method()
comm=self.comm_method()
dataframe = self._get_file_data(path)
rob_comm, rob_df = robot.operation(dataframe)
TWC_comm, TWC_df = TWC.operation(TWC_raw)
comm_df = comm.operation(rob_comm, TWC_comm)
return comm_df, rob_df, TWC_df
class RobotData(BaseProduct):
def __init__(self):
self._signals = [
"$OUT3012",
"$IN3003",
"$OUT3003",
"$OUT3244"
]
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]}
point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"])
return communication_sig, pd.DataFrame({'in_point':point_interval,
'in_move':movement_interval})
class TWC_Data(BaseProduct):
def __init__(self):
self._signals = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncoming"
]
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала
closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"])
return communication_sig, pd.DataFrame({'in_closing':closing_interval,
'in_squeeze':squeeze_interval,
'in_relief':relief_interval,
'in_oncoming':oncoming_interval})
class CommData(BaseProduct):
"""
Определяет промежуток, в который происходит взаимодействие между нодами.
Подразумевается следующая структура: node: tuple(list, list)
node[0] - время отправки пакетов. node[1] - время приема пакетов.
"""
def operation(self, node1: dict, node2: dict) -> pd.DataFrame:
n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received'])
n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received'])
return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)])

View File

@ -1,33 +1,42 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import Optional, Union from typing import Optional, Union, Any
from cachetools import LRUCache
import pandas as pd import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget from PyQt5.QtWidgets import QWidget, QTabWidget
from src.OptAlgorithm import OptAlgorithm from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm
import pandas as pd import pandas as pd
import pandas as pd
import numpy as np
class BaseMediator: class BaseMediator:
def __init__(self, def __init__(self,
monitor: BaseDirectoryMonitor, monitor: BaseDirectoryMonitor,
converter: BaseDataConverter, converter: BaseDataConverter,
passportFormer: BasePointPassportFormer,
plot: BasePlotWidget, plot: BasePlotWidget,
controller: BaseController): controller: BaseController):
self._monitor = monitor self._monitor = monitor
self._monitor.mediator = self self._monitor.mediator = self
self._converter = converter self._converter = converter
self._converter.mediator = self self._converter.mediator = self
self._passportFormer = passportFormer
self._passportFormer.mediator = self
self._plot = plot self._plot = plot
self._plot.mediator = self self._plot.mediator = self
self._controller = controller self._controller = controller
self._controller.mediator = self self._controller.mediator = self
def notify(self, def notify(self,
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BaseMainWindow], source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[QWidget], list[dict]]): data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
... ...
def push_settings (self, data: list[dict]): def push_settings (self, data: list[dict]):
... ...
@ -70,7 +79,7 @@ class BaseDirectoryMonitor:
self._files = files self._files = files
def start(self): def start(self):
self.update_timer.start(self._update_time) self.update_timer.start(int(self._update_time))
def stop(self): def stop(self):
self.update_timer.stop() self.update_timer.stop()
@ -103,26 +112,23 @@ class BasePlotWidget:
super().__init__() super().__init__()
self._mediator = mediator self._mediator = mediator
self._stages = [
"Closing",
"Squeeze",
"Welding",
"Relief"
]
self._stage_colors = { self._stage_colors = {
"Closing": [208, 28, 31, 100], "Closing": [220, 20, 60, 100], # Crimson
"Squeeze": [45, 51, 89, 150], "Squeeze": [30, 144, 255, 100], # Dodger Blue
"Welding": [247, 183, 24, 100], "Welding": [128, 128, 128, 100], # Gray
"Relief": [0, 134, 88, 100], "Relief": [34, 139, 34, 100], # Forest Green
"Oncoming": [222, 184, 135, 100] "Oncomming": [255, 165, 0, 100] # Orange
} }
self._plt_channels = { self._plt_channels = {
"Electrode Force, N & Welding Current, kA": { "Electrode Force, N & Welding Current, kA": {
"Settings": { "Settings": {
"zoom": False, "zoom": False,
"stages": True, "stages": True,
"performance": True "performance": True,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -145,26 +151,34 @@ class BasePlotWidget:
} }
] ]
}, },
"Electrode Force, N": { "Electrode Position, mm": {
"Settings": { "Settings": {
"zoom": True, "zoom": False,
"stages": False, "stages": True,
"performance": False "performance": False,
"ideals": True,
"mirror ME": True,
"workpiece": True,
"force compensation FE": True
}, },
"Real_signals": [ "Real_signals": [
{ {
"name": "Electrode Force, N ME", "name": "Rotor Position, mm ME",
"pen": 'r', "pen": {'color': 'r', 'width':2},
}, },
{ {
"name": "Electrode Force, N FE", "name": "Rotor Position, mm FE",
"pen": 'w', "pen": {'color': 'w', 'width':2},
} }
], ],
"Ideal_signals": [ "Ideal_signals": [
{ {
"name": "Force", "name": "Position ME",
"pen": {'color': 'r', 'width':3}, "pen": {'color': 'g', 'width':4},
},
{
"name": "Position FE",
"pen": {'color': 'b', 'width':4},
} }
] ]
}, },
@ -172,7 +186,11 @@ class BasePlotWidget:
"Settings": { "Settings": {
"zoom": False, "zoom": False,
"stages": True, "stages": True,
"performance": False "performance": False,
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -189,29 +207,26 @@ class BasePlotWidget:
"Ideal_signals": [ "Ideal_signals": [
{ {
"name": "Rotor Speed ME", "name": "Rotor Speed ME",
"pen": {'color': 'y', 'width':3}, "pen": {'color': 'g', 'width':3},
"zoom": False "zoom": False
}, },
{ {
"name": "Rotor Speed FE", "name": "Rotor Speed FE",
"pen": {'color': 'g', 'width':3}, "pen": {'color': 'b', 'width':3},
"zoom": False "zoom": False
} }
] ]
}, },
} }
def _initIdealBuilder(self, def set_style(self, object: Union[QTabWidget, QWidget]) -> None:
idealDataBuilder: Optional[BaseIdealDataBuilder] = None, object.setStyleSheet(
data: list[dict] = None): """QLabel {
self.opt = idealDataBuilder(data) color: #ffffff;
font-size: 26px;
self._stage_ideals = { font-weight: bold;
"Closing": self._opt.get_closingDF(), font-family: "Segoe UI", sans-serif;
"Squeeze": self._opt.get_compressionDF(), }""")
"Welding": self._opt.get_weldingDF(),
"Relief": self._opt.get_openingDF(),
"Oncoming": self._opt.get_oncomingDF()
}
@property @property
def mediator(self) -> BaseMediator: def mediator(self) -> BaseMediator:
return self._mediator return self._mediator
@ -231,8 +246,6 @@ class BasePlotWidget:
def build(self, data: list[pd.DataFrame]) -> list[QWidget]: def build(self, data: list[pd.DataFrame]) -> list[QWidget]:
... ...
def update_settings(self, data: list[dict]) -> None:
...
class BaseController(QObject): class BaseController(QObject):
@ -241,22 +254,26 @@ class BaseController(QObject):
def push_settings(self, settings: list[dict]) -> None: def push_settings(self, settings: list[dict]) -> None:
... ...
def open_custom_file (self, filepath: str) -> None:
...
# FIXME: WeldingDF показывает только 1 секунду
class BaseIdealDataBuilder(OptAlgorithm): class BaseIdealDataBuilder(OptAlgorithm):
def __init__(self, data: list[dict]):
operator_params, system_params = data def __init__(self, params: list[dict]):
operator_params, system_params = params
self.mul = system_params['time_capture'] self.mul = system_params['time_capture']
self.welding_time = operator_params['time_wielding'] self.welding_time = operator_params['time_wielding']
super().__init__(operator_params, system_params) super().__init__(operator_params, system_params)
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = [] data = []
for i in range (0, int(end_timestamp*self.mul)): for i in range (0, int(end_timestamp*self.mul)):
time = i/self.mul time = i/self.mul
X1, X2, V1, V2, F = func(time) X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Posicion FE":X1*1000,"Posicion ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data) return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame: def get_closingDF(self) -> pd.DataFrame:
@ -335,4 +352,158 @@ class BaseMainWindow(QWidget):
font-weight: bold; font-weight: bold;
font-family: "Segoe UI", sans-serif; font-family: "Segoe UI", sans-serif;
} }
""") """)
class BasePointPassportFormer:
def __init__(self,
mediator: Optional[BaseMediator] = None):
self._mediator = mediator
self._clear_stage = "Welding"
self._stages = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncomming"
]
self._tesla_stages = [
"Tesla squeeze",
"Tesla closing",
"Tesla welding",
"Tesla oncomming_relief"
]
self._params = []
self._ideal_data_cashe = LRUCache(maxsize=1000)
self._OptAlgorithm_operator_params = [
"dist_open_start_1",
"dist_open_start_2",
"dist_open_after_1",
"dist_open_after_2",
"dist_open_end_1",
"dist_open_end_2",
"dist_close_end_1",
"dist_close_end_2",
"time_command",
"time_robot_movement",
"object_thickness",
"force_target",
"force_capture",
"time_wielding"]
self._OptAlgorithm_system_params = [
"a_max_1",
"v_max_1",
"a_max_2",
"v_max_2",
"mass_1",
"mass_2",
"k_hardness_1",
"k_hardness_2",
"torque_max_1",
"torque_max_2",
"transmission_ratio_1",
"transmission_ratio_2",
"position_start_1",
"position_start_2",
"k_prop",
"time_capture"]
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
signal: str,
times:pd.Series,
dataframe: pd.DataFrame) -> tuple[list[float], list[float]]:
start_idx, finish_idx = self._find_indexes(signal, dataframe)
if len(start_idx) > 0 and len(finish_idx) > 0 and start_idx[0] > finish_idx[0]:
start_idx = np.insert(start_idx, 0, 0)
start_list = times.iloc[start_idx].tolist() if len(start_idx) > 0 else []
end_list = times.iloc[finish_idx].tolist() if len(finish_idx) > 0 else []
if len(start_list) - len(end_list) == 1:
end_list.append(float(times.iloc[-1]))
return start_list, end_list
def _filter_events(self,
times: pd.Series,
dataframe: pd.DataFrame) -> tuple[dict[str, list[list[float]]], int]:
events = {}
point_quantity = 0
if self._clear_stage in self._stages:
start_list, end_list = self._find_events(self._clear_stage, times, dataframe)
point_quantity = len(start_list)
if point_quantity == 0:
#TODO: добавить обработку исключения
return []
for stage in self._stages:
start_list, end_list = self._find_events(stage, times, dataframe)
temp = min([len(start_list), len(end_list)])
if temp < point_quantity:
print ("cant find enough", stage)
start_list += [0]*(point_quantity - temp)
end_list += [1]*(point_quantity - temp)
events[stage] = [start_list, end_list]
return events, point_quantity
def _build_ideal_data(self,
idealDataBuilder: Optional[BaseIdealDataBuilder] = None,
params: list[dict] = None) -> dict:
self.opt = idealDataBuilder(params)
stage_ideals = {
"Closing": self._opt.get_closingDF(),
"Squeeze": self._opt.get_compressionDF(),
"Welding": self._opt.get_weldingDF(),
"Relief": self._opt.get_openingDF(),
"Oncomming": self._opt.get_oncomingDF(),
"Ideal cycle": self._opt.get_cycle_time(),
"Ideal timings": self._opt.get_ideal_timings()
}
return stage_ideals
def form_passports(self) -> list[list[pd.DataFrame, dict, list]]:
...
def update_settings(self, params: list) -> None:
...
def _generate_cache_key(self,
params_list: list[dict[str, Any]]) -> tuple[tuple[tuple[str, Any], ...], tuple[tuple[str, Any], ...]]:
"""
Преобразует params_list в хешируемый ключ для кэша.
"""
operator_settings, system_settings = params_list
# Преобразуем словари в отсортированные кортежи пар ключ-значение
operator_tuple = frozenset((key, value)
for key, value in operator_settings.items()
if str(key) in self._OptAlgorithm_operator_params)
system_tuple = frozenset((key, value)
for key, value in system_settings.items()
if str(key) in self._OptAlgorithm_system_params)
return (operator_tuple, system_tuple)
@property
def opt(self) -> BaseIdealDataBuilder:
return self._opt
@opt.setter
def opt(self, opt: BaseIdealDataBuilder):
self._opt = opt
@property
def mediator(self) -> BaseMediator:
return self._mediator
@mediator.setter
def mediator(self, mediator: BaseMediator) -> None:
self._mediator = mediator

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff