Compare commits

..

16 Commits

Author SHA1 Message Date
Andrew
9faa35f514 dev: добавлены обработчики ошибок при чтении файлов + добавлено автоматическое закрытие окон 2024-12-23 14:18:54 +03:00
ccfb6b9fcb fix: Исправлено смыкание на идеальной траектории 2024-12-23 16:35:39 +03:00
Andrew
625f3d7e01 fix: исправлено взаимодействие модулей 2024-12-19 14:25:11 +03:00
Andrew
d99fafd2ae dev: добавлено построение идеального графика 2024-12-19 13:23:27 +03:00
Andrew
bf155af58a fix: убрал отображение КДИП и идеальной производительности 2024-12-19 11:24:33 +03:00
Andrew
3c18e22d09 dev: добавлено меню выбора режима работы + реализован режим работы raport 2024-12-18 19:43:46 +03:00
Andrew
4c79aba8a4 style: изменены параметры запуска 2024-12-18 15:08:23 +03:00
Andrew
3c7040692e dev: изменил создание виджета с графиками 2024-12-18 14:54:40 +03:00
Andrew
c3d976f6a6 dev: добавил возможность рисовать поле допусков для силы 2024-12-18 13:52:05 +03:00
Andrew
7c0db070c9 style: увеличена толщина границ ROI для удобства взаимодействия 2024-12-18 13:03:30 +03:00
Andrew
b73eba7f7e chore: добавил описание функций в новый класс 2024-12-18 12:58:37 +03:00
Andrew
27d1cd63de dev: изменил отображение настроек, цветов, теперь их можно редактировать 2024-12-18 12:44:43 +03:00
Andrew
37b9c30f12 chore: удалены лишние импорты 2024-12-18 10:16:50 +03:00
Andrew
2e66a77ced fix: теперь график-навигатор имеет ограничения по перемещению ROI 2024-12-18 10:13:16 +03:00
Andrew
eaec12d3ff dev: добавлено окно, отображающее параметры каждого элемента графика 2024-12-17 20:40:04 +03:00
Andrew
365118bc22 chore: часть функций plotter перенесена в base 2024-12-17 16:14:07 +03:00
34 changed files with 876 additions and 419 deletions

3
.gitignore vendored
View File

@ -6,4 +6,5 @@
/venv /venv
*.txt *.txt
*.svg *.svg
/test.py /test.py
/output

View File

@ -81,19 +81,19 @@
], ],
"distance_l_2": [ "distance_l_2": [
0.0275, 0.0275,
0.03, 0.0255,
0.033, 0.0242,
0.033, 0.0245,
0.033, 0.0228,
0.033, 0.0236,
0.033, 0.0229,
0.033, 0.0248,
0.033, 0.024,
0.033, 0.0235,
0.033, 0.025,
0.033, 0.0276,
0.033, 0.0234,
0.033 0.0215
], ],
"distance_h_end1": [ "distance_h_end1": [
0.003, 0.003,

Binary file not shown.

View File

@ -3,75 +3,76 @@ from numpy import sqrt, arcsin, arccos, cos, sin
from OptAlgorithm.AutoConfigClass import AutoConfigClass from OptAlgorithm.AutoConfigClass import AutoConfigClass
from OptAlgorithm.ConstantCalculator import ConstantCalculator from OptAlgorithm.ConstantCalculator import ConstantCalculator
class OptTimeCalculator(AutoConfigClass): class OptTimeCalculator(AutoConfigClass):
params_list = [] params_list = []
def __init__(self, operator_config : dict, system_config : dict):
def __init__(self, operator_config: dict, system_config: dict):
cCalculator = ConstantCalculator(operator_config, system_config) cCalculator = ConstantCalculator(operator_config, system_config)
super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc()) super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc())
self.allTimes = {} self.allTimes = {}
self.check_eps = 1e-7
def tGrowNominal(self, F : float) -> float:
return arcsin(F/(self.Ftogrow)) * sqrt(self.mass_1/self.k_hardness_1) def tGrowNominal(self, F: float) -> float:
return arcsin(F / (self.Ftogrow)) * sqrt(self.mass_1 / self.k_hardness_1)
def Tclose(self, h1: float, h2: float) -> None: def Tclose(self, h1: float, h2: float) -> None:
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1) v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow) v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
t1 = v0 / self.a_max_1 t1 = v0 / self.a_max_1
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0) t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 / 2)) / v0)
T1 = t1 + t2t T1 = t1 + t2t
t21 = sqrt(h2/self.a_max_2) t21 = sqrt(h2 / (self.a_max_2))
t21 = min(self.v_max_2/self.a_max_2, t21) t21 = min(self.v_max_2 / self.a_max_2, t21)
t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2) t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 T2 = t22 + 2 * t21
Tclose = max(T1, T2) Tclose = max(T1, T2)
tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1) tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1)
tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2) tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2)
self.allTimes["tclose_1_acc"] = tclose_1_acc self.allTimes["tclose_1_acc"] = tclose_1_acc
self.allTimes["tclose_1_speed"] = tclose_1_speed self.allTimes["tclose_1_speed"] = tclose_1_speed
self.allTimes["tclose_2_acc"] = tclose_2_acc self.allTimes["tclose_2_acc"] = tclose_2_acc
self.allTimes["tclose_2_speed"] = tclose_2_speed self.allTimes["tclose_2_speed"] = tclose_2_speed
self.allTimes["tclose"] = Tclose self.allTimes["tclose"] = Tclose
def Topen(self, s1 : float, s2 : float, l1 : float, l2 : float, Fs1 : float, Fs2 : float = 0) -> None: def Topen(self, s1: float, s2: float, l1: float, l2: float, Fs1: float, Fs2: float = 0) -> None:
t11 = sqrt((l1 + Fs1)/self.a_max_1) t11 = sqrt((l1 + Fs1) / self.a_max_1)
t11 = min(self.v_max_1/self.a_max_1, t11) t11 = min(self.v_max_1 / self.a_max_1, t11)
t12 = max(0, ((l1+Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1) t12 = max(0, ((l1 + Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
T1 = t12 + 2 * t11 T1 = t12 + 2 * t11
offset = self.calcSecondOpenOffset(t11, t12, Fs1) offset = self.calcSecondOpenOffset(t11, t12, Fs1)
t21 = sqrt(l2/self.a_max_2) t21 = sqrt(l2 / self.a_max_2)
t21 = min(self.v_max_2/self.a_max_2, t21) t21 = min(self.v_max_2 / self.a_max_2, t21)
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2) t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 + offset T2 = t22 + 2 * t21 + offset
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1) topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1 + Fs1)
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1) offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2) topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
self.allTimes["topen_1_acc"] = topen_1_acc self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_2_offset"] = offset self.allTimes["topen_2_offset"] = offset
self.allTimes["topen_1_acc"] = topen_1_acc self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_1_speed"] = topen_1_speed self.allTimes["topen_1_speed"] = topen_1_speed
self.allTimes["topen_2_acc"] = topen_2_acc self.allTimes["topen_2_acc"] = topen_2_acc
self.allTimes["topen_2_speed"] = topen_2_speed self.allTimes["topen_2_speed"] = topen_2_speed
if s1 >= l1: if s1 >= l1:
raise Exception("""S1 >= L1 - недопустимый сценарий, raise Exception("""S1 >= L1 - недопустимый сценарий,
проверьте distance_s_1, distance_h_end1""") проверьте distance_s_1, distance_h_end1""")
if s2 >= l2: if s2 >= l2:
raise Exception("""S2 >= L2 - недопустимый сценарий, raise Exception("""S2 >= L2 - недопустимый сценарий,
проверьте distance_s_2, distance_h_end2""") проверьте distance_s_2, distance_h_end2""")
s1 += Fs1 s1 += Fs1
topen_1_mark = sqrt(2 * s1 / self.a_max_1) topen_1_mark = sqrt(2 * s1 / self.a_max_1)
if topen_1_mark > topen_1_acc: if topen_1_mark > topen_1_acc:
@ -79,167 +80,173 @@ class OptTimeCalculator(AutoConfigClass):
v1 = topen_1_acc * self.a_max_1 v1 = topen_1_acc * self.a_max_1
if s1 > topen_1_speed * v1: if s1 > topen_1_speed * v1:
s1 -= topen_1_speed * v1 s1 -= topen_1_speed * v1
topen_1_mark = 2*topen_1_acc + topen_1_speed - sqrt(topen_1_acc**2 - 2*s1 / self.a_max_1) topen_1_mark = 2 * topen_1_acc + topen_1_speed - sqrt(topen_1_acc ** 2 - 2 * s1 / self.a_max_1)
else: else:
topen_1_mark = topen_1_acc + s1 / v1 topen_1_mark = topen_1_acc + s1 / v1
topen_2_mark = sqrt(2 * s2 / self.a_max_2) topen_2_mark = sqrt(2 * s2 / self.a_max_2)
if topen_2_mark > topen_2_acc: if topen_2_mark > topen_2_acc:
s2 -= topen_2_acc ** 2 * self.a_max_2 / 2 s2 -= topen_2_acc ** 2 * self.a_max_2 / 2
v2 = topen_2_acc * self.a_max_2 v2 = topen_2_acc * self.a_max_2
if s2 > topen_2_speed * v2: if s2 > topen_2_speed * v2:
s2 -= topen_2_speed * v2 s2 -= topen_2_speed * v2
topen_2_mark = 2*topen_2_acc + topen_2_speed - sqrt(topen_2_acc**2 - 2*s2 / self.a_max_2) topen_2_mark = 2 * topen_2_acc + topen_2_speed - sqrt(topen_2_acc ** 2 - 2 * s2 / self.a_max_2)
else: else:
topen_2_mark = topen_2_acc + s2 / v2 topen_2_mark = topen_2_acc + s2 / v2
self.allTimes["topen_1_mark"] = topen_1_mark self.allTimes["topen_1_mark"] = topen_1_mark
self.allTimes["topen_2_mark"] = topen_2_mark self.allTimes["topen_2_mark"] = topen_2_mark
def Tgrow(self) -> None:
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0) def Tgrow(self) -> None:
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed) v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1 / (self.mass_1)) * self.Ftogrow)
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0 * vF0)
tspeed = sqrt(self.mass_1 / self.k_hardness_1) * (
arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1 / self.freq * vF0 * sin(
self.freq * tspeed)
eps = 1e1 eps = 1e1
if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps: if self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 < -eps:
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
, проверьте параметры k_hardness_1, mass_1, k_prop""") , проверьте параметры k_hardness_1, mass_1, k_prop""")
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps) Fmeet = 1 / self.freq * sqrt(self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 + eps)
Fstart_prop = self.Fstart_prop Fstart_prop = self.Fstart_prop
if Fmeet > Fstart_prop: if Fmeet > Fstart_prop:
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
, проверьте параметры v_max_1, k_prop""") , проверьте параметры v_max_1, k_prop""")
tmeet = (Fmeet - Fspeed)/vFmax tmeet = (Fmeet - Fspeed) / vFmax
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet) tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2) vp = 1 / sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow ** 2 - self.Fstart_prop ** 2)
ap = Fstart_prop / self.mass_1 ap = Fstart_prop / self.mass_1
tprop = 2*vp / ap tprop = 2 * vp / ap
self.allTimes["tspeed"] = tspeed self.allTimes["tspeed"] = tspeed
self.allTimes["tmeet"] = tmeet self.allTimes["tmeet"] = tmeet
self.allTimes["tend"] = tend self.allTimes["tend"] = tend
self.allTimes["tprop"] = tprop self.allTimes["tprop"] = tprop
self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop
def T(self, h1 : float, h2 : float, s1 : float, s2 : float, l1 : float, l2 : float) -> dict: def T(self, h1: float, h2: float, s1: float, s2: float, l1: float, l2: float) -> dict:
self.Tclose(h1, h2) self.Tclose(h1, h2)
self.Tgrow() self.Tgrow()
self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0) self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0)
return self.allTimes return self.allTimes
def Tmovement(self, closeAlgo, tmark) -> None: def Tmovement(self, closeAlgo, tmark) -> tuple[list, list]:
contact = [self.contact_distance_1, self.contact_distance_2] contact = [self.contact_distance_1, self.contact_distance_2]
v0s = [] v0s = []
pos0s = [] pos0s = []
for i in range(1,3): for i in range(1, 3):
if tmark < 0: if tmark < 0:
raise Exception("""Отрицательное время этапа раскрытия, raise Exception("""Отрицательное время этапа раскрытия,
проверьте distance_s_{1,2}, time_command""") проверьте distance_s_{1,2}, time_command""")
v0 = closeAlgo("V"+str(i), "Open", tmark) v0 = closeAlgo("V" + str(i), "Open", tmark)
v0s.append(v0) v0s.append(v0)
x0 = closeAlgo("X"+str(i), "Open", tmark) x0 = closeAlgo("X" + str(i), "Open", tmark)
x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)] x1 = contact[i - 1] - self.__dict__["distance_h_end" + str(i)]
x = x1 - x0 x = x1 - x0
pos0s.append(closeAlgo("X"+str(i), "Open", tmark)) pos0s.append(closeAlgo("X" + str(i), "Open", tmark))
Tfull = self.time_robot_movement Tfull = self.time_robot_movement
L = self.__dict__["distance_l_" + str(i)]
L = self.__dict__["distance_l_"+str(i)] maxL = contact[i - 1] - L - x0
maxL = contact[i-1] - L - x0
self.Tmovementi(i, x, Tfull, v0, maxL) self.Tmovementi(i, x, Tfull, v0, maxL)
return pos0s, v0s return pos0s, v0s
def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None: def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None:
v0 = abs(v0) v0 = abs(v0)
vmax = self.__dict__["v_max_"+str(i)] vmax = self.__dict__["v_max_" + str(i)]
a = self.__dict__["a_max_"+str(i)] a = self.__dict__["a_max_" + str(i)]
t3 = (Tfull + v0 / a) / 2 t3 = (Tfull + v0 / a) / 2
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2) sqrtval = a ** 2 * (
a ** 2 * (Tfull + 2 * t3) ** 2 - 8 * a * Sfull + 2 * a * v0 * (Tfull + 2 * t3) - 3 * v0 ** 2)
if sqrtval < 0: if sqrtval < 0:
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время, raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""") проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2) t1max = ((Tfull + 2 * t3) + v0 / a) / (2) - sqrt(sqrtval) * sqrt(2) / (4 * a ** 2)
t1 = min(t1max, (vmax- abs(v0))/a) t1 = min(t1max, (vmax - abs(v0)) / a)
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a))) t1 = max(0, min(t1, -v0 / a + sqrt(v0 ** 2 / (a ** 2) + (abs(maxL) - v0 * v0 / a) / a)))
t31 = v0/a + t1 t31 = v0 / a + t1
t5max = (Tfull - v0/a)/2 - t1 t5max = (Tfull - v0 / a) / 2 - t1
v1 = v0 + a * t1 v1 = v0 + a * t1
S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2 S1 = v0 * t1 + a * t1 * t1 / 2 + v1 * t31 - a * t31 * t31 / 2
S2max = Sfull + S1 S2max = Sfull + S1
t5 = min(t5max, (vmax)/a, sqrt(S2max / a)) t5 = min(t5max, (vmax) / a, sqrt(S2max / a))
t3 = abs(v0)/a + t1 + t5 t3 = abs(v0) / a + t1 + t5
t32 = t5 t32 = t5
v1 = abs(v0+t1*a) v1 = abs(v0 + t1 * a)
v3 = abs(v0 + t1*a - t3*a) v3 = abs(v0 + t1 * a - t3 * a)
timeleft = Tfull - t1 - t5 - t3 timeleft = Tfull - t1 - t5 - t3
sq = -v0*t1 - a*t1**2/2 - v1 * t3 + a*t3**2/2 + v3*t5 - a*t5**2/2 sq = -v0 * t1 - a * t1 ** 2 / 2 - v1 * t3 + a * t3 ** 2 / 2 + v3 * t5 - a * t5 ** 2 / 2
Sleft = Sfull - sq Sleft = Sfull - sq
t2max = (timeleft - Sleft/v3) / (1 + v1/v3) t2max = (timeleft - Sleft / v3) / (1 + v1 / v3)
Smovement = -v0 * t1 - a/2 * t1**2 - v1 * t31 + a/2*t31**2 Smovement = -v0 * t1 - a / 2 * t1 ** 2 - v1 * t31 + a / 2 * t31 ** 2
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1)) t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement)) / v1))
t4 = max(0, Sleft/v3 + v1/v3 * t2) t4 = max(0, Sleft / v3 + v1 / v3 * t2)
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5) tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
self.allTimes["tmovement_"+str(i)+"_acc"] = t1 self.allTimes["tmovement_" + str(i) + "_acc"] = t1
self.allTimes["tmovement_"+str(i)+"_speed"] = t2 self.allTimes["tmovement_" + str(i) + "_speed"] = t2
self.allTimes["tmovement_"+str(i)+"_slow"] = t31 self.allTimes["tmovement_" + str(i) + "_slow"] = t31
self.allTimes["tmovement_"+str(i)+"_stay"] = tstay self.allTimes["tmovement_" + str(i) + "_stay"] = tstay
self.allTimes["tmovement_"+str(i)] = t1 + t2 + t31 + tstay self.allTimes["tmovement_" + str(i)] = t1 + t2 + t31 + tstay
self.allTimes["tpreclose_"+str(i)+"_slow"] = t32 self.allTimes["tpreclose_" + str(i) + "_slow"] = t32
self.allTimes["tpreclose_"+str(i)+"_speed"] = t4 self.allTimes["tpreclose_" + str(i) + "_speed"] = t4
self.allTimes["tpreclose_"+str(i)+"_acc"] = t5 self.allTimes["tpreclose_" + str(i) + "_acc"] = t5
self.allTimes["tpreclose_"+str(i)] = t32 + t4 + t5 self.allTimes["tpreclose_" + str(i)] = t32 + t4 + t5
T = Tfull T = Tfull
self.allTimes["tmovement"] = T self.allTimes["tmovement"] = T
def calcFirstClose(self, T : float, s : float) -> tuple[float, float]: def calcFirstClose(self, T: float, s: float) -> tuple[float, float]:
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1) v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow) v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1)) t1 = T - sqrt(max(0, T ** 2 - 2 * s / self.a_max_1))
t1 = min(t1, v0 / self.a_max_1) if t1 > v0/ self.a_max_1 + self.check_eps:
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1)) raise Exception("""Мы вышли за границы разгона - смыкание FE, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2 return t1, t2
def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]: def calcFirstOpen(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2 t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_1)) / 2
t1 = min(t1, self.v_max_1 / self.a_max_1) if t1 > self.v_max_1 / self.a_max_1 + self.check_eps:
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1)) raise Exception("""Мы вышли за границы разгона - раскрытие FE, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2 return t1, t2
def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]: def calcSecondOpen(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2) if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1)) raise Exception("""Мы вышли за границы разгона - раскрытие ME, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2 return t1, t2
def calcSecondClose(self, T : float, s : float) -> tuple[float, float]: def calcSecondClose(self, T: float, s: float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
t1 = min(t1, self.v_max_2 / self.a_max_2) if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1)) raise Exception("""Мы вышли за границы разгона - смыкание ME, вообще не знаю как так получилось""")
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2 return t1, t2
def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float: def calcSecondOpenOffset(self, t1: float, t2: float, sq: float) -> float:
s = sq * 1 s = sq * 1
offset = sqrt(2 * s / self.a_max_1) offset = sqrt(2 * s / self.a_max_1)
if offset > t1: if offset > t1:
s -= t1 ** 2 * self.a_max_1 / 2 s -= t1 ** 2 * self.a_max_1 / 2
v1 = t1 * self.a_max_1 v1 = t1 * self.a_max_1
if s > t2 * v1: if s > t2 * v1:
s -= t2 * v1 s -= t2 * v1
offset = 2*t1 + t2 - sqrt(t1**2 - 2*s / self.a_max_1) offset = 2 * t1 + t2 - sqrt(t1 ** 2 - 2 * s / self.a_max_1)
else: else:
offset = t1 + s / v1 offset = t1 + s / v1
return offset return offset

View File

@ -8,13 +8,21 @@ class Controller(BaseController):
signal_widgets = pyqtSignal(list) signal_widgets = pyqtSignal(list)
signal_settings = pyqtSignal(list) signal_settings = pyqtSignal(list)
signal_monitor = pyqtSignal(str) signal_raport_mode = pyqtSignal(str)
signal_seeking_mode = pyqtSignal()
def send_widgets(self, widgets: list[QWidget]) -> None: def send_widgets(self, widgets: list[QWidget]) -> None:
self.signal_widgets.emit(widgets) self.signal_widgets.emit(widgets)
def push_settings(self, settings: list[dict]) -> None: def update_settings(self, settings: list[dict]) -> None:
self.signal_settings.emit(settings) self.signal_settings.emit(settings)
def open_custom_file (self, filepath: str) -> None: def raport_mode(self, filepath: str) -> None:
self.signal_monitor.emit(filepath) self.signal_raport_mode.emit(filepath)
def seeking_mode(self) -> None:
self.signal_seeking_mode.emit()

View File

@ -1,6 +1,9 @@
import pandas as pd import pandas as pd
import traceback
import sys
from loguru import logger
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить. # FIXME: костыль для выключения предупреждения "replace deprecated"
pd.set_option('future.no_silent_downcasting', True) pd.set_option('future.no_silent_downcasting', True)
from utils.base.base import BaseDataConverter from utils.base.base import BaseDataConverter
@ -10,11 +13,23 @@ class DataConverter(BaseDataConverter):
@staticmethod @staticmethod
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame: def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()] try:
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0}) bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
return dataframe dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
return dataframe
except:
return None
def convert_data(self, files: list[str]) -> None: def convert_data(self, files: list[str]) -> None:
dataframes = [pd.read_csv(file) for file in files] try:
converted_dataframes = list(map(self._replace_bool, dataframes)) dataframes = [pd.read_csv(file) if file != '' else None for file in files]
self._mediator.notify(self, converted_dataframes) converted_dataframes = list(map(self._replace_bool, dataframes))
except:
# Get the traceback object
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
converted_dataframes = [None]
finally:
self._mediator.notify(self, converted_dataframes)

View File

@ -3,7 +3,9 @@ import pandas as pd
from typing import Union from typing import Union
from PyQt5.QtWidgets import QWidget from PyQt5.QtWidgets import QWidget
from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer from utils.base.base import (BaseMediator, BaseDirectoryMonitor,
BaseDataConverter, BasePlotWidget,
BasePointPassportFormer)
class Mediator(BaseMediator): class Mediator(BaseMediator):
@ -24,9 +26,9 @@ class Mediator(BaseMediator):
if issubclass(source.__class__, BasePlotWidget): if issubclass(source.__class__, BasePlotWidget):
self._controller.send_widgets(data) self._controller.send_widgets(data)
def push_settings(self, settings: list[dict]): def update_settings(self, settings: list[dict]):
self._monitor.update_settings(settings) self._monitor.update_settings(settings)
self._passportFormer.update_settings(settings) self._passportFormer.update_settings(settings)
self._monitor.force_all_dir() self._monitor.update_plots()

View File

@ -1,6 +1,4 @@
from time import sleep
import os import os
from loguru import logger from loguru import logger
from utils.base.base import BaseDirectoryMonitor from utils.base.base import BaseDirectoryMonitor
@ -9,47 +7,62 @@ from utils.base.base import BaseDirectoryMonitor
class DirectoryMonitor(BaseDirectoryMonitor): class DirectoryMonitor(BaseDirectoryMonitor):
def _init_state(self): def _init_state(self):
files = os.listdir(self._directory_path)
self._files = files self._files = [
os.path.join(self._directory_path, file)
for file in os.listdir(self._directory_path)
if file.lower().endswith('.csv')
]
self.update_timer.timeout.connect(self._monitor) self.update_timer.timeout.connect(self._monitor)
logger.info("Monitor initiated!") logger.info("Monitor initiated!")
def _monitor(self): def _monitor(self):
files = os.listdir(self._directory_path) current_files = [
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x), os.path.join(self._directory_path, file)
filter(lambda x: x not in self._files, files)))) for file in os.listdir(self._directory_path)
if file.lower().endswith('.csv')
]
new_files = sorted(list(filter(lambda x: x not in self._files, current_files)))
if new_files: if new_files:
logger.info(f"New files detected: {new_files}") logger.info(f"New files detected: {new_files}")
self._mediator.notify(self, new_files) self._mediator.notify(self, new_files)
self._files = files self._files = current_files
if not files: if not current_files:
self._files = [] self._files = []
def update_settings(self, data: list[dict]) -> None: def update_settings(self, data: list[dict]) -> None:
if self.isActive:
self.stop()
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
if not os.path.exists(self._directory_path):
logger.error(f"Путь {self._directory_path} не существует.")
raise FileNotFoundError(f"Путь {self._directory_path} не существует.")
else:
self._init_state()
self.start()
else:
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
def update_plots(self):
if self._files is not None:
self._mediator.notify(self, self._files)
def custom_csv_extract_only(self, path: str):
self.stop() self.stop()
operator_params, system_params = data self._files.append(path)
self._directory_path = system_params['trace_storage_path'][0] if path is not None:
self._update_time = system_params['monitor_update_period'][0] self._mediator.notify(self, [path])
else:
self._mediator.notify(self, [None])
def start_seeking(self) -> None:
self._init_state() self._init_state()
self.start() self.start()
def force_all_dir(self):
files = os.listdir(self._directory_path)
self._files = files
all_files = []
for x in self._files:
path = os.path.join(self._directory_path, x)
all_files.append(path)
if all_files:
logger.info(f"Plotting all files: {all_files}")
self._mediator.notify(self, all_files)
else:
logger.info(f"Failed pushing {str(all_files)}")
def custom_dir_extract(self, dict_path:str ):
files = os.listdir(dict_path)
if files is not None:
all_files = [os.path.join(dict_path, file) for file in files]
self._mediator.notify(self, all_files)

View File

@ -1,6 +1,6 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd import pandas as pd
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
class idealDataBuilder(BaseIdealDataBuilder): class idealDataBuilder(BaseIdealDataBuilder):
@ -18,14 +18,15 @@ class idealDataBuilder(BaseIdealDataBuilder):
def get_weldingDF(self) -> pd.DataFrame: def get_weldingDF(self) -> pd.DataFrame:
data = [] data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']) X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data) return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]: def get_ideal_timings(self) -> list[float]:
data = self.Ts data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
return ideal_timings return ideal_timings
class PassportFormer(BasePointPassportFormer): class PassportFormer(BasePointPassportFormer):
@ -36,49 +37,50 @@ class PassportFormer(BasePointPassportFormer):
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]: def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
system_settings = {key: value[0] for key, value in self._params[1].items()}
if dataframe is not None:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if point_quantity == 0:
return []
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
else:
events = None
point_quantity = 1
points_pocket = []
system_settings = {key: value[0] for key, value in self._params[1].items()}
tesla_time = sum(self._params[0].get("Tesla summary time", [])) tesla_time = sum(self._params[0].get("Tesla summary time", []))
useful_data = {"tesla_time": tesla_time, useful_data = {"tesla_time": tesla_time,
"range_ME": system_settings["Range ME, mm"], "range_ME": system_settings["Range ME, mm"],
"k_hardness": system_settings["k_hardness_1"] "k_hardness": system_settings["k_hardness_1"]
} }
points_pocket = [] for i in range(point_quantity):
operator_settings = {
time_is_valid = not dataframe["time"].isna().all() key: (value[i] if i < len(value) else value[0])
for key, value in self._params[0].items()
if time_is_valid: }
params_list = [operator_settings, system_settings]
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe :
for i in range(point_quantity): ideal_data = self._ideal_data_cashe[cache_key]
operator_settings = { print(f"Cache hit")
key: (value[i] if i < len(value) else value[0]) else:
for key, value in self._params[0].items() ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
} self._ideal_data_cashe[cache_key] = ideal_data
params_list = [operator_settings, system_settings] print(f"Cache miss. Computed and cached.")
cache_key = self._generate_cache_key(params_list) if events is not None:
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
idx = i+1 if idx_shift else i idx = i+1 if idx_shift else i
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]] point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()} point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
useful_p_data = {"thickness": operator_settings["object_thickness"], else:
"L2": operator_settings["distance_l_2"], point_timeframe, point_events = None, None
"force": operator_settings["force_target"]} useful_p_data = {"thickness": operator_settings["object_thickness"],
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data]) points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
return dataframe, points_pocket, useful_data return dataframe, points_pocket, useful_data
def update_settings(self, params: list[dict, dict]): def update_settings(self, params: list[dict, dict]):
self._params = params self._params = params

Binary file not shown.

View File

@ -1,47 +1,148 @@
from datetime import datetime as dt from datetime import datetime as dt
from typing import Optional from typing import Optional
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
from PyQt5.QtGui import QPixmap
from utils.base.base import BaseMainWindow, BaseController from utils.base.base import BaseMainWindow, BaseController
from gui.settings_window import settingsWindow from gui.settings_window import SystemSettings, OperatorSettings
from gui.reportGui import ReportSettings
class MainWindow(BaseMainWindow): class MainWindow(BaseMainWindow):
def __init__(self, def __init__(self,
controller: Optional[BaseController] = None): controller: Optional[BaseController] = None) -> None:
super().__init__() super().__init__()
self._controller = controller self._controller = controller
self.initUI() self._init_startUI()
self.set_style(self)
self.settings_button.clicked.connect(self._show_settings)
self.select_dir_button.clicked.connect(self._select_dir)
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
def initUI(self) -> None: def _init_startUI(self) -> None:
self.operSettings = OperatorSettings("params/operator_params.json", 'Operator', self._transfer_settings)
self.sysSettings = SystemSettings("params/system_params.json", 'System', self._transfer_settings)
self.repSettings = ReportSettings()
self.tabWidget = QtWidgets.QTabWidget()
self._clear()
seeking_mode_btn = QtWidgets.QPushButton("Real time folder scanning")
seeking_mode_btn.setFixedWidth(300)
seeking_mode_btn.clicked.connect(self._init_seekingUI)
raport_mode_btn = QtWidgets.QPushButton("Raport editor")
raport_mode_btn.setFixedWidth(300)
raport_mode_btn.clicked.connect(self._init_raportUI)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(seeking_mode_btn)
button_layout.addWidget(raport_mode_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
mainLayout = self.layout()
label = QtWidgets.QLabel("Select work mode")
label.setStyleSheet(
"""QLabel{
color: #ffffff;
font-size: 40px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}"""
)
mainLayout.addWidget(label, alignment=Qt.AlignCenter)
mainLayout.addWidget(button_widget)
def _clear(self) -> None:
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else: self.setLayout(QtWidgets.QVBoxLayout())
def _init_seekingUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget() self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True) self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab) self.tabWidget.tabCloseRequested.connect(self._close_tab)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.tabWidget)
self.settings_button = QtWidgets.QPushButton("Show settings") sys_settings_btn = QtWidgets.QPushButton("System settings")
self.settings_button.setFixedWidth(160) sys_settings_btn.setFixedWidth(200)
self.select_dir_button = QtWidgets.QPushButton("Open directory") sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
self.select_dir_button.setFixedWidth(175) oper_settings_btn = QtWidgets.QPushButton("Operator settings")
oper_settings_btn.setFixedWidth(200)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
button_layout = QtWidgets.QHBoxLayout() button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2) button_layout.setSpacing(2)
button_layout.addWidget(self.settings_button) button_layout.addWidget(sys_settings_btn)
button_layout.addWidget(self.select_dir_button) button_layout.addWidget(oper_settings_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("online mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.seeking_mode()
# TODO:push seeking to mediator
def _init_raportUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab)
sys_settings_btn = QtWidgets.QPushButton("System settings")
sys_settings_btn.setFixedWidth(185)
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
oper_settings_btn.setFixedWidth(185)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
view_settings_btn = QtWidgets.QPushButton("Customize view")
view_settings_btn.setFixedWidth(185)
view_settings_btn.clicked.connect(self._customization_window)
save_screen_btn = QtWidgets.QPushButton("Save state")
save_screen_btn.setFixedWidth(185)
save_screen_btn.clicked.connect(self._save_plots)
open_file_btn = QtWidgets.QPushButton("Open file")
open_file_btn.setFixedWidth(185)
open_file_btn.clicked.connect(self._open_file)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(sys_settings_btn)
button_layout.addWidget(oper_settings_btn)
button_layout.addWidget(view_settings_btn)
button_layout.addWidget(save_screen_btn)
button_layout.addWidget(open_file_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("raport mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.raport_mode(None)
#self._controller.raport_mode(path)
# TODO: push only one dir to monitor
layout.addLayout(button_layout)
self.setLayout(layout)
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None: def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
for plot_widget in plot_widgets: for plot_widget in plot_widgets:
widget, reg_items, curve_items = plot_widget
tab = QtWidgets.QWidget() tab = QtWidgets.QWidget()
tab.setProperty("reg_items", reg_items)
tab.setProperty("curve_items", curve_items)
grid = QtWidgets.QGridLayout() grid = QtWidgets.QGridLayout()
grid.addWidget(plot_widget) grid.addWidget(widget)
tab.setLayout(grid) tab.setLayout(grid)
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S')) self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
self.tabWidget.setCurrentWidget(tab) self.tabWidget.setCurrentWidget(tab)
@ -51,31 +152,61 @@ class MainWindow(BaseMainWindow):
for i in range(0, tab_count-2): for i in range(0, tab_count-2):
self._close_tab(i) self._close_tab(i)
def keyPressEvent(self, a0) -> None:
def keyPressEvent(self, a0):
if a0.key() == Qt.Key_F5: if a0.key() == Qt.Key_F5:
pass tab_count = self.tabWidget.count()
for i in range(0, tab_count):
self._close_tab(i)
def closeEvent(self, a0):
self.operSettings.close()
self.sysSettings.close()
self.repSettings.close()
super().closeEvent(a0)
def _show_settings(self): def _show_settings(self) -> None:
self.operSettings.show() self.operSettings.show()
self.sysSettings.show() self.sysSettings.show()
def push_settings(self) -> None:
self._updater_trigger()
def _updater_trigger(self) -> None: def _transfer_settings(self) -> None:
self.tabWidget.clear() self.tabWidget.clear()
operator_params = self.operSettings.getParams() operator_params = self.operSettings.getParams()
system_params = self.sysSettings.getParams() system_params = self.sysSettings.getParams()
self._controller.push_settings([operator_params, system_params]) self._controller.update_settings([operator_params, system_params])
def _close_tab(self, index:int) -> None: def _close_tab(self, index:int) -> None:
self.tabWidget.removeTab(index) self.tabWidget.removeTab(index)
def _open_file(self) -> None:
path = self._select_csv()
self._controller.raport_mode(path)
def _select_dir(self): def _select_csv(self) -> Optional[str]:
folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "") CSV_path, _ = QtWidgets.QFileDialog.getOpenFileName(self,"Select csv file", "", "CSV Files (*.csv)")
if folder_path: if CSV_path:
self._controller.open_custom_file(folder_path) print(CSV_path)
return CSV_path
return None
def _customization_window(self) -> None:
tab = self.tabWidget.currentWidget()
reg_items = tab.property("reg_items")
curve_items = tab.property("curve_items")
self.repSettings.build(reg_items, curve_items)
def _save_plots(self) -> None:
filepath, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save file", "", "Image Files (*.png *.jpeg)")
tab = self.tabWidget.currentWidget()
pixmap = QPixmap(tab.size())
tab.render(pixmap)
pixmap.save(filepath)

View File

@ -1,9 +1,7 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
import copy import copy
import pyqtgraph as pg import pyqtgraph as pg
import pandas as pd
from typing import Optional, Any from typing import Optional, Any
from utils.base.base import BasePlotWidget from utils.base.base import BasePlotWidget
@ -14,56 +12,6 @@ class ProcessStage():
finish_index:int finish_index:int
class PlotWidget(BasePlotWidget): class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self, def _create_curve_ideal(self,
signal: dict[str, Any], signal: dict[str, Any],
@ -97,20 +45,20 @@ class PlotWidget(BasePlotWidget):
return None return None
@staticmethod @staticmethod
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]: def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
plot_widget = pg.PlotWidget(title=title) plot_item = pg.PlotItem(title=title)
# Оптимизация отображения графиков # Оптимизация отображения графиков
plot_widget.setDownsampling(auto=True, mode='peak') plot_item.setDownsampling(auto=True, mode='peak')
plot_widget.showGrid(x=True, y=True) plot_item.showGrid(x=True, y=True)
plot_widget.setClipToView(True) plot_item.setClipToView(True)
legend = pg.LegendItem((80, 60), offset=(70, 20)) legend = plot_item.addLegend(offset=(70, 20))
legend.setParentItem(plot_widget.graphicsItem()) return plot_item, legend
return plot_widget, legend
def _add_stage_regions(self, def _add_stage_regions(self,
plot_widget: pg.PlotWidget, plot_item: pg.PlotItem,
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
dataframe_headers: list[str], dataframe_headers: list[str],
reg_items: dict,
transparency: int = 75) -> None: transparency: int = 75) -> None:
""" """
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма. Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
@ -122,12 +70,15 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency) region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None: if region is not None:
region.setZValue(-20) region.setZValue(-20)
plot_widget.addItem(region) plot_item.addItem(region)
reg_items["real"].setdefault(stage, [])
reg_items["real"][stage].append(region)
def _add_ideal_stage_regions(self, def _add_ideal_stage_regions(self,
plot_widget: pg.PlotWidget, plot_item: pg.PlotItem,
ideal_data: dict[str, Any], ideal_data: dict[str, Any],
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
reg_items: dict,
transparency: int = 125) -> None: transparency: int = 125) -> None:
""" """
Добавляет регионы для идеальных этапов. Добавляет регионы для идеальных этапов.
@ -140,13 +91,16 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency) region = self._create_stage_region(stage, start_t, end_t, transparency)
if region: if region:
region.setZValue(-10) region.setZValue(-10)
plot_widget.addItem(region) plot_item.addItem(region)
reg_items["ideal"].setdefault(stage, [])
reg_items["ideal"][stage].append(region)
def _add_ideal_signals(self, def _add_ideal_signals(self,
plot_widget: pg.PlotWidget, plot_item: pg.PlotItem,
ideal_data: dict[str, Any], ideal_data: dict[str, Any],
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]]) -> None: ideal_signals: list[dict[str, Any]],
curve_items: dict) -> None:
""" """
Добавляет идеальные сигналы для каждого этапа. Добавляет идеальные сигналы для каждого этапа.
""" """
@ -159,23 +113,29 @@ class PlotWidget(BasePlotWidget):
point_events[stage][1] point_events[stage][1]
) )
if curve: if curve:
curve.setZValue(10) curve.setZValue(50)
plot_widget.addItem(curve) plot_item.addItem(curve)
curve_items["ideal"].setdefault(signal["name"], {})
curve_items["ideal"][signal["name"]].setdefault(stage, [])
curve_items["ideal"][signal["name"]][stage].append(curve)
def _add_real_signals(self, def _add_real_signals(self,
plot_widget: pg.PlotWidget, plot_item: pg.PlotItem,
dataframe: pd.DataFrame, dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]], real_signals: list[dict[str, Any]],
legend: pg.LegendItem) -> None: legend: pg.LegendItem,
curve_items: dict) -> None:
""" """
Добавляет реальные сигналы из dataframe на виджет. Добавляет реальные сигналы из dataframe на виджет.
""" """
dataframe_headers = dataframe.columns.tolist() dataframe_headers = dataframe.columns.tolist()
for signal in real_signals: for signal in real_signals:
if signal["name"] in dataframe_headers: if signal["name"] in dataframe_headers:
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True) plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
plot.setZValue(0) plot.setZValue(0)
legend.addItem(plot, signal["name"]) legend.addItem(plot, signal["name"])
curve_items["real"].setdefault(signal["name"], {})
curve_items["real"][signal["name"]] = plot
def _add_performance_label(self, def _add_performance_label(self,
layout: QVBoxLayout, layout: QVBoxLayout,
@ -190,52 +150,35 @@ class PlotWidget(BasePlotWidget):
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0 TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel( performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %, " f"Сокращение длительности: фактическое = {tesla_TWC} %"
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
) )
#f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
self.set_style(performance_label) self.set_style(performance_label)
layout.addWidget(performance_label) layout.addWidget(performance_label)
performance_label.update() performance_label.update()
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _build_widget(self, data: list[Any]) -> QWidget: def _build_widget(self, data: list[Any]) -> QWidget:
""" """
Собирает графический виджет для одного набора данных. Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data]. Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
""" """
widget = QWidget() result_widget = QWidget()
layout = QVBoxLayout(widget) result_layout = QVBoxLayout(result_widget)
plot_layout = pg.GraphicsLayoutWidget()
reg_items = {"real":{}, "ideal":{}}
curve_items = {"real":{}, "ideal":{}}
dataframe, points_pocket, useful_data = data dataframe, points_pocket, useful_data = data
tesla_time = useful_data["tesla_time"] tesla_time = useful_data["tesla_time"]
range_ME = useful_data["range_ME"] range_ME = useful_data["range_ME"]
k_hardness = useful_data["k_hardness"] k_hardness = useful_data["k_hardness"]
dataframe_headers = dataframe.columns.tolist()
dat_is_none = dataframe is None
if not dat_is_none: dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()): for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_widget, legend = self._init_plot_widget(title=channel) plot_item, legend = self._init_plot_item(title=channel)
settings = description["Settings"] settings = description["Settings"]
TWC_time = 0.0 TWC_time = 0.0
@ -243,17 +186,31 @@ class PlotWidget(BasePlotWidget):
worst_perf = 2 worst_perf = 2
# TODO: рассчитать корректный параметр range # TODO: рассчитать корректный параметр range
if settings["mirror ME"]: if settings["mirror ME"] and not dat_is_none:
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME) dataframe = self._mirror_shift_data(
"ME",
description["Real_signals"],
dataframe,
range_ME
)
# Итерация по точкам # Итерация по точкам
for cur_point, point_data in enumerate(points_pocket): for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events] # point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data]
point_timeframe, ideal_dat, point_events, useful_p_data = point_data point_timeframe, ideal_dat, point_events, useful_p_data = point_data
ideal_data = copy.deepcopy(ideal_dat) ideal_data = copy.deepcopy(ideal_dat)
if dat_is_none:
worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]]
point_events = {}
keys = list(ideal_data.keys())
shift = 0
for i, time in enumerate(ideal_data["Ideal timings"]):
point_events[keys[i]] = [shift, time+shift]
shift += time
# TODO: проверить корректность расчетов # TODO: проверить корректность расчетов
if settings["force compensation FE"]: if settings["force compensation FE"] and not dat_is_none:
force = useful_p_data["force"] force = useful_p_data["force"]
F_comp = - force/k_hardness F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
@ -266,10 +223,9 @@ class PlotWidget(BasePlotWidget):
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME) ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
# Добавляем реальные стадии # Добавляем реальные стадии
if settings["stages"]: if settings["stages"] and not dat_is_none:
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75) self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75)
# TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
if settings["workpiece"]: if settings["workpiece"]:
x1 = point_timeframe[0] x1 = point_timeframe[0]
dx = point_timeframe[1] - x1 dx = point_timeframe[1] - x1
@ -280,15 +236,30 @@ class PlotWidget(BasePlotWidget):
rect_item.setZValue(-5) rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush('grey')) rect_item.setBrush(pg.mkBrush('grey'))
rect_item.setPen(pg.mkPen('black', width=3)) rect_item.setPen(pg.mkPen('black', width=3))
plot_widget.addItem(rect_item) plot_item.addItem(rect_item)
if settings["force accuracy"]and not dat_is_none:
modifier = 0.05
x1 = point_events["Welding"][0]
dx = point_events["Welding"][1] - x1
force = useful_p_data["force"]
y1 = force*(1-modifier)
dy = force*(2*modifier)
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
rect_item.setPen(pg.mkPen('black', width=0))
plot_item.addItem(rect_item)
# Добавляем идеальные стадии и идеальные сигналы # Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]: if settings["ideals"]:
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100) self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100)
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"]) self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items)
# Подсчёт производительности # Подсчёт производительности
if settings["performance"]: if settings["performance"]and not dat_is_none:
is_last_point = (cur_point == len(points_pocket) - 1) is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point: if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]]) TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
@ -299,53 +270,46 @@ class PlotWidget(BasePlotWidget):
TWC_time += TWC_delta TWC_time += TWC_delta
ideal_time += ideal_delta ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1 curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf: if curr_perf < worst_perf:
worst_perf = curr_perf worst_perf = curr_perf
worst_timeframe = point_timeframe worst_timeframe = point_timeframe
# Добавляем реальные сигналы # Добавляем реальные сигналы
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend) if not dat_is_none:
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
if widget_num == 0: if widget_num == 0:
main_plot = plot_widget main_plot = plot_item
else: else:
# Связываем остальные графики с основным графиком # Связываем остальные графики с основным графиком
plot_widget.setXLink(main_plot) plot_item.setXLink(main_plot)
# Если есть настройка производительности, добавляем label if settings["performance"] and not dat_is_none:
if settings["performance"]: self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time)
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
layout.addWidget(plot_widget) plot_layout.addItem(plot_item, widget_num, 0)
layout.addWidget(navigator) navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot)
self._sync_main_plot_with_navigator(main_plot, ROI_region) if navigator is not None:
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region)) plot_layout.addItem(navigator, widget_num+1, 0)
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
widget.setLayout(layout) result_layout.addWidget(plot_layout)
return widget return result_widget, reg_items, curve_items
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[list[Any]]) -> None: def build(self, data: list[list[Any]]) -> None:
""" """
Создает набор виджетов по предоставленному списку данных. Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида: Предполагается, что data это список элементов вида:
[ [
[dataframe, points_pocket, tesla_time], [dataframe, points_pocket, useful_data],
[dataframe, points_pocket, tesla_time], [dataframe, points_pocket, useful_data],
... ...
] ]
""" """
widgets = [self._build_widget(data_sample) for data_sample in data] widgets_datapack = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets) self._mediator.notify(self, widgets_datapack)

169
src/gui/reportGui.py Normal file
View File

@ -0,0 +1,169 @@
import pyqtgraph as pg
from pyqtgraph.parametertree import Parameter, ParameterTree
from typing import Union
from PyQt5 import QtWidgets
class ReportSettings(QtWidgets.QWidget):
def build(self, reg_items: dict, curve_items: dict) -> None:
"""Создает ParameterTree для элементов всех графиков выбранной вкладки"""
self._clear()
param_tree = ParameterTree()
layout = self.layout()
layout.addWidget(param_tree)
body= [
self._generate_reg_params(reg_items),
self._generate_curve_params(curve_items)
]
# Добавляем параметры в дерево
params = Parameter.create(name='params', type='group', children=body)
params.sigTreeStateChanged.connect(
lambda: self._update_settings(reg_items, curve_items, params)
)
param_tree.setParameters(params, showTop=False)
self.show()
def _clear(self) -> None:
"""
Приводит виджет в готовое к работе состояние.
Удаляет все содержимое, если имеется
"""
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else:
self.setLayout(QtWidgets.QVBoxLayout())
def _generate_reg_params(self,
reg_items: dict) -> dict:
"""Созадет реальные и идеальные секторы"""
res = {'name': 'Sectors', 'type': 'group', 'children': [
{'name': 'Real sectors', 'type': 'group', 'children': self._create_samples(reg_items["real"])},
{'name': 'Ideal sectors', 'type': 'group', 'children': self._create_samples(reg_items["ideal"])},
]}
return res
def _generate_curve_params(self,
curve_items: dict) -> dict:
"""Создает реальные и идеальные линии графиков"""
res = {'name': 'Plots', 'type': 'group', 'children': [
{'name': 'Real plots', 'type': 'group', 'children': self._create_samples(curve_items["real"])},
{'name': 'Ideal plots', 'type': 'group', 'children': self._create_ideal_curves(curve_items["ideal"])},
]}
return res
def _create_ideal_curves(self,
curve: dict) -> list[dict]:
"""Создает секторы с этапами циклограммы"""
res = []
for key, item in curve.items():
param = {'name': key, 'type': 'group', 'children': self._create_samples(item)}
res.append(param)
return res
def _create_samples(self,
sector: dict) -> list[dict]:
"""Создает список представленных элементов с их параметрами"""
res = []
for key, item in sector.items():
sample = item[0] if type(item) == list else item
param = {'name': key, 'type': 'group', 'children': self._create_settings(sample)}
res.append(param)
return res
def _create_settings(self,
item: Union[pg.LinearRegionItem, pg.PlotDataItem]) -> list[dict]:
"""Получает настройки для элемента"""
if type(item) == pg.LinearRegionItem:
pen = item.lines[0].pen
brush = item.brush
fill_color = brush.color().getRgb()
else:
pen = pg.mkPen(item.opts.get("pen"))
fill_color = None
line_color = pen.color().getRgb()
line_thickness = pen.width()
visibility = item.isVisible()
return [
{'name': 'Line color', 'type': 'color', 'value': line_color},
{'name': 'Line thickness', 'type': 'int', 'value': line_thickness, 'limits': (1, 10)},
{'name': 'Visibility', 'type': 'bool', 'value': visibility},
{'name': 'Fill color', 'type': 'color', 'value': fill_color},
]
def _update_settings(self,
reg_items: dict,
curve_items: dict,
params: Parameter) -> None:
"""Задает параметры элементов в соответствии с paramTree"""
real_sectors = params.child("Sectors").child("Real sectors")
ideal_sectors = params.child("Sectors").child("Ideal sectors")
real_plots = params.child("Plots").child("Real plots")
ideal_plots = params.child("Plots").child("Ideal plots")
self._set_sector_settings(reg_items["real"], real_sectors)
self._set_sector_settings(reg_items["ideal"], ideal_sectors)
self._set_plot_settings(curve_items["real"], real_plots)
for key, item_dict in curve_items["ideal"].items():
self._set_plot_settings(item_dict, ideal_plots.child(key))
def _set_sector_settings(self,
sectors: dict,
settings: Parameter) -> None:
"""Задает параметры секторов в соответствии с настройками"""
for key, item in sectors.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
fill_color = sample.child("Fill color").value()
pen = pg.mkPen(color=line_color, width=line_width)
brush=pg.mkBrush(fill_color)
for reg in item:
reg.setVisible(visibility)
reg.lines[0].setPen(pen)
reg.lines[1].setPen(pen)
reg.setBrush(brush)
def _set_plot_settings(self,
curves:dict,
settings: Parameter) -> None:
"""Задает параметры кривых в соответствии с настройками"""
for key, item in curves.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
pen = pg.mkPen(color=line_color, width=line_width)
if type(item) == list:
for curve in item:
curve.setVisible(visibility)
curve.setPen(pen)
else:
item.setVisible(visibility)
item.setPen(pen)

View File

@ -1,7 +1,8 @@
from typing import Callable, Optional, Any from typing import Callable, Optional, Any
from PyQt5.QtWidgets import ( from PyQt5.QtWidgets import (QWidget, QPushButton,
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem QLineEdit, QHBoxLayout,
) QVBoxLayout, QLabel,
QTableWidget, QTableWidgetItem)
from PyQt5.QtGui import QIntValidator from PyQt5.QtGui import QIntValidator
from utils.json_tools import read_json, write_json from utils.json_tools import read_json, write_json
@ -11,7 +12,6 @@ class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func: Callable[[], None]): def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
""" """
Окно настроек для редактирования параметров. Окно настроек для редактирования параметров.
:param path: Путь к файлу настроек (JSON). :param path: Путь к файлу настроек (JSON).
:param name: Название набора настроек. :param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк). :param upd_func: Функция обновления (коллбэк).
@ -176,7 +176,16 @@ class settingsWindow(QWidget):
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count) self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
class SystemSettings(settingsWindow):
def __init__(self, path, name, upd_func):
super().__init__(path, name, upd_func)
self._num_points.setVisible(False)
def _expand(self):
pass
class OperatorSettings(settingsWindow):
pass
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -24,10 +24,9 @@ def main():
window.show() window.show()
controller.signal_widgets.connect(window.show_plot_tabs) controller.signal_widgets.connect(window.show_plot_tabs)
controller.signal_settings.connect(mediator.push_settings) controller.signal_settings.connect(mediator.update_settings)
controller.signal_monitor.connect(monitor.custom_dir_extract) controller.signal_raport_mode.connect(monitor.custom_csv_extract_only)
controller.signal_seeking_mode.connect(monitor.start_seeking)
window.push_settings()
sys.exit(app.exec_()) sys.exit(app.exec_())

43
src/testAlgo.py Normal file
View File

@ -0,0 +1,43 @@
from src.OptAlgorithm.OptAlgorithm import OptAlgorithm
from src.utils import read_json
from matplotlib import pyplot as plt, use
from numpy import cos, sin, sqrt, cbrt, arcsin, linspace, array
if __name__ == "__main__":
tq = 1
ts = linspace(0, tq, 200000)
operator_params = read_json("params/operator_params.json")
system_params = read_json("params/system_params.json")
non_array_operator_params = {}
i = 1
for key, value in operator_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_operator_params[key] = value[i]
else:
non_array_operator_params[key] = value[0]
else:
non_array_operator_params[key] = value
non_array_system_params = {}
for key, value in system_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_system_params[key] = value[i]
else:
non_array_system_params[key] = value[0]
else:
non_array_system_params[key] = value
opt = OptAlgorithm(non_array_operator_params, non_array_system_params)
Xs = array([opt.getVar("X1", t) for t in ts])
plt.plot(ts, Xs)
plt.show()

View File

@ -5,14 +5,11 @@ from typing import Optional, Union, Any
from cachetools import LRUCache from cachetools import LRUCache
import pandas as pd import pandas as pd
from PyQt5.QtCore import QThread, QObject, QTimer from PyQt5.QtCore import QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget from PyQt5.QtWidgets import QWidget, QTabWidget
from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm from OptAlgorithm import OptAlgorithm
import pandas as pd
import pandas as pd
import numpy as np import numpy as np
import pyqtgraph as pg
@ -38,7 +35,7 @@ class BaseMediator:
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget], source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]): data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
... ...
def push_settings (self, data: list[dict]): def update_settings (self, data: list[dict]):
... ...
class BaseDirectoryMonitor: class BaseDirectoryMonitor:
@ -50,7 +47,8 @@ class BaseDirectoryMonitor:
super().__init__() super().__init__()
self._directory_path = None self._directory_path = None
self._update_time = None self._update_time = None
self.isActive = False
self._files = []
self._mediator = mediator self._mediator = mediator
@ -79,14 +77,19 @@ class BaseDirectoryMonitor:
self._files = files self._files = files
def start(self): def start(self):
self.isActive = True
self.update_timer.start(int(self._update_time)) self.update_timer.start(int(self._update_time))
def stop(self): def stop(self):
self.isActive = False
self.update_timer.stop() self.update_timer.stop()
def update_settings(self, data: list[dict]) -> None: def update_settings(self, data: list[dict]) -> None:
... ...
def update_plots(self) -> None:
...
def force_all_dir(self): def force_all_dir(self):
... ...
@ -128,7 +131,8 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": False, "mirror ME": False,
"workpiece": False, "workpiece": False,
"force compensation FE": False "force compensation FE": False,
"force accuracy":True
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -159,7 +163,8 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": True, "mirror ME": True,
"workpiece": True, "workpiece": True,
"force compensation FE": True "force compensation FE": True,
"force accuracy":False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -190,7 +195,8 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": False, "mirror ME": False,
"workpiece": False, "workpiece": False,
"force compensation FE": False "force compensation FE": False,
"force accuracy":False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -227,6 +233,88 @@ class BasePlotWidget:
font-family: "Segoe UI", sans-serif; font-family: "Segoe UI", sans-serif;
}""") }""")
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotItem(title="Navigator")
navigator.setFixedHeight(100)
# Получение кривых из main_plot
for curve in main_plot.listDataItems():
# Извлекаем данные из кривой
x, y = curve.getData()
curve_name = curve.opts.get("name", None)
signal_pen = curve.opts.get("pen", None)
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
ROI_region.setBounds([0, x[-1]])
navigator.addItem(ROI_region)
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
@property @property
def mediator(self) -> BaseMediator: def mediator(self) -> BaseMediator:
return self._mediator return self._mediator
@ -252,12 +340,14 @@ class BaseController(QObject):
def send_widgets(self, widgets: list[QWidget]) -> None: def send_widgets(self, widgets: list[QWidget]) -> None:
... ...
def push_settings(self, settings: list[dict]) -> None: def update_settings(self, settings: list[dict]) -> None:
... ...
def open_custom_file (self, filepath: str) -> None: def raport_mode (self, filepath: str) -> None:
... ...
def seeking_mode(self) -> None:
...
class BaseIdealDataBuilder(OptAlgorithm): class BaseIdealDataBuilder(OptAlgorithm):
@ -270,10 +360,12 @@ class BaseIdealDataBuilder(OptAlgorithm):
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = [] data = []
for i in range (0, int(end_timestamp*self.mul)): for i in range (0, int(end_timestamp*self.mul)+1):
time = i/self.mul time = i/self.mul
X1, X2, V1, V2, F = func(time) X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data) return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame: def get_closingDF(self) -> pd.DataFrame:
@ -305,6 +397,8 @@ class BaseMainWindow(QWidget):
def __init__(self, def __init__(self,
controller: Optional[BaseController] = None): controller: Optional[BaseController] = None):
super().__init__() super().__init__()
self.set_style(self)
self.resize(200,200)
self._controller = controller self._controller = controller
... ...