Compare commits

...

16 Commits

Author SHA1 Message Date
Andrew
9faa35f514 dev: добавлены обработчики ошибок при чтении файлов + добавлено автоматическое закрытие окон 2024-12-23 14:18:54 +03:00
ccfb6b9fcb fix: Исправлено смыкание на идеальной траектории 2024-12-23 16:35:39 +03:00
Andrew
625f3d7e01 fix: исправлено взаимодействие модулей 2024-12-19 14:25:11 +03:00
Andrew
d99fafd2ae dev: добавлено построение идеального графика 2024-12-19 13:23:27 +03:00
Andrew
bf155af58a fix: убрал отображение КДИП и идеальной производительности 2024-12-19 11:24:33 +03:00
Andrew
3c18e22d09 dev: добавлено меню выбора режима работы + реализован режим работы raport 2024-12-18 19:43:46 +03:00
Andrew
4c79aba8a4 style: изменены параметры запуска 2024-12-18 15:08:23 +03:00
Andrew
3c7040692e dev: изменил создание виджета с графиками 2024-12-18 14:54:40 +03:00
Andrew
c3d976f6a6 dev: добавил возможность рисовать поле допусков для силы 2024-12-18 13:52:05 +03:00
Andrew
7c0db070c9 style: увеличена толщина границ ROI для удобства взаимодействия 2024-12-18 13:03:30 +03:00
Andrew
b73eba7f7e chore: добавил описание функций в новый класс 2024-12-18 12:58:37 +03:00
Andrew
27d1cd63de dev: изменил отображение настроек, цветов, теперь их можно редактировать 2024-12-18 12:44:43 +03:00
Andrew
37b9c30f12 chore: удалены лишние импорты 2024-12-18 10:16:50 +03:00
Andrew
2e66a77ced fix: теперь график-навигатор имеет ограничения по перемещению ROI 2024-12-18 10:13:16 +03:00
Andrew
eaec12d3ff dev: добавлено окно, отображающее параметры каждого элемента графика 2024-12-17 20:40:04 +03:00
Andrew
365118bc22 chore: часть функций plotter перенесена в base 2024-12-17 16:14:07 +03:00
34 changed files with 876 additions and 419 deletions

3
.gitignore vendored
View File

@ -6,4 +6,5 @@
/venv
*.txt
*.svg
/test.py
/test.py
/output

View File

@ -81,19 +81,19 @@
],
"distance_l_2": [
0.0275,
0.03,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033,
0.033
0.0255,
0.0242,
0.0245,
0.0228,
0.0236,
0.0229,
0.0248,
0.024,
0.0235,
0.025,
0.0276,
0.0234,
0.0215
],
"distance_h_end1": [
0.003,

Binary file not shown.

View File

@ -3,75 +3,76 @@ from numpy import sqrt, arcsin, arccos, cos, sin
from OptAlgorithm.AutoConfigClass import AutoConfigClass
from OptAlgorithm.ConstantCalculator import ConstantCalculator
class OptTimeCalculator(AutoConfigClass):
params_list = []
def __init__(self, operator_config : dict, system_config : dict):
def __init__(self, operator_config: dict, system_config: dict):
cCalculator = ConstantCalculator(operator_config, system_config)
super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc())
self.allTimes = {}
def tGrowNominal(self, F : float) -> float:
return arcsin(F/(self.Ftogrow)) * sqrt(self.mass_1/self.k_hardness_1)
self.check_eps = 1e-7
def tGrowNominal(self, F: float) -> float:
return arcsin(F / (self.Ftogrow)) * sqrt(self.mass_1 / self.k_hardness_1)
def Tclose(self, h1: float, h2: float) -> None:
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
t1 = v0 / self.a_max_1
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0)
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 / 2)) / v0)
T1 = t1 + t2t
t21 = sqrt(h2/self.a_max_2)
t21 = min(self.v_max_2/self.a_max_2, t21)
t21 = sqrt(h2 / (self.a_max_2))
t21 = min(self.v_max_2 / self.a_max_2, t21)
t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21
Tclose = max(T1, T2)
tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1)
tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2)
self.allTimes["tclose_1_acc"] = tclose_1_acc
self.allTimes["tclose_1_speed"] = tclose_1_speed
self.allTimes["tclose_2_acc"] = tclose_2_acc
self.allTimes["tclose_2_speed"] = tclose_2_speed
self.allTimes["tclose"] = Tclose
def Topen(self, s1 : float, s2 : float, l1 : float, l2 : float, Fs1 : float, Fs2 : float = 0) -> None:
t11 = sqrt((l1 + Fs1)/self.a_max_1)
t11 = min(self.v_max_1/self.a_max_1, t11)
t12 = max(0, ((l1+Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
def Topen(self, s1: float, s2: float, l1: float, l2: float, Fs1: float, Fs2: float = 0) -> None:
t11 = sqrt((l1 + Fs1) / self.a_max_1)
t11 = min(self.v_max_1 / self.a_max_1, t11)
t12 = max(0, ((l1 + Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
T1 = t12 + 2 * t11
offset = self.calcSecondOpenOffset(t11, t12, Fs1)
t21 = sqrt(l2/self.a_max_2)
t21 = min(self.v_max_2/self.a_max_2, t21)
t21 = sqrt(l2 / self.a_max_2)
t21 = min(self.v_max_2 / self.a_max_2, t21)
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 + offset
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1)
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1 + Fs1)
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_2_offset"] = offset
self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_2_offset"] = offset
self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_1_speed"] = topen_1_speed
self.allTimes["topen_2_acc"] = topen_2_acc
self.allTimes["topen_2_speed"] = topen_2_speed
if s1 >= l1:
raise Exception("""S1 >= L1 - недопустимый сценарий,
проверьте distance_s_1, distance_h_end1""")
if s2 >= l2:
raise Exception("""S2 >= L2 - недопустимый сценарий,
проверьте distance_s_2, distance_h_end2""")
s1 += Fs1
topen_1_mark = sqrt(2 * s1 / self.a_max_1)
if topen_1_mark > topen_1_acc:
@ -79,167 +80,173 @@ class OptTimeCalculator(AutoConfigClass):
v1 = topen_1_acc * self.a_max_1
if s1 > topen_1_speed * v1:
s1 -= topen_1_speed * v1
topen_1_mark = 2*topen_1_acc + topen_1_speed - sqrt(topen_1_acc**2 - 2*s1 / self.a_max_1)
topen_1_mark = 2 * topen_1_acc + topen_1_speed - sqrt(topen_1_acc ** 2 - 2 * s1 / self.a_max_1)
else:
topen_1_mark = topen_1_acc + s1 / v1
topen_2_mark = sqrt(2 * s2 / self.a_max_2)
if topen_2_mark > topen_2_acc:
s2 -= topen_2_acc ** 2 * self.a_max_2 / 2
v2 = topen_2_acc * self.a_max_2
if s2 > topen_2_speed * v2:
s2 -= topen_2_speed * v2
topen_2_mark = 2*topen_2_acc + topen_2_speed - sqrt(topen_2_acc**2 - 2*s2 / self.a_max_2)
topen_2_mark = 2 * topen_2_acc + topen_2_speed - sqrt(topen_2_acc ** 2 - 2 * s2 / self.a_max_2)
else:
topen_2_mark = topen_2_acc + s2 / v2
self.allTimes["topen_1_mark"] = topen_1_mark
self.allTimes["topen_2_mark"] = topen_2_mark
def Tgrow(self) -> None:
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0)
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed)
def Tgrow(self) -> None:
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1 / (self.mass_1)) * self.Ftogrow)
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0 * vF0)
tspeed = sqrt(self.mass_1 / self.k_hardness_1) * (
arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1 / self.freq * vF0 * sin(
self.freq * tspeed)
eps = 1e1
if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps:
if self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 < -eps:
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
, проверьте параметры k_hardness_1, mass_1, k_prop""")
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps)
Fmeet = 1 / self.freq * sqrt(self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 + eps)
Fstart_prop = self.Fstart_prop
if Fmeet > Fstart_prop:
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
, проверьте параметры v_max_1, k_prop""")
tmeet = (Fmeet - Fspeed)/vFmax
tmeet = (Fmeet - Fspeed) / vFmax
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2)
vp = 1 / sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow ** 2 - self.Fstart_prop ** 2)
ap = Fstart_prop / self.mass_1
tprop = 2*vp / ap
tprop = 2 * vp / ap
self.allTimes["tspeed"] = tspeed
self.allTimes["tmeet"] = tmeet
self.allTimes["tend"] = tend
self.allTimes["tprop"] = tprop
self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop
def T(self, h1 : float, h2 : float, s1 : float, s2 : float, l1 : float, l2 : float) -> dict:
def T(self, h1: float, h2: float, s1: float, s2: float, l1: float, l2: float) -> dict:
self.Tclose(h1, h2)
self.Tgrow()
self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0)
return self.allTimes
def Tmovement(self, closeAlgo, tmark) -> None:
def Tmovement(self, closeAlgo, tmark) -> tuple[list, list]:
contact = [self.contact_distance_1, self.contact_distance_2]
v0s = []
pos0s = []
for i in range(1,3):
for i in range(1, 3):
if tmark < 0:
raise Exception("""Отрицательное время этапа раскрытия,
проверьте distance_s_{1,2}, time_command""")
v0 = closeAlgo("V"+str(i), "Open", tmark)
v0 = closeAlgo("V" + str(i), "Open", tmark)
v0s.append(v0)
x0 = closeAlgo("X"+str(i), "Open", tmark)
x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)]
x0 = closeAlgo("X" + str(i), "Open", tmark)
x1 = contact[i - 1] - self.__dict__["distance_h_end" + str(i)]
x = x1 - x0
pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
pos0s.append(closeAlgo("X" + str(i), "Open", tmark))
Tfull = self.time_robot_movement
L = self.__dict__["distance_l_"+str(i)]
maxL = contact[i-1] - L - x0
L = self.__dict__["distance_l_" + str(i)]
maxL = contact[i - 1] - L - x0
self.Tmovementi(i, x, Tfull, v0, maxL)
return pos0s, v0s
def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None:
v0 = abs(v0)
vmax = self.__dict__["v_max_"+str(i)]
a = self.__dict__["a_max_"+str(i)]
t3 = (Tfull + v0 / a) / 2
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
vmax = self.__dict__["v_max_" + str(i)]
a = self.__dict__["a_max_" + str(i)]
t3 = (Tfull + v0 / a) / 2
sqrtval = a ** 2 * (
a ** 2 * (Tfull + 2 * t3) ** 2 - 8 * a * Sfull + 2 * a * v0 * (Tfull + 2 * t3) - 3 * v0 ** 2)
if sqrtval < 0:
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
t1 = min(t1max, (vmax- abs(v0))/a)
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
t31 = v0/a + t1
t5max = (Tfull - v0/a)/2 - t1
t1max = ((Tfull + 2 * t3) + v0 / a) / (2) - sqrt(sqrtval) * sqrt(2) / (4 * a ** 2)
t1 = min(t1max, (vmax - abs(v0)) / a)
t1 = max(0, min(t1, -v0 / a + sqrt(v0 ** 2 / (a ** 2) + (abs(maxL) - v0 * v0 / a) / a)))
t31 = v0 / a + t1
t5max = (Tfull - v0 / a) / 2 - t1
v1 = v0 + a * t1
S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2
S1 = v0 * t1 + a * t1 * t1 / 2 + v1 * t31 - a * t31 * t31 / 2
S2max = Sfull + S1
t5 = min(t5max, (vmax)/a, sqrt(S2max / a))
t3 = abs(v0)/a + t1 + t5
t5 = min(t5max, (vmax) / a, sqrt(S2max / a))
t3 = abs(v0) / a + t1 + t5
t32 = t5
v1 = abs(v0+t1*a)
v3 = abs(v0 + t1*a - t3*a)
v1 = abs(v0 + t1 * a)
v3 = abs(v0 + t1 * a - t3 * a)
timeleft = Tfull - t1 - t5 - t3
sq = -v0*t1 - a*t1**2/2 - v1 * t3 + a*t3**2/2 + v3*t5 - a*t5**2/2
sq = -v0 * t1 - a * t1 ** 2 / 2 - v1 * t3 + a * t3 ** 2 / 2 + v3 * t5 - a * t5 ** 2 / 2
Sleft = Sfull - sq
t2max = (timeleft - Sleft/v3) / (1 + v1/v3)
Smovement = -v0 * t1 - a/2 * t1**2 - v1 * t31 + a/2*t31**2
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1))
t4 = max(0, Sleft/v3 + v1/v3 * t2)
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
self.allTimes["tmovement_"+str(i)+"_acc"] = t1
self.allTimes["tmovement_"+str(i)+"_speed"] = t2
self.allTimes["tmovement_"+str(i)+"_slow"] = t31
self.allTimes["tmovement_"+str(i)+"_stay"] = tstay
self.allTimes["tmovement_"+str(i)] = t1 + t2 + t31 + tstay
self.allTimes["tpreclose_"+str(i)+"_slow"] = t32
self.allTimes["tpreclose_"+str(i)+"_speed"] = t4
self.allTimes["tpreclose_"+str(i)+"_acc"] = t5
self.allTimes["tpreclose_"+str(i)] = t32 + t4 + t5
t2max = (timeleft - Sleft / v3) / (1 + v1 / v3)
Smovement = -v0 * t1 - a / 2 * t1 ** 2 - v1 * t31 + a / 2 * t31 ** 2
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement)) / v1))
t4 = max(0, Sleft / v3 + v1 / v3 * t2)
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
self.allTimes["tmovement_" + str(i) + "_acc"] = t1
self.allTimes["tmovement_" + str(i) + "_speed"] = t2
self.allTimes["tmovement_" + str(i) + "_slow"] = t31
self.allTimes["tmovement_" + str(i) + "_stay"] = tstay
self.allTimes["tmovement_" + str(i)] = t1 + t2 + t31 + tstay
self.allTimes["tpreclose_" + str(i) + "_slow"] = t32
self.allTimes["tpreclose_" + str(i) + "_speed"] = t4
self.allTimes["tpreclose_" + str(i) + "_acc"] = t5
self.allTimes["tpreclose_" + str(i)] = t32 + t4 + t5
T = Tfull
self.allTimes["tmovement"] = T
def calcFirstClose(self, T : float, s : float) -> tuple[float, float]:
def calcFirstClose(self, T: float, s: float) -> tuple[float, float]:
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1))
t1 = min(t1, v0 / self.a_max_1)
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
t1 = T - sqrt(max(0, T ** 2 - 2 * s / self.a_max_1))
if t1 > v0/ self.a_max_1 + self.check_eps:
raise Exception("""Мы вышли за границы разгона - смыкание FE, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2
def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2
t1 = min(t1, self.v_max_1 / self.a_max_1)
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
def calcFirstOpen(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_1)) / 2
if t1 > self.v_max_1 / self.a_max_1 + self.check_eps:
raise Exception("""Мы вышли за границы разгона - раскрытие FE, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2
def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2)
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
def calcSecondOpen(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
raise Exception("""Мы вышли за границы разгона - раскрытие ME, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2
def calcSecondClose(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2)
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
def calcSecondClose(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
raise Exception("""Мы вышли за границы разгона - смыкание ME, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2
def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float:
def calcSecondOpenOffset(self, t1: float, t2: float, sq: float) -> float:
s = sq * 1
offset = sqrt(2 * s / self.a_max_1)
if offset > t1:
s -= t1 ** 2 * self.a_max_1 / 2
v1 = t1 * self.a_max_1
if s > t2 * v1:
s -= t2 * v1
offset = 2*t1 + t2 - sqrt(t1**2 - 2*s / self.a_max_1)
offset = 2 * t1 + t2 - sqrt(t1 ** 2 - 2 * s / self.a_max_1)
else:
offset = t1 + s / v1
return offset

View File

@ -8,13 +8,21 @@ class Controller(BaseController):
signal_widgets = pyqtSignal(list)
signal_settings = pyqtSignal(list)
signal_monitor = pyqtSignal(str)
signal_raport_mode = pyqtSignal(str)
signal_seeking_mode = pyqtSignal()
def send_widgets(self, widgets: list[QWidget]) -> None:
self.signal_widgets.emit(widgets)
def push_settings(self, settings: list[dict]) -> None:
def update_settings(self, settings: list[dict]) -> None:
self.signal_settings.emit(settings)
def open_custom_file (self, filepath: str) -> None:
self.signal_monitor.emit(filepath)
def raport_mode(self, filepath: str) -> None:
self.signal_raport_mode.emit(filepath)
def seeking_mode(self) -> None:
self.signal_seeking_mode.emit()

View File

@ -1,6 +1,9 @@
import pandas as pd
import traceback
import sys
from loguru import logger
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
# FIXME: костыль для выключения предупреждения "replace deprecated"
pd.set_option('future.no_silent_downcasting', True)
from utils.base.base import BaseDataConverter
@ -10,11 +13,23 @@ class DataConverter(BaseDataConverter):
@staticmethod
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
return dataframe
try:
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
return dataframe
except:
return None
def convert_data(self, files: list[str]) -> None:
dataframes = [pd.read_csv(file) for file in files]
converted_dataframes = list(map(self._replace_bool, dataframes))
self._mediator.notify(self, converted_dataframes)
try:
dataframes = [pd.read_csv(file) if file != '' else None for file in files]
converted_dataframes = list(map(self._replace_bool, dataframes))
except:
# Get the traceback object
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
converted_dataframes = [None]
finally:
self._mediator.notify(self, converted_dataframes)

View File

@ -3,7 +3,9 @@ import pandas as pd
from typing import Union
from PyQt5.QtWidgets import QWidget
from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer
from utils.base.base import (BaseMediator, BaseDirectoryMonitor,
BaseDataConverter, BasePlotWidget,
BasePointPassportFormer)
class Mediator(BaseMediator):
@ -24,9 +26,9 @@ class Mediator(BaseMediator):
if issubclass(source.__class__, BasePlotWidget):
self._controller.send_widgets(data)
def push_settings(self, settings: list[dict]):
def update_settings(self, settings: list[dict]):
self._monitor.update_settings(settings)
self._passportFormer.update_settings(settings)
self._monitor.force_all_dir()
self._monitor.update_plots()

View File

@ -1,6 +1,4 @@
from time import sleep
import os
from loguru import logger
from utils.base.base import BaseDirectoryMonitor
@ -9,47 +7,62 @@ from utils.base.base import BaseDirectoryMonitor
class DirectoryMonitor(BaseDirectoryMonitor):
def _init_state(self):
files = os.listdir(self._directory_path)
self._files = files
self._files = [
os.path.join(self._directory_path, file)
for file in os.listdir(self._directory_path)
if file.lower().endswith('.csv')
]
self.update_timer.timeout.connect(self._monitor)
logger.info("Monitor initiated!")
def _monitor(self):
files = os.listdir(self._directory_path)
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
filter(lambda x: x not in self._files, files))))
current_files = [
os.path.join(self._directory_path, file)
for file in os.listdir(self._directory_path)
if file.lower().endswith('.csv')
]
new_files = sorted(list(filter(lambda x: x not in self._files, current_files)))
if new_files:
logger.info(f"New files detected: {new_files}")
self._mediator.notify(self, new_files)
self._files = files
if not files:
self._files = current_files
if not current_files:
self._files = []
def update_settings(self, data: list[dict]) -> None:
if self.isActive:
self.stop()
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
if not os.path.exists(self._directory_path):
logger.error(f"Путь {self._directory_path} не существует.")
raise FileNotFoundError(f"Путь {self._directory_path} не существует.")
else:
self._init_state()
self.start()
else:
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
def update_plots(self):
if self._files is not None:
self._mediator.notify(self, self._files)
def custom_csv_extract_only(self, path: str):
self.stop()
operator_params, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
self._files.append(path)
if path is not None:
self._mediator.notify(self, [path])
else:
self._mediator.notify(self, [None])
def start_seeking(self) -> None:
self._init_state()
self.start()
def force_all_dir(self):
files = os.listdir(self._directory_path)
self._files = files
all_files = []
for x in self._files:
path = os.path.join(self._directory_path, x)
all_files.append(path)
if all_files:
logger.info(f"Plotting all files: {all_files}")
self._mediator.notify(self, all_files)
else:
logger.info(f"Failed pushing {str(all_files)}")
def custom_dir_extract(self, dict_path:str ):
files = os.listdir(dict_path)
if files is not None:
all_files = [os.path.join(dict_path, file) for file in files]
self._mediator.notify(self, all_files)

View File

@ -1,6 +1,6 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
class idealDataBuilder(BaseIdealDataBuilder):
@ -18,14 +18,15 @@ class idealDataBuilder(BaseIdealDataBuilder):
def get_weldingDF(self) -> pd.DataFrame:
data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]:
data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
return ideal_timings
class PassportFormer(BasePointPassportFormer):
@ -36,49 +37,50 @@ class PassportFormer(BasePointPassportFormer):
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
system_settings = {key: value[0] for key, value in self._params[1].items()}
if dataframe is not None:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
else:
events = None
point_quantity = 1
points_pocket = []
system_settings = {key: value[0] for key, value in self._params[1].items()}
tesla_time = sum(self._params[0].get("Tesla summary time", []))
useful_data = {"tesla_time": tesla_time,
"range_ME": system_settings["Range ME, mm"],
"k_hardness": system_settings["k_hardness_1"]
}
points_pocket = []
time_is_valid = not dataframe["time"].isna().all()
if time_is_valid:
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
for i in range(point_quantity):
operator_settings = {
key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
}
params_list = [operator_settings, system_settings]
cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
for i in range(point_quantity):
operator_settings = {
key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
}
params_list = [operator_settings, system_settings]
cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
if events is not None:
idx = i+1 if idx_shift else i
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
useful_p_data = {"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
else:
point_timeframe, point_events = None, None
useful_p_data = {"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
return dataframe, points_pocket, useful_data
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
return dataframe, points_pocket, useful_data
def update_settings(self, params: list[dict, dict]):
self._params = params

Binary file not shown.

View File

@ -1,47 +1,148 @@
from datetime import datetime as dt
from typing import Optional
from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QPixmap
from utils.base.base import BaseMainWindow, BaseController
from gui.settings_window import settingsWindow
from gui.settings_window import SystemSettings, OperatorSettings
from gui.reportGui import ReportSettings
class MainWindow(BaseMainWindow):
def __init__(self,
controller: Optional[BaseController] = None):
controller: Optional[BaseController] = None) -> None:
super().__init__()
self._controller = controller
self.initUI()
self.set_style(self)
self.settings_button.clicked.connect(self._show_settings)
self.select_dir_button.clicked.connect(self._select_dir)
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
self._init_startUI()
def initUI(self) -> None:
def _init_startUI(self) -> None:
self.operSettings = OperatorSettings("params/operator_params.json", 'Operator', self._transfer_settings)
self.sysSettings = SystemSettings("params/system_params.json", 'System', self._transfer_settings)
self.repSettings = ReportSettings()
self.tabWidget = QtWidgets.QTabWidget()
self._clear()
seeking_mode_btn = QtWidgets.QPushButton("Real time folder scanning")
seeking_mode_btn.setFixedWidth(300)
seeking_mode_btn.clicked.connect(self._init_seekingUI)
raport_mode_btn = QtWidgets.QPushButton("Raport editor")
raport_mode_btn.setFixedWidth(300)
raport_mode_btn.clicked.connect(self._init_raportUI)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(seeking_mode_btn)
button_layout.addWidget(raport_mode_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
mainLayout = self.layout()
label = QtWidgets.QLabel("Select work mode")
label.setStyleSheet(
"""QLabel{
color: #ffffff;
font-size: 40px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}"""
)
mainLayout.addWidget(label, alignment=Qt.AlignCenter)
mainLayout.addWidget(button_widget)
def _clear(self) -> None:
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else: self.setLayout(QtWidgets.QVBoxLayout())
def _init_seekingUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.tabWidget)
self.settings_button = QtWidgets.QPushButton("Show settings")
self.settings_button.setFixedWidth(160)
self.select_dir_button = QtWidgets.QPushButton("Open directory")
self.select_dir_button.setFixedWidth(175)
sys_settings_btn = QtWidgets.QPushButton("System settings")
sys_settings_btn.setFixedWidth(200)
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
oper_settings_btn.setFixedWidth(200)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(self.settings_button)
button_layout.addWidget(self.select_dir_button)
button_layout.addWidget(sys_settings_btn)
button_layout.addWidget(oper_settings_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("online mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.seeking_mode()
# TODO:push seeking to mediator
def _init_raportUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab)
sys_settings_btn = QtWidgets.QPushButton("System settings")
sys_settings_btn.setFixedWidth(185)
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
oper_settings_btn.setFixedWidth(185)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
view_settings_btn = QtWidgets.QPushButton("Customize view")
view_settings_btn.setFixedWidth(185)
view_settings_btn.clicked.connect(self._customization_window)
save_screen_btn = QtWidgets.QPushButton("Save state")
save_screen_btn.setFixedWidth(185)
save_screen_btn.clicked.connect(self._save_plots)
open_file_btn = QtWidgets.QPushButton("Open file")
open_file_btn.setFixedWidth(185)
open_file_btn.clicked.connect(self._open_file)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(sys_settings_btn)
button_layout.addWidget(oper_settings_btn)
button_layout.addWidget(view_settings_btn)
button_layout.addWidget(save_screen_btn)
button_layout.addWidget(open_file_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("raport mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.raport_mode(None)
#self._controller.raport_mode(path)
# TODO: push only one dir to monitor
layout.addLayout(button_layout)
self.setLayout(layout)
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
for plot_widget in plot_widgets:
widget, reg_items, curve_items = plot_widget
tab = QtWidgets.QWidget()
tab.setProperty("reg_items", reg_items)
tab.setProperty("curve_items", curve_items)
grid = QtWidgets.QGridLayout()
grid.addWidget(plot_widget)
grid.addWidget(widget)
tab.setLayout(grid)
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
self.tabWidget.setCurrentWidget(tab)
@ -51,31 +152,61 @@ class MainWindow(BaseMainWindow):
for i in range(0, tab_count-2):
self._close_tab(i)
def keyPressEvent(self, a0):
def keyPressEvent(self, a0) -> None:
if a0.key() == Qt.Key_F5:
pass
tab_count = self.tabWidget.count()
for i in range(0, tab_count):
self._close_tab(i)
def closeEvent(self, a0):
self.operSettings.close()
self.sysSettings.close()
self.repSettings.close()
super().closeEvent(a0)
def _show_settings(self):
def _show_settings(self) -> None:
self.operSettings.show()
self.sysSettings.show()
def push_settings(self) -> None:
self._updater_trigger()
def _updater_trigger(self) -> None:
def _transfer_settings(self) -> None:
self.tabWidget.clear()
operator_params = self.operSettings.getParams()
system_params = self.sysSettings.getParams()
self._controller.push_settings([operator_params, system_params])
self._controller.update_settings([operator_params, system_params])
def _close_tab(self, index:int) -> None:
self.tabWidget.removeTab(index)
def _open_file(self) -> None:
path = self._select_csv()
self._controller.raport_mode(path)
def _select_dir(self):
folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "")
if folder_path:
self._controller.open_custom_file(folder_path)
def _select_csv(self) -> Optional[str]:
CSV_path, _ = QtWidgets.QFileDialog.getOpenFileName(self,"Select csv file", "", "CSV Files (*.csv)")
if CSV_path:
print(CSV_path)
return CSV_path
return None
def _customization_window(self) -> None:
tab = self.tabWidget.currentWidget()
reg_items = tab.property("reg_items")
curve_items = tab.property("curve_items")
self.repSettings.build(reg_items, curve_items)
def _save_plots(self) -> None:
filepath, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save file", "", "Image Files (*.png *.jpeg)")
tab = self.tabWidget.currentWidget()
pixmap = QPixmap(tab.size())
tab.render(pixmap)
pixmap.save(filepath)

View File

@ -1,9 +1,7 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
import copy
import pyqtgraph as pg
import pandas as pd
from typing import Optional, Any
from utils.base.base import BasePlotWidget
@ -14,56 +12,6 @@ class ProcessStage():
finish_index:int
class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self,
signal: dict[str, Any],
@ -97,20 +45,20 @@ class PlotWidget(BasePlotWidget):
return None
@staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title)
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
plot_item = pg.PlotItem(title=title)
# Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True)
plot_widget.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
plot_item.setDownsampling(auto=True, mode='peak')
plot_item.showGrid(x=True, y=True)
plot_item.setClipToView(True)
legend = plot_item.addLegend(offset=(70, 20))
return plot_item, legend
def _add_stage_regions(self,
plot_widget: pg.PlotWidget,
plot_item: pg.PlotItem,
point_events: dict[str, list[float]],
dataframe_headers: list[str],
reg_items: dict,
transparency: int = 75) -> None:
"""
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
@ -122,12 +70,15 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None:
region.setZValue(-20)
plot_widget.addItem(region)
plot_item.addItem(region)
reg_items["real"].setdefault(stage, [])
reg_items["real"][stage].append(region)
def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget,
plot_item: pg.PlotItem,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
reg_items: dict,
transparency: int = 125) -> None:
"""
Добавляет регионы для идеальных этапов.
@ -140,13 +91,16 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency)
if region:
region.setZValue(-10)
plot_widget.addItem(region)
plot_item.addItem(region)
reg_items["ideal"].setdefault(stage, [])
reg_items["ideal"][stage].append(region)
def _add_ideal_signals(self,
plot_widget: pg.PlotWidget,
plot_item: pg.PlotItem,
ideal_data: dict[str, Any],
point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None:
ideal_signals: list[dict[str, Any]],
curve_items: dict) -> None:
"""
Добавляет идеальные сигналы для каждого этапа.
"""
@ -159,23 +113,29 @@ class PlotWidget(BasePlotWidget):
point_events[stage][1]
)
if curve:
curve.setZValue(10)
plot_widget.addItem(curve)
curve.setZValue(50)
plot_item.addItem(curve)
curve_items["ideal"].setdefault(signal["name"], {})
curve_items["ideal"][signal["name"]].setdefault(stage, [])
curve_items["ideal"][signal["name"]][stage].append(curve)
def _add_real_signals(self,
plot_widget: pg.PlotWidget,
plot_item: pg.PlotItem,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None:
legend: pg.LegendItem,
curve_items: dict) -> None:
"""
Добавляет реальные сигналы из dataframe на виджет.
"""
dataframe_headers = dataframe.columns.tolist()
for signal in real_signals:
if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
plot.setZValue(0)
legend.addItem(plot, signal["name"])
curve_items["real"].setdefault(signal["name"], {})
curve_items["real"][signal["name"]] = plot
def _add_performance_label(self,
layout: QVBoxLayout,
@ -190,52 +150,35 @@ class PlotWidget(BasePlotWidget):
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
f"Сокращение длительности: фактическое = {tesla_TWC} %"
)
#f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
self.set_style(performance_label)
layout.addWidget(performance_label)
performance_label.update()
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _build_widget(self, data: list[Any]) -> QWidget:
"""
Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
"""
widget = QWidget()
layout = QVBoxLayout(widget)
result_widget = QWidget()
result_layout = QVBoxLayout(result_widget)
plot_layout = pg.GraphicsLayoutWidget()
reg_items = {"real":{}, "ideal":{}}
curve_items = {"real":{}, "ideal":{}}
dataframe, points_pocket, useful_data = data
tesla_time = useful_data["tesla_time"]
range_ME = useful_data["range_ME"]
k_hardness = useful_data["k_hardness"]
dataframe_headers = dataframe.columns.tolist()
dat_is_none = dataframe is None
if not dat_is_none: dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel)
plot_item, legend = self._init_plot_item(title=channel)
settings = description["Settings"]
TWC_time = 0.0
@ -243,17 +186,31 @@ class PlotWidget(BasePlotWidget):
worst_perf = 2
# TODO: рассчитать корректный параметр range
if settings["mirror ME"]:
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
if settings["mirror ME"] and not dat_is_none:
dataframe = self._mirror_shift_data(
"ME",
description["Real_signals"],
dataframe,
range_ME
)
# Итерация по точкам
for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events]
# point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data]
point_timeframe, ideal_dat, point_events, useful_p_data = point_data
ideal_data = copy.deepcopy(ideal_dat)
if dat_is_none:
worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]]
point_events = {}
keys = list(ideal_data.keys())
shift = 0
for i, time in enumerate(ideal_data["Ideal timings"]):
point_events[keys[i]] = [shift, time+shift]
shift += time
# TODO: проверить корректность расчетов
if settings["force compensation FE"]:
if settings["force compensation FE"] and not dat_is_none:
force = useful_p_data["force"]
F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
@ -266,10 +223,9 @@ class PlotWidget(BasePlotWidget):
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
# Добавляем реальные стадии
if settings["stages"]:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
if settings["stages"] and not dat_is_none:
self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75)
# TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
if settings["workpiece"]:
x1 = point_timeframe[0]
dx = point_timeframe[1] - x1
@ -280,15 +236,30 @@ class PlotWidget(BasePlotWidget):
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush('grey'))
rect_item.setPen(pg.mkPen('black', width=3))
plot_widget.addItem(rect_item)
plot_item.addItem(rect_item)
if settings["force accuracy"]and not dat_is_none:
modifier = 0.05
x1 = point_events["Welding"][0]
dx = point_events["Welding"][1] - x1
force = useful_p_data["force"]
y1 = force*(1-modifier)
dy = force*(2*modifier)
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
rect_item.setPen(pg.mkPen('black', width=0))
plot_item.addItem(rect_item)
# Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]:
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100)
self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items)
# Подсчёт производительности
if settings["performance"]:
if settings["performance"]and not dat_is_none:
is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
@ -299,53 +270,46 @@ class PlotWidget(BasePlotWidget):
TWC_time += TWC_delta
ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf:
worst_perf = curr_perf
worst_timeframe = point_timeframe
# Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
if not dat_is_none:
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
if widget_num == 0:
main_plot = plot_widget
main_plot = plot_item
else:
# Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot)
plot_item.setXLink(main_plot)
# Если есть настройка производительности, добавляем label
if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
if settings["performance"] and not dat_is_none:
self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time)
layout.addWidget(plot_widget)
layout.addWidget(navigator)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
plot_layout.addItem(plot_item, widget_num, 0)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot)
if navigator is not None:
plot_layout.addItem(navigator, widget_num+1, 0)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout)
return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
result_layout.addWidget(plot_layout)
return result_widget, reg_items, curve_items
def build(self, data: list[list[Any]]) -> None:
"""
Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида:
[
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, useful_data],
[dataframe, points_pocket, useful_data],
...
]
"""
widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets)
widgets_datapack = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets_datapack)

169
src/gui/reportGui.py Normal file
View File

@ -0,0 +1,169 @@
import pyqtgraph as pg
from pyqtgraph.parametertree import Parameter, ParameterTree
from typing import Union
from PyQt5 import QtWidgets
class ReportSettings(QtWidgets.QWidget):
def build(self, reg_items: dict, curve_items: dict) -> None:
"""Создает ParameterTree для элементов всех графиков выбранной вкладки"""
self._clear()
param_tree = ParameterTree()
layout = self.layout()
layout.addWidget(param_tree)
body= [
self._generate_reg_params(reg_items),
self._generate_curve_params(curve_items)
]
# Добавляем параметры в дерево
params = Parameter.create(name='params', type='group', children=body)
params.sigTreeStateChanged.connect(
lambda: self._update_settings(reg_items, curve_items, params)
)
param_tree.setParameters(params, showTop=False)
self.show()
def _clear(self) -> None:
"""
Приводит виджет в готовое к работе состояние.
Удаляет все содержимое, если имеется
"""
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else:
self.setLayout(QtWidgets.QVBoxLayout())
def _generate_reg_params(self,
reg_items: dict) -> dict:
"""Созадет реальные и идеальные секторы"""
res = {'name': 'Sectors', 'type': 'group', 'children': [
{'name': 'Real sectors', 'type': 'group', 'children': self._create_samples(reg_items["real"])},
{'name': 'Ideal sectors', 'type': 'group', 'children': self._create_samples(reg_items["ideal"])},
]}
return res
def _generate_curve_params(self,
curve_items: dict) -> dict:
"""Создает реальные и идеальные линии графиков"""
res = {'name': 'Plots', 'type': 'group', 'children': [
{'name': 'Real plots', 'type': 'group', 'children': self._create_samples(curve_items["real"])},
{'name': 'Ideal plots', 'type': 'group', 'children': self._create_ideal_curves(curve_items["ideal"])},
]}
return res
def _create_ideal_curves(self,
curve: dict) -> list[dict]:
"""Создает секторы с этапами циклограммы"""
res = []
for key, item in curve.items():
param = {'name': key, 'type': 'group', 'children': self._create_samples(item)}
res.append(param)
return res
def _create_samples(self,
sector: dict) -> list[dict]:
"""Создает список представленных элементов с их параметрами"""
res = []
for key, item in sector.items():
sample = item[0] if type(item) == list else item
param = {'name': key, 'type': 'group', 'children': self._create_settings(sample)}
res.append(param)
return res
def _create_settings(self,
item: Union[pg.LinearRegionItem, pg.PlotDataItem]) -> list[dict]:
"""Получает настройки для элемента"""
if type(item) == pg.LinearRegionItem:
pen = item.lines[0].pen
brush = item.brush
fill_color = brush.color().getRgb()
else:
pen = pg.mkPen(item.opts.get("pen"))
fill_color = None
line_color = pen.color().getRgb()
line_thickness = pen.width()
visibility = item.isVisible()
return [
{'name': 'Line color', 'type': 'color', 'value': line_color},
{'name': 'Line thickness', 'type': 'int', 'value': line_thickness, 'limits': (1, 10)},
{'name': 'Visibility', 'type': 'bool', 'value': visibility},
{'name': 'Fill color', 'type': 'color', 'value': fill_color},
]
def _update_settings(self,
reg_items: dict,
curve_items: dict,
params: Parameter) -> None:
"""Задает параметры элементов в соответствии с paramTree"""
real_sectors = params.child("Sectors").child("Real sectors")
ideal_sectors = params.child("Sectors").child("Ideal sectors")
real_plots = params.child("Plots").child("Real plots")
ideal_plots = params.child("Plots").child("Ideal plots")
self._set_sector_settings(reg_items["real"], real_sectors)
self._set_sector_settings(reg_items["ideal"], ideal_sectors)
self._set_plot_settings(curve_items["real"], real_plots)
for key, item_dict in curve_items["ideal"].items():
self._set_plot_settings(item_dict, ideal_plots.child(key))
def _set_sector_settings(self,
sectors: dict,
settings: Parameter) -> None:
"""Задает параметры секторов в соответствии с настройками"""
for key, item in sectors.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
fill_color = sample.child("Fill color").value()
pen = pg.mkPen(color=line_color, width=line_width)
brush=pg.mkBrush(fill_color)
for reg in item:
reg.setVisible(visibility)
reg.lines[0].setPen(pen)
reg.lines[1].setPen(pen)
reg.setBrush(brush)
def _set_plot_settings(self,
curves:dict,
settings: Parameter) -> None:
"""Задает параметры кривых в соответствии с настройками"""
for key, item in curves.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
pen = pg.mkPen(color=line_color, width=line_width)
if type(item) == list:
for curve in item:
curve.setVisible(visibility)
curve.setPen(pen)
else:
item.setVisible(visibility)
item.setPen(pen)

View File

@ -1,7 +1,8 @@
from typing import Callable, Optional, Any
from PyQt5.QtWidgets import (
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
)
from PyQt5.QtWidgets import (QWidget, QPushButton,
QLineEdit, QHBoxLayout,
QVBoxLayout, QLabel,
QTableWidget, QTableWidgetItem)
from PyQt5.QtGui import QIntValidator
from utils.json_tools import read_json, write_json
@ -11,7 +12,6 @@ class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
"""
Окно настроек для редактирования параметров.
:param path: Путь к файлу настроек (JSON).
:param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк).
@ -176,7 +176,16 @@ class settingsWindow(QWidget):
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
class SystemSettings(settingsWindow):
def __init__(self, path, name, upd_func):
super().__init__(path, name, upd_func)
self._num_points.setVisible(False)
def _expand(self):
pass
class OperatorSettings(settingsWindow):
pass
if __name__ == '__main__':

View File

@ -24,10 +24,9 @@ def main():
window.show()
controller.signal_widgets.connect(window.show_plot_tabs)
controller.signal_settings.connect(mediator.push_settings)
controller.signal_monitor.connect(monitor.custom_dir_extract)
window.push_settings()
controller.signal_settings.connect(mediator.update_settings)
controller.signal_raport_mode.connect(monitor.custom_csv_extract_only)
controller.signal_seeking_mode.connect(monitor.start_seeking)
sys.exit(app.exec_())

43
src/testAlgo.py Normal file
View File

@ -0,0 +1,43 @@
from src.OptAlgorithm.OptAlgorithm import OptAlgorithm
from src.utils import read_json
from matplotlib import pyplot as plt, use
from numpy import cos, sin, sqrt, cbrt, arcsin, linspace, array
if __name__ == "__main__":
tq = 1
ts = linspace(0, tq, 200000)
operator_params = read_json("params/operator_params.json")
system_params = read_json("params/system_params.json")
non_array_operator_params = {}
i = 1
for key, value in operator_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_operator_params[key] = value[i]
else:
non_array_operator_params[key] = value[0]
else:
non_array_operator_params[key] = value
non_array_system_params = {}
for key, value in system_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_system_params[key] = value[i]
else:
non_array_system_params[key] = value[0]
else:
non_array_system_params[key] = value
opt = OptAlgorithm(non_array_operator_params, non_array_system_params)
Xs = array([opt.getVar("X1", t) for t in ts])
plt.plot(ts, Xs)
plt.show()

View File

@ -5,14 +5,11 @@ from typing import Optional, Union, Any
from cachetools import LRUCache
import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtCore import QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget
from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm
import pandas as pd
import pandas as pd
import numpy as np
import pyqtgraph as pg
@ -38,7 +35,7 @@ class BaseMediator:
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
...
def push_settings (self, data: list[dict]):
def update_settings (self, data: list[dict]):
...
class BaseDirectoryMonitor:
@ -50,7 +47,8 @@ class BaseDirectoryMonitor:
super().__init__()
self._directory_path = None
self._update_time = None
self.isActive = False
self._files = []
self._mediator = mediator
@ -79,14 +77,19 @@ class BaseDirectoryMonitor:
self._files = files
def start(self):
self.isActive = True
self.update_timer.start(int(self._update_time))
def stop(self):
self.isActive = False
self.update_timer.stop()
def update_settings(self, data: list[dict]) -> None:
...
def update_plots(self) -> None:
...
def force_all_dir(self):
...
@ -128,7 +131,8 @@ class BasePlotWidget:
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False
"force compensation FE": False,
"force accuracy":True
},
"Real_signals": [
{
@ -159,7 +163,8 @@ class BasePlotWidget:
"ideals": True,
"mirror ME": True,
"workpiece": True,
"force compensation FE": True
"force compensation FE": True,
"force accuracy":False
},
"Real_signals": [
{
@ -190,7 +195,8 @@ class BasePlotWidget:
"ideals": True,
"mirror ME": False,
"workpiece": False,
"force compensation FE": False
"force compensation FE": False,
"force accuracy":False
},
"Real_signals": [
{
@ -227,6 +233,88 @@ class BasePlotWidget:
font-family: "Segoe UI", sans-serif;
}""")
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotItem(title="Navigator")
navigator.setFixedHeight(100)
# Получение кривых из main_plot
for curve in main_plot.listDataItems():
# Извлекаем данные из кривой
x, y = curve.getData()
curve_name = curve.opts.get("name", None)
signal_pen = curve.opts.get("pen", None)
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
ROI_region.setBounds([0, x[-1]])
navigator.addItem(ROI_region)
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
@property
def mediator(self) -> BaseMediator:
return self._mediator
@ -252,12 +340,14 @@ class BaseController(QObject):
def send_widgets(self, widgets: list[QWidget]) -> None:
...
def push_settings(self, settings: list[dict]) -> None:
def update_settings(self, settings: list[dict]) -> None:
...
def open_custom_file (self, filepath: str) -> None:
def raport_mode (self, filepath: str) -> None:
...
def seeking_mode(self) -> None:
...
class BaseIdealDataBuilder(OptAlgorithm):
@ -270,10 +360,12 @@ class BaseIdealDataBuilder(OptAlgorithm):
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = []
for i in range (0, int(end_timestamp*self.mul)):
for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul
X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame:
@ -305,6 +397,8 @@ class BaseMainWindow(QWidget):
def __init__(self,
controller: Optional[BaseController] = None):
super().__init__()
self.set_style(self)
self.resize(200,200)
self._controller = controller
...