Compare commits

..

No commits in common. "9faa35f514058f2414297ff97a9d3c82ea99548c" and "02268f1f7b18941aa61c2971509328a6c0be9170" have entirely different histories.

34 changed files with 418 additions and 875 deletions

3
.gitignore vendored
View File

@ -6,5 +6,4 @@
/venv /venv
*.txt *.txt
*.svg *.svg
/test.py /test.py
/output

View File

@ -81,19 +81,19 @@
], ],
"distance_l_2": [ "distance_l_2": [
0.0275, 0.0275,
0.0255, 0.03,
0.0242, 0.033,
0.0245, 0.033,
0.0228, 0.033,
0.0236, 0.033,
0.0229, 0.033,
0.0248, 0.033,
0.024, 0.033,
0.0235, 0.033,
0.025, 0.033,
0.0276, 0.033,
0.0234, 0.033,
0.0215 0.033
], ],
"distance_h_end1": [ "distance_h_end1": [
0.003, 0.003,

BIN
profile_results.prof Normal file

Binary file not shown.

View File

@ -3,76 +3,75 @@ from numpy import sqrt, arcsin, arccos, cos, sin
from OptAlgorithm.AutoConfigClass import AutoConfigClass from OptAlgorithm.AutoConfigClass import AutoConfigClass
from OptAlgorithm.ConstantCalculator import ConstantCalculator from OptAlgorithm.ConstantCalculator import ConstantCalculator
class OptTimeCalculator(AutoConfigClass): class OptTimeCalculator(AutoConfigClass):
params_list = [] params_list = []
def __init__(self, operator_config : dict, system_config : dict):
def __init__(self, operator_config: dict, system_config: dict):
cCalculator = ConstantCalculator(operator_config, system_config) cCalculator = ConstantCalculator(operator_config, system_config)
super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc()) super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc())
self.allTimes = {} self.allTimes = {}
self.check_eps = 1e-7
def tGrowNominal(self, F : float) -> float:
def tGrowNominal(self, F: float) -> float: return arcsin(F/(self.Ftogrow)) * sqrt(self.mass_1/self.k_hardness_1)
return arcsin(F / (self.Ftogrow)) * sqrt(self.mass_1 / self.k_hardness_1)
def Tclose(self, h1: float, h2: float) -> None: def Tclose(self, h1: float, h2: float) -> None:
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1) v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow) v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
t1 = v0 / self.a_max_1 t1 = v0 / self.a_max_1
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 / 2)) / v0) t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0)
T1 = t1 + t2t T1 = t1 + t2t
t21 = sqrt(h2 / (self.a_max_2)) t21 = sqrt(h2/self.a_max_2)
t21 = min(self.v_max_2 / self.a_max_2, t21) t21 = min(self.v_max_2/self.a_max_2, t21)
t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2) t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 T2 = t22 + 2 * t21
Tclose = max(T1, T2) Tclose = max(T1, T2)
tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1) tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1)
tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2) tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2)
self.allTimes["tclose_1_acc"] = tclose_1_acc self.allTimes["tclose_1_acc"] = tclose_1_acc
self.allTimes["tclose_1_speed"] = tclose_1_speed self.allTimes["tclose_1_speed"] = tclose_1_speed
self.allTimes["tclose_2_acc"] = tclose_2_acc self.allTimes["tclose_2_acc"] = tclose_2_acc
self.allTimes["tclose_2_speed"] = tclose_2_speed self.allTimes["tclose_2_speed"] = tclose_2_speed
self.allTimes["tclose"] = Tclose self.allTimes["tclose"] = Tclose
def Topen(self, s1: float, s2: float, l1: float, l2: float, Fs1: float, Fs2: float = 0) -> None: def Topen(self, s1 : float, s2 : float, l1 : float, l2 : float, Fs1 : float, Fs2 : float = 0) -> None:
t11 = sqrt((l1 + Fs1) / self.a_max_1) t11 = sqrt((l1 + Fs1)/self.a_max_1)
t11 = min(self.v_max_1 / self.a_max_1, t11) t11 = min(self.v_max_1/self.a_max_1, t11)
t12 = max(0, ((l1 + Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1) t12 = max(0, ((l1+Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
T1 = t12 + 2 * t11 T1 = t12 + 2 * t11
offset = self.calcSecondOpenOffset(t11, t12, Fs1) offset = self.calcSecondOpenOffset(t11, t12, Fs1)
t21 = sqrt(l2 / self.a_max_2) t21 = sqrt(l2/self.a_max_2)
t21 = min(self.v_max_2 / self.a_max_2, t21) t21 = min(self.v_max_2/self.a_max_2, t21)
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2) t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
T2 = t22 + 2 * t21 + offset T2 = t22 + 2 * t21 + offset
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1 + Fs1) topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1)
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1) offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2) topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
self.allTimes["topen_1_acc"] = topen_1_acc self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_2_offset"] = offset self.allTimes["topen_2_offset"] = offset
self.allTimes["topen_1_acc"] = topen_1_acc self.allTimes["topen_1_acc"] = topen_1_acc
self.allTimes["topen_1_speed"] = topen_1_speed self.allTimes["topen_1_speed"] = topen_1_speed
self.allTimes["topen_2_acc"] = topen_2_acc self.allTimes["topen_2_acc"] = topen_2_acc
self.allTimes["topen_2_speed"] = topen_2_speed self.allTimes["topen_2_speed"] = topen_2_speed
if s1 >= l1: if s1 >= l1:
raise Exception("""S1 >= L1 - недопустимый сценарий, raise Exception("""S1 >= L1 - недопустимый сценарий,
проверьте distance_s_1, distance_h_end1""") проверьте distance_s_1, distance_h_end1""")
if s2 >= l2: if s2 >= l2:
raise Exception("""S2 >= L2 - недопустимый сценарий, raise Exception("""S2 >= L2 - недопустимый сценарий,
проверьте distance_s_2, distance_h_end2""") проверьте distance_s_2, distance_h_end2""")
s1 += Fs1 s1 += Fs1
topen_1_mark = sqrt(2 * s1 / self.a_max_1) topen_1_mark = sqrt(2 * s1 / self.a_max_1)
if topen_1_mark > topen_1_acc: if topen_1_mark > topen_1_acc:
@ -80,173 +79,167 @@ class OptTimeCalculator(AutoConfigClass):
v1 = topen_1_acc * self.a_max_1 v1 = topen_1_acc * self.a_max_1
if s1 > topen_1_speed * v1: if s1 > topen_1_speed * v1:
s1 -= topen_1_speed * v1 s1 -= topen_1_speed * v1
topen_1_mark = 2 * topen_1_acc + topen_1_speed - sqrt(topen_1_acc ** 2 - 2 * s1 / self.a_max_1) topen_1_mark = 2*topen_1_acc + topen_1_speed - sqrt(topen_1_acc**2 - 2*s1 / self.a_max_1)
else: else:
topen_1_mark = topen_1_acc + s1 / v1 topen_1_mark = topen_1_acc + s1 / v1
topen_2_mark = sqrt(2 * s2 / self.a_max_2) topen_2_mark = sqrt(2 * s2 / self.a_max_2)
if topen_2_mark > topen_2_acc: if topen_2_mark > topen_2_acc:
s2 -= topen_2_acc ** 2 * self.a_max_2 / 2 s2 -= topen_2_acc ** 2 * self.a_max_2 / 2
v2 = topen_2_acc * self.a_max_2 v2 = topen_2_acc * self.a_max_2
if s2 > topen_2_speed * v2: if s2 > topen_2_speed * v2:
s2 -= topen_2_speed * v2 s2 -= topen_2_speed * v2
topen_2_mark = 2 * topen_2_acc + topen_2_speed - sqrt(topen_2_acc ** 2 - 2 * s2 / self.a_max_2) topen_2_mark = 2*topen_2_acc + topen_2_speed - sqrt(topen_2_acc**2 - 2*s2 / self.a_max_2)
else: else:
topen_2_mark = topen_2_acc + s2 / v2 topen_2_mark = topen_2_acc + s2 / v2
self.allTimes["topen_1_mark"] = topen_1_mark self.allTimes["topen_1_mark"] = topen_1_mark
self.allTimes["topen_2_mark"] = topen_2_mark self.allTimes["topen_2_mark"] = topen_2_mark
def Tgrow(self) -> None: def Tgrow(self) -> None:
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1 v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
vF0 = v0 * self.k_hardness_1 vF0 = v0 * self.k_hardness_1
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1 / (self.mass_1)) * self.Ftogrow) L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0)
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0 * vF0) Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed)
tspeed = sqrt(self.mass_1 / self.k_hardness_1) * (
arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1 / self.freq * vF0 * sin(
self.freq * tspeed)
eps = 1e1 eps = 1e1
if self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 < -eps: if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps:
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
, проверьте параметры k_hardness_1, mass_1, k_prop""") , проверьте параметры k_hardness_1, mass_1, k_prop""")
Fmeet = 1 / self.freq * sqrt(self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 + eps) Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps)
Fstart_prop = self.Fstart_prop Fstart_prop = self.Fstart_prop
if Fmeet > Fstart_prop: if Fmeet > Fstart_prop:
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
, проверьте параметры v_max_1, k_prop""") , проверьте параметры v_max_1, k_prop""")
tmeet = (Fmeet - Fspeed) / vFmax tmeet = (Fmeet - Fspeed)/vFmax
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet) tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
vp = 1 / sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow ** 2 - self.Fstart_prop ** 2) vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2)
ap = Fstart_prop / self.mass_1 ap = Fstart_prop / self.mass_1
tprop = 2 * vp / ap tprop = 2*vp / ap
self.allTimes["tspeed"] = tspeed self.allTimes["tspeed"] = tspeed
self.allTimes["tmeet"] = tmeet self.allTimes["tmeet"] = tmeet
self.allTimes["tend"] = tend self.allTimes["tend"] = tend
self.allTimes["tprop"] = tprop self.allTimes["tprop"] = tprop
self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop
def T(self, h1: float, h2: float, s1: float, s2: float, l1: float, l2: float) -> dict: def T(self, h1 : float, h2 : float, s1 : float, s2 : float, l1 : float, l2 : float) -> dict:
self.Tclose(h1, h2) self.Tclose(h1, h2)
self.Tgrow() self.Tgrow()
self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0) self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0)
return self.allTimes return self.allTimes
def Tmovement(self, closeAlgo, tmark) -> tuple[list, list]: def Tmovement(self, closeAlgo, tmark) -> None:
contact = [self.contact_distance_1, self.contact_distance_2] contact = [self.contact_distance_1, self.contact_distance_2]
v0s = [] v0s = []
pos0s = [] pos0s = []
for i in range(1, 3): for i in range(1,3):
if tmark < 0: if tmark < 0:
raise Exception("""Отрицательное время этапа раскрытия, raise Exception("""Отрицательное время этапа раскрытия,
проверьте distance_s_{1,2}, time_command""") проверьте distance_s_{1,2}, time_command""")
v0 = closeAlgo("V" + str(i), "Open", tmark) v0 = closeAlgo("V"+str(i), "Open", tmark)
v0s.append(v0) v0s.append(v0)
x0 = closeAlgo("X" + str(i), "Open", tmark) x0 = closeAlgo("X"+str(i), "Open", tmark)
x1 = contact[i - 1] - self.__dict__["distance_h_end" + str(i)] x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)]
x = x1 - x0 x = x1 - x0
pos0s.append(closeAlgo("X" + str(i), "Open", tmark)) pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
Tfull = self.time_robot_movement Tfull = self.time_robot_movement
L = self.__dict__["distance_l_" + str(i)]
maxL = contact[i - 1] - L - x0 L = self.__dict__["distance_l_"+str(i)]
maxL = contact[i-1] - L - x0
self.Tmovementi(i, x, Tfull, v0, maxL) self.Tmovementi(i, x, Tfull, v0, maxL)
return pos0s, v0s return pos0s, v0s
def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None: def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None:
v0 = abs(v0) v0 = abs(v0)
vmax = self.__dict__["v_max_" + str(i)] vmax = self.__dict__["v_max_"+str(i)]
a = self.__dict__["a_max_" + str(i)] a = self.__dict__["a_max_"+str(i)]
t3 = (Tfull + v0 / a) / 2 t3 = (Tfull + v0 / a) / 2
sqrtval = a ** 2 * ( sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
a ** 2 * (Tfull + 2 * t3) ** 2 - 8 * a * Sfull + 2 * a * v0 * (Tfull + 2 * t3) - 3 * v0 ** 2)
if sqrtval < 0: if sqrtval < 0:
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время, raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""") проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
t1max = ((Tfull + 2 * t3) + v0 / a) / (2) - sqrt(sqrtval) * sqrt(2) / (4 * a ** 2) t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
t1 = min(t1max, (vmax - abs(v0)) / a) t1 = min(t1max, (vmax- abs(v0))/a)
t1 = max(0, min(t1, -v0 / a + sqrt(v0 ** 2 / (a ** 2) + (abs(maxL) - v0 * v0 / a) / a))) t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
t31 = v0 / a + t1 t31 = v0/a + t1
t5max = (Tfull - v0 / a) / 2 - t1 t5max = (Tfull - v0/a)/2 - t1
v1 = v0 + a * t1 v1 = v0 + a * t1
S1 = v0 * t1 + a * t1 * t1 / 2 + v1 * t31 - a * t31 * t31 / 2 S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2
S2max = Sfull + S1 S2max = Sfull + S1
t5 = min(t5max, (vmax) / a, sqrt(S2max / a)) t5 = min(t5max, (vmax)/a, sqrt(S2max / a))
t3 = abs(v0) / a + t1 + t5 t3 = abs(v0)/a + t1 + t5
t32 = t5 t32 = t5
v1 = abs(v0 + t1 * a) v1 = abs(v0+t1*a)
v3 = abs(v0 + t1 * a - t3 * a) v3 = abs(v0 + t1*a - t3*a)
timeleft = Tfull - t1 - t5 - t3 timeleft = Tfull - t1 - t5 - t3
sq = -v0 * t1 - a * t1 ** 2 / 2 - v1 * t3 + a * t3 ** 2 / 2 + v3 * t5 - a * t5 ** 2 / 2 sq = -v0*t1 - a*t1**2/2 - v1 * t3 + a*t3**2/2 + v3*t5 - a*t5**2/2
Sleft = Sfull - sq Sleft = Sfull - sq
t2max = (timeleft - Sleft / v3) / (1 + v1 / v3) t2max = (timeleft - Sleft/v3) / (1 + v1/v3)
Smovement = -v0 * t1 - a / 2 * t1 ** 2 - v1 * t31 + a / 2 * t31 ** 2 Smovement = -v0 * t1 - a/2 * t1**2 - v1 * t31 + a/2*t31**2
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement)) / v1)) t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1))
t4 = max(0, Sleft / v3 + v1 / v3 * t2) t4 = max(0, Sleft/v3 + v1/v3 * t2)
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5) tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
self.allTimes["tmovement_" + str(i) + "_acc"] = t1 self.allTimes["tmovement_"+str(i)+"_acc"] = t1
self.allTimes["tmovement_" + str(i) + "_speed"] = t2 self.allTimes["tmovement_"+str(i)+"_speed"] = t2
self.allTimes["tmovement_" + str(i) + "_slow"] = t31 self.allTimes["tmovement_"+str(i)+"_slow"] = t31
self.allTimes["tmovement_" + str(i) + "_stay"] = tstay self.allTimes["tmovement_"+str(i)+"_stay"] = tstay
self.allTimes["tmovement_" + str(i)] = t1 + t2 + t31 + tstay self.allTimes["tmovement_"+str(i)] = t1 + t2 + t31 + tstay
self.allTimes["tpreclose_" + str(i) + "_slow"] = t32 self.allTimes["tpreclose_"+str(i)+"_slow"] = t32
self.allTimes["tpreclose_" + str(i) + "_speed"] = t4 self.allTimes["tpreclose_"+str(i)+"_speed"] = t4
self.allTimes["tpreclose_" + str(i) + "_acc"] = t5 self.allTimes["tpreclose_"+str(i)+"_acc"] = t5
self.allTimes["tpreclose_" + str(i)] = t32 + t4 + t5 self.allTimes["tpreclose_"+str(i)] = t32 + t4 + t5
T = Tfull T = Tfull
self.allTimes["tmovement"] = T self.allTimes["tmovement"] = T
def calcFirstClose(self, T: float, s: float) -> tuple[float, float]: def calcFirstClose(self, T : float, s : float) -> tuple[float, float]:
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1) v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow) v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
t1 = T - sqrt(max(0, T ** 2 - 2 * s / self.a_max_1)) t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1))
if t1 > v0/ self.a_max_1 + self.check_eps: t1 = min(t1, v0 / self.a_max_1)
raise Exception("""Мы вышли за границы разгона - смыкание FE, вообще не знаю как так получилось""") t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2 return t1, t2
def calcFirstOpen(self, T: float, s: float) -> tuple[float, float]: def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_1)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2
if t1 > self.v_max_1 / self.a_max_1 + self.check_eps: t1 = min(t1, self.v_max_1 / self.a_max_1)
raise Exception("""Мы вышли за границы разгона - раскрытие FE, вообще не знаю как так получилось""") t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
return t1, t2 return t1, t2
def calcSecondOpen(self, T: float, s: float) -> tuple[float, float]: def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps: t1 = min(t1, self.v_max_2 / self.a_max_2)
raise Exception("""Мы вышли за границы разгона - раскрытие ME, вообще не знаю как так получилось""") t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2 return t1, t2
def calcSecondClose(self, T: float, s: float) -> tuple[float, float]: def calcSecondClose(self, T : float, s : float) -> tuple[float, float]:
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2 t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps: t1 = min(t1, self.v_max_2 / self.a_max_2)
raise Exception("""Мы вышли за границы разгона - смыкание ME, вообще не знаю как так получилось""") t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
return t1, t2 return t1, t2
def calcSecondOpenOffset(self, t1: float, t2: float, sq: float) -> float: def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float:
s = sq * 1 s = sq * 1
offset = sqrt(2 * s / self.a_max_1) offset = sqrt(2 * s / self.a_max_1)
if offset > t1: if offset > t1:
s -= t1 ** 2 * self.a_max_1 / 2 s -= t1 ** 2 * self.a_max_1 / 2
v1 = t1 * self.a_max_1 v1 = t1 * self.a_max_1
if s > t2 * v1: if s > t2 * v1:
s -= t2 * v1 s -= t2 * v1
offset = 2 * t1 + t2 - sqrt(t1 ** 2 - 2 * s / self.a_max_1) offset = 2*t1 + t2 - sqrt(t1**2 - 2*s / self.a_max_1)
else: else:
offset = t1 + s / v1 offset = t1 + s / v1
return offset return offset

View File

@ -8,21 +8,13 @@ class Controller(BaseController):
signal_widgets = pyqtSignal(list) signal_widgets = pyqtSignal(list)
signal_settings = pyqtSignal(list) signal_settings = pyqtSignal(list)
signal_raport_mode = pyqtSignal(str) signal_monitor = pyqtSignal(str)
signal_seeking_mode = pyqtSignal()
def send_widgets(self, widgets: list[QWidget]) -> None: def send_widgets(self, widgets: list[QWidget]) -> None:
self.signal_widgets.emit(widgets) self.signal_widgets.emit(widgets)
def update_settings(self, settings: list[dict]) -> None: def push_settings(self, settings: list[dict]) -> None:
self.signal_settings.emit(settings) self.signal_settings.emit(settings)
def raport_mode(self, filepath: str) -> None: def open_custom_file (self, filepath: str) -> None:
self.signal_raport_mode.emit(filepath) self.signal_monitor.emit(filepath)
def seeking_mode(self) -> None:
self.signal_seeking_mode.emit()

View File

@ -1,9 +1,6 @@
import pandas as pd import pandas as pd
import traceback
import sys
from loguru import logger
# FIXME: костыль для выключения предупреждения "replace deprecated" #FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
pd.set_option('future.no_silent_downcasting', True) pd.set_option('future.no_silent_downcasting', True)
from utils.base.base import BaseDataConverter from utils.base.base import BaseDataConverter
@ -13,23 +10,11 @@ class DataConverter(BaseDataConverter):
@staticmethod @staticmethod
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame: def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
try: bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()] dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0}) return dataframe
return dataframe
except:
return None
def convert_data(self, files: list[str]) -> None: def convert_data(self, files: list[str]) -> None:
try: dataframes = [pd.read_csv(file) for file in files]
dataframes = [pd.read_csv(file) if file != '' else None for file in files] converted_dataframes = list(map(self._replace_bool, dataframes))
converted_dataframes = list(map(self._replace_bool, dataframes)) self._mediator.notify(self, converted_dataframes)
except:
# Get the traceback object
tb = sys.exc_info()[2]
tbinfo = traceback.format_tb(tb)[0]
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
logger.error(pymsg)
converted_dataframes = [None]
finally:
self._mediator.notify(self, converted_dataframes)

View File

@ -3,9 +3,7 @@ import pandas as pd
from typing import Union from typing import Union
from PyQt5.QtWidgets import QWidget from PyQt5.QtWidgets import QWidget
from utils.base.base import (BaseMediator, BaseDirectoryMonitor, from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer
BaseDataConverter, BasePlotWidget,
BasePointPassportFormer)
class Mediator(BaseMediator): class Mediator(BaseMediator):
@ -26,9 +24,9 @@ class Mediator(BaseMediator):
if issubclass(source.__class__, BasePlotWidget): if issubclass(source.__class__, BasePlotWidget):
self._controller.send_widgets(data) self._controller.send_widgets(data)
def update_settings(self, settings: list[dict]): def push_settings(self, settings: list[dict]):
self._monitor.update_settings(settings) self._monitor.update_settings(settings)
self._passportFormer.update_settings(settings) self._passportFormer.update_settings(settings)
self._monitor.update_plots() self._monitor.force_all_dir()

View File

@ -1,4 +1,6 @@
from time import sleep
import os import os
from loguru import logger from loguru import logger
from utils.base.base import BaseDirectoryMonitor from utils.base.base import BaseDirectoryMonitor
@ -7,62 +9,47 @@ from utils.base.base import BaseDirectoryMonitor
class DirectoryMonitor(BaseDirectoryMonitor): class DirectoryMonitor(BaseDirectoryMonitor):
def _init_state(self): def _init_state(self):
files = os.listdir(self._directory_path)
self._files = [ self._files = files
os.path.join(self._directory_path, file)
for file in os.listdir(self._directory_path)
if file.lower().endswith('.csv')
]
self.update_timer.timeout.connect(self._monitor) self.update_timer.timeout.connect(self._monitor)
logger.info("Monitor initiated!") logger.info("Monitor initiated!")
def _monitor(self): def _monitor(self):
current_files = [ files = os.listdir(self._directory_path)
os.path.join(self._directory_path, file) new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
for file in os.listdir(self._directory_path) filter(lambda x: x not in self._files, files))))
if file.lower().endswith('.csv')
]
new_files = sorted(list(filter(lambda x: x not in self._files, current_files)))
if new_files: if new_files:
logger.info(f"New files detected: {new_files}") logger.info(f"New files detected: {new_files}")
self._mediator.notify(self, new_files) self._mediator.notify(self, new_files)
self._files = current_files self._files = files
if not current_files: if not files:
self._files = [] self._files = []
def update_settings(self, data: list[dict]) -> None: def update_settings(self, data: list[dict]) -> None:
if self.isActive:
self.stop()
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
if not os.path.exists(self._directory_path):
logger.error(f"Путь {self._directory_path} не существует.")
raise FileNotFoundError(f"Путь {self._directory_path} не существует.")
else:
self._init_state()
self.start()
else:
_, system_params = data
self._directory_path = system_params['trace_storage_path'][0]
self._update_time = system_params['monitor_update_period'][0]
def update_plots(self):
if self._files is not None:
self._mediator.notify(self, self._files)
def custom_csv_extract_only(self, path: str):
self.stop() self.stop()
self._files.append(path) operator_params, system_params = data
if path is not None: self._directory_path = system_params['trace_storage_path'][0]
self._mediator.notify(self, [path]) self._update_time = system_params['monitor_update_period'][0]
else:
self._mediator.notify(self, [None])
def start_seeking(self) -> None:
self._init_state() self._init_state()
self.start() self.start()
def force_all_dir(self):
files = os.listdir(self._directory_path)
self._files = files
all_files = []
for x in self._files:
path = os.path.join(self._directory_path, x)
all_files.append(path)
if all_files:
logger.info(f"Plotting all files: {all_files}")
self._mediator.notify(self, all_files)
else:
logger.info(f"Failed pushing {str(all_files)}")
def custom_dir_extract(self, dict_path:str ):
files = os.listdir(dict_path)
if files is not None:
all_files = [os.path.join(dict_path, file) for file in files]
self._mediator.notify(self, all_files)

View File

@ -1,6 +1,6 @@
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
import pandas as pd import pandas as pd
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
class idealDataBuilder(BaseIdealDataBuilder): class idealDataBuilder(BaseIdealDataBuilder):
@ -18,15 +18,14 @@ class idealDataBuilder(BaseIdealDataBuilder):
def get_weldingDF(self) -> pd.DataFrame: def get_weldingDF(self) -> pd.DataFrame:
data = [] data = []
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001) X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F}) data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
return pd.DataFrame(data) return pd.DataFrame(data)
def get_ideal_timings(self) -> list[float]: def get_ideal_timings(self) -> list[float]:
data = self.Ts data = self.Ts
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
return ideal_timings return ideal_timings
class PassportFormer(BasePointPassportFormer): class PassportFormer(BasePointPassportFormer):
@ -37,50 +36,49 @@ class PassportFormer(BasePointPassportFormer):
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]: def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
if dataframe is not None: if point_quantity == 0:
events, point_quantity = self._filter_events(dataframe["time"], dataframe) return []
if point_quantity == 0:
return []
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
else:
events = None
point_quantity = 1
points_pocket = []
system_settings = {key: value[0] for key, value in self._params[1].items()} system_settings = {key: value[0] for key, value in self._params[1].items()}
tesla_time = sum(self._params[0].get("Tesla summary time", [])) tesla_time = sum(self._params[0].get("Tesla summary time", []))
useful_data = {"tesla_time": tesla_time, useful_data = {"tesla_time": tesla_time,
"range_ME": system_settings["Range ME, mm"], "range_ME": system_settings["Range ME, mm"],
"k_hardness": system_settings["k_hardness_1"] "k_hardness": system_settings["k_hardness_1"]
} }
for i in range(point_quantity): points_pocket = []
operator_settings = {
key: (value[i] if i < len(value) else value[0]) time_is_valid = not dataframe["time"].isna().all()
for key, value in self._params[0].items()
} if time_is_valid:
params_list = [operator_settings, system_settings]
cache_key = self._generate_cache_key(params_list) idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key] for i in range(point_quantity):
print(f"Cache hit") operator_settings = {
else: key: (value[i] if i < len(value) else value[0])
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list) for key, value in self._params[0].items()
self._ideal_data_cashe[cache_key] = ideal_data }
print(f"Cache miss. Computed and cached.") params_list = [operator_settings, system_settings]
if events is not None: cache_key = self._generate_cache_key(params_list)
if cache_key in self._ideal_data_cashe :
ideal_data = self._ideal_data_cashe[cache_key]
print(f"Cache hit")
else:
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
self._ideal_data_cashe[cache_key] = ideal_data
print(f"Cache miss. Computed and cached.")
idx = i+1 if idx_shift else i idx = i+1 if idx_shift else i
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]] point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()} point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
else: useful_p_data = {"thickness": operator_settings["object_thickness"],
point_timeframe, point_events = None, None "L2": operator_settings["distance_l_2"],
useful_p_data = {"thickness": operator_settings["object_thickness"], "force": operator_settings["force_target"]}
"L2": operator_settings["distance_l_2"],
"force": operator_settings["force_target"]}
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data]) points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
return dataframe, points_pocket, useful_data return dataframe, points_pocket, useful_data
def update_settings(self, params: list[dict, dict]): def update_settings(self, params: list[dict, dict]):
self._params = params self._params = params

View File

@ -1,148 +1,47 @@
from datetime import datetime as dt from datetime import datetime as dt
from typing import Optional from typing import Optional
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
from PyQt5.QtGui import QPixmap
from utils.base.base import BaseMainWindow, BaseController from utils.base.base import BaseMainWindow, BaseController
from gui.settings_window import SystemSettings, OperatorSettings from gui.settings_window import settingsWindow
from gui.reportGui import ReportSettings
class MainWindow(BaseMainWindow): class MainWindow(BaseMainWindow):
def __init__(self, def __init__(self,
controller: Optional[BaseController] = None) -> None: controller: Optional[BaseController] = None):
super().__init__() super().__init__()
self._controller = controller self._controller = controller
self._init_startUI() self.initUI()
self.set_style(self)
self.settings_button.clicked.connect(self._show_settings)
self.select_dir_button.clicked.connect(self._select_dir)
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
def _init_startUI(self) -> None: def initUI(self) -> None:
self.operSettings = OperatorSettings("params/operator_params.json", 'Operator', self._transfer_settings)
self.sysSettings = SystemSettings("params/system_params.json", 'System', self._transfer_settings)
self.repSettings = ReportSettings()
self.tabWidget = QtWidgets.QTabWidget()
self._clear()
seeking_mode_btn = QtWidgets.QPushButton("Real time folder scanning")
seeking_mode_btn.setFixedWidth(300)
seeking_mode_btn.clicked.connect(self._init_seekingUI)
raport_mode_btn = QtWidgets.QPushButton("Raport editor")
raport_mode_btn.setFixedWidth(300)
raport_mode_btn.clicked.connect(self._init_raportUI)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(seeking_mode_btn)
button_layout.addWidget(raport_mode_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
mainLayout = self.layout()
label = QtWidgets.QLabel("Select work mode")
label.setStyleSheet(
"""QLabel{
color: #ffffff;
font-size: 40px;
font-weight: bold;
font-family: "Segoe UI", sans-serif;
}"""
)
mainLayout.addWidget(label, alignment=Qt.AlignCenter)
mainLayout.addWidget(button_widget)
def _clear(self) -> None:
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else: self.setLayout(QtWidgets.QVBoxLayout())
def _init_seekingUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget() self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True) self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab) self.tabWidget.tabCloseRequested.connect(self._close_tab)
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self.tabWidget)
sys_settings_btn = QtWidgets.QPushButton("System settings") self.settings_button = QtWidgets.QPushButton("Show settings")
sys_settings_btn.setFixedWidth(200) self.settings_button.setFixedWidth(160)
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show()) self.select_dir_button = QtWidgets.QPushButton("Open directory")
oper_settings_btn = QtWidgets.QPushButton("Operator settings") self.select_dir_button.setFixedWidth(175)
oper_settings_btn.setFixedWidth(200)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
button_layout = QtWidgets.QHBoxLayout() button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2) button_layout.setSpacing(2)
button_layout.addWidget(sys_settings_btn) button_layout.addWidget(self.settings_button)
button_layout.addWidget(oper_settings_btn) button_layout.addWidget(self.select_dir_button)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("online mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.seeking_mode()
# TODO:push seeking to mediator
def _init_raportUI(self) -> None:
self._clear()
self._transfer_settings()
self.tabWidget = QtWidgets.QTabWidget()
self.tabWidget.setTabsClosable(True)
self.tabWidget.tabCloseRequested.connect(self._close_tab)
sys_settings_btn = QtWidgets.QPushButton("System settings")
sys_settings_btn.setFixedWidth(185)
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
oper_settings_btn.setFixedWidth(185)
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
view_settings_btn = QtWidgets.QPushButton("Customize view")
view_settings_btn.setFixedWidth(185)
view_settings_btn.clicked.connect(self._customization_window)
save_screen_btn = QtWidgets.QPushButton("Save state")
save_screen_btn.setFixedWidth(185)
save_screen_btn.clicked.connect(self._save_plots)
open_file_btn = QtWidgets.QPushButton("Open file")
open_file_btn.setFixedWidth(185)
open_file_btn.clicked.connect(self._open_file)
button_layout = QtWidgets.QHBoxLayout()
button_layout.setSpacing(2)
button_layout.addWidget(sys_settings_btn)
button_layout.addWidget(oper_settings_btn)
button_layout.addWidget(view_settings_btn)
button_layout.addWidget(save_screen_btn)
button_layout.addWidget(open_file_btn)
button_widget = QtWidgets.QWidget()
button_widget.setLayout(button_layout)
title = QtWidgets.QLabel("raport mode")
mainLayout = self.layout()
mainLayout.addWidget(self.tabWidget)
mainLayout.addWidget(button_widget)
mainLayout.addWidget(title, alignment=Qt.AlignRight)
self.resize(800,800)
self._controller.raport_mode(None)
#self._controller.raport_mode(path)
# TODO: push only one dir to monitor
layout.addLayout(button_layout)
self.setLayout(layout)
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None: def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
for plot_widget in plot_widgets: for plot_widget in plot_widgets:
widget, reg_items, curve_items = plot_widget
tab = QtWidgets.QWidget() tab = QtWidgets.QWidget()
tab.setProperty("reg_items", reg_items)
tab.setProperty("curve_items", curve_items)
grid = QtWidgets.QGridLayout() grid = QtWidgets.QGridLayout()
grid.addWidget(widget) grid.addWidget(plot_widget)
tab.setLayout(grid) tab.setLayout(grid)
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S')) self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
self.tabWidget.setCurrentWidget(tab) self.tabWidget.setCurrentWidget(tab)
@ -152,61 +51,31 @@ class MainWindow(BaseMainWindow):
for i in range(0, tab_count-2): for i in range(0, tab_count-2):
self._close_tab(i) self._close_tab(i)
def keyPressEvent(self, a0) -> None:
if a0.key() == Qt.Key_F5:
tab_count = self.tabWidget.count()
for i in range(0, tab_count):
self._close_tab(i)
def closeEvent(self, a0):
self.operSettings.close()
self.sysSettings.close()
self.repSettings.close()
super().closeEvent(a0)
def _show_settings(self) -> None: def keyPressEvent(self, a0):
if a0.key() == Qt.Key_F5:
pass
def _show_settings(self):
self.operSettings.show() self.operSettings.show()
self.sysSettings.show() self.sysSettings.show()
def push_settings(self) -> None:
self._updater_trigger()
def _transfer_settings(self) -> None: def _updater_trigger(self) -> None:
self.tabWidget.clear() self.tabWidget.clear()
operator_params = self.operSettings.getParams() operator_params = self.operSettings.getParams()
system_params = self.sysSettings.getParams() system_params = self.sysSettings.getParams()
self._controller.update_settings([operator_params, system_params]) self._controller.push_settings([operator_params, system_params])
def _close_tab(self, index:int) -> None: def _close_tab(self, index:int) -> None:
self.tabWidget.removeTab(index) self.tabWidget.removeTab(index)
def _open_file(self) -> None:
path = self._select_csv()
self._controller.raport_mode(path)
def _select_csv(self) -> Optional[str]: def _select_dir(self):
CSV_path, _ = QtWidgets.QFileDialog.getOpenFileName(self,"Select csv file", "", "CSV Files (*.csv)") folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "")
if CSV_path: if folder_path:
print(CSV_path) self._controller.open_custom_file(folder_path)
return CSV_path
return None
def _customization_window(self) -> None:
tab = self.tabWidget.currentWidget()
reg_items = tab.property("reg_items")
curve_items = tab.property("curve_items")
self.repSettings.build(reg_items, curve_items)
def _save_plots(self) -> None:
filepath, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save file", "", "Image Files (*.png *.jpeg)")
tab = self.tabWidget.currentWidget()
pixmap = QPixmap(tab.size())
tab.render(pixmap)
pixmap.save(filepath)

View File

@ -1,7 +1,9 @@
import pandas as pd
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
import copy import copy
import pyqtgraph as pg import pyqtgraph as pg
import pandas as pd
from typing import Optional, Any from typing import Optional, Any
from utils.base.base import BasePlotWidget from utils.base.base import BasePlotWidget
@ -12,6 +14,56 @@ class ProcessStage():
finish_index:int finish_index:int
class PlotWidget(BasePlotWidget): class PlotWidget(BasePlotWidget):
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotWidget,
dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotWidget(title="Navigator")
navigator.setFixedHeight(100)
for signal in real_signals:
if signal["name"] in dataframe.columns:
x = dataframe["time"]
y = dataframe[signal["name"]]
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
navigator.addItem(ROI_region)
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotWidget,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _create_curve_ideal(self, def _create_curve_ideal(self,
signal: dict[str, Any], signal: dict[str, Any],
@ -45,20 +97,20 @@ class PlotWidget(BasePlotWidget):
return None return None
@staticmethod @staticmethod
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]: def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
plot_item = pg.PlotItem(title=title) plot_widget = pg.PlotWidget(title=title)
# Оптимизация отображения графиков # Оптимизация отображения графиков
plot_item.setDownsampling(auto=True, mode='peak') plot_widget.setDownsampling(auto=True, mode='peak')
plot_item.showGrid(x=True, y=True) plot_widget.showGrid(x=True, y=True)
plot_item.setClipToView(True) plot_widget.setClipToView(True)
legend = plot_item.addLegend(offset=(70, 20)) legend = pg.LegendItem((80, 60), offset=(70, 20))
return plot_item, legend legend.setParentItem(plot_widget.graphicsItem())
return plot_widget, legend
def _add_stage_regions(self, def _add_stage_regions(self,
plot_item: pg.PlotItem, plot_widget: pg.PlotWidget,
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
dataframe_headers: list[str], dataframe_headers: list[str],
reg_items: dict,
transparency: int = 75) -> None: transparency: int = 75) -> None:
""" """
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма. Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
@ -70,15 +122,12 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency) region = self._create_stage_region(stage, start_t, end_t, transparency)
if region is not None: if region is not None:
region.setZValue(-20) region.setZValue(-20)
plot_item.addItem(region) plot_widget.addItem(region)
reg_items["real"].setdefault(stage, [])
reg_items["real"][stage].append(region)
def _add_ideal_stage_regions(self, def _add_ideal_stage_regions(self,
plot_item: pg.PlotItem, plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any], ideal_data: dict[str, Any],
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
reg_items: dict,
transparency: int = 125) -> None: transparency: int = 125) -> None:
""" """
Добавляет регионы для идеальных этапов. Добавляет регионы для идеальных этапов.
@ -91,16 +140,13 @@ class PlotWidget(BasePlotWidget):
region = self._create_stage_region(stage, start_t, end_t, transparency) region = self._create_stage_region(stage, start_t, end_t, transparency)
if region: if region:
region.setZValue(-10) region.setZValue(-10)
plot_item.addItem(region) plot_widget.addItem(region)
reg_items["ideal"].setdefault(stage, [])
reg_items["ideal"][stage].append(region)
def _add_ideal_signals(self, def _add_ideal_signals(self,
plot_item: pg.PlotItem, plot_widget: pg.PlotWidget,
ideal_data: dict[str, Any], ideal_data: dict[str, Any],
point_events: dict[str, list[float]], point_events: dict[str, list[float]],
ideal_signals: list[dict[str, Any]], ideal_signals: list[dict[str, Any]]) -> None:
curve_items: dict) -> None:
""" """
Добавляет идеальные сигналы для каждого этапа. Добавляет идеальные сигналы для каждого этапа.
""" """
@ -113,29 +159,23 @@ class PlotWidget(BasePlotWidget):
point_events[stage][1] point_events[stage][1]
) )
if curve: if curve:
curve.setZValue(50) curve.setZValue(10)
plot_item.addItem(curve) plot_widget.addItem(curve)
curve_items["ideal"].setdefault(signal["name"], {})
curve_items["ideal"][signal["name"]].setdefault(stage, [])
curve_items["ideal"][signal["name"]][stage].append(curve)
def _add_real_signals(self, def _add_real_signals(self,
plot_item: pg.PlotItem, plot_widget: pg.PlotWidget,
dataframe: pd.DataFrame, dataframe: pd.DataFrame,
real_signals: list[dict[str, Any]], real_signals: list[dict[str, Any]],
legend: pg.LegendItem, legend: pg.LegendItem) -> None:
curve_items: dict) -> None:
""" """
Добавляет реальные сигналы из dataframe на виджет. Добавляет реальные сигналы из dataframe на виджет.
""" """
dataframe_headers = dataframe.columns.tolist() dataframe_headers = dataframe.columns.tolist()
for signal in real_signals: for signal in real_signals:
if signal["name"] in dataframe_headers: if signal["name"] in dataframe_headers:
plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True) plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
plot.setZValue(0) plot.setZValue(0)
legend.addItem(plot, signal["name"]) legend.addItem(plot, signal["name"])
curve_items["real"].setdefault(signal["name"], {})
curve_items["real"][signal["name"]] = plot
def _add_performance_label(self, def _add_performance_label(self,
layout: QVBoxLayout, layout: QVBoxLayout,
@ -150,35 +190,52 @@ class PlotWidget(BasePlotWidget):
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0 TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
performance_label = QLabel( performance_label = QLabel(
f"Сокращение длительности: фактическое = {tesla_TWC} %" f"Сокращение длительности: фактическое = {tesla_TWC} %, "
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
) )
#f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
self.set_style(performance_label) self.set_style(performance_label)
layout.addWidget(performance_label) layout.addWidget(performance_label)
performance_label.update() performance_label.update()
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _build_widget(self, data: list[Any]) -> QWidget: def _build_widget(self, data: list[Any]) -> QWidget:
""" """
Собирает графический виджет для одного набора данных. Собирает графический виджет для одного набора данных.
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data]. Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
""" """
result_widget = QWidget() widget = QWidget()
result_layout = QVBoxLayout(result_widget) layout = QVBoxLayout(widget)
plot_layout = pg.GraphicsLayoutWidget()
reg_items = {"real":{}, "ideal":{}}
curve_items = {"real":{}, "ideal":{}}
dataframe, points_pocket, useful_data = data dataframe, points_pocket, useful_data = data
tesla_time = useful_data["tesla_time"] tesla_time = useful_data["tesla_time"]
range_ME = useful_data["range_ME"] range_ME = useful_data["range_ME"]
k_hardness = useful_data["k_hardness"] k_hardness = useful_data["k_hardness"]
dataframe_headers = dataframe.columns.tolist()
dat_is_none = dataframe is None
if not dat_is_none: dataframe_headers = dataframe.columns.tolist()
for widget_num, (channel, description) in enumerate(self._plt_channels.items()): for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
plot_item, legend = self._init_plot_item(title=channel) plot_widget, legend = self._init_plot_widget(title=channel)
settings = description["Settings"] settings = description["Settings"]
TWC_time = 0.0 TWC_time = 0.0
@ -186,31 +243,17 @@ class PlotWidget(BasePlotWidget):
worst_perf = 2 worst_perf = 2
# TODO: рассчитать корректный параметр range # TODO: рассчитать корректный параметр range
if settings["mirror ME"] and not dat_is_none: if settings["mirror ME"]:
dataframe = self._mirror_shift_data( dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
"ME",
description["Real_signals"],
dataframe,
range_ME
)
# Итерация по точкам # Итерация по точкам
for cur_point, point_data in enumerate(points_pocket): for cur_point, point_data in enumerate(points_pocket):
# point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data] # point_data структура: [point_timeframe, ideal_data, point_events]
point_timeframe, ideal_dat, point_events, useful_p_data = point_data point_timeframe, ideal_dat, point_events, useful_p_data = point_data
ideal_data = copy.deepcopy(ideal_dat) ideal_data = copy.deepcopy(ideal_dat)
if dat_is_none:
worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]]
point_events = {}
keys = list(ideal_data.keys())
shift = 0
for i, time in enumerate(ideal_data["Ideal timings"]):
point_events[keys[i]] = [shift, time+shift]
shift += time
# TODO: проверить корректность расчетов # TODO: проверить корректность расчетов
if settings["force compensation FE"] and not dat_is_none: if settings["force compensation FE"]:
force = useful_p_data["force"] force = useful_p_data["force"]
F_comp = - force/k_hardness F_comp = - force/k_hardness
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
@ -223,9 +266,10 @@ class PlotWidget(BasePlotWidget):
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME) ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
# Добавляем реальные стадии # Добавляем реальные стадии
if settings["stages"] and not dat_is_none: if settings["stages"]:
self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75) self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
# TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
if settings["workpiece"]: if settings["workpiece"]:
x1 = point_timeframe[0] x1 = point_timeframe[0]
dx = point_timeframe[1] - x1 dx = point_timeframe[1] - x1
@ -236,30 +280,15 @@ class PlotWidget(BasePlotWidget):
rect_item.setZValue(-5) rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush('grey')) rect_item.setBrush(pg.mkBrush('grey'))
rect_item.setPen(pg.mkPen('black', width=3)) rect_item.setPen(pg.mkPen('black', width=3))
plot_item.addItem(rect_item) plot_widget.addItem(rect_item)
if settings["force accuracy"]and not dat_is_none:
modifier = 0.05
x1 = point_events["Welding"][0]
dx = point_events["Welding"][1] - x1
force = useful_p_data["force"]
y1 = force*(1-modifier)
dy = force*(2*modifier)
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
rect_item.setZValue(-5)
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
rect_item.setPen(pg.mkPen('black', width=0))
plot_item.addItem(rect_item)
# Добавляем идеальные стадии и идеальные сигналы # Добавляем идеальные стадии и идеальные сигналы
if settings["ideals"]: if settings["ideals"]:
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100) self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items) self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
# Подсчёт производительности # Подсчёт производительности
if settings["performance"]and not dat_is_none: if settings["performance"]:
is_last_point = (cur_point == len(points_pocket) - 1) is_last_point = (cur_point == len(points_pocket) - 1)
if is_last_point: if is_last_point:
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]]) TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
@ -270,46 +299,53 @@ class PlotWidget(BasePlotWidget):
TWC_time += TWC_delta TWC_time += TWC_delta
ideal_time += ideal_delta ideal_time += ideal_delta
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1 curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
if curr_perf < worst_perf: if curr_perf < worst_perf:
worst_perf = curr_perf worst_perf = curr_perf
worst_timeframe = point_timeframe worst_timeframe = point_timeframe
# Добавляем реальные сигналы # Добавляем реальные сигналы
if not dat_is_none: self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
if widget_num == 0: if widget_num == 0:
main_plot = plot_item main_plot = plot_widget
else: else:
# Связываем остальные графики с основным графиком # Связываем остальные графики с основным графиком
plot_item.setXLink(main_plot) plot_widget.setXLink(main_plot)
if settings["performance"] and not dat_is_none: # Если есть настройка производительности, добавляем label
self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time) if settings["performance"]:
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
plot_layout.addItem(plot_item, widget_num, 0) layout.addWidget(plot_widget)
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot) layout.addWidget(navigator)
if navigator is not None: self._sync_main_plot_with_navigator(main_plot, ROI_region)
plot_layout.addItem(navigator, widget_num+1, 0) main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
self._sync_main_plot_with_navigator(main_plot, ROI_region)
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
result_layout.addWidget(plot_layout) widget.setLayout(layout)
return result_widget, reg_items, curve_items return widget
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
def build(self, data: list[list[Any]]) -> None: def build(self, data: list[list[Any]]) -> None:
""" """
Создает набор виджетов по предоставленному списку данных. Создает набор виджетов по предоставленному списку данных.
Предполагается, что data это список элементов вида: Предполагается, что data это список элементов вида:
[ [
[dataframe, points_pocket, useful_data], [dataframe, points_pocket, tesla_time],
[dataframe, points_pocket, useful_data], [dataframe, points_pocket, tesla_time],
... ...
] ]
""" """
widgets_datapack = [self._build_widget(data_sample) for data_sample in data] widgets = [self._build_widget(data_sample) for data_sample in data]
self._mediator.notify(self, widgets_datapack) self._mediator.notify(self, widgets)

View File

@ -1,169 +0,0 @@
import pyqtgraph as pg
from pyqtgraph.parametertree import Parameter, ParameterTree
from typing import Union
from PyQt5 import QtWidgets
class ReportSettings(QtWidgets.QWidget):
def build(self, reg_items: dict, curve_items: dict) -> None:
"""Создает ParameterTree для элементов всех графиков выбранной вкладки"""
self._clear()
param_tree = ParameterTree()
layout = self.layout()
layout.addWidget(param_tree)
body= [
self._generate_reg_params(reg_items),
self._generate_curve_params(curve_items)
]
# Добавляем параметры в дерево
params = Parameter.create(name='params', type='group', children=body)
params.sigTreeStateChanged.connect(
lambda: self._update_settings(reg_items, curve_items, params)
)
param_tree.setParameters(params, showTop=False)
self.show()
def _clear(self) -> None:
"""
Приводит виджет в готовое к работе состояние.
Удаляет все содержимое, если имеется
"""
main = self.layout()
if self.layout() is not None:
while main.count():
child = main.takeAt(0)
if child.widget() is not None:
child.widget().deleteLater()
else:
self.setLayout(QtWidgets.QVBoxLayout())
def _generate_reg_params(self,
reg_items: dict) -> dict:
"""Созадет реальные и идеальные секторы"""
res = {'name': 'Sectors', 'type': 'group', 'children': [
{'name': 'Real sectors', 'type': 'group', 'children': self._create_samples(reg_items["real"])},
{'name': 'Ideal sectors', 'type': 'group', 'children': self._create_samples(reg_items["ideal"])},
]}
return res
def _generate_curve_params(self,
curve_items: dict) -> dict:
"""Создает реальные и идеальные линии графиков"""
res = {'name': 'Plots', 'type': 'group', 'children': [
{'name': 'Real plots', 'type': 'group', 'children': self._create_samples(curve_items["real"])},
{'name': 'Ideal plots', 'type': 'group', 'children': self._create_ideal_curves(curve_items["ideal"])},
]}
return res
def _create_ideal_curves(self,
curve: dict) -> list[dict]:
"""Создает секторы с этапами циклограммы"""
res = []
for key, item in curve.items():
param = {'name': key, 'type': 'group', 'children': self._create_samples(item)}
res.append(param)
return res
def _create_samples(self,
sector: dict) -> list[dict]:
"""Создает список представленных элементов с их параметрами"""
res = []
for key, item in sector.items():
sample = item[0] if type(item) == list else item
param = {'name': key, 'type': 'group', 'children': self._create_settings(sample)}
res.append(param)
return res
def _create_settings(self,
item: Union[pg.LinearRegionItem, pg.PlotDataItem]) -> list[dict]:
"""Получает настройки для элемента"""
if type(item) == pg.LinearRegionItem:
pen = item.lines[0].pen
brush = item.brush
fill_color = brush.color().getRgb()
else:
pen = pg.mkPen(item.opts.get("pen"))
fill_color = None
line_color = pen.color().getRgb()
line_thickness = pen.width()
visibility = item.isVisible()
return [
{'name': 'Line color', 'type': 'color', 'value': line_color},
{'name': 'Line thickness', 'type': 'int', 'value': line_thickness, 'limits': (1, 10)},
{'name': 'Visibility', 'type': 'bool', 'value': visibility},
{'name': 'Fill color', 'type': 'color', 'value': fill_color},
]
def _update_settings(self,
reg_items: dict,
curve_items: dict,
params: Parameter) -> None:
"""Задает параметры элементов в соответствии с paramTree"""
real_sectors = params.child("Sectors").child("Real sectors")
ideal_sectors = params.child("Sectors").child("Ideal sectors")
real_plots = params.child("Plots").child("Real plots")
ideal_plots = params.child("Plots").child("Ideal plots")
self._set_sector_settings(reg_items["real"], real_sectors)
self._set_sector_settings(reg_items["ideal"], ideal_sectors)
self._set_plot_settings(curve_items["real"], real_plots)
for key, item_dict in curve_items["ideal"].items():
self._set_plot_settings(item_dict, ideal_plots.child(key))
def _set_sector_settings(self,
sectors: dict,
settings: Parameter) -> None:
"""Задает параметры секторов в соответствии с настройками"""
for key, item in sectors.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
fill_color = sample.child("Fill color").value()
pen = pg.mkPen(color=line_color, width=line_width)
brush=pg.mkBrush(fill_color)
for reg in item:
reg.setVisible(visibility)
reg.lines[0].setPen(pen)
reg.lines[1].setPen(pen)
reg.setBrush(brush)
def _set_plot_settings(self,
curves:dict,
settings: Parameter) -> None:
"""Задает параметры кривых в соответствии с настройками"""
for key, item in curves.items():
sample = settings.child(key)
line_color = sample.child("Line color").value()
line_width = sample.child("Line thickness").value()
visibility = sample.child("Visibility").value()
pen = pg.mkPen(color=line_color, width=line_width)
if type(item) == list:
for curve in item:
curve.setVisible(visibility)
curve.setPen(pen)
else:
item.setVisible(visibility)
item.setPen(pen)

View File

@ -1,8 +1,7 @@
from typing import Callable, Optional, Any from typing import Callable, Optional, Any
from PyQt5.QtWidgets import (QWidget, QPushButton, from PyQt5.QtWidgets import (
QLineEdit, QHBoxLayout, QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
QVBoxLayout, QLabel, )
QTableWidget, QTableWidgetItem)
from PyQt5.QtGui import QIntValidator from PyQt5.QtGui import QIntValidator
from utils.json_tools import read_json, write_json from utils.json_tools import read_json, write_json
@ -12,6 +11,7 @@ class settingsWindow(QWidget):
def __init__(self, path: str, name: str, upd_func: Callable[[], None]): def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
""" """
Окно настроек для редактирования параметров. Окно настроек для редактирования параметров.
:param path: Путь к файлу настроек (JSON). :param path: Путь к файлу настроек (JSON).
:param name: Название набора настроек. :param name: Название набора настроек.
:param upd_func: Функция обновления (коллбэк). :param upd_func: Функция обновления (коллбэк).
@ -176,16 +176,7 @@ class settingsWindow(QWidget):
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count) self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
class SystemSettings(settingsWindow):
def __init__(self, path, name, upd_func):
super().__init__(path, name, upd_func)
self._num_points.setVisible(False)
def _expand(self):
pass
class OperatorSettings(settingsWindow):
pass
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -24,9 +24,10 @@ def main():
window.show() window.show()
controller.signal_widgets.connect(window.show_plot_tabs) controller.signal_widgets.connect(window.show_plot_tabs)
controller.signal_settings.connect(mediator.update_settings) controller.signal_settings.connect(mediator.push_settings)
controller.signal_raport_mode.connect(monitor.custom_csv_extract_only) controller.signal_monitor.connect(monitor.custom_dir_extract)
controller.signal_seeking_mode.connect(monitor.start_seeking)
window.push_settings()
sys.exit(app.exec_()) sys.exit(app.exec_())

View File

@ -1,43 +0,0 @@
from src.OptAlgorithm.OptAlgorithm import OptAlgorithm
from src.utils import read_json
from matplotlib import pyplot as plt, use
from numpy import cos, sin, sqrt, cbrt, arcsin, linspace, array
if __name__ == "__main__":
tq = 1
ts = linspace(0, tq, 200000)
operator_params = read_json("params/operator_params.json")
system_params = read_json("params/system_params.json")
non_array_operator_params = {}
i = 1
for key, value in operator_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_operator_params[key] = value[i]
else:
non_array_operator_params[key] = value[0]
else:
non_array_operator_params[key] = value
non_array_system_params = {}
for key, value in system_params.items():
if hasattr(value, "__len__"):
if len(value) > i:
non_array_system_params[key] = value[i]
else:
non_array_system_params[key] = value[0]
else:
non_array_system_params[key] = value
opt = OptAlgorithm(non_array_operator_params, non_array_system_params)
Xs = array([opt.getVar("X1", t) for t in ts])
plt.plot(ts, Xs)
plt.show()

View File

@ -5,11 +5,14 @@ from typing import Optional, Union, Any
from cachetools import LRUCache from cachetools import LRUCache
import pandas as pd import pandas as pd
from PyQt5.QtCore import QObject, QTimer from PyQt5.QtCore import QThread, QObject, QTimer
from PyQt5.QtWidgets import QWidget, QTabWidget from PyQt5.QtWidgets import QWidget, QTabWidget
from PyQt5.QtOpenGL import QGLWidget
from OptAlgorithm import OptAlgorithm from OptAlgorithm import OptAlgorithm
import pandas as pd
import pandas as pd
import numpy as np import numpy as np
import pyqtgraph as pg
@ -35,7 +38,7 @@ class BaseMediator:
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget], source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]): data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
... ...
def update_settings (self, data: list[dict]): def push_settings (self, data: list[dict]):
... ...
class BaseDirectoryMonitor: class BaseDirectoryMonitor:
@ -47,8 +50,7 @@ class BaseDirectoryMonitor:
super().__init__() super().__init__()
self._directory_path = None self._directory_path = None
self._update_time = None self._update_time = None
self.isActive = False
self._files = []
self._mediator = mediator self._mediator = mediator
@ -77,19 +79,14 @@ class BaseDirectoryMonitor:
self._files = files self._files = files
def start(self): def start(self):
self.isActive = True
self.update_timer.start(int(self._update_time)) self.update_timer.start(int(self._update_time))
def stop(self): def stop(self):
self.isActive = False
self.update_timer.stop() self.update_timer.stop()
def update_settings(self, data: list[dict]) -> None: def update_settings(self, data: list[dict]) -> None:
... ...
def update_plots(self) -> None:
...
def force_all_dir(self): def force_all_dir(self):
... ...
@ -131,8 +128,7 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": False, "mirror ME": False,
"workpiece": False, "workpiece": False,
"force compensation FE": False, "force compensation FE": False
"force accuracy":True
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -163,8 +159,7 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": True, "mirror ME": True,
"workpiece": True, "workpiece": True,
"force compensation FE": True, "force compensation FE": True
"force accuracy":False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -195,8 +190,7 @@ class BasePlotWidget:
"ideals": True, "ideals": True,
"mirror ME": False, "mirror ME": False,
"workpiece": False, "workpiece": False,
"force compensation FE": False, "force compensation FE": False
"force accuracy":False
}, },
"Real_signals": [ "Real_signals": [
{ {
@ -233,88 +227,6 @@ class BasePlotWidget:
font-family: "Segoe UI", sans-serif; font-family: "Segoe UI", sans-serif;
}""") }""")
def _downsample_data(self, x, y, max_points=5000):
"""
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
"""
if len(x) > max_points:
factor = len(x) // max_points
x_downsampled = x[::factor]
y_downsampled = y[::factor]
return x_downsampled, y_downsampled
return x, y
def _create_navigator(self,
time_region:tuple[float, float],
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
"""
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
"""
navigator = pg.PlotItem(title="Navigator")
navigator.setFixedHeight(100)
# Получение кривых из main_plot
for curve in main_plot.listDataItems():
# Извлекаем данные из кривой
x, y = curve.getData()
curve_name = curve.opts.get("name", None)
signal_pen = curve.opts.get("pen", None)
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
ROI_region.setBounds([0, x[-1]])
navigator.addItem(ROI_region)
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
return navigator, ROI_region
def _sync_main_plot_with_navigator(self,
main_plot: pg.PlotItem,
region: pg.LinearRegionItem):
"""
Синхронизирует область просмотра основного графика с регионом навигатора.
"""
x_min, x_max = region.getRegion()
if main_plot:
main_plot.blockSignals(True)
main_plot.setXRange(x_min, x_max, padding=0)
main_plot.blockSignals(False)
def _mirror_shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
return dataframe
def _shift_data(self,
valid_str: str,
signals: list[dict],
dataframe: pd.DataFrame,
shift: float) -> pd.DataFrame:
keys = dataframe.keys()
for signal in signals:
if valid_str in signal["name"] and signal["name"] in keys:
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
return dataframe
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
"""
Синхронизирует регион навигатора с областью просмотра основного графика.
"""
if region:
x_min, x_max = main_plot
region.blockSignals(True) # Предотвращаем рекурсию
region.setRegion([x_min, x_max])
region.blockSignals(False)
@property @property
def mediator(self) -> BaseMediator: def mediator(self) -> BaseMediator:
return self._mediator return self._mediator
@ -340,14 +252,12 @@ class BaseController(QObject):
def send_widgets(self, widgets: list[QWidget]) -> None: def send_widgets(self, widgets: list[QWidget]) -> None:
... ...
def update_settings(self, settings: list[dict]) -> None: def push_settings(self, settings: list[dict]) -> None:
... ...
def raport_mode (self, filepath: str) -> None: def open_custom_file (self, filepath: str) -> None:
... ...
def seeking_mode(self) -> None:
...
class BaseIdealDataBuilder(OptAlgorithm): class BaseIdealDataBuilder(OptAlgorithm):
@ -360,12 +270,10 @@ class BaseIdealDataBuilder(OptAlgorithm):
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame: def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
data = [] data = []
for i in range (0, int(end_timestamp*self.mul)+1): for i in range (0, int(end_timestamp*self.mul)):
time = i/self.mul time = i/self.mul
X1, X2, V1, V2, F = func(time) X1, X2, V1, V2, F = func(time)
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F}) data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
X1, X2, V1, V2, F = func(end_timestamp)
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
return pd.DataFrame(data) return pd.DataFrame(data)
def get_closingDF(self) -> pd.DataFrame: def get_closingDF(self) -> pd.DataFrame:
@ -397,8 +305,6 @@ class BaseMainWindow(QWidget):
def __init__(self, def __init__(self,
controller: Optional[BaseController] = None): controller: Optional[BaseController] = None):
super().__init__() super().__init__()
self.set_style(self)
self.resize(200,200)
self._controller = controller self._controller = controller
... ...