Compare commits
16 Commits
02268f1f7b
...
9faa35f514
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9faa35f514 | ||
| ccfb6b9fcb | |||
|
|
625f3d7e01 | ||
|
|
d99fafd2ae | ||
|
|
bf155af58a | ||
|
|
3c18e22d09 | ||
|
|
4c79aba8a4 | ||
|
|
3c7040692e | ||
|
|
c3d976f6a6 | ||
|
|
7c0db070c9 | ||
|
|
b73eba7f7e | ||
|
|
27d1cd63de | ||
|
|
37b9c30f12 | ||
|
|
2e66a77ced | ||
|
|
eaec12d3ff | ||
|
|
365118bc22 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -6,4 +6,5 @@
|
||||
/venv
|
||||
*.txt
|
||||
*.svg
|
||||
/test.py
|
||||
/test.py
|
||||
/output
|
||||
|
||||
@ -81,19 +81,19 @@
|
||||
],
|
||||
"distance_l_2": [
|
||||
0.0275,
|
||||
0.03,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033
|
||||
0.0255,
|
||||
0.0242,
|
||||
0.0245,
|
||||
0.0228,
|
||||
0.0236,
|
||||
0.0229,
|
||||
0.0248,
|
||||
0.024,
|
||||
0.0235,
|
||||
0.025,
|
||||
0.0276,
|
||||
0.0234,
|
||||
0.0215
|
||||
],
|
||||
"distance_h_end1": [
|
||||
0.003,
|
||||
|
||||
Binary file not shown.
@ -3,75 +3,76 @@ from numpy import sqrt, arcsin, arccos, cos, sin
|
||||
from OptAlgorithm.AutoConfigClass import AutoConfigClass
|
||||
from OptAlgorithm.ConstantCalculator import ConstantCalculator
|
||||
|
||||
|
||||
class OptTimeCalculator(AutoConfigClass):
|
||||
|
||||
params_list = []
|
||||
def __init__(self, operator_config : dict, system_config : dict):
|
||||
|
||||
|
||||
def __init__(self, operator_config: dict, system_config: dict):
|
||||
|
||||
cCalculator = ConstantCalculator(operator_config, system_config)
|
||||
super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc())
|
||||
self.allTimes = {}
|
||||
|
||||
def tGrowNominal(self, F : float) -> float:
|
||||
return arcsin(F/(self.Ftogrow)) * sqrt(self.mass_1/self.k_hardness_1)
|
||||
self.check_eps = 1e-7
|
||||
|
||||
def tGrowNominal(self, F: float) -> float:
|
||||
return arcsin(F / (self.Ftogrow)) * sqrt(self.mass_1 / self.k_hardness_1)
|
||||
|
||||
def Tclose(self, h1: float, h2: float) -> None:
|
||||
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
|
||||
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
|
||||
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
|
||||
t1 = v0 / self.a_max_1
|
||||
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0)
|
||||
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 / 2)) / v0)
|
||||
T1 = t1 + t2t
|
||||
|
||||
t21 = sqrt(h2/self.a_max_2)
|
||||
t21 = min(self.v_max_2/self.a_max_2, t21)
|
||||
|
||||
t21 = sqrt(h2 / (self.a_max_2))
|
||||
t21 = min(self.v_max_2 / self.a_max_2, t21)
|
||||
t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
|
||||
T2 = t22 + 2 * t21
|
||||
|
||||
|
||||
Tclose = max(T1, T2)
|
||||
|
||||
|
||||
tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1)
|
||||
tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2)
|
||||
|
||||
|
||||
self.allTimes["tclose_1_acc"] = tclose_1_acc
|
||||
self.allTimes["tclose_1_speed"] = tclose_1_speed
|
||||
|
||||
|
||||
self.allTimes["tclose_2_acc"] = tclose_2_acc
|
||||
self.allTimes["tclose_2_speed"] = tclose_2_speed
|
||||
self.allTimes["tclose"] = Tclose
|
||||
|
||||
def Topen(self, s1 : float, s2 : float, l1 : float, l2 : float, Fs1 : float, Fs2 : float = 0) -> None:
|
||||
t11 = sqrt((l1 + Fs1)/self.a_max_1)
|
||||
t11 = min(self.v_max_1/self.a_max_1, t11)
|
||||
t12 = max(0, ((l1+Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
|
||||
|
||||
def Topen(self, s1: float, s2: float, l1: float, l2: float, Fs1: float, Fs2: float = 0) -> None:
|
||||
t11 = sqrt((l1 + Fs1) / self.a_max_1)
|
||||
t11 = min(self.v_max_1 / self.a_max_1, t11)
|
||||
t12 = max(0, ((l1 + Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
|
||||
T1 = t12 + 2 * t11
|
||||
offset = self.calcSecondOpenOffset(t11, t12, Fs1)
|
||||
|
||||
t21 = sqrt(l2/self.a_max_2)
|
||||
t21 = min(self.v_max_2/self.a_max_2, t21)
|
||||
|
||||
t21 = sqrt(l2 / self.a_max_2)
|
||||
t21 = min(self.v_max_2 / self.a_max_2, t21)
|
||||
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
|
||||
T2 = t22 + 2 * t21 + offset
|
||||
|
||||
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1)
|
||||
|
||||
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1 + Fs1)
|
||||
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
|
||||
|
||||
topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_2_offset"] = offset
|
||||
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_2_offset"] = offset
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_1_speed"] = topen_1_speed
|
||||
|
||||
|
||||
self.allTimes["topen_2_acc"] = topen_2_acc
|
||||
self.allTimes["topen_2_speed"] = topen_2_speed
|
||||
|
||||
|
||||
if s1 >= l1:
|
||||
raise Exception("""S1 >= L1 - недопустимый сценарий,
|
||||
проверьте distance_s_1, distance_h_end1""")
|
||||
if s2 >= l2:
|
||||
raise Exception("""S2 >= L2 - недопустимый сценарий,
|
||||
проверьте distance_s_2, distance_h_end2""")
|
||||
|
||||
|
||||
s1 += Fs1
|
||||
topen_1_mark = sqrt(2 * s1 / self.a_max_1)
|
||||
if topen_1_mark > topen_1_acc:
|
||||
@ -79,167 +80,173 @@ class OptTimeCalculator(AutoConfigClass):
|
||||
v1 = topen_1_acc * self.a_max_1
|
||||
if s1 > topen_1_speed * v1:
|
||||
s1 -= topen_1_speed * v1
|
||||
topen_1_mark = 2*topen_1_acc + topen_1_speed - sqrt(topen_1_acc**2 - 2*s1 / self.a_max_1)
|
||||
topen_1_mark = 2 * topen_1_acc + topen_1_speed - sqrt(topen_1_acc ** 2 - 2 * s1 / self.a_max_1)
|
||||
else:
|
||||
topen_1_mark = topen_1_acc + s1 / v1
|
||||
|
||||
|
||||
topen_2_mark = sqrt(2 * s2 / self.a_max_2)
|
||||
if topen_2_mark > topen_2_acc:
|
||||
s2 -= topen_2_acc ** 2 * self.a_max_2 / 2
|
||||
v2 = topen_2_acc * self.a_max_2
|
||||
if s2 > topen_2_speed * v2:
|
||||
s2 -= topen_2_speed * v2
|
||||
topen_2_mark = 2*topen_2_acc + topen_2_speed - sqrt(topen_2_acc**2 - 2*s2 / self.a_max_2)
|
||||
topen_2_mark = 2 * topen_2_acc + topen_2_speed - sqrt(topen_2_acc ** 2 - 2 * s2 / self.a_max_2)
|
||||
else:
|
||||
topen_2_mark = topen_2_acc + s2 / v2
|
||||
|
||||
self.allTimes["topen_1_mark"] = topen_1_mark
|
||||
self.allTimes["topen_2_mark"] = topen_2_mark
|
||||
|
||||
def Tgrow(self) -> None:
|
||||
|
||||
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
|
||||
vF0 = v0 * self.k_hardness_1
|
||||
|
||||
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
|
||||
|
||||
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0)
|
||||
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
|
||||
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed)
|
||||
def Tgrow(self) -> None:
|
||||
|
||||
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
|
||||
vF0 = v0 * self.k_hardness_1
|
||||
|
||||
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1 / (self.mass_1)) * self.Ftogrow)
|
||||
|
||||
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0 * vF0)
|
||||
tspeed = sqrt(self.mass_1 / self.k_hardness_1) * (
|
||||
arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
|
||||
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1 / self.freq * vF0 * sin(
|
||||
self.freq * tspeed)
|
||||
eps = 1e1
|
||||
if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps:
|
||||
if self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 < -eps:
|
||||
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
|
||||
, проверьте параметры k_hardness_1, mass_1, k_prop""")
|
||||
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps)
|
||||
Fmeet = 1 / self.freq * sqrt(self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 + eps)
|
||||
Fstart_prop = self.Fstart_prop
|
||||
if Fmeet > Fstart_prop:
|
||||
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
|
||||
, проверьте параметры v_max_1, k_prop""")
|
||||
tmeet = (Fmeet - Fspeed)/vFmax
|
||||
tmeet = (Fmeet - Fspeed) / vFmax
|
||||
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
|
||||
vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2)
|
||||
vp = 1 / sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow ** 2 - self.Fstart_prop ** 2)
|
||||
ap = Fstart_prop / self.mass_1
|
||||
tprop = 2*vp / ap
|
||||
|
||||
tprop = 2 * vp / ap
|
||||
|
||||
self.allTimes["tspeed"] = tspeed
|
||||
self.allTimes["tmeet"] = tmeet
|
||||
self.allTimes["tend"] = tend
|
||||
self.allTimes["tprop"] = tprop
|
||||
self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop
|
||||
|
||||
def T(self, h1 : float, h2 : float, s1 : float, s2 : float, l1 : float, l2 : float) -> dict:
|
||||
|
||||
def T(self, h1: float, h2: float, s1: float, s2: float, l1: float, l2: float) -> dict:
|
||||
self.Tclose(h1, h2)
|
||||
self.Tgrow()
|
||||
self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0)
|
||||
return self.allTimes
|
||||
|
||||
def Tmovement(self, closeAlgo, tmark) -> None:
|
||||
|
||||
def Tmovement(self, closeAlgo, tmark) -> tuple[list, list]:
|
||||
contact = [self.contact_distance_1, self.contact_distance_2]
|
||||
v0s = []
|
||||
pos0s = []
|
||||
for i in range(1,3):
|
||||
for i in range(1, 3):
|
||||
if tmark < 0:
|
||||
raise Exception("""Отрицательное время этапа раскрытия,
|
||||
проверьте distance_s_{1,2}, time_command""")
|
||||
v0 = closeAlgo("V"+str(i), "Open", tmark)
|
||||
v0 = closeAlgo("V" + str(i), "Open", tmark)
|
||||
v0s.append(v0)
|
||||
x0 = closeAlgo("X"+str(i), "Open", tmark)
|
||||
x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)]
|
||||
x0 = closeAlgo("X" + str(i), "Open", tmark)
|
||||
x1 = contact[i - 1] - self.__dict__["distance_h_end" + str(i)]
|
||||
x = x1 - x0
|
||||
pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
|
||||
pos0s.append(closeAlgo("X" + str(i), "Open", tmark))
|
||||
Tfull = self.time_robot_movement
|
||||
|
||||
|
||||
L = self.__dict__["distance_l_"+str(i)]
|
||||
maxL = contact[i-1] - L - x0
|
||||
|
||||
L = self.__dict__["distance_l_" + str(i)]
|
||||
maxL = contact[i - 1] - L - x0
|
||||
self.Tmovementi(i, x, Tfull, v0, maxL)
|
||||
return pos0s, v0s
|
||||
|
||||
|
||||
def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None:
|
||||
v0 = abs(v0)
|
||||
vmax = self.__dict__["v_max_"+str(i)]
|
||||
a = self.__dict__["a_max_"+str(i)]
|
||||
t3 = (Tfull + v0 / a) / 2
|
||||
|
||||
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
|
||||
vmax = self.__dict__["v_max_" + str(i)]
|
||||
a = self.__dict__["a_max_" + str(i)]
|
||||
t3 = (Tfull + v0 / a) / 2
|
||||
|
||||
sqrtval = a ** 2 * (
|
||||
a ** 2 * (Tfull + 2 * t3) ** 2 - 8 * a * Sfull + 2 * a * v0 * (Tfull + 2 * t3) - 3 * v0 ** 2)
|
||||
if sqrtval < 0:
|
||||
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
|
||||
проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
|
||||
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
|
||||
t1 = min(t1max, (vmax- abs(v0))/a)
|
||||
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
|
||||
|
||||
t31 = v0/a + t1
|
||||
t5max = (Tfull - v0/a)/2 - t1
|
||||
t1max = ((Tfull + 2 * t3) + v0 / a) / (2) - sqrt(sqrtval) * sqrt(2) / (4 * a ** 2)
|
||||
t1 = min(t1max, (vmax - abs(v0)) / a)
|
||||
t1 = max(0, min(t1, -v0 / a + sqrt(v0 ** 2 / (a ** 2) + (abs(maxL) - v0 * v0 / a) / a)))
|
||||
|
||||
t31 = v0 / a + t1
|
||||
t5max = (Tfull - v0 / a) / 2 - t1
|
||||
v1 = v0 + a * t1
|
||||
S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2
|
||||
S1 = v0 * t1 + a * t1 * t1 / 2 + v1 * t31 - a * t31 * t31 / 2
|
||||
S2max = Sfull + S1
|
||||
t5 = min(t5max, (vmax)/a, sqrt(S2max / a))
|
||||
t3 = abs(v0)/a + t1 + t5
|
||||
t5 = min(t5max, (vmax) / a, sqrt(S2max / a))
|
||||
t3 = abs(v0) / a + t1 + t5
|
||||
t32 = t5
|
||||
|
||||
v1 = abs(v0+t1*a)
|
||||
v3 = abs(v0 + t1*a - t3*a)
|
||||
|
||||
v1 = abs(v0 + t1 * a)
|
||||
v3 = abs(v0 + t1 * a - t3 * a)
|
||||
timeleft = Tfull - t1 - t5 - t3
|
||||
sq = -v0*t1 - a*t1**2/2 - v1 * t3 + a*t3**2/2 + v3*t5 - a*t5**2/2
|
||||
sq = -v0 * t1 - a * t1 ** 2 / 2 - v1 * t3 + a * t3 ** 2 / 2 + v3 * t5 - a * t5 ** 2 / 2
|
||||
Sleft = Sfull - sq
|
||||
|
||||
t2max = (timeleft - Sleft/v3) / (1 + v1/v3)
|
||||
Smovement = -v0 * t1 - a/2 * t1**2 - v1 * t31 + a/2*t31**2
|
||||
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1))
|
||||
t4 = max(0, Sleft/v3 + v1/v3 * t2)
|
||||
|
||||
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
|
||||
|
||||
self.allTimes["tmovement_"+str(i)+"_acc"] = t1
|
||||
self.allTimes["tmovement_"+str(i)+"_speed"] = t2
|
||||
self.allTimes["tmovement_"+str(i)+"_slow"] = t31
|
||||
self.allTimes["tmovement_"+str(i)+"_stay"] = tstay
|
||||
self.allTimes["tmovement_"+str(i)] = t1 + t2 + t31 + tstay
|
||||
self.allTimes["tpreclose_"+str(i)+"_slow"] = t32
|
||||
self.allTimes["tpreclose_"+str(i)+"_speed"] = t4
|
||||
self.allTimes["tpreclose_"+str(i)+"_acc"] = t5
|
||||
self.allTimes["tpreclose_"+str(i)] = t32 + t4 + t5
|
||||
|
||||
t2max = (timeleft - Sleft / v3) / (1 + v1 / v3)
|
||||
Smovement = -v0 * t1 - a / 2 * t1 ** 2 - v1 * t31 + a / 2 * t31 ** 2
|
||||
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement)) / v1))
|
||||
t4 = max(0, Sleft / v3 + v1 / v3 * t2)
|
||||
|
||||
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
|
||||
|
||||
self.allTimes["tmovement_" + str(i) + "_acc"] = t1
|
||||
self.allTimes["tmovement_" + str(i) + "_speed"] = t2
|
||||
self.allTimes["tmovement_" + str(i) + "_slow"] = t31
|
||||
self.allTimes["tmovement_" + str(i) + "_stay"] = tstay
|
||||
self.allTimes["tmovement_" + str(i)] = t1 + t2 + t31 + tstay
|
||||
self.allTimes["tpreclose_" + str(i) + "_slow"] = t32
|
||||
self.allTimes["tpreclose_" + str(i) + "_speed"] = t4
|
||||
self.allTimes["tpreclose_" + str(i) + "_acc"] = t5
|
||||
self.allTimes["tpreclose_" + str(i)] = t32 + t4 + t5
|
||||
T = Tfull
|
||||
self.allTimes["tmovement"] = T
|
||||
|
||||
def calcFirstClose(self, T : float, s : float) -> tuple[float, float]:
|
||||
|
||||
def calcFirstClose(self, T: float, s: float) -> tuple[float, float]:
|
||||
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
|
||||
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
|
||||
t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1))
|
||||
t1 = min(t1, v0 / self.a_max_1)
|
||||
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
|
||||
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
|
||||
t1 = T - sqrt(max(0, T ** 2 - 2 * s / self.a_max_1))
|
||||
if t1 > v0/ self.a_max_1 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - смыкание FE, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
|
||||
return t1, t2
|
||||
|
||||
def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2
|
||||
t1 = min(t1, self.v_max_1 / self.a_max_1)
|
||||
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
|
||||
|
||||
def calcFirstOpen(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_1)) / 2
|
||||
if t1 > self.v_max_1 / self.a_max_1 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - раскрытие FE, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
|
||||
t1 = min(t1, self.v_max_2 / self.a_max_2)
|
||||
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
|
||||
|
||||
def calcSecondOpen(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
|
||||
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - раскрытие ME, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondClose(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
|
||||
t1 = min(t1, self.v_max_2 / self.a_max_2)
|
||||
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
|
||||
|
||||
def calcSecondClose(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
|
||||
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - смыкание ME, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float:
|
||||
|
||||
def calcSecondOpenOffset(self, t1: float, t2: float, sq: float) -> float:
|
||||
s = sq * 1
|
||||
offset = sqrt(2 * s / self.a_max_1)
|
||||
|
||||
|
||||
if offset > t1:
|
||||
s -= t1 ** 2 * self.a_max_1 / 2
|
||||
v1 = t1 * self.a_max_1
|
||||
if s > t2 * v1:
|
||||
s -= t2 * v1
|
||||
|
||||
offset = 2*t1 + t2 - sqrt(t1**2 - 2*s / self.a_max_1)
|
||||
|
||||
offset = 2 * t1 + t2 - sqrt(t1 ** 2 - 2 * s / self.a_max_1)
|
||||
else:
|
||||
offset = t1 + s / v1
|
||||
return offset
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -8,13 +8,21 @@ class Controller(BaseController):
|
||||
|
||||
signal_widgets = pyqtSignal(list)
|
||||
signal_settings = pyqtSignal(list)
|
||||
signal_monitor = pyqtSignal(str)
|
||||
signal_raport_mode = pyqtSignal(str)
|
||||
signal_seeking_mode = pyqtSignal()
|
||||
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
self.signal_widgets.emit(widgets)
|
||||
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
def update_settings(self, settings: list[dict]) -> None:
|
||||
self.signal_settings.emit(settings)
|
||||
|
||||
def open_custom_file (self, filepath: str) -> None:
|
||||
self.signal_monitor.emit(filepath)
|
||||
def raport_mode(self, filepath: str) -> None:
|
||||
self.signal_raport_mode.emit(filepath)
|
||||
|
||||
def seeking_mode(self) -> None:
|
||||
self.signal_seeking_mode.emit()
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
import pandas as pd
|
||||
import traceback
|
||||
import sys
|
||||
from loguru import logger
|
||||
|
||||
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
|
||||
# FIXME: костыль для выключения предупреждения "replace deprecated"
|
||||
pd.set_option('future.no_silent_downcasting', True)
|
||||
|
||||
from utils.base.base import BaseDataConverter
|
||||
@ -10,11 +13,23 @@ class DataConverter(BaseDataConverter):
|
||||
|
||||
@staticmethod
|
||||
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
|
||||
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
|
||||
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
|
||||
return dataframe
|
||||
try:
|
||||
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
|
||||
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
|
||||
return dataframe
|
||||
except:
|
||||
return None
|
||||
|
||||
def convert_data(self, files: list[str]) -> None:
|
||||
dataframes = [pd.read_csv(file) for file in files]
|
||||
converted_dataframes = list(map(self._replace_bool, dataframes))
|
||||
self._mediator.notify(self, converted_dataframes)
|
||||
try:
|
||||
dataframes = [pd.read_csv(file) if file != '' else None for file in files]
|
||||
converted_dataframes = list(map(self._replace_bool, dataframes))
|
||||
except:
|
||||
# Get the traceback object
|
||||
tb = sys.exc_info()[2]
|
||||
tbinfo = traceback.format_tb(tb)[0]
|
||||
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
|
||||
logger.error(pymsg)
|
||||
converted_dataframes = [None]
|
||||
finally:
|
||||
self._mediator.notify(self, converted_dataframes)
|
||||
|
||||
@ -3,7 +3,9 @@ import pandas as pd
|
||||
from typing import Union
|
||||
from PyQt5.QtWidgets import QWidget
|
||||
|
||||
from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer
|
||||
from utils.base.base import (BaseMediator, BaseDirectoryMonitor,
|
||||
BaseDataConverter, BasePlotWidget,
|
||||
BasePointPassportFormer)
|
||||
|
||||
|
||||
class Mediator(BaseMediator):
|
||||
@ -24,9 +26,9 @@ class Mediator(BaseMediator):
|
||||
if issubclass(source.__class__, BasePlotWidget):
|
||||
self._controller.send_widgets(data)
|
||||
|
||||
def push_settings(self, settings: list[dict]):
|
||||
def update_settings(self, settings: list[dict]):
|
||||
self._monitor.update_settings(settings)
|
||||
self._passportFormer.update_settings(settings)
|
||||
self._monitor.force_all_dir()
|
||||
self._monitor.update_plots()
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
from time import sleep
|
||||
import os
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from utils.base.base import BaseDirectoryMonitor
|
||||
@ -9,47 +7,62 @@ from utils.base.base import BaseDirectoryMonitor
|
||||
class DirectoryMonitor(BaseDirectoryMonitor):
|
||||
|
||||
def _init_state(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
|
||||
self._files = [
|
||||
os.path.join(self._directory_path, file)
|
||||
for file in os.listdir(self._directory_path)
|
||||
if file.lower().endswith('.csv')
|
||||
]
|
||||
|
||||
self.update_timer.timeout.connect(self._monitor)
|
||||
logger.info("Monitor initiated!")
|
||||
|
||||
def _monitor(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
|
||||
filter(lambda x: x not in self._files, files))))
|
||||
current_files = [
|
||||
os.path.join(self._directory_path, file)
|
||||
for file in os.listdir(self._directory_path)
|
||||
if file.lower().endswith('.csv')
|
||||
]
|
||||
new_files = sorted(list(filter(lambda x: x not in self._files, current_files)))
|
||||
if new_files:
|
||||
logger.info(f"New files detected: {new_files}")
|
||||
self._mediator.notify(self, new_files)
|
||||
self._files = files
|
||||
if not files:
|
||||
self._files = current_files
|
||||
if not current_files:
|
||||
self._files = []
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
if self.isActive:
|
||||
self.stop()
|
||||
_, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
|
||||
if not os.path.exists(self._directory_path):
|
||||
logger.error(f"Путь {self._directory_path} не существует.")
|
||||
raise FileNotFoundError(f"Путь {self._directory_path} не существует.")
|
||||
else:
|
||||
self._init_state()
|
||||
self.start()
|
||||
else:
|
||||
_, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
|
||||
def update_plots(self):
|
||||
if self._files is not None:
|
||||
self._mediator.notify(self, self._files)
|
||||
|
||||
def custom_csv_extract_only(self, path: str):
|
||||
self.stop()
|
||||
operator_params, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
self._files.append(path)
|
||||
if path is not None:
|
||||
self._mediator.notify(self, [path])
|
||||
else:
|
||||
self._mediator.notify(self, [None])
|
||||
|
||||
|
||||
def start_seeking(self) -> None:
|
||||
self._init_state()
|
||||
self.start()
|
||||
|
||||
def force_all_dir(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
all_files = []
|
||||
for x in self._files:
|
||||
path = os.path.join(self._directory_path, x)
|
||||
all_files.append(path)
|
||||
if all_files:
|
||||
logger.info(f"Plotting all files: {all_files}")
|
||||
self._mediator.notify(self, all_files)
|
||||
else:
|
||||
logger.info(f"Failed pushing {str(all_files)}")
|
||||
|
||||
def custom_dir_extract(self, dict_path:str ):
|
||||
files = os.listdir(dict_path)
|
||||
if files is not None:
|
||||
all_files = [os.path.join(dict_path, file) for file in files]
|
||||
self._mediator.notify(self, all_files)
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||||
import pandas as pd
|
||||
|
||||
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||||
|
||||
|
||||
class idealDataBuilder(BaseIdealDataBuilder):
|
||||
@ -18,14 +18,15 @@ class idealDataBuilder(BaseIdealDataBuilder):
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
data = []
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
|
||||
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
|
||||
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_ideal_timings(self) -> list[float]:
|
||||
data = self.Ts
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
|
||||
return ideal_timings
|
||||
|
||||
class PassportFormer(BasePointPassportFormer):
|
||||
@ -36,49 +37,50 @@ class PassportFormer(BasePointPassportFormer):
|
||||
|
||||
|
||||
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
|
||||
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||||
|
||||
if dataframe is not None:
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
||||
else:
|
||||
events = None
|
||||
point_quantity = 1
|
||||
|
||||
points_pocket = []
|
||||
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||||
tesla_time = sum(self._params[0].get("Tesla summary time", []))
|
||||
useful_data = {"tesla_time": tesla_time,
|
||||
"range_ME": system_settings["Range ME, mm"],
|
||||
"k_hardness": system_settings["k_hardness_1"]
|
||||
}
|
||||
|
||||
points_pocket = []
|
||||
|
||||
time_is_valid = not dataframe["time"].isna().all()
|
||||
|
||||
if time_is_valid:
|
||||
|
||||
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
||||
|
||||
for i in range(point_quantity):
|
||||
operator_settings = {
|
||||
key: (value[i] if i < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
params_list = [operator_settings, system_settings]
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
if cache_key in self._ideal_data_cashe :
|
||||
ideal_data = self._ideal_data_cashe[cache_key]
|
||||
print(f"Cache hit")
|
||||
else:
|
||||
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
print(f"Cache miss. Computed and cached.")
|
||||
for i in range(point_quantity):
|
||||
operator_settings = {
|
||||
key: (value[i] if i < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
params_list = [operator_settings, system_settings]
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
if cache_key in self._ideal_data_cashe :
|
||||
ideal_data = self._ideal_data_cashe[cache_key]
|
||||
print(f"Cache hit")
|
||||
else:
|
||||
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
print(f"Cache miss. Computed and cached.")
|
||||
if events is not None:
|
||||
idx = i+1 if idx_shift else i
|
||||
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
|
||||
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
|
||||
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
else:
|
||||
point_timeframe, point_events = None, None
|
||||
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
|
||||
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
||||
return dataframe, points_pocket, useful_data
|
||||
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
||||
return dataframe, points_pocket, useful_data
|
||||
|
||||
def update_settings(self, params: list[dict, dict]):
|
||||
self._params = params
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
BIN
src/gui/__pycache__/reportGui.cpython-310.pyc
Normal file
BIN
src/gui/__pycache__/reportGui.cpython-310.pyc
Normal file
Binary file not shown.
Binary file not shown.
@ -1,47 +1,148 @@
|
||||
from datetime import datetime as dt
|
||||
from typing import Optional
|
||||
|
||||
from PyQt5 import QtWidgets
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtGui import QPixmap
|
||||
|
||||
from utils.base.base import BaseMainWindow, BaseController
|
||||
from gui.settings_window import settingsWindow
|
||||
from gui.settings_window import SystemSettings, OperatorSettings
|
||||
from gui.reportGui import ReportSettings
|
||||
|
||||
class MainWindow(BaseMainWindow):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
controller: Optional[BaseController] = None) -> None:
|
||||
super().__init__()
|
||||
self._controller = controller
|
||||
self.initUI()
|
||||
self.set_style(self)
|
||||
self.settings_button.clicked.connect(self._show_settings)
|
||||
self.select_dir_button.clicked.connect(self._select_dir)
|
||||
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
|
||||
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
|
||||
self._init_startUI()
|
||||
|
||||
|
||||
def initUI(self) -> None:
|
||||
def _init_startUI(self) -> None:
|
||||
self.operSettings = OperatorSettings("params/operator_params.json", 'Operator', self._transfer_settings)
|
||||
self.sysSettings = SystemSettings("params/system_params.json", 'System', self._transfer_settings)
|
||||
self.repSettings = ReportSettings()
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
|
||||
self._clear()
|
||||
seeking_mode_btn = QtWidgets.QPushButton("Real time folder scanning")
|
||||
seeking_mode_btn.setFixedWidth(300)
|
||||
seeking_mode_btn.clicked.connect(self._init_seekingUI)
|
||||
raport_mode_btn = QtWidgets.QPushButton("Raport editor")
|
||||
raport_mode_btn.setFixedWidth(300)
|
||||
raport_mode_btn.clicked.connect(self._init_raportUI)
|
||||
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(seeking_mode_btn)
|
||||
button_layout.addWidget(raport_mode_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
mainLayout = self.layout()
|
||||
label = QtWidgets.QLabel("Select work mode")
|
||||
label.setStyleSheet(
|
||||
"""QLabel{
|
||||
color: #ffffff;
|
||||
font-size: 40px;
|
||||
font-weight: bold;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}"""
|
||||
)
|
||||
mainLayout.addWidget(label, alignment=Qt.AlignCenter)
|
||||
mainLayout.addWidget(button_widget)
|
||||
|
||||
def _clear(self) -> None:
|
||||
main = self.layout()
|
||||
if self.layout() is not None:
|
||||
while main.count():
|
||||
child = main.takeAt(0)
|
||||
if child.widget() is not None:
|
||||
child.widget().deleteLater()
|
||||
else: self.setLayout(QtWidgets.QVBoxLayout())
|
||||
|
||||
def _init_seekingUI(self) -> None:
|
||||
self._clear()
|
||||
self._transfer_settings()
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
self.tabWidget.setTabsClosable(True)
|
||||
self.tabWidget.tabCloseRequested.connect(self._close_tab)
|
||||
layout = QtWidgets.QVBoxLayout()
|
||||
layout.addWidget(self.tabWidget)
|
||||
|
||||
self.settings_button = QtWidgets.QPushButton("Show settings")
|
||||
self.settings_button.setFixedWidth(160)
|
||||
self.select_dir_button = QtWidgets.QPushButton("Open directory")
|
||||
self.select_dir_button.setFixedWidth(175)
|
||||
sys_settings_btn = QtWidgets.QPushButton("System settings")
|
||||
sys_settings_btn.setFixedWidth(200)
|
||||
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
|
||||
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
|
||||
oper_settings_btn.setFixedWidth(200)
|
||||
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
|
||||
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(self.settings_button)
|
||||
button_layout.addWidget(self.select_dir_button)
|
||||
button_layout.addWidget(sys_settings_btn)
|
||||
button_layout.addWidget(oper_settings_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
title = QtWidgets.QLabel("online mode")
|
||||
mainLayout = self.layout()
|
||||
mainLayout.addWidget(self.tabWidget)
|
||||
mainLayout.addWidget(button_widget)
|
||||
mainLayout.addWidget(title, alignment=Qt.AlignRight)
|
||||
self.resize(800,800)
|
||||
self._controller.seeking_mode()
|
||||
# TODO:push seeking to mediator
|
||||
|
||||
def _init_raportUI(self) -> None:
|
||||
self._clear()
|
||||
self._transfer_settings()
|
||||
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
self.tabWidget.setTabsClosable(True)
|
||||
self.tabWidget.tabCloseRequested.connect(self._close_tab)
|
||||
sys_settings_btn = QtWidgets.QPushButton("System settings")
|
||||
sys_settings_btn.setFixedWidth(185)
|
||||
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
|
||||
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
|
||||
oper_settings_btn.setFixedWidth(185)
|
||||
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
|
||||
view_settings_btn = QtWidgets.QPushButton("Customize view")
|
||||
view_settings_btn.setFixedWidth(185)
|
||||
view_settings_btn.clicked.connect(self._customization_window)
|
||||
save_screen_btn = QtWidgets.QPushButton("Save state")
|
||||
save_screen_btn.setFixedWidth(185)
|
||||
save_screen_btn.clicked.connect(self._save_plots)
|
||||
open_file_btn = QtWidgets.QPushButton("Open file")
|
||||
open_file_btn.setFixedWidth(185)
|
||||
open_file_btn.clicked.connect(self._open_file)
|
||||
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(sys_settings_btn)
|
||||
button_layout.addWidget(oper_settings_btn)
|
||||
button_layout.addWidget(view_settings_btn)
|
||||
button_layout.addWidget(save_screen_btn)
|
||||
button_layout.addWidget(open_file_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
title = QtWidgets.QLabel("raport mode")
|
||||
mainLayout = self.layout()
|
||||
mainLayout.addWidget(self.tabWidget)
|
||||
mainLayout.addWidget(button_widget)
|
||||
mainLayout.addWidget(title, alignment=Qt.AlignRight)
|
||||
self.resize(800,800)
|
||||
self._controller.raport_mode(None)
|
||||
|
||||
#self._controller.raport_mode(path)
|
||||
# TODO: push only one dir to monitor
|
||||
|
||||
layout.addLayout(button_layout)
|
||||
self.setLayout(layout)
|
||||
|
||||
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
|
||||
for plot_widget in plot_widgets:
|
||||
widget, reg_items, curve_items = plot_widget
|
||||
|
||||
tab = QtWidgets.QWidget()
|
||||
tab.setProperty("reg_items", reg_items)
|
||||
tab.setProperty("curve_items", curve_items)
|
||||
grid = QtWidgets.QGridLayout()
|
||||
grid.addWidget(plot_widget)
|
||||
grid.addWidget(widget)
|
||||
tab.setLayout(grid)
|
||||
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
|
||||
self.tabWidget.setCurrentWidget(tab)
|
||||
@ -51,31 +152,61 @@ class MainWindow(BaseMainWindow):
|
||||
for i in range(0, tab_count-2):
|
||||
self._close_tab(i)
|
||||
|
||||
|
||||
def keyPressEvent(self, a0):
|
||||
def keyPressEvent(self, a0) -> None:
|
||||
if a0.key() == Qt.Key_F5:
|
||||
pass
|
||||
tab_count = self.tabWidget.count()
|
||||
for i in range(0, tab_count):
|
||||
self._close_tab(i)
|
||||
|
||||
def closeEvent(self, a0):
|
||||
self.operSettings.close()
|
||||
self.sysSettings.close()
|
||||
self.repSettings.close()
|
||||
super().closeEvent(a0)
|
||||
|
||||
def _show_settings(self):
|
||||
def _show_settings(self) -> None:
|
||||
self.operSettings.show()
|
||||
self.sysSettings.show()
|
||||
|
||||
def push_settings(self) -> None:
|
||||
self._updater_trigger()
|
||||
|
||||
def _updater_trigger(self) -> None:
|
||||
def _transfer_settings(self) -> None:
|
||||
self.tabWidget.clear()
|
||||
operator_params = self.operSettings.getParams()
|
||||
system_params = self.sysSettings.getParams()
|
||||
self._controller.push_settings([operator_params, system_params])
|
||||
self._controller.update_settings([operator_params, system_params])
|
||||
|
||||
def _close_tab(self, index:int) -> None:
|
||||
self.tabWidget.removeTab(index)
|
||||
|
||||
def _open_file(self) -> None:
|
||||
path = self._select_csv()
|
||||
self._controller.raport_mode(path)
|
||||
|
||||
def _select_dir(self):
|
||||
folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "")
|
||||
if folder_path:
|
||||
self._controller.open_custom_file(folder_path)
|
||||
def _select_csv(self) -> Optional[str]:
|
||||
CSV_path, _ = QtWidgets.QFileDialog.getOpenFileName(self,"Select csv file", "", "CSV Files (*.csv)")
|
||||
if CSV_path:
|
||||
print(CSV_path)
|
||||
return CSV_path
|
||||
return None
|
||||
|
||||
def _customization_window(self) -> None:
|
||||
tab = self.tabWidget.currentWidget()
|
||||
reg_items = tab.property("reg_items")
|
||||
curve_items = tab.property("curve_items")
|
||||
self.repSettings.build(reg_items, curve_items)
|
||||
|
||||
def _save_plots(self) -> None:
|
||||
filepath, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save file", "", "Image Files (*.png *.jpeg)")
|
||||
tab = self.tabWidget.currentWidget()
|
||||
pixmap = QPixmap(tab.size())
|
||||
tab.render(pixmap)
|
||||
pixmap.save(filepath)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,9 +1,7 @@
|
||||
import pandas as pd
|
||||
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
|
||||
import copy
|
||||
|
||||
|
||||
import pyqtgraph as pg
|
||||
import pandas as pd
|
||||
from typing import Optional, Any
|
||||
|
||||
from utils.base.base import BasePlotWidget
|
||||
@ -14,56 +12,6 @@ class ProcessStage():
|
||||
finish_index:int
|
||||
|
||||
class PlotWidget(BasePlotWidget):
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotWidget,
|
||||
dataframe: pd.DataFrame,
|
||||
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotWidget(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
|
||||
for signal in real_signals:
|
||||
if signal["name"] in dataframe.columns:
|
||||
x = dataframe["time"]
|
||||
y = dataframe[signal["name"]]
|
||||
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
|
||||
navigator.addItem(ROI_region)
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
|
||||
return navigator, ROI_region
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotWidget,
|
||||
region: pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
def _create_curve_ideal(self,
|
||||
signal: dict[str, Any],
|
||||
@ -97,20 +45,20 @@ class PlotWidget(BasePlotWidget):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
|
||||
plot_widget = pg.PlotWidget(title=title)
|
||||
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
|
||||
plot_item = pg.PlotItem(title=title)
|
||||
# Оптимизация отображения графиков
|
||||
plot_widget.setDownsampling(auto=True, mode='peak')
|
||||
plot_widget.showGrid(x=True, y=True)
|
||||
plot_widget.setClipToView(True)
|
||||
legend = pg.LegendItem((80, 60), offset=(70, 20))
|
||||
legend.setParentItem(plot_widget.graphicsItem())
|
||||
return plot_widget, legend
|
||||
plot_item.setDownsampling(auto=True, mode='peak')
|
||||
plot_item.showGrid(x=True, y=True)
|
||||
plot_item.setClipToView(True)
|
||||
legend = plot_item.addLegend(offset=(70, 20))
|
||||
return plot_item, legend
|
||||
|
||||
def _add_stage_regions(self,
|
||||
plot_widget: pg.PlotWidget,
|
||||
plot_item: pg.PlotItem,
|
||||
point_events: dict[str, list[float]],
|
||||
dataframe_headers: list[str],
|
||||
reg_items: dict,
|
||||
transparency: int = 75) -> None:
|
||||
"""
|
||||
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
|
||||
@ -122,12 +70,15 @@ class PlotWidget(BasePlotWidget):
|
||||
region = self._create_stage_region(stage, start_t, end_t, transparency)
|
||||
if region is not None:
|
||||
region.setZValue(-20)
|
||||
plot_widget.addItem(region)
|
||||
plot_item.addItem(region)
|
||||
reg_items["real"].setdefault(stage, [])
|
||||
reg_items["real"][stage].append(region)
|
||||
|
||||
def _add_ideal_stage_regions(self,
|
||||
plot_widget: pg.PlotWidget,
|
||||
plot_item: pg.PlotItem,
|
||||
ideal_data: dict[str, Any],
|
||||
point_events: dict[str, list[float]],
|
||||
reg_items: dict,
|
||||
transparency: int = 125) -> None:
|
||||
"""
|
||||
Добавляет регионы для идеальных этапов.
|
||||
@ -140,13 +91,16 @@ class PlotWidget(BasePlotWidget):
|
||||
region = self._create_stage_region(stage, start_t, end_t, transparency)
|
||||
if region:
|
||||
region.setZValue(-10)
|
||||
plot_widget.addItem(region)
|
||||
plot_item.addItem(region)
|
||||
reg_items["ideal"].setdefault(stage, [])
|
||||
reg_items["ideal"][stage].append(region)
|
||||
|
||||
def _add_ideal_signals(self,
|
||||
plot_widget: pg.PlotWidget,
|
||||
plot_item: pg.PlotItem,
|
||||
ideal_data: dict[str, Any],
|
||||
point_events: dict[str, list[float]],
|
||||
ideal_signals: list[dict[str, Any]]) -> None:
|
||||
ideal_signals: list[dict[str, Any]],
|
||||
curve_items: dict) -> None:
|
||||
"""
|
||||
Добавляет идеальные сигналы для каждого этапа.
|
||||
"""
|
||||
@ -159,23 +113,29 @@ class PlotWidget(BasePlotWidget):
|
||||
point_events[stage][1]
|
||||
)
|
||||
if curve:
|
||||
curve.setZValue(10)
|
||||
plot_widget.addItem(curve)
|
||||
|
||||
curve.setZValue(50)
|
||||
plot_item.addItem(curve)
|
||||
curve_items["ideal"].setdefault(signal["name"], {})
|
||||
curve_items["ideal"][signal["name"]].setdefault(stage, [])
|
||||
curve_items["ideal"][signal["name"]][stage].append(curve)
|
||||
|
||||
def _add_real_signals(self,
|
||||
plot_widget: pg.PlotWidget,
|
||||
plot_item: pg.PlotItem,
|
||||
dataframe: pd.DataFrame,
|
||||
real_signals: list[dict[str, Any]],
|
||||
legend: pg.LegendItem) -> None:
|
||||
legend: pg.LegendItem,
|
||||
curve_items: dict) -> None:
|
||||
"""
|
||||
Добавляет реальные сигналы из dataframe на виджет.
|
||||
"""
|
||||
dataframe_headers = dataframe.columns.tolist()
|
||||
for signal in real_signals:
|
||||
if signal["name"] in dataframe_headers:
|
||||
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
|
||||
plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
|
||||
plot.setZValue(0)
|
||||
legend.addItem(plot, signal["name"])
|
||||
curve_items["real"].setdefault(signal["name"], {})
|
||||
curve_items["real"][signal["name"]] = plot
|
||||
|
||||
def _add_performance_label(self,
|
||||
layout: QVBoxLayout,
|
||||
@ -190,52 +150,35 @@ class PlotWidget(BasePlotWidget):
|
||||
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
|
||||
|
||||
performance_label = QLabel(
|
||||
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
|
||||
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
|
||||
f"Сокращение длительности: фактическое = {tesla_TWC} %"
|
||||
)
|
||||
#f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
|
||||
self.set_style(performance_label)
|
||||
layout.addWidget(performance_label)
|
||||
performance_label.update()
|
||||
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
|
||||
def _build_widget(self, data: list[Any]) -> QWidget:
|
||||
"""
|
||||
Собирает графический виджет для одного набора данных.
|
||||
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
|
||||
"""
|
||||
widget = QWidget()
|
||||
layout = QVBoxLayout(widget)
|
||||
result_widget = QWidget()
|
||||
result_layout = QVBoxLayout(result_widget)
|
||||
plot_layout = pg.GraphicsLayoutWidget()
|
||||
reg_items = {"real":{}, "ideal":{}}
|
||||
curve_items = {"real":{}, "ideal":{}}
|
||||
|
||||
dataframe, points_pocket, useful_data = data
|
||||
tesla_time = useful_data["tesla_time"]
|
||||
range_ME = useful_data["range_ME"]
|
||||
k_hardness = useful_data["k_hardness"]
|
||||
dataframe_headers = dataframe.columns.tolist()
|
||||
|
||||
dat_is_none = dataframe is None
|
||||
|
||||
if not dat_is_none: dataframe_headers = dataframe.columns.tolist()
|
||||
|
||||
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
|
||||
plot_widget, legend = self._init_plot_widget(title=channel)
|
||||
plot_item, legend = self._init_plot_item(title=channel)
|
||||
settings = description["Settings"]
|
||||
|
||||
TWC_time = 0.0
|
||||
@ -243,17 +186,31 @@ class PlotWidget(BasePlotWidget):
|
||||
worst_perf = 2
|
||||
|
||||
# TODO: рассчитать корректный параметр range
|
||||
if settings["mirror ME"]:
|
||||
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
|
||||
if settings["mirror ME"] and not dat_is_none:
|
||||
dataframe = self._mirror_shift_data(
|
||||
"ME",
|
||||
description["Real_signals"],
|
||||
dataframe,
|
||||
range_ME
|
||||
)
|
||||
|
||||
# Итерация по точкам
|
||||
for cur_point, point_data in enumerate(points_pocket):
|
||||
# point_data структура: [point_timeframe, ideal_data, point_events]
|
||||
# point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data]
|
||||
point_timeframe, ideal_dat, point_events, useful_p_data = point_data
|
||||
ideal_data = copy.deepcopy(ideal_dat)
|
||||
|
||||
|
||||
if dat_is_none:
|
||||
worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]]
|
||||
point_events = {}
|
||||
keys = list(ideal_data.keys())
|
||||
shift = 0
|
||||
for i, time in enumerate(ideal_data["Ideal timings"]):
|
||||
point_events[keys[i]] = [shift, time+shift]
|
||||
shift += time
|
||||
|
||||
# TODO: проверить корректность расчетов
|
||||
if settings["force compensation FE"]:
|
||||
if settings["force compensation FE"] and not dat_is_none:
|
||||
force = useful_p_data["force"]
|
||||
F_comp = - force/k_hardness
|
||||
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
|
||||
@ -266,10 +223,9 @@ class PlotWidget(BasePlotWidget):
|
||||
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
|
||||
|
||||
# Добавляем реальные стадии
|
||||
if settings["stages"]:
|
||||
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
|
||||
if settings["stages"] and not dat_is_none:
|
||||
self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75)
|
||||
|
||||
# TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
|
||||
if settings["workpiece"]:
|
||||
x1 = point_timeframe[0]
|
||||
dx = point_timeframe[1] - x1
|
||||
@ -280,15 +236,30 @@ class PlotWidget(BasePlotWidget):
|
||||
rect_item.setZValue(-5)
|
||||
rect_item.setBrush(pg.mkBrush('grey'))
|
||||
rect_item.setPen(pg.mkPen('black', width=3))
|
||||
plot_widget.addItem(rect_item)
|
||||
plot_item.addItem(rect_item)
|
||||
|
||||
if settings["force accuracy"]and not dat_is_none:
|
||||
modifier = 0.05
|
||||
|
||||
x1 = point_events["Welding"][0]
|
||||
dx = point_events["Welding"][1] - x1
|
||||
force = useful_p_data["force"]
|
||||
y1 = force*(1-modifier)
|
||||
dy = force*(2*modifier)
|
||||
|
||||
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
|
||||
rect_item.setZValue(-5)
|
||||
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
|
||||
rect_item.setPen(pg.mkPen('black', width=0))
|
||||
plot_item.addItem(rect_item)
|
||||
|
||||
# Добавляем идеальные стадии и идеальные сигналы
|
||||
if settings["ideals"]:
|
||||
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
|
||||
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
|
||||
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100)
|
||||
self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items)
|
||||
|
||||
# Подсчёт производительности
|
||||
if settings["performance"]:
|
||||
if settings["performance"]and not dat_is_none:
|
||||
is_last_point = (cur_point == len(points_pocket) - 1)
|
||||
if is_last_point:
|
||||
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
|
||||
@ -299,53 +270,46 @@ class PlotWidget(BasePlotWidget):
|
||||
TWC_time += TWC_delta
|
||||
ideal_time += ideal_delta
|
||||
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
|
||||
|
||||
if curr_perf < worst_perf:
|
||||
worst_perf = curr_perf
|
||||
worst_timeframe = point_timeframe
|
||||
|
||||
# Добавляем реальные сигналы
|
||||
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
|
||||
if not dat_is_none:
|
||||
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
|
||||
if widget_num == 0:
|
||||
main_plot = plot_widget
|
||||
main_plot = plot_item
|
||||
else:
|
||||
# Связываем остальные графики с основным графиком
|
||||
plot_widget.setXLink(main_plot)
|
||||
plot_item.setXLink(main_plot)
|
||||
|
||||
# Если есть настройка производительности, добавляем label
|
||||
if settings["performance"]:
|
||||
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
|
||||
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
|
||||
if settings["performance"] and not dat_is_none:
|
||||
self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time)
|
||||
|
||||
layout.addWidget(plot_widget)
|
||||
|
||||
layout.addWidget(navigator)
|
||||
self._sync_main_plot_with_navigator(main_plot, ROI_region)
|
||||
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
|
||||
plot_layout.addItem(plot_item, widget_num, 0)
|
||||
|
||||
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot)
|
||||
if navigator is not None:
|
||||
plot_layout.addItem(navigator, widget_num+1, 0)
|
||||
self._sync_main_plot_with_navigator(main_plot, ROI_region)
|
||||
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
|
||||
|
||||
widget.setLayout(layout)
|
||||
return widget
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
result_layout.addWidget(plot_layout)
|
||||
return result_widget, reg_items, curve_items
|
||||
|
||||
def build(self, data: list[list[Any]]) -> None:
|
||||
"""
|
||||
Создает набор виджетов по предоставленному списку данных.
|
||||
Предполагается, что data — это список элементов вида:
|
||||
[
|
||||
[dataframe, points_pocket, tesla_time],
|
||||
[dataframe, points_pocket, tesla_time],
|
||||
[dataframe, points_pocket, useful_data],
|
||||
[dataframe, points_pocket, useful_data],
|
||||
...
|
||||
]
|
||||
"""
|
||||
widgets = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets)
|
||||
widgets_datapack = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets_datapack)
|
||||
|
||||
|
||||
|
||||
169
src/gui/reportGui.py
Normal file
169
src/gui/reportGui.py
Normal file
@ -0,0 +1,169 @@
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.parametertree import Parameter, ParameterTree
|
||||
from typing import Union
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
|
||||
class ReportSettings(QtWidgets.QWidget):
|
||||
|
||||
def build(self, reg_items: dict, curve_items: dict) -> None:
|
||||
"""Создает ParameterTree для элементов всех графиков выбранной вкладки"""
|
||||
self._clear()
|
||||
param_tree = ParameterTree()
|
||||
layout = self.layout()
|
||||
layout.addWidget(param_tree)
|
||||
|
||||
body= [
|
||||
self._generate_reg_params(reg_items),
|
||||
self._generate_curve_params(curve_items)
|
||||
]
|
||||
# Добавляем параметры в дерево
|
||||
params = Parameter.create(name='params', type='group', children=body)
|
||||
params.sigTreeStateChanged.connect(
|
||||
lambda: self._update_settings(reg_items, curve_items, params)
|
||||
)
|
||||
param_tree.setParameters(params, showTop=False)
|
||||
self.show()
|
||||
|
||||
def _clear(self) -> None:
|
||||
"""
|
||||
Приводит виджет в готовое к работе состояние.
|
||||
Удаляет все содержимое, если имеется
|
||||
"""
|
||||
main = self.layout()
|
||||
if self.layout() is not None:
|
||||
while main.count():
|
||||
child = main.takeAt(0)
|
||||
if child.widget() is not None:
|
||||
child.widget().deleteLater()
|
||||
else:
|
||||
self.setLayout(QtWidgets.QVBoxLayout())
|
||||
|
||||
def _generate_reg_params(self,
|
||||
reg_items: dict) -> dict:
|
||||
|
||||
"""Созадет реальные и идеальные секторы"""
|
||||
|
||||
res = {'name': 'Sectors', 'type': 'group', 'children': [
|
||||
{'name': 'Real sectors', 'type': 'group', 'children': self._create_samples(reg_items["real"])},
|
||||
{'name': 'Ideal sectors', 'type': 'group', 'children': self._create_samples(reg_items["ideal"])},
|
||||
]}
|
||||
return res
|
||||
|
||||
def _generate_curve_params(self,
|
||||
curve_items: dict) -> dict:
|
||||
|
||||
"""Создает реальные и идеальные линии графиков"""
|
||||
|
||||
res = {'name': 'Plots', 'type': 'group', 'children': [
|
||||
{'name': 'Real plots', 'type': 'group', 'children': self._create_samples(curve_items["real"])},
|
||||
{'name': 'Ideal plots', 'type': 'group', 'children': self._create_ideal_curves(curve_items["ideal"])},
|
||||
]}
|
||||
return res
|
||||
|
||||
def _create_ideal_curves(self,
|
||||
curve: dict) -> list[dict]:
|
||||
|
||||
"""Создает секторы с этапами циклограммы"""
|
||||
|
||||
res = []
|
||||
for key, item in curve.items():
|
||||
param = {'name': key, 'type': 'group', 'children': self._create_samples(item)}
|
||||
res.append(param)
|
||||
return res
|
||||
|
||||
def _create_samples(self,
|
||||
sector: dict) -> list[dict]:
|
||||
|
||||
"""Создает список представленных элементов с их параметрами"""
|
||||
|
||||
res = []
|
||||
for key, item in sector.items():
|
||||
sample = item[0] if type(item) == list else item
|
||||
param = {'name': key, 'type': 'group', 'children': self._create_settings(sample)}
|
||||
res.append(param)
|
||||
return res
|
||||
|
||||
def _create_settings(self,
|
||||
item: Union[pg.LinearRegionItem, pg.PlotDataItem]) -> list[dict]:
|
||||
|
||||
"""Получает настройки для элемента"""
|
||||
|
||||
if type(item) == pg.LinearRegionItem:
|
||||
pen = item.lines[0].pen
|
||||
brush = item.brush
|
||||
fill_color = brush.color().getRgb()
|
||||
else:
|
||||
pen = pg.mkPen(item.opts.get("pen"))
|
||||
fill_color = None
|
||||
line_color = pen.color().getRgb()
|
||||
line_thickness = pen.width()
|
||||
visibility = item.isVisible()
|
||||
|
||||
return [
|
||||
{'name': 'Line color', 'type': 'color', 'value': line_color},
|
||||
{'name': 'Line thickness', 'type': 'int', 'value': line_thickness, 'limits': (1, 10)},
|
||||
{'name': 'Visibility', 'type': 'bool', 'value': visibility},
|
||||
{'name': 'Fill color', 'type': 'color', 'value': fill_color},
|
||||
]
|
||||
|
||||
def _update_settings(self,
|
||||
reg_items: dict,
|
||||
curve_items: dict,
|
||||
params: Parameter) -> None:
|
||||
|
||||
"""Задает параметры элементов в соответствии с paramTree"""
|
||||
|
||||
real_sectors = params.child("Sectors").child("Real sectors")
|
||||
ideal_sectors = params.child("Sectors").child("Ideal sectors")
|
||||
|
||||
real_plots = params.child("Plots").child("Real plots")
|
||||
ideal_plots = params.child("Plots").child("Ideal plots")
|
||||
|
||||
self._set_sector_settings(reg_items["real"], real_sectors)
|
||||
self._set_sector_settings(reg_items["ideal"], ideal_sectors)
|
||||
|
||||
self._set_plot_settings(curve_items["real"], real_plots)
|
||||
for key, item_dict in curve_items["ideal"].items():
|
||||
self._set_plot_settings(item_dict, ideal_plots.child(key))
|
||||
|
||||
def _set_sector_settings(self,
|
||||
sectors: dict,
|
||||
settings: Parameter) -> None:
|
||||
|
||||
"""Задает параметры секторов в соответствии с настройками"""
|
||||
|
||||
for key, item in sectors.items():
|
||||
sample = settings.child(key)
|
||||
line_color = sample.child("Line color").value()
|
||||
line_width = sample.child("Line thickness").value()
|
||||
visibility = sample.child("Visibility").value()
|
||||
fill_color = sample.child("Fill color").value()
|
||||
|
||||
pen = pg.mkPen(color=line_color, width=line_width)
|
||||
brush=pg.mkBrush(fill_color)
|
||||
for reg in item:
|
||||
reg.setVisible(visibility)
|
||||
reg.lines[0].setPen(pen)
|
||||
reg.lines[1].setPen(pen)
|
||||
reg.setBrush(brush)
|
||||
|
||||
def _set_plot_settings(self,
|
||||
curves:dict,
|
||||
settings: Parameter) -> None:
|
||||
|
||||
"""Задает параметры кривых в соответствии с настройками"""
|
||||
|
||||
for key, item in curves.items():
|
||||
sample = settings.child(key)
|
||||
line_color = sample.child("Line color").value()
|
||||
line_width = sample.child("Line thickness").value()
|
||||
visibility = sample.child("Visibility").value()
|
||||
pen = pg.mkPen(color=line_color, width=line_width)
|
||||
if type(item) == list:
|
||||
for curve in item:
|
||||
curve.setVisible(visibility)
|
||||
curve.setPen(pen)
|
||||
else:
|
||||
item.setVisible(visibility)
|
||||
item.setPen(pen)
|
||||
@ -1,7 +1,8 @@
|
||||
from typing import Callable, Optional, Any
|
||||
from PyQt5.QtWidgets import (
|
||||
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
|
||||
)
|
||||
from PyQt5.QtWidgets import (QWidget, QPushButton,
|
||||
QLineEdit, QHBoxLayout,
|
||||
QVBoxLayout, QLabel,
|
||||
QTableWidget, QTableWidgetItem)
|
||||
from PyQt5.QtGui import QIntValidator
|
||||
|
||||
from utils.json_tools import read_json, write_json
|
||||
@ -11,7 +12,6 @@ class settingsWindow(QWidget):
|
||||
def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
|
||||
"""
|
||||
Окно настроек для редактирования параметров.
|
||||
|
||||
:param path: Путь к файлу настроек (JSON).
|
||||
:param name: Название набора настроек.
|
||||
:param upd_func: Функция обновления (коллбэк).
|
||||
@ -176,7 +176,16 @@ class settingsWindow(QWidget):
|
||||
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
|
||||
|
||||
|
||||
|
||||
class SystemSettings(settingsWindow):
|
||||
def __init__(self, path, name, upd_func):
|
||||
super().__init__(path, name, upd_func)
|
||||
self._num_points.setVisible(False)
|
||||
|
||||
def _expand(self):
|
||||
pass
|
||||
|
||||
class OperatorSettings(settingsWindow):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@ -24,10 +24,9 @@ def main():
|
||||
window.show()
|
||||
|
||||
controller.signal_widgets.connect(window.show_plot_tabs)
|
||||
controller.signal_settings.connect(mediator.push_settings)
|
||||
controller.signal_monitor.connect(monitor.custom_dir_extract)
|
||||
|
||||
window.push_settings()
|
||||
controller.signal_settings.connect(mediator.update_settings)
|
||||
controller.signal_raport_mode.connect(monitor.custom_csv_extract_only)
|
||||
controller.signal_seeking_mode.connect(monitor.start_seeking)
|
||||
|
||||
sys.exit(app.exec_())
|
||||
|
||||
|
||||
43
src/testAlgo.py
Normal file
43
src/testAlgo.py
Normal file
@ -0,0 +1,43 @@
|
||||
from src.OptAlgorithm.OptAlgorithm import OptAlgorithm
|
||||
from src.utils import read_json
|
||||
|
||||
from matplotlib import pyplot as plt, use
|
||||
|
||||
from numpy import cos, sin, sqrt, cbrt, arcsin, linspace, array
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
tq = 1
|
||||
ts = linspace(0, tq, 200000)
|
||||
|
||||
operator_params = read_json("params/operator_params.json")
|
||||
system_params = read_json("params/system_params.json")
|
||||
|
||||
non_array_operator_params = {}
|
||||
i = 1
|
||||
for key, value in operator_params.items():
|
||||
if hasattr(value, "__len__"):
|
||||
if len(value) > i:
|
||||
non_array_operator_params[key] = value[i]
|
||||
else:
|
||||
non_array_operator_params[key] = value[0]
|
||||
else:
|
||||
non_array_operator_params[key] = value
|
||||
|
||||
non_array_system_params = {}
|
||||
for key, value in system_params.items():
|
||||
if hasattr(value, "__len__"):
|
||||
if len(value) > i:
|
||||
non_array_system_params[key] = value[i]
|
||||
else:
|
||||
non_array_system_params[key] = value[0]
|
||||
else:
|
||||
non_array_system_params[key] = value
|
||||
|
||||
|
||||
opt = OptAlgorithm(non_array_operator_params, non_array_system_params)
|
||||
Xs = array([opt.getVar("X1", t) for t in ts])
|
||||
|
||||
|
||||
plt.plot(ts, Xs)
|
||||
plt.show()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -5,14 +5,11 @@ from typing import Optional, Union, Any
|
||||
from cachetools import LRUCache
|
||||
|
||||
import pandas as pd
|
||||
from PyQt5.QtCore import QThread, QObject, QTimer
|
||||
from PyQt5.QtCore import QObject, QTimer
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||||
from PyQt5.QtOpenGL import QGLWidget
|
||||
from OptAlgorithm import OptAlgorithm
|
||||
import pandas as pd
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
import pyqtgraph as pg
|
||||
|
||||
|
||||
|
||||
@ -38,7 +35,7 @@ class BaseMediator:
|
||||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
|
||||
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
|
||||
...
|
||||
def push_settings (self, data: list[dict]):
|
||||
def update_settings (self, data: list[dict]):
|
||||
...
|
||||
|
||||
class BaseDirectoryMonitor:
|
||||
@ -50,7 +47,8 @@ class BaseDirectoryMonitor:
|
||||
super().__init__()
|
||||
self._directory_path = None
|
||||
self._update_time = None
|
||||
|
||||
self.isActive = False
|
||||
self._files = []
|
||||
self._mediator = mediator
|
||||
|
||||
|
||||
@ -79,14 +77,19 @@ class BaseDirectoryMonitor:
|
||||
self._files = files
|
||||
|
||||
def start(self):
|
||||
self.isActive = True
|
||||
self.update_timer.start(int(self._update_time))
|
||||
|
||||
def stop(self):
|
||||
self.isActive = False
|
||||
self.update_timer.stop()
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def update_plots(self) -> None:
|
||||
...
|
||||
|
||||
def force_all_dir(self):
|
||||
...
|
||||
|
||||
@ -128,7 +131,8 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": False,
|
||||
"workpiece": False,
|
||||
"force compensation FE": False
|
||||
"force compensation FE": False,
|
||||
"force accuracy":True
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -159,7 +163,8 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": True,
|
||||
"workpiece": True,
|
||||
"force compensation FE": True
|
||||
"force compensation FE": True,
|
||||
"force accuracy":False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -190,7 +195,8 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": False,
|
||||
"workpiece": False,
|
||||
"force compensation FE": False
|
||||
"force compensation FE": False,
|
||||
"force accuracy":False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -227,6 +233,88 @@ class BasePlotWidget:
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}""")
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotItem(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
# Получение кривых из main_plot
|
||||
for curve in main_plot.listDataItems():
|
||||
# Извлекаем данные из кривой
|
||||
x, y = curve.getData()
|
||||
curve_name = curve.opts.get("name", None)
|
||||
signal_pen = curve.opts.get("pen", None)
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
|
||||
ROI_region.setBounds([0, x[-1]])
|
||||
navigator.addItem(ROI_region)
|
||||
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
|
||||
return navigator, ROI_region
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotItem,
|
||||
region: pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
@ -252,12 +340,14 @@ class BaseController(QObject):
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
...
|
||||
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
def update_settings(self, settings: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def open_custom_file (self, filepath: str) -> None:
|
||||
def raport_mode (self, filepath: str) -> None:
|
||||
...
|
||||
|
||||
def seeking_mode(self) -> None:
|
||||
...
|
||||
|
||||
class BaseIdealDataBuilder(OptAlgorithm):
|
||||
|
||||
@ -270,10 +360,12 @@ class BaseIdealDataBuilder(OptAlgorithm):
|
||||
|
||||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||||
data = []
|
||||
for i in range (0, int(end_timestamp*self.mul)):
|
||||
for i in range (0, int(end_timestamp*self.mul)+1):
|
||||
time = i/self.mul
|
||||
X1, X2, V1, V2, F = func(time)
|
||||
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
X1, X2, V1, V2, F = func(end_timestamp)
|
||||
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
@ -305,6 +397,8 @@ class BaseMainWindow(QWidget):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
super().__init__()
|
||||
self.set_style(self)
|
||||
self.resize(200,200)
|
||||
self._controller = controller
|
||||
...
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user