Compare commits
No commits in common. "9faa35f514058f2414297ff97a9d3c82ea99548c" and "02268f1f7b18941aa61c2971509328a6c0be9170" have entirely different histories.
9faa35f514
...
02268f1f7b
3
.gitignore
vendored
3
.gitignore
vendored
@ -6,5 +6,4 @@
|
||||
/venv
|
||||
*.txt
|
||||
*.svg
|
||||
/test.py
|
||||
/output
|
||||
/test.py
|
||||
@ -81,19 +81,19 @@
|
||||
],
|
||||
"distance_l_2": [
|
||||
0.0275,
|
||||
0.0255,
|
||||
0.0242,
|
||||
0.0245,
|
||||
0.0228,
|
||||
0.0236,
|
||||
0.0229,
|
||||
0.0248,
|
||||
0.024,
|
||||
0.0235,
|
||||
0.025,
|
||||
0.0276,
|
||||
0.0234,
|
||||
0.0215
|
||||
0.03,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033,
|
||||
0.033
|
||||
],
|
||||
"distance_h_end1": [
|
||||
0.003,
|
||||
|
||||
BIN
profile_results.prof
Normal file
BIN
profile_results.prof
Normal file
Binary file not shown.
@ -3,76 +3,75 @@ from numpy import sqrt, arcsin, arccos, cos, sin
|
||||
from OptAlgorithm.AutoConfigClass import AutoConfigClass
|
||||
from OptAlgorithm.ConstantCalculator import ConstantCalculator
|
||||
|
||||
|
||||
class OptTimeCalculator(AutoConfigClass):
|
||||
|
||||
params_list = []
|
||||
|
||||
def __init__(self, operator_config: dict, system_config: dict):
|
||||
|
||||
def __init__(self, operator_config : dict, system_config : dict):
|
||||
|
||||
cCalculator = ConstantCalculator(operator_config, system_config)
|
||||
super().__init__(OptTimeCalculator.params_list, operator_config, system_config, cCalculator.calc())
|
||||
self.allTimes = {}
|
||||
self.check_eps = 1e-7
|
||||
|
||||
def tGrowNominal(self, F: float) -> float:
|
||||
return arcsin(F / (self.Ftogrow)) * sqrt(self.mass_1 / self.k_hardness_1)
|
||||
|
||||
def tGrowNominal(self, F : float) -> float:
|
||||
return arcsin(F/(self.Ftogrow)) * sqrt(self.mass_1/self.k_hardness_1)
|
||||
|
||||
def Tclose(self, h1: float, h2: float) -> None:
|
||||
v0q = min(sqrt(2 * self.a_max_1 * h1), self.v_max_1)
|
||||
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
|
||||
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
|
||||
t1 = v0 / self.a_max_1
|
||||
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 / 2)) / v0)
|
||||
t2t = max(0, (h1 - (self.a_max_1 * t1 * t1 /2)) / v0)
|
||||
T1 = t1 + t2t
|
||||
|
||||
t21 = sqrt(h2 / (self.a_max_2))
|
||||
t21 = min(self.v_max_2 / self.a_max_2, t21)
|
||||
|
||||
t21 = sqrt(h2/self.a_max_2)
|
||||
t21 = min(self.v_max_2/self.a_max_2, t21)
|
||||
t22 = max(0, (h2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
|
||||
T2 = t22 + 2 * t21
|
||||
|
||||
|
||||
Tclose = max(T1, T2)
|
||||
|
||||
|
||||
tclose_1_acc, tclose_1_speed = self.calcFirstClose(Tclose, h1)
|
||||
tclose_2_acc, tclose_2_speed = self.calcSecondClose(Tclose, h2)
|
||||
|
||||
|
||||
self.allTimes["tclose_1_acc"] = tclose_1_acc
|
||||
self.allTimes["tclose_1_speed"] = tclose_1_speed
|
||||
|
||||
|
||||
self.allTimes["tclose_2_acc"] = tclose_2_acc
|
||||
self.allTimes["tclose_2_speed"] = tclose_2_speed
|
||||
self.allTimes["tclose"] = Tclose
|
||||
|
||||
def Topen(self, s1: float, s2: float, l1: float, l2: float, Fs1: float, Fs2: float = 0) -> None:
|
||||
t11 = sqrt((l1 + Fs1) / self.a_max_1)
|
||||
t11 = min(self.v_max_1 / self.a_max_1, t11)
|
||||
t12 = max(0, ((l1 + Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
|
||||
|
||||
def Topen(self, s1 : float, s2 : float, l1 : float, l2 : float, Fs1 : float, Fs2 : float = 0) -> None:
|
||||
t11 = sqrt((l1 + Fs1)/self.a_max_1)
|
||||
t11 = min(self.v_max_1/self.a_max_1, t11)
|
||||
t12 = max(0, ((l1+Fs1) - (self.a_max_1 * t11 * t11)) / self.v_max_1)
|
||||
T1 = t12 + 2 * t11
|
||||
offset = self.calcSecondOpenOffset(t11, t12, Fs1)
|
||||
|
||||
t21 = sqrt(l2 / self.a_max_2)
|
||||
t21 = min(self.v_max_2 / self.a_max_2, t21)
|
||||
|
||||
t21 = sqrt(l2/self.a_max_2)
|
||||
t21 = min(self.v_max_2/self.a_max_2, t21)
|
||||
t22 = max(0, (l2 - (self.a_max_2 * t21 * t21)) / self.v_max_2)
|
||||
T2 = t22 + 2 * t21 + offset
|
||||
|
||||
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1 + Fs1)
|
||||
|
||||
topen_1_acc, topen_1_speed = self.calcFirstOpen(T1, l1+Fs1)
|
||||
offset = self.calcSecondOpenOffset(topen_1_acc, topen_1_speed, Fs1)
|
||||
|
||||
topen_2_acc, topen_2_speed = self.calcSecondOpen(T2 - offset, l2)
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_2_offset"] = offset
|
||||
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_2_offset"] = offset
|
||||
|
||||
self.allTimes["topen_1_acc"] = topen_1_acc
|
||||
self.allTimes["topen_1_speed"] = topen_1_speed
|
||||
|
||||
|
||||
self.allTimes["topen_2_acc"] = topen_2_acc
|
||||
self.allTimes["topen_2_speed"] = topen_2_speed
|
||||
|
||||
|
||||
if s1 >= l1:
|
||||
raise Exception("""S1 >= L1 - недопустимый сценарий,
|
||||
проверьте distance_s_1, distance_h_end1""")
|
||||
if s2 >= l2:
|
||||
raise Exception("""S2 >= L2 - недопустимый сценарий,
|
||||
проверьте distance_s_2, distance_h_end2""")
|
||||
|
||||
|
||||
s1 += Fs1
|
||||
topen_1_mark = sqrt(2 * s1 / self.a_max_1)
|
||||
if topen_1_mark > topen_1_acc:
|
||||
@ -80,173 +79,167 @@ class OptTimeCalculator(AutoConfigClass):
|
||||
v1 = topen_1_acc * self.a_max_1
|
||||
if s1 > topen_1_speed * v1:
|
||||
s1 -= topen_1_speed * v1
|
||||
topen_1_mark = 2 * topen_1_acc + topen_1_speed - sqrt(topen_1_acc ** 2 - 2 * s1 / self.a_max_1)
|
||||
topen_1_mark = 2*topen_1_acc + topen_1_speed - sqrt(topen_1_acc**2 - 2*s1 / self.a_max_1)
|
||||
else:
|
||||
topen_1_mark = topen_1_acc + s1 / v1
|
||||
|
||||
|
||||
topen_2_mark = sqrt(2 * s2 / self.a_max_2)
|
||||
if topen_2_mark > topen_2_acc:
|
||||
s2 -= topen_2_acc ** 2 * self.a_max_2 / 2
|
||||
v2 = topen_2_acc * self.a_max_2
|
||||
if s2 > topen_2_speed * v2:
|
||||
s2 -= topen_2_speed * v2
|
||||
topen_2_mark = 2 * topen_2_acc + topen_2_speed - sqrt(topen_2_acc ** 2 - 2 * s2 / self.a_max_2)
|
||||
topen_2_mark = 2*topen_2_acc + topen_2_speed - sqrt(topen_2_acc**2 - 2*s2 / self.a_max_2)
|
||||
else:
|
||||
topen_2_mark = topen_2_acc + s2 / v2
|
||||
|
||||
self.allTimes["topen_1_mark"] = topen_1_mark
|
||||
self.allTimes["topen_2_mark"] = topen_2_mark
|
||||
|
||||
|
||||
def Tgrow(self) -> None:
|
||||
|
||||
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
|
||||
|
||||
v0 = self.allTimes["tclose_1_acc"] * self.a_max_1
|
||||
vF0 = v0 * self.k_hardness_1
|
||||
|
||||
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1/(self.mass_1))* self.Ftogrow)
|
||||
|
||||
vFmax = min(self.v_max_1 * self.k_hardness_1, sqrt(self.k_hardness_1 / (self.mass_1)) * self.Ftogrow)
|
||||
|
||||
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0 * vF0)
|
||||
tspeed = sqrt(self.mass_1 / self.k_hardness_1) * (
|
||||
arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
|
||||
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1 / self.freq * vF0 * sin(
|
||||
self.freq * tspeed)
|
||||
L = sqrt(self.k_hardness_1 / self.mass_1 * self.eff_control ** 2 + vF0*vF0)
|
||||
tspeed = sqrt(self.mass_1/self.k_hardness_1) * (arcsin(vFmax / L) - arccos(sqrt(self.k_hardness_1 / self.mass_1) * self.eff_control / L))
|
||||
Fspeed = - self.eff_control * cos(self.freq * tspeed) + self.eff_control + 1/self.freq * vF0 * sin(self.freq * tspeed)
|
||||
eps = 1e1
|
||||
if self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 < -eps:
|
||||
if self.freq**2 * self.Ftogrow**2 - vFmax**2 < -eps:
|
||||
raise Exception("""Номинальная траектория набора усилия не может быть достигнута, максимальная скорость превысила скорость траектории
|
||||
, проверьте параметры k_hardness_1, mass_1, k_prop""")
|
||||
Fmeet = 1 / self.freq * sqrt(self.freq ** 2 * self.Ftogrow ** 2 - vFmax ** 2 + eps)
|
||||
Fmeet = 1/self.freq * sqrt(self.freq**2 * self.Ftogrow**2 - vFmax**2 + eps)
|
||||
Fstart_prop = self.Fstart_prop
|
||||
if Fmeet > Fstart_prop:
|
||||
raise Exception("""Номинальная траектория набора усилия была достигнута на фазе подпора
|
||||
, проверьте параметры v_max_1, k_prop""")
|
||||
tmeet = (Fmeet - Fspeed) / vFmax
|
||||
tmeet = (Fmeet - Fspeed)/vFmax
|
||||
tend = self.tGrowNominal(Fstart_prop) - self.tGrowNominal(Fmeet)
|
||||
vp = 1 / sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow ** 2 - self.Fstart_prop ** 2)
|
||||
vp = 1/sqrt(self.k_hardness_1 * self.mass_1) * sqrt(self.Ftogrow**2 - self.Fstart_prop**2)
|
||||
ap = Fstart_prop / self.mass_1
|
||||
tprop = 2 * vp / ap
|
||||
|
||||
tprop = 2*vp / ap
|
||||
|
||||
self.allTimes["tspeed"] = tspeed
|
||||
self.allTimes["tmeet"] = tmeet
|
||||
self.allTimes["tend"] = tend
|
||||
self.allTimes["tprop"] = tprop
|
||||
self.allTimes["tgrow"] = tspeed + tmeet + tend + tprop
|
||||
|
||||
def T(self, h1: float, h2: float, s1: float, s2: float, l1: float, l2: float) -> dict:
|
||||
|
||||
def T(self, h1 : float, h2 : float, s1 : float, s2 : float, l1 : float, l2 : float) -> dict:
|
||||
self.Tclose(h1, h2)
|
||||
self.Tgrow()
|
||||
self.Topen(s1, s2, l1, l2, self.force_target / self.k_hardness_1, 0)
|
||||
return self.allTimes
|
||||
|
||||
def Tmovement(self, closeAlgo, tmark) -> tuple[list, list]:
|
||||
|
||||
def Tmovement(self, closeAlgo, tmark) -> None:
|
||||
contact = [self.contact_distance_1, self.contact_distance_2]
|
||||
v0s = []
|
||||
pos0s = []
|
||||
for i in range(1, 3):
|
||||
for i in range(1,3):
|
||||
if tmark < 0:
|
||||
raise Exception("""Отрицательное время этапа раскрытия,
|
||||
проверьте distance_s_{1,2}, time_command""")
|
||||
v0 = closeAlgo("V" + str(i), "Open", tmark)
|
||||
v0 = closeAlgo("V"+str(i), "Open", tmark)
|
||||
v0s.append(v0)
|
||||
x0 = closeAlgo("X" + str(i), "Open", tmark)
|
||||
x1 = contact[i - 1] - self.__dict__["distance_h_end" + str(i)]
|
||||
x0 = closeAlgo("X"+str(i), "Open", tmark)
|
||||
x1 = contact[i-1] - self.__dict__["distance_h_end"+str(i)]
|
||||
x = x1 - x0
|
||||
pos0s.append(closeAlgo("X" + str(i), "Open", tmark))
|
||||
pos0s.append(closeAlgo("X"+str(i), "Open", tmark))
|
||||
Tfull = self.time_robot_movement
|
||||
|
||||
L = self.__dict__["distance_l_" + str(i)]
|
||||
maxL = contact[i - 1] - L - x0
|
||||
|
||||
|
||||
L = self.__dict__["distance_l_"+str(i)]
|
||||
maxL = contact[i-1] - L - x0
|
||||
self.Tmovementi(i, x, Tfull, v0, maxL)
|
||||
return pos0s, v0s
|
||||
|
||||
|
||||
def Tmovementi(self, i, Sfull, Tfull, v0, maxL) -> None:
|
||||
v0 = abs(v0)
|
||||
vmax = self.__dict__["v_max_" + str(i)]
|
||||
a = self.__dict__["a_max_" + str(i)]
|
||||
t3 = (Tfull + v0 / a) / 2
|
||||
|
||||
sqrtval = a ** 2 * (
|
||||
a ** 2 * (Tfull + 2 * t3) ** 2 - 8 * a * Sfull + 2 * a * v0 * (Tfull + 2 * t3) - 3 * v0 ** 2)
|
||||
vmax = self.__dict__["v_max_"+str(i)]
|
||||
a = self.__dict__["a_max_"+str(i)]
|
||||
t3 = (Tfull + v0 / a) / 2
|
||||
|
||||
sqrtval = a**2 * (a**2 * (Tfull+2*t3)**2 - 8 * a * Sfull + 2 * a* v0 * (Tfull+2*t3) - 3 *v0**2)
|
||||
if sqrtval < 0:
|
||||
raise Exception("""Невозможно с S_{i} добраться но H*_{i} за указанное время,
|
||||
проверьте distance_s_{i}, distance_h_end{i}, time_command, time_robot_movement""")
|
||||
t1max = ((Tfull + 2 * t3) + v0 / a) / (2) - sqrt(sqrtval) * sqrt(2) / (4 * a ** 2)
|
||||
t1 = min(t1max, (vmax - abs(v0)) / a)
|
||||
t1 = max(0, min(t1, -v0 / a + sqrt(v0 ** 2 / (a ** 2) + (abs(maxL) - v0 * v0 / a) / a)))
|
||||
|
||||
t31 = v0 / a + t1
|
||||
t5max = (Tfull - v0 / a) / 2 - t1
|
||||
t1max = ((Tfull+2*t3) + v0/a)/(2) - sqrt(sqrtval) * sqrt(2)/(4*a**2)
|
||||
t1 = min(t1max, (vmax- abs(v0))/a)
|
||||
t1 = max(0, min(t1, -v0/a + sqrt(v0**2 / (a**2) + (abs(maxL)-v0*v0/a)/a)))
|
||||
|
||||
t31 = v0/a + t1
|
||||
t5max = (Tfull - v0/a)/2 - t1
|
||||
v1 = v0 + a * t1
|
||||
S1 = v0 * t1 + a * t1 * t1 / 2 + v1 * t31 - a * t31 * t31 / 2
|
||||
S1 = v0*t1 + a*t1*t1/2 + v1*t31 - a*t31*t31/2
|
||||
S2max = Sfull + S1
|
||||
t5 = min(t5max, (vmax) / a, sqrt(S2max / a))
|
||||
t3 = abs(v0) / a + t1 + t5
|
||||
t5 = min(t5max, (vmax)/a, sqrt(S2max / a))
|
||||
t3 = abs(v0)/a + t1 + t5
|
||||
t32 = t5
|
||||
|
||||
v1 = abs(v0 + t1 * a)
|
||||
v3 = abs(v0 + t1 * a - t3 * a)
|
||||
|
||||
v1 = abs(v0+t1*a)
|
||||
v3 = abs(v0 + t1*a - t3*a)
|
||||
timeleft = Tfull - t1 - t5 - t3
|
||||
sq = -v0 * t1 - a * t1 ** 2 / 2 - v1 * t3 + a * t3 ** 2 / 2 + v3 * t5 - a * t5 ** 2 / 2
|
||||
sq = -v0*t1 - a*t1**2/2 - v1 * t3 + a*t3**2/2 + v3*t5 - a*t5**2/2
|
||||
Sleft = Sfull - sq
|
||||
|
||||
t2max = (timeleft - Sleft / v3) / (1 + v1 / v3)
|
||||
Smovement = -v0 * t1 - a / 2 * t1 ** 2 - v1 * t31 + a / 2 * t31 ** 2
|
||||
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement)) / v1))
|
||||
t4 = max(0, Sleft / v3 + v1 / v3 * t2)
|
||||
|
||||
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
|
||||
|
||||
self.allTimes["tmovement_" + str(i) + "_acc"] = t1
|
||||
self.allTimes["tmovement_" + str(i) + "_speed"] = t2
|
||||
self.allTimes["tmovement_" + str(i) + "_slow"] = t31
|
||||
self.allTimes["tmovement_" + str(i) + "_stay"] = tstay
|
||||
self.allTimes["tmovement_" + str(i)] = t1 + t2 + t31 + tstay
|
||||
self.allTimes["tpreclose_" + str(i) + "_slow"] = t32
|
||||
self.allTimes["tpreclose_" + str(i) + "_speed"] = t4
|
||||
self.allTimes["tpreclose_" + str(i) + "_acc"] = t5
|
||||
self.allTimes["tpreclose_" + str(i)] = t32 + t4 + t5
|
||||
|
||||
t2max = (timeleft - Sleft/v3) / (1 + v1/v3)
|
||||
Smovement = -v0 * t1 - a/2 * t1**2 - v1 * t31 + a/2*t31**2
|
||||
t2 = max(0, min(t2max, (abs(maxL) - abs(Smovement))/v1))
|
||||
t4 = max(0, Sleft/v3 + v1/v3 * t2)
|
||||
|
||||
tstay = max(0, Tfull - t1 - t2 - t3 - t4 - t5)
|
||||
|
||||
self.allTimes["tmovement_"+str(i)+"_acc"] = t1
|
||||
self.allTimes["tmovement_"+str(i)+"_speed"] = t2
|
||||
self.allTimes["tmovement_"+str(i)+"_slow"] = t31
|
||||
self.allTimes["tmovement_"+str(i)+"_stay"] = tstay
|
||||
self.allTimes["tmovement_"+str(i)] = t1 + t2 + t31 + tstay
|
||||
self.allTimes["tpreclose_"+str(i)+"_slow"] = t32
|
||||
self.allTimes["tpreclose_"+str(i)+"_speed"] = t4
|
||||
self.allTimes["tpreclose_"+str(i)+"_acc"] = t5
|
||||
self.allTimes["tpreclose_"+str(i)] = t32 + t4 + t5
|
||||
T = Tfull
|
||||
self.allTimes["tmovement"] = T
|
||||
|
||||
def calcFirstClose(self, T: float, s: float) -> tuple[float, float]:
|
||||
|
||||
def calcFirstClose(self, T : float, s : float) -> tuple[float, float]:
|
||||
v0q = min(sqrt(2 * self.a_max_1 * s), self.v_max_1)
|
||||
v0 = min(v0q, sqrt(1 / (self.k_hardness_1 * self.mass_1)) * self.Ftogrow)
|
||||
t1 = T - sqrt(max(0, T ** 2 - 2 * s / self.a_max_1))
|
||||
if t1 > v0/ self.a_max_1 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - смыкание FE, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
|
||||
v0 = min(v0q, sqrt(1/(self.k_hardness_1*self.mass_1))* self.Ftogrow)
|
||||
t1 = T - sqrt(max(0, T**2 - 2 * s / self.a_max_1))
|
||||
t1 = min(t1, v0 / self.a_max_1)
|
||||
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
|
||||
return t1, t2
|
||||
|
||||
def calcFirstOpen(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_1)) / 2
|
||||
if t1 > self.v_max_1 / self.a_max_1 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - раскрытие FE, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_1 * t1 ** 2 / 2) / (self.a_max_1 * t1))
|
||||
|
||||
def calcFirstOpen(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_1)) / 2
|
||||
t1 = min(t1, self.v_max_1 / self.a_max_1)
|
||||
t2 = max(0, (s - self.a_max_1*t1**2/2) / (self.a_max_1*t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondOpen(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
|
||||
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - раскрытие ME, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
|
||||
|
||||
def calcSecondOpen(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
|
||||
t1 = min(t1, self.v_max_2 / self.a_max_2)
|
||||
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondClose(self, T: float, s: float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T ** 2 - 4 * s / self.a_max_2)) / 2
|
||||
if t1 > self.v_max_2 / self.a_max_2 + self.check_eps:
|
||||
raise Exception("""Мы вышли за границы разгона - смыкание ME, вообще не знаю как так получилось""")
|
||||
t2 = max(0, (s - self.a_max_2 * t1 ** 2) / (self.a_max_2 * t1))
|
||||
|
||||
def calcSecondClose(self, T : float, s : float) -> tuple[float, float]:
|
||||
t1 = T / 2 - sqrt(max(0, T**2 - 4 * s / self.a_max_2)) / 2
|
||||
t1 = min(t1, self.v_max_2 / self.a_max_2)
|
||||
t2 = max(0, (s - self.a_max_2*t1**2/2) / (self.a_max_2*t1))
|
||||
return t1, t2
|
||||
|
||||
def calcSecondOpenOffset(self, t1: float, t2: float, sq: float) -> float:
|
||||
|
||||
def calcSecondOpenOffset(self, t1 : float, t2 : float, sq : float) -> float:
|
||||
s = sq * 1
|
||||
offset = sqrt(2 * s / self.a_max_1)
|
||||
|
||||
|
||||
if offset > t1:
|
||||
s -= t1 ** 2 * self.a_max_1 / 2
|
||||
v1 = t1 * self.a_max_1
|
||||
if s > t2 * v1:
|
||||
s -= t2 * v1
|
||||
|
||||
offset = 2 * t1 + t2 - sqrt(t1 ** 2 - 2 * s / self.a_max_1)
|
||||
|
||||
offset = 2*t1 + t2 - sqrt(t1**2 - 2*s / self.a_max_1)
|
||||
else:
|
||||
offset = t1 + s / v1
|
||||
return offset
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -8,21 +8,13 @@ class Controller(BaseController):
|
||||
|
||||
signal_widgets = pyqtSignal(list)
|
||||
signal_settings = pyqtSignal(list)
|
||||
signal_raport_mode = pyqtSignal(str)
|
||||
signal_seeking_mode = pyqtSignal()
|
||||
signal_monitor = pyqtSignal(str)
|
||||
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
self.signal_widgets.emit(widgets)
|
||||
|
||||
def update_settings(self, settings: list[dict]) -> None:
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
self.signal_settings.emit(settings)
|
||||
|
||||
def raport_mode(self, filepath: str) -> None:
|
||||
self.signal_raport_mode.emit(filepath)
|
||||
|
||||
def seeking_mode(self) -> None:
|
||||
self.signal_seeking_mode.emit()
|
||||
|
||||
|
||||
|
||||
|
||||
def open_custom_file (self, filepath: str) -> None:
|
||||
self.signal_monitor.emit(filepath)
|
||||
@ -1,9 +1,6 @@
|
||||
import pandas as pd
|
||||
import traceback
|
||||
import sys
|
||||
from loguru import logger
|
||||
|
||||
# FIXME: костыль для выключения предупреждения "replace deprecated"
|
||||
#FIXME: костыль для выключения предупреждения "replace deprecated". Потом надо поправить.
|
||||
pd.set_option('future.no_silent_downcasting', True)
|
||||
|
||||
from utils.base.base import BaseDataConverter
|
||||
@ -13,23 +10,11 @@ class DataConverter(BaseDataConverter):
|
||||
|
||||
@staticmethod
|
||||
def _replace_bool(dataframe: pd.DataFrame) -> pd.DataFrame:
|
||||
try:
|
||||
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
|
||||
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
|
||||
return dataframe
|
||||
except:
|
||||
return None
|
||||
bool_columns = dataframe.columns[dataframe.isin([True, False]).any()]
|
||||
dataframe[bool_columns] = dataframe[bool_columns].replace({True: 1, False: 0})
|
||||
return dataframe
|
||||
|
||||
def convert_data(self, files: list[str]) -> None:
|
||||
try:
|
||||
dataframes = [pd.read_csv(file) if file != '' else None for file in files]
|
||||
converted_dataframes = list(map(self._replace_bool, dataframes))
|
||||
except:
|
||||
# Get the traceback object
|
||||
tb = sys.exc_info()[2]
|
||||
tbinfo = traceback.format_tb(tb)[0]
|
||||
pymsg = "Traceback info:\n" + tbinfo + "\nError Info:\n" + str(sys.exc_info()[1])
|
||||
logger.error(pymsg)
|
||||
converted_dataframes = [None]
|
||||
finally:
|
||||
self._mediator.notify(self, converted_dataframes)
|
||||
dataframes = [pd.read_csv(file) for file in files]
|
||||
converted_dataframes = list(map(self._replace_bool, dataframes))
|
||||
self._mediator.notify(self, converted_dataframes)
|
||||
|
||||
@ -3,9 +3,7 @@ import pandas as pd
|
||||
from typing import Union
|
||||
from PyQt5.QtWidgets import QWidget
|
||||
|
||||
from utils.base.base import (BaseMediator, BaseDirectoryMonitor,
|
||||
BaseDataConverter, BasePlotWidget,
|
||||
BasePointPassportFormer)
|
||||
from utils.base.base import BaseMediator, BaseDirectoryMonitor, BaseDataConverter, BasePlotWidget, BasePointPassportFormer
|
||||
|
||||
|
||||
class Mediator(BaseMediator):
|
||||
@ -26,9 +24,9 @@ class Mediator(BaseMediator):
|
||||
if issubclass(source.__class__, BasePlotWidget):
|
||||
self._controller.send_widgets(data)
|
||||
|
||||
def update_settings(self, settings: list[dict]):
|
||||
def push_settings(self, settings: list[dict]):
|
||||
self._monitor.update_settings(settings)
|
||||
self._passportFormer.update_settings(settings)
|
||||
self._monitor.update_plots()
|
||||
self._monitor.force_all_dir()
|
||||
|
||||
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
from time import sleep
|
||||
import os
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from utils.base.base import BaseDirectoryMonitor
|
||||
@ -7,62 +9,47 @@ from utils.base.base import BaseDirectoryMonitor
|
||||
class DirectoryMonitor(BaseDirectoryMonitor):
|
||||
|
||||
def _init_state(self):
|
||||
|
||||
self._files = [
|
||||
os.path.join(self._directory_path, file)
|
||||
for file in os.listdir(self._directory_path)
|
||||
if file.lower().endswith('.csv')
|
||||
]
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
|
||||
self.update_timer.timeout.connect(self._monitor)
|
||||
logger.info("Monitor initiated!")
|
||||
|
||||
def _monitor(self):
|
||||
current_files = [
|
||||
os.path.join(self._directory_path, file)
|
||||
for file in os.listdir(self._directory_path)
|
||||
if file.lower().endswith('.csv')
|
||||
]
|
||||
new_files = sorted(list(filter(lambda x: x not in self._files, current_files)))
|
||||
files = os.listdir(self._directory_path)
|
||||
new_files = sorted(list(map(lambda x: os.path.join(self._directory_path, x),
|
||||
filter(lambda x: x not in self._files, files))))
|
||||
if new_files:
|
||||
logger.info(f"New files detected: {new_files}")
|
||||
self._mediator.notify(self, new_files)
|
||||
self._files = current_files
|
||||
if not current_files:
|
||||
self._files = files
|
||||
if not files:
|
||||
self._files = []
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
if self.isActive:
|
||||
self.stop()
|
||||
_, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
|
||||
if not os.path.exists(self._directory_path):
|
||||
logger.error(f"Путь {self._directory_path} не существует.")
|
||||
raise FileNotFoundError(f"Путь {self._directory_path} не существует.")
|
||||
else:
|
||||
self._init_state()
|
||||
self.start()
|
||||
else:
|
||||
_, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
|
||||
def update_plots(self):
|
||||
if self._files is not None:
|
||||
self._mediator.notify(self, self._files)
|
||||
|
||||
def custom_csv_extract_only(self, path: str):
|
||||
self.stop()
|
||||
self._files.append(path)
|
||||
if path is not None:
|
||||
self._mediator.notify(self, [path])
|
||||
else:
|
||||
self._mediator.notify(self, [None])
|
||||
|
||||
|
||||
def start_seeking(self) -> None:
|
||||
operator_params, system_params = data
|
||||
self._directory_path = system_params['trace_storage_path'][0]
|
||||
self._update_time = system_params['monitor_update_period'][0]
|
||||
self._init_state()
|
||||
self.start()
|
||||
|
||||
def force_all_dir(self):
|
||||
files = os.listdir(self._directory_path)
|
||||
self._files = files
|
||||
all_files = []
|
||||
for x in self._files:
|
||||
path = os.path.join(self._directory_path, x)
|
||||
all_files.append(path)
|
||||
if all_files:
|
||||
logger.info(f"Plotting all files: {all_files}")
|
||||
self._mediator.notify(self, all_files)
|
||||
else:
|
||||
logger.info(f"Failed pushing {str(all_files)}")
|
||||
|
||||
def custom_dir_extract(self, dict_path:str ):
|
||||
files = os.listdir(dict_path)
|
||||
if files is not None:
|
||||
all_files = [os.path.join(dict_path, file) for file in files]
|
||||
self._mediator.notify(self, all_files)
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||||
import pandas as pd
|
||||
|
||||
from utils.base.base import BasePointPassportFormer, BaseIdealDataBuilder
|
||||
|
||||
|
||||
class idealDataBuilder(BaseIdealDataBuilder):
|
||||
@ -18,15 +18,14 @@ class idealDataBuilder(BaseIdealDataBuilder):
|
||||
|
||||
def get_weldingDF(self) -> pd.DataFrame:
|
||||
data = []
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow']-0.0001)
|
||||
X1, X2, V1, V2 = X1*1000, X2*1000, V1*1000, V2*1000
|
||||
X1, X2, V1, V2, F = self.calcPhaseGrow(self.Ts['tgrow'])
|
||||
data.append({"time":0, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
data.append({"time":self.welding_time, "Position FE":X1,"Position ME":X2, "Rotor Speed FE":V1, "Rotor Speed ME":V2, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_ideal_timings(self) -> list[float]:
|
||||
data = self.Ts
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']]
|
||||
ideal_timings = [data['tclose'], data['tgrow'], self.welding_time, self.getMarkOpen(), data['tmovement']] # TODO: add data['tmovement'], Oncoming не учитывается в производительности
|
||||
return ideal_timings
|
||||
|
||||
class PassportFormer(BasePointPassportFormer):
|
||||
@ -37,50 +36,49 @@ class PassportFormer(BasePointPassportFormer):
|
||||
|
||||
|
||||
def _build_passports_pocket(self, dataframe: pd.DataFrame) -> list[pd.DataFrame, dict, int]:
|
||||
|
||||
if dataframe is not None:
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
||||
else:
|
||||
events = None
|
||||
point_quantity = 1
|
||||
|
||||
points_pocket = []
|
||||
events, point_quantity = self._filter_events(dataframe["time"], dataframe)
|
||||
if point_quantity == 0:
|
||||
return []
|
||||
|
||||
system_settings = {key: value[0] for key, value in self._params[1].items()}
|
||||
|
||||
tesla_time = sum(self._params[0].get("Tesla summary time", []))
|
||||
useful_data = {"tesla_time": tesla_time,
|
||||
"range_ME": system_settings["Range ME, mm"],
|
||||
"k_hardness": system_settings["k_hardness_1"]
|
||||
}
|
||||
|
||||
for i in range(point_quantity):
|
||||
operator_settings = {
|
||||
key: (value[i] if i < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
params_list = [operator_settings, system_settings]
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
if cache_key in self._ideal_data_cashe :
|
||||
ideal_data = self._ideal_data_cashe[cache_key]
|
||||
print(f"Cache hit")
|
||||
else:
|
||||
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
print(f"Cache miss. Computed and cached.")
|
||||
if events is not None:
|
||||
points_pocket = []
|
||||
|
||||
time_is_valid = not dataframe["time"].isna().all()
|
||||
|
||||
if time_is_valid:
|
||||
|
||||
idx_shift = True if events[self._stages[-1]][0][0] == 0 else False
|
||||
|
||||
for i in range(point_quantity):
|
||||
operator_settings = {
|
||||
key: (value[i] if i < len(value) else value[0])
|
||||
for key, value in self._params[0].items()
|
||||
}
|
||||
params_list = [operator_settings, system_settings]
|
||||
cache_key = self._generate_cache_key(params_list)
|
||||
if cache_key in self._ideal_data_cashe :
|
||||
ideal_data = self._ideal_data_cashe[cache_key]
|
||||
print(f"Cache hit")
|
||||
else:
|
||||
ideal_data = self._build_ideal_data(idealDataBuilder=idealDataBuilder, params=params_list)
|
||||
self._ideal_data_cashe[cache_key] = ideal_data
|
||||
print(f"Cache miss. Computed and cached.")
|
||||
idx = i+1 if idx_shift else i
|
||||
point_timeframe = [events[self._stages[0]][0][i], events[self._stages[-1]][1][idx]]
|
||||
point_events = {key: [value[0][i], value[1][i]] for key, value in events.items()}
|
||||
else:
|
||||
point_timeframe, point_events = None, None
|
||||
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
useful_p_data = {"thickness": operator_settings["object_thickness"],
|
||||
"L2": operator_settings["distance_l_2"],
|
||||
"force": operator_settings["force_target"]}
|
||||
|
||||
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
||||
return dataframe, points_pocket, useful_data
|
||||
points_pocket.append([point_timeframe, ideal_data, point_events, useful_p_data])
|
||||
return dataframe, points_pocket, useful_data
|
||||
|
||||
def update_settings(self, params: list[dict, dict]):
|
||||
self._params = params
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,148 +1,47 @@
|
||||
from datetime import datetime as dt
|
||||
from typing import Optional
|
||||
|
||||
from PyQt5 import QtWidgets
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtGui import QPixmap
|
||||
|
||||
from utils.base.base import BaseMainWindow, BaseController
|
||||
from gui.settings_window import SystemSettings, OperatorSettings
|
||||
from gui.reportGui import ReportSettings
|
||||
from gui.settings_window import settingsWindow
|
||||
|
||||
class MainWindow(BaseMainWindow):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None) -> None:
|
||||
controller: Optional[BaseController] = None):
|
||||
super().__init__()
|
||||
self._controller = controller
|
||||
self._init_startUI()
|
||||
|
||||
self.initUI()
|
||||
self.set_style(self)
|
||||
self.settings_button.clicked.connect(self._show_settings)
|
||||
self.select_dir_button.clicked.connect(self._select_dir)
|
||||
self.operSettings = settingsWindow("params/operator_params.json", 'Operator', self._updater_trigger)
|
||||
self.sysSettings = settingsWindow("params/system_params.json", 'System', self._updater_trigger)
|
||||
|
||||
def _init_startUI(self) -> None:
|
||||
self.operSettings = OperatorSettings("params/operator_params.json", 'Operator', self._transfer_settings)
|
||||
self.sysSettings = SystemSettings("params/system_params.json", 'System', self._transfer_settings)
|
||||
self.repSettings = ReportSettings()
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
|
||||
self._clear()
|
||||
seeking_mode_btn = QtWidgets.QPushButton("Real time folder scanning")
|
||||
seeking_mode_btn.setFixedWidth(300)
|
||||
seeking_mode_btn.clicked.connect(self._init_seekingUI)
|
||||
raport_mode_btn = QtWidgets.QPushButton("Raport editor")
|
||||
raport_mode_btn.setFixedWidth(300)
|
||||
raport_mode_btn.clicked.connect(self._init_raportUI)
|
||||
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(seeking_mode_btn)
|
||||
button_layout.addWidget(raport_mode_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
mainLayout = self.layout()
|
||||
label = QtWidgets.QLabel("Select work mode")
|
||||
label.setStyleSheet(
|
||||
"""QLabel{
|
||||
color: #ffffff;
|
||||
font-size: 40px;
|
||||
font-weight: bold;
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}"""
|
||||
)
|
||||
mainLayout.addWidget(label, alignment=Qt.AlignCenter)
|
||||
mainLayout.addWidget(button_widget)
|
||||
|
||||
def _clear(self) -> None:
|
||||
main = self.layout()
|
||||
if self.layout() is not None:
|
||||
while main.count():
|
||||
child = main.takeAt(0)
|
||||
if child.widget() is not None:
|
||||
child.widget().deleteLater()
|
||||
else: self.setLayout(QtWidgets.QVBoxLayout())
|
||||
|
||||
def _init_seekingUI(self) -> None:
|
||||
self._clear()
|
||||
self._transfer_settings()
|
||||
def initUI(self) -> None:
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
self.tabWidget.setTabsClosable(True)
|
||||
self.tabWidget.tabCloseRequested.connect(self._close_tab)
|
||||
layout = QtWidgets.QVBoxLayout()
|
||||
layout.addWidget(self.tabWidget)
|
||||
|
||||
sys_settings_btn = QtWidgets.QPushButton("System settings")
|
||||
sys_settings_btn.setFixedWidth(200)
|
||||
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
|
||||
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
|
||||
oper_settings_btn.setFixedWidth(200)
|
||||
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
|
||||
|
||||
self.settings_button = QtWidgets.QPushButton("Show settings")
|
||||
self.settings_button.setFixedWidth(160)
|
||||
self.select_dir_button = QtWidgets.QPushButton("Open directory")
|
||||
self.select_dir_button.setFixedWidth(175)
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(sys_settings_btn)
|
||||
button_layout.addWidget(oper_settings_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
title = QtWidgets.QLabel("online mode")
|
||||
mainLayout = self.layout()
|
||||
mainLayout.addWidget(self.tabWidget)
|
||||
mainLayout.addWidget(button_widget)
|
||||
mainLayout.addWidget(title, alignment=Qt.AlignRight)
|
||||
self.resize(800,800)
|
||||
self._controller.seeking_mode()
|
||||
# TODO:push seeking to mediator
|
||||
|
||||
def _init_raportUI(self) -> None:
|
||||
self._clear()
|
||||
self._transfer_settings()
|
||||
|
||||
self.tabWidget = QtWidgets.QTabWidget()
|
||||
self.tabWidget.setTabsClosable(True)
|
||||
self.tabWidget.tabCloseRequested.connect(self._close_tab)
|
||||
sys_settings_btn = QtWidgets.QPushButton("System settings")
|
||||
sys_settings_btn.setFixedWidth(185)
|
||||
sys_settings_btn.clicked.connect(lambda: self.sysSettings.show())
|
||||
oper_settings_btn = QtWidgets.QPushButton("Operator settings")
|
||||
oper_settings_btn.setFixedWidth(185)
|
||||
oper_settings_btn.clicked.connect(lambda: self.operSettings.show())
|
||||
view_settings_btn = QtWidgets.QPushButton("Customize view")
|
||||
view_settings_btn.setFixedWidth(185)
|
||||
view_settings_btn.clicked.connect(self._customization_window)
|
||||
save_screen_btn = QtWidgets.QPushButton("Save state")
|
||||
save_screen_btn.setFixedWidth(185)
|
||||
save_screen_btn.clicked.connect(self._save_plots)
|
||||
open_file_btn = QtWidgets.QPushButton("Open file")
|
||||
open_file_btn.setFixedWidth(185)
|
||||
open_file_btn.clicked.connect(self._open_file)
|
||||
|
||||
button_layout = QtWidgets.QHBoxLayout()
|
||||
button_layout.setSpacing(2)
|
||||
button_layout.addWidget(sys_settings_btn)
|
||||
button_layout.addWidget(oper_settings_btn)
|
||||
button_layout.addWidget(view_settings_btn)
|
||||
button_layout.addWidget(save_screen_btn)
|
||||
button_layout.addWidget(open_file_btn)
|
||||
button_widget = QtWidgets.QWidget()
|
||||
button_widget.setLayout(button_layout)
|
||||
|
||||
title = QtWidgets.QLabel("raport mode")
|
||||
mainLayout = self.layout()
|
||||
mainLayout.addWidget(self.tabWidget)
|
||||
mainLayout.addWidget(button_widget)
|
||||
mainLayout.addWidget(title, alignment=Qt.AlignRight)
|
||||
self.resize(800,800)
|
||||
self._controller.raport_mode(None)
|
||||
|
||||
#self._controller.raport_mode(path)
|
||||
# TODO: push only one dir to monitor
|
||||
button_layout.addWidget(self.settings_button)
|
||||
button_layout.addWidget(self.select_dir_button)
|
||||
|
||||
layout.addLayout(button_layout)
|
||||
self.setLayout(layout)
|
||||
|
||||
def show_plot_tabs(self, plot_widgets: list[QtWidgets.QWidget]) -> None:
|
||||
for plot_widget in plot_widgets:
|
||||
widget, reg_items, curve_items = plot_widget
|
||||
|
||||
tab = QtWidgets.QWidget()
|
||||
tab.setProperty("reg_items", reg_items)
|
||||
tab.setProperty("curve_items", curve_items)
|
||||
grid = QtWidgets.QGridLayout()
|
||||
grid.addWidget(widget)
|
||||
grid.addWidget(plot_widget)
|
||||
tab.setLayout(grid)
|
||||
self.tabWidget.addTab(tab, "SF_trace_" + dt.now().strftime('%Y_%m_%d-%H_%M_%S'))
|
||||
self.tabWidget.setCurrentWidget(tab)
|
||||
@ -152,61 +51,31 @@ class MainWindow(BaseMainWindow):
|
||||
for i in range(0, tab_count-2):
|
||||
self._close_tab(i)
|
||||
|
||||
def keyPressEvent(self, a0) -> None:
|
||||
if a0.key() == Qt.Key_F5:
|
||||
tab_count = self.tabWidget.count()
|
||||
for i in range(0, tab_count):
|
||||
self._close_tab(i)
|
||||
|
||||
def closeEvent(self, a0):
|
||||
self.operSettings.close()
|
||||
self.sysSettings.close()
|
||||
self.repSettings.close()
|
||||
super().closeEvent(a0)
|
||||
|
||||
def _show_settings(self) -> None:
|
||||
def keyPressEvent(self, a0):
|
||||
if a0.key() == Qt.Key_F5:
|
||||
pass
|
||||
|
||||
def _show_settings(self):
|
||||
self.operSettings.show()
|
||||
self.sysSettings.show()
|
||||
|
||||
def push_settings(self) -> None:
|
||||
self._updater_trigger()
|
||||
|
||||
def _transfer_settings(self) -> None:
|
||||
def _updater_trigger(self) -> None:
|
||||
self.tabWidget.clear()
|
||||
operator_params = self.operSettings.getParams()
|
||||
system_params = self.sysSettings.getParams()
|
||||
self._controller.update_settings([operator_params, system_params])
|
||||
self._controller.push_settings([operator_params, system_params])
|
||||
|
||||
def _close_tab(self, index:int) -> None:
|
||||
self.tabWidget.removeTab(index)
|
||||
|
||||
def _open_file(self) -> None:
|
||||
path = self._select_csv()
|
||||
self._controller.raport_mode(path)
|
||||
|
||||
def _select_csv(self) -> Optional[str]:
|
||||
CSV_path, _ = QtWidgets.QFileDialog.getOpenFileName(self,"Select csv file", "", "CSV Files (*.csv)")
|
||||
if CSV_path:
|
||||
print(CSV_path)
|
||||
return CSV_path
|
||||
return None
|
||||
|
||||
def _customization_window(self) -> None:
|
||||
tab = self.tabWidget.currentWidget()
|
||||
reg_items = tab.property("reg_items")
|
||||
curve_items = tab.property("curve_items")
|
||||
self.repSettings.build(reg_items, curve_items)
|
||||
|
||||
def _save_plots(self) -> None:
|
||||
filepath, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save file", "", "Image Files (*.png *.jpeg)")
|
||||
tab = self.tabWidget.currentWidget()
|
||||
pixmap = QPixmap(tab.size())
|
||||
tab.render(pixmap)
|
||||
pixmap.save(filepath)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _select_dir(self):
|
||||
folder_path = QtWidgets.QFileDialog.getExistingDirectory(self, 'Select directory', "")
|
||||
if folder_path:
|
||||
self._controller.open_custom_file(folder_path)
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
import pandas as pd
|
||||
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QLabel, QGraphicsRectItem
|
||||
import copy
|
||||
|
||||
|
||||
import pyqtgraph as pg
|
||||
import pandas as pd
|
||||
from typing import Optional, Any
|
||||
|
||||
from utils.base.base import BasePlotWidget
|
||||
@ -12,6 +14,56 @@ class ProcessStage():
|
||||
finish_index:int
|
||||
|
||||
class PlotWidget(BasePlotWidget):
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotWidget,
|
||||
dataframe: pd.DataFrame,
|
||||
real_signals: list[dict[str, Any]]) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotWidget(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
|
||||
for signal in real_signals:
|
||||
if signal["name"] in dataframe.columns:
|
||||
x = dataframe["time"]
|
||||
y = dataframe[signal["name"]]
|
||||
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal["pen"], name=signal["name"])
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100))
|
||||
navigator.addItem(ROI_region)
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
|
||||
return navigator, ROI_region
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotWidget,
|
||||
region: pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
def _create_curve_ideal(self,
|
||||
signal: dict[str, Any],
|
||||
@ -45,20 +97,20 @@ class PlotWidget(BasePlotWidget):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _init_plot_item(title: str) -> tuple[pg.PlotItem, pg.LegendItem]:
|
||||
plot_item = pg.PlotItem(title=title)
|
||||
def _init_plot_widget(title: str) -> tuple[pg.PlotWidget, pg.LegendItem]:
|
||||
plot_widget = pg.PlotWidget(title=title)
|
||||
# Оптимизация отображения графиков
|
||||
plot_item.setDownsampling(auto=True, mode='peak')
|
||||
plot_item.showGrid(x=True, y=True)
|
||||
plot_item.setClipToView(True)
|
||||
legend = plot_item.addLegend(offset=(70, 20))
|
||||
return plot_item, legend
|
||||
plot_widget.setDownsampling(auto=True, mode='peak')
|
||||
plot_widget.showGrid(x=True, y=True)
|
||||
plot_widget.setClipToView(True)
|
||||
legend = pg.LegendItem((80, 60), offset=(70, 20))
|
||||
legend.setParentItem(plot_widget.graphicsItem())
|
||||
return plot_widget, legend
|
||||
|
||||
def _add_stage_regions(self,
|
||||
plot_item: pg.PlotItem,
|
||||
plot_widget: pg.PlotWidget,
|
||||
point_events: dict[str, list[float]],
|
||||
dataframe_headers: list[str],
|
||||
reg_items: dict,
|
||||
transparency: int = 75) -> None:
|
||||
"""
|
||||
Добавляет регионы для реальных этапов, если все стадии есть в заголовках датафрейма.
|
||||
@ -70,15 +122,12 @@ class PlotWidget(BasePlotWidget):
|
||||
region = self._create_stage_region(stage, start_t, end_t, transparency)
|
||||
if region is not None:
|
||||
region.setZValue(-20)
|
||||
plot_item.addItem(region)
|
||||
reg_items["real"].setdefault(stage, [])
|
||||
reg_items["real"][stage].append(region)
|
||||
plot_widget.addItem(region)
|
||||
|
||||
def _add_ideal_stage_regions(self,
|
||||
plot_item: pg.PlotItem,
|
||||
plot_widget: pg.PlotWidget,
|
||||
ideal_data: dict[str, Any],
|
||||
point_events: dict[str, list[float]],
|
||||
reg_items: dict,
|
||||
transparency: int = 125) -> None:
|
||||
"""
|
||||
Добавляет регионы для идеальных этапов.
|
||||
@ -91,16 +140,13 @@ class PlotWidget(BasePlotWidget):
|
||||
region = self._create_stage_region(stage, start_t, end_t, transparency)
|
||||
if region:
|
||||
region.setZValue(-10)
|
||||
plot_item.addItem(region)
|
||||
reg_items["ideal"].setdefault(stage, [])
|
||||
reg_items["ideal"][stage].append(region)
|
||||
plot_widget.addItem(region)
|
||||
|
||||
def _add_ideal_signals(self,
|
||||
plot_item: pg.PlotItem,
|
||||
plot_widget: pg.PlotWidget,
|
||||
ideal_data: dict[str, Any],
|
||||
point_events: dict[str, list[float]],
|
||||
ideal_signals: list[dict[str, Any]],
|
||||
curve_items: dict) -> None:
|
||||
ideal_signals: list[dict[str, Any]]) -> None:
|
||||
"""
|
||||
Добавляет идеальные сигналы для каждого этапа.
|
||||
"""
|
||||
@ -113,29 +159,23 @@ class PlotWidget(BasePlotWidget):
|
||||
point_events[stage][1]
|
||||
)
|
||||
if curve:
|
||||
curve.setZValue(50)
|
||||
plot_item.addItem(curve)
|
||||
curve_items["ideal"].setdefault(signal["name"], {})
|
||||
curve_items["ideal"][signal["name"]].setdefault(stage, [])
|
||||
curve_items["ideal"][signal["name"]][stage].append(curve)
|
||||
|
||||
curve.setZValue(10)
|
||||
plot_widget.addItem(curve)
|
||||
|
||||
def _add_real_signals(self,
|
||||
plot_item: pg.PlotItem,
|
||||
plot_widget: pg.PlotWidget,
|
||||
dataframe: pd.DataFrame,
|
||||
real_signals: list[dict[str, Any]],
|
||||
legend: pg.LegendItem,
|
||||
curve_items: dict) -> None:
|
||||
legend: pg.LegendItem) -> None:
|
||||
"""
|
||||
Добавляет реальные сигналы из dataframe на виджет.
|
||||
"""
|
||||
dataframe_headers = dataframe.columns.tolist()
|
||||
for signal in real_signals:
|
||||
if signal["name"] in dataframe_headers:
|
||||
plot = plot_item.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
|
||||
plot = plot_widget.plot(dataframe["time"], dataframe[signal["name"]], pen=signal["pen"], fast=True)
|
||||
plot.setZValue(0)
|
||||
legend.addItem(plot, signal["name"])
|
||||
curve_items["real"].setdefault(signal["name"], {})
|
||||
curve_items["real"][signal["name"]] = plot
|
||||
|
||||
def _add_performance_label(self,
|
||||
layout: QVBoxLayout,
|
||||
@ -150,35 +190,52 @@ class PlotWidget(BasePlotWidget):
|
||||
TWC_ideal = round((ideal_time/TWC_time)*100, 2) if TWC_time else 0.0
|
||||
|
||||
performance_label = QLabel(
|
||||
f"Сокращение длительности: фактическое = {tesla_TWC} %"
|
||||
f"Сокращение длительности: фактическое = {tesla_TWC} %, "
|
||||
f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
|
||||
)
|
||||
#f"идеальное = {tesla_ideal} %; КДИП = {TWC_ideal}%"
|
||||
self.set_style(performance_label)
|
||||
layout.addWidget(performance_label)
|
||||
performance_label.update()
|
||||
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
|
||||
def _build_widget(self, data: list[Any]) -> QWidget:
|
||||
"""
|
||||
Собирает графический виджет для одного набора данных.
|
||||
Параметр `data` предполагается списком: [dataframe, points_pocket, useful_data].
|
||||
"""
|
||||
result_widget = QWidget()
|
||||
result_layout = QVBoxLayout(result_widget)
|
||||
plot_layout = pg.GraphicsLayoutWidget()
|
||||
reg_items = {"real":{}, "ideal":{}}
|
||||
curve_items = {"real":{}, "ideal":{}}
|
||||
widget = QWidget()
|
||||
layout = QVBoxLayout(widget)
|
||||
|
||||
dataframe, points_pocket, useful_data = data
|
||||
tesla_time = useful_data["tesla_time"]
|
||||
range_ME = useful_data["range_ME"]
|
||||
k_hardness = useful_data["k_hardness"]
|
||||
|
||||
dat_is_none = dataframe is None
|
||||
|
||||
if not dat_is_none: dataframe_headers = dataframe.columns.tolist()
|
||||
dataframe_headers = dataframe.columns.tolist()
|
||||
|
||||
for widget_num, (channel, description) in enumerate(self._plt_channels.items()):
|
||||
plot_item, legend = self._init_plot_item(title=channel)
|
||||
plot_widget, legend = self._init_plot_widget(title=channel)
|
||||
settings = description["Settings"]
|
||||
|
||||
TWC_time = 0.0
|
||||
@ -186,31 +243,17 @@ class PlotWidget(BasePlotWidget):
|
||||
worst_perf = 2
|
||||
|
||||
# TODO: рассчитать корректный параметр range
|
||||
if settings["mirror ME"] and not dat_is_none:
|
||||
dataframe = self._mirror_shift_data(
|
||||
"ME",
|
||||
description["Real_signals"],
|
||||
dataframe,
|
||||
range_ME
|
||||
)
|
||||
if settings["mirror ME"]:
|
||||
dataframe = self._mirror_shift_data("ME", description["Real_signals"], dataframe, range_ME)
|
||||
|
||||
# Итерация по точкам
|
||||
for cur_point, point_data in enumerate(points_pocket):
|
||||
# point_data структура: [point_timeframe, ideal_data, point_events, useful_p_data]
|
||||
# point_data структура: [point_timeframe, ideal_data, point_events]
|
||||
point_timeframe, ideal_dat, point_events, useful_p_data = point_data
|
||||
ideal_data = copy.deepcopy(ideal_dat)
|
||||
|
||||
if dat_is_none:
|
||||
worst_timeframe = point_timeframe = [0, ideal_data["Ideal cycle"]]
|
||||
point_events = {}
|
||||
keys = list(ideal_data.keys())
|
||||
shift = 0
|
||||
for i, time in enumerate(ideal_data["Ideal timings"]):
|
||||
point_events[keys[i]] = [shift, time+shift]
|
||||
shift += time
|
||||
|
||||
|
||||
# TODO: проверить корректность расчетов
|
||||
if settings["force compensation FE"] and not dat_is_none:
|
||||
if settings["force compensation FE"]:
|
||||
force = useful_p_data["force"]
|
||||
F_comp = - force/k_hardness
|
||||
point_idxs = dataframe[(dataframe["time"] >= point_timeframe[0]) & (dataframe["time"] <= point_timeframe[1])].index
|
||||
@ -223,9 +266,10 @@ class PlotWidget(BasePlotWidget):
|
||||
ideal_data[stage] = self._mirror_shift_data("ME", description["Ideal_signals"], ideal_data[stage], range_ME)
|
||||
|
||||
# Добавляем реальные стадии
|
||||
if settings["stages"] and not dat_is_none:
|
||||
self._add_stage_regions(plot_item, point_events, dataframe_headers, reg_items, 75)
|
||||
if settings["stages"]:
|
||||
self._add_stage_regions(plot_widget, point_events, dataframe_headers, 75)
|
||||
|
||||
# TODO: подобрать не вырвеглазные цвета, возможно ограничить зону
|
||||
if settings["workpiece"]:
|
||||
x1 = point_timeframe[0]
|
||||
dx = point_timeframe[1] - x1
|
||||
@ -236,30 +280,15 @@ class PlotWidget(BasePlotWidget):
|
||||
rect_item.setZValue(-5)
|
||||
rect_item.setBrush(pg.mkBrush('grey'))
|
||||
rect_item.setPen(pg.mkPen('black', width=3))
|
||||
plot_item.addItem(rect_item)
|
||||
|
||||
if settings["force accuracy"]and not dat_is_none:
|
||||
modifier = 0.05
|
||||
|
||||
x1 = point_events["Welding"][0]
|
||||
dx = point_events["Welding"][1] - x1
|
||||
force = useful_p_data["force"]
|
||||
y1 = force*(1-modifier)
|
||||
dy = force*(2*modifier)
|
||||
|
||||
rect_item = QGraphicsRectItem(x1, y1, dx, dy)
|
||||
rect_item.setZValue(-5)
|
||||
rect_item.setBrush(pg.mkBrush((0,255,0, 50)))
|
||||
rect_item.setPen(pg.mkPen('black', width=0))
|
||||
plot_item.addItem(rect_item)
|
||||
plot_widget.addItem(rect_item)
|
||||
|
||||
# Добавляем идеальные стадии и идеальные сигналы
|
||||
if settings["ideals"]:
|
||||
self._add_ideal_stage_regions(plot_item, ideal_data, point_events, reg_items, 100)
|
||||
self._add_ideal_signals(plot_item, ideal_data, point_events, description["Ideal_signals"], curve_items)
|
||||
self._add_ideal_stage_regions(plot_widget, ideal_data, point_events, 100)
|
||||
self._add_ideal_signals(plot_widget, ideal_data, point_events, description["Ideal_signals"])
|
||||
|
||||
# Подсчёт производительности
|
||||
if settings["performance"]and not dat_is_none:
|
||||
if settings["performance"]:
|
||||
is_last_point = (cur_point == len(points_pocket) - 1)
|
||||
if is_last_point:
|
||||
TWC_delta = sum([point_events[stage][1] - point_events[stage][0] for stage in ["Closing", "Squeeze", "Welding"]])
|
||||
@ -270,46 +299,53 @@ class PlotWidget(BasePlotWidget):
|
||||
TWC_time += TWC_delta
|
||||
ideal_time += ideal_delta
|
||||
curr_perf = ideal_delta/TWC_delta if TWC_delta != 0 else 1
|
||||
|
||||
if curr_perf < worst_perf:
|
||||
worst_perf = curr_perf
|
||||
worst_timeframe = point_timeframe
|
||||
|
||||
# Добавляем реальные сигналы
|
||||
if not dat_is_none:
|
||||
self._add_real_signals(plot_item, dataframe, description["Real_signals"], legend, curve_items)
|
||||
self._add_real_signals(plot_widget, dataframe, description["Real_signals"], legend)
|
||||
if widget_num == 0:
|
||||
main_plot = plot_item
|
||||
main_plot = plot_widget
|
||||
else:
|
||||
# Связываем остальные графики с основным графиком
|
||||
plot_item.setXLink(main_plot)
|
||||
plot_widget.setXLink(main_plot)
|
||||
|
||||
if settings["performance"] and not dat_is_none:
|
||||
self._add_performance_label(result_layout, TWC_time, ideal_time, tesla_time)
|
||||
# Если есть настройка производительности, добавляем label
|
||||
if settings["performance"]:
|
||||
self._add_performance_label(layout, TWC_time, ideal_time, tesla_time)
|
||||
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot, dataframe, description["Real_signals"])
|
||||
|
||||
plot_layout.addItem(plot_item, widget_num, 0)
|
||||
|
||||
navigator, ROI_region = self._create_navigator(worst_timeframe, main_plot)
|
||||
if navigator is not None:
|
||||
plot_layout.addItem(navigator, widget_num+1, 0)
|
||||
self._sync_main_plot_with_navigator(main_plot, ROI_region)
|
||||
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
|
||||
layout.addWidget(plot_widget)
|
||||
|
||||
layout.addWidget(navigator)
|
||||
self._sync_main_plot_with_navigator(main_plot, ROI_region)
|
||||
main_plot.sigXRangeChanged.connect(lambda _, plot=main_plot, region=ROI_region: self._sync_navigator_with_main(main_plot=plot, region=region))
|
||||
|
||||
result_layout.addWidget(plot_layout)
|
||||
return result_widget, reg_items, curve_items
|
||||
widget.setLayout(layout)
|
||||
return widget
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotWidget, region:pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
|
||||
def build(self, data: list[list[Any]]) -> None:
|
||||
"""
|
||||
Создает набор виджетов по предоставленному списку данных.
|
||||
Предполагается, что data — это список элементов вида:
|
||||
[
|
||||
[dataframe, points_pocket, useful_data],
|
||||
[dataframe, points_pocket, useful_data],
|
||||
[dataframe, points_pocket, tesla_time],
|
||||
[dataframe, points_pocket, tesla_time],
|
||||
...
|
||||
]
|
||||
"""
|
||||
widgets_datapack = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets_datapack)
|
||||
widgets = [self._build_widget(data_sample) for data_sample in data]
|
||||
self._mediator.notify(self, widgets)
|
||||
|
||||
|
||||
|
||||
@ -1,169 +0,0 @@
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph.parametertree import Parameter, ParameterTree
|
||||
from typing import Union
|
||||
from PyQt5 import QtWidgets
|
||||
|
||||
|
||||
class ReportSettings(QtWidgets.QWidget):
|
||||
|
||||
def build(self, reg_items: dict, curve_items: dict) -> None:
|
||||
"""Создает ParameterTree для элементов всех графиков выбранной вкладки"""
|
||||
self._clear()
|
||||
param_tree = ParameterTree()
|
||||
layout = self.layout()
|
||||
layout.addWidget(param_tree)
|
||||
|
||||
body= [
|
||||
self._generate_reg_params(reg_items),
|
||||
self._generate_curve_params(curve_items)
|
||||
]
|
||||
# Добавляем параметры в дерево
|
||||
params = Parameter.create(name='params', type='group', children=body)
|
||||
params.sigTreeStateChanged.connect(
|
||||
lambda: self._update_settings(reg_items, curve_items, params)
|
||||
)
|
||||
param_tree.setParameters(params, showTop=False)
|
||||
self.show()
|
||||
|
||||
def _clear(self) -> None:
|
||||
"""
|
||||
Приводит виджет в готовое к работе состояние.
|
||||
Удаляет все содержимое, если имеется
|
||||
"""
|
||||
main = self.layout()
|
||||
if self.layout() is not None:
|
||||
while main.count():
|
||||
child = main.takeAt(0)
|
||||
if child.widget() is not None:
|
||||
child.widget().deleteLater()
|
||||
else:
|
||||
self.setLayout(QtWidgets.QVBoxLayout())
|
||||
|
||||
def _generate_reg_params(self,
|
||||
reg_items: dict) -> dict:
|
||||
|
||||
"""Созадет реальные и идеальные секторы"""
|
||||
|
||||
res = {'name': 'Sectors', 'type': 'group', 'children': [
|
||||
{'name': 'Real sectors', 'type': 'group', 'children': self._create_samples(reg_items["real"])},
|
||||
{'name': 'Ideal sectors', 'type': 'group', 'children': self._create_samples(reg_items["ideal"])},
|
||||
]}
|
||||
return res
|
||||
|
||||
def _generate_curve_params(self,
|
||||
curve_items: dict) -> dict:
|
||||
|
||||
"""Создает реальные и идеальные линии графиков"""
|
||||
|
||||
res = {'name': 'Plots', 'type': 'group', 'children': [
|
||||
{'name': 'Real plots', 'type': 'group', 'children': self._create_samples(curve_items["real"])},
|
||||
{'name': 'Ideal plots', 'type': 'group', 'children': self._create_ideal_curves(curve_items["ideal"])},
|
||||
]}
|
||||
return res
|
||||
|
||||
def _create_ideal_curves(self,
|
||||
curve: dict) -> list[dict]:
|
||||
|
||||
"""Создает секторы с этапами циклограммы"""
|
||||
|
||||
res = []
|
||||
for key, item in curve.items():
|
||||
param = {'name': key, 'type': 'group', 'children': self._create_samples(item)}
|
||||
res.append(param)
|
||||
return res
|
||||
|
||||
def _create_samples(self,
|
||||
sector: dict) -> list[dict]:
|
||||
|
||||
"""Создает список представленных элементов с их параметрами"""
|
||||
|
||||
res = []
|
||||
for key, item in sector.items():
|
||||
sample = item[0] if type(item) == list else item
|
||||
param = {'name': key, 'type': 'group', 'children': self._create_settings(sample)}
|
||||
res.append(param)
|
||||
return res
|
||||
|
||||
def _create_settings(self,
|
||||
item: Union[pg.LinearRegionItem, pg.PlotDataItem]) -> list[dict]:
|
||||
|
||||
"""Получает настройки для элемента"""
|
||||
|
||||
if type(item) == pg.LinearRegionItem:
|
||||
pen = item.lines[0].pen
|
||||
brush = item.brush
|
||||
fill_color = brush.color().getRgb()
|
||||
else:
|
||||
pen = pg.mkPen(item.opts.get("pen"))
|
||||
fill_color = None
|
||||
line_color = pen.color().getRgb()
|
||||
line_thickness = pen.width()
|
||||
visibility = item.isVisible()
|
||||
|
||||
return [
|
||||
{'name': 'Line color', 'type': 'color', 'value': line_color},
|
||||
{'name': 'Line thickness', 'type': 'int', 'value': line_thickness, 'limits': (1, 10)},
|
||||
{'name': 'Visibility', 'type': 'bool', 'value': visibility},
|
||||
{'name': 'Fill color', 'type': 'color', 'value': fill_color},
|
||||
]
|
||||
|
||||
def _update_settings(self,
|
||||
reg_items: dict,
|
||||
curve_items: dict,
|
||||
params: Parameter) -> None:
|
||||
|
||||
"""Задает параметры элементов в соответствии с paramTree"""
|
||||
|
||||
real_sectors = params.child("Sectors").child("Real sectors")
|
||||
ideal_sectors = params.child("Sectors").child("Ideal sectors")
|
||||
|
||||
real_plots = params.child("Plots").child("Real plots")
|
||||
ideal_plots = params.child("Plots").child("Ideal plots")
|
||||
|
||||
self._set_sector_settings(reg_items["real"], real_sectors)
|
||||
self._set_sector_settings(reg_items["ideal"], ideal_sectors)
|
||||
|
||||
self._set_plot_settings(curve_items["real"], real_plots)
|
||||
for key, item_dict in curve_items["ideal"].items():
|
||||
self._set_plot_settings(item_dict, ideal_plots.child(key))
|
||||
|
||||
def _set_sector_settings(self,
|
||||
sectors: dict,
|
||||
settings: Parameter) -> None:
|
||||
|
||||
"""Задает параметры секторов в соответствии с настройками"""
|
||||
|
||||
for key, item in sectors.items():
|
||||
sample = settings.child(key)
|
||||
line_color = sample.child("Line color").value()
|
||||
line_width = sample.child("Line thickness").value()
|
||||
visibility = sample.child("Visibility").value()
|
||||
fill_color = sample.child("Fill color").value()
|
||||
|
||||
pen = pg.mkPen(color=line_color, width=line_width)
|
||||
brush=pg.mkBrush(fill_color)
|
||||
for reg in item:
|
||||
reg.setVisible(visibility)
|
||||
reg.lines[0].setPen(pen)
|
||||
reg.lines[1].setPen(pen)
|
||||
reg.setBrush(brush)
|
||||
|
||||
def _set_plot_settings(self,
|
||||
curves:dict,
|
||||
settings: Parameter) -> None:
|
||||
|
||||
"""Задает параметры кривых в соответствии с настройками"""
|
||||
|
||||
for key, item in curves.items():
|
||||
sample = settings.child(key)
|
||||
line_color = sample.child("Line color").value()
|
||||
line_width = sample.child("Line thickness").value()
|
||||
visibility = sample.child("Visibility").value()
|
||||
pen = pg.mkPen(color=line_color, width=line_width)
|
||||
if type(item) == list:
|
||||
for curve in item:
|
||||
curve.setVisible(visibility)
|
||||
curve.setPen(pen)
|
||||
else:
|
||||
item.setVisible(visibility)
|
||||
item.setPen(pen)
|
||||
@ -1,8 +1,7 @@
|
||||
from typing import Callable, Optional, Any
|
||||
from PyQt5.QtWidgets import (QWidget, QPushButton,
|
||||
QLineEdit, QHBoxLayout,
|
||||
QVBoxLayout, QLabel,
|
||||
QTableWidget, QTableWidgetItem)
|
||||
from PyQt5.QtWidgets import (
|
||||
QWidget, QPushButton, QLineEdit, QHBoxLayout, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem
|
||||
)
|
||||
from PyQt5.QtGui import QIntValidator
|
||||
|
||||
from utils.json_tools import read_json, write_json
|
||||
@ -12,6 +11,7 @@ class settingsWindow(QWidget):
|
||||
def __init__(self, path: str, name: str, upd_func: Callable[[], None]):
|
||||
"""
|
||||
Окно настроек для редактирования параметров.
|
||||
|
||||
:param path: Путь к файлу настроек (JSON).
|
||||
:param name: Название набора настроек.
|
||||
:param upd_func: Функция обновления (коллбэк).
|
||||
@ -176,16 +176,7 @@ class settingsWindow(QWidget):
|
||||
self._data[key].extend([float(last_value) if key != "trace_storage_path" else last_value] * additional_count)
|
||||
|
||||
|
||||
class SystemSettings(settingsWindow):
|
||||
def __init__(self, path, name, upd_func):
|
||||
super().__init__(path, name, upd_func)
|
||||
self._num_points.setVisible(False)
|
||||
|
||||
def _expand(self):
|
||||
pass
|
||||
|
||||
class OperatorSettings(settingsWindow):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@ -24,9 +24,10 @@ def main():
|
||||
window.show()
|
||||
|
||||
controller.signal_widgets.connect(window.show_plot_tabs)
|
||||
controller.signal_settings.connect(mediator.update_settings)
|
||||
controller.signal_raport_mode.connect(monitor.custom_csv_extract_only)
|
||||
controller.signal_seeking_mode.connect(monitor.start_seeking)
|
||||
controller.signal_settings.connect(mediator.push_settings)
|
||||
controller.signal_monitor.connect(monitor.custom_dir_extract)
|
||||
|
||||
window.push_settings()
|
||||
|
||||
sys.exit(app.exec_())
|
||||
|
||||
|
||||
@ -1,43 +0,0 @@
|
||||
from src.OptAlgorithm.OptAlgorithm import OptAlgorithm
|
||||
from src.utils import read_json
|
||||
|
||||
from matplotlib import pyplot as plt, use
|
||||
|
||||
from numpy import cos, sin, sqrt, cbrt, arcsin, linspace, array
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
tq = 1
|
||||
ts = linspace(0, tq, 200000)
|
||||
|
||||
operator_params = read_json("params/operator_params.json")
|
||||
system_params = read_json("params/system_params.json")
|
||||
|
||||
non_array_operator_params = {}
|
||||
i = 1
|
||||
for key, value in operator_params.items():
|
||||
if hasattr(value, "__len__"):
|
||||
if len(value) > i:
|
||||
non_array_operator_params[key] = value[i]
|
||||
else:
|
||||
non_array_operator_params[key] = value[0]
|
||||
else:
|
||||
non_array_operator_params[key] = value
|
||||
|
||||
non_array_system_params = {}
|
||||
for key, value in system_params.items():
|
||||
if hasattr(value, "__len__"):
|
||||
if len(value) > i:
|
||||
non_array_system_params[key] = value[i]
|
||||
else:
|
||||
non_array_system_params[key] = value[0]
|
||||
else:
|
||||
non_array_system_params[key] = value
|
||||
|
||||
|
||||
opt = OptAlgorithm(non_array_operator_params, non_array_system_params)
|
||||
Xs = array([opt.getVar("X1", t) for t in ts])
|
||||
|
||||
|
||||
plt.plot(ts, Xs)
|
||||
plt.show()
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -5,11 +5,14 @@ from typing import Optional, Union, Any
|
||||
from cachetools import LRUCache
|
||||
|
||||
import pandas as pd
|
||||
from PyQt5.QtCore import QObject, QTimer
|
||||
from PyQt5.QtCore import QThread, QObject, QTimer
|
||||
from PyQt5.QtWidgets import QWidget, QTabWidget
|
||||
from PyQt5.QtOpenGL import QGLWidget
|
||||
from OptAlgorithm import OptAlgorithm
|
||||
import pandas as pd
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import pyqtgraph as pg
|
||||
|
||||
|
||||
|
||||
|
||||
@ -35,7 +38,7 @@ class BaseMediator:
|
||||
source: Union[BaseDirectoryMonitor, BaseDataConverter, BasePointPassportFormer, BasePlotWidget],
|
||||
data: Union[list[str], list[pd.DataFrame], list[list], list[QWidget]]):
|
||||
...
|
||||
def update_settings (self, data: list[dict]):
|
||||
def push_settings (self, data: list[dict]):
|
||||
...
|
||||
|
||||
class BaseDirectoryMonitor:
|
||||
@ -47,8 +50,7 @@ class BaseDirectoryMonitor:
|
||||
super().__init__()
|
||||
self._directory_path = None
|
||||
self._update_time = None
|
||||
self.isActive = False
|
||||
self._files = []
|
||||
|
||||
self._mediator = mediator
|
||||
|
||||
|
||||
@ -77,19 +79,14 @@ class BaseDirectoryMonitor:
|
||||
self._files = files
|
||||
|
||||
def start(self):
|
||||
self.isActive = True
|
||||
self.update_timer.start(int(self._update_time))
|
||||
|
||||
def stop(self):
|
||||
self.isActive = False
|
||||
self.update_timer.stop()
|
||||
|
||||
def update_settings(self, data: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def update_plots(self) -> None:
|
||||
...
|
||||
|
||||
def force_all_dir(self):
|
||||
...
|
||||
|
||||
@ -131,8 +128,7 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": False,
|
||||
"workpiece": False,
|
||||
"force compensation FE": False,
|
||||
"force accuracy":True
|
||||
"force compensation FE": False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -163,8 +159,7 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": True,
|
||||
"workpiece": True,
|
||||
"force compensation FE": True,
|
||||
"force accuracy":False
|
||||
"force compensation FE": True
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -195,8 +190,7 @@ class BasePlotWidget:
|
||||
"ideals": True,
|
||||
"mirror ME": False,
|
||||
"workpiece": False,
|
||||
"force compensation FE": False,
|
||||
"force accuracy":False
|
||||
"force compensation FE": False
|
||||
},
|
||||
"Real_signals": [
|
||||
{
|
||||
@ -233,88 +227,6 @@ class BasePlotWidget:
|
||||
font-family: "Segoe UI", sans-serif;
|
||||
}""")
|
||||
|
||||
def _downsample_data(self, x, y, max_points=5000):
|
||||
"""
|
||||
Понижает разрешение данных до заданного количества точек для улучшения производительности навигатора.
|
||||
"""
|
||||
if len(x) > max_points:
|
||||
factor = len(x) // max_points
|
||||
x_downsampled = x[::factor]
|
||||
y_downsampled = y[::factor]
|
||||
return x_downsampled, y_downsampled
|
||||
return x, y
|
||||
|
||||
def _create_navigator(self,
|
||||
time_region:tuple[float, float],
|
||||
main_plot: pg.PlotItem) -> list[pg.PlotWidget, pg.LinearRegionItem]:
|
||||
"""
|
||||
Создаёт график-навигатор, отображающий все данные в уменьшенном масштабе.
|
||||
"""
|
||||
navigator = pg.PlotItem(title="Navigator")
|
||||
navigator.setFixedHeight(100)
|
||||
# Получение кривых из main_plot
|
||||
for curve in main_plot.listDataItems():
|
||||
# Извлекаем данные из кривой
|
||||
x, y = curve.getData()
|
||||
curve_name = curve.opts.get("name", None)
|
||||
signal_pen = curve.opts.get("pen", None)
|
||||
x_downsampled, y_downsampled = self._downsample_data(x, y, max_points=1000)
|
||||
navigator.plot(x_downsampled, y_downsampled, pen=signal_pen, name=curve_name)
|
||||
|
||||
ROI_region = pg.LinearRegionItem(values=time_region, movable=True, brush=pg.mkBrush(0, 0, 255, 100), pen=pg.mkPen(width=4))
|
||||
ROI_region.setBounds([0, x[-1]])
|
||||
navigator.addItem(ROI_region)
|
||||
navigator.getViewBox().setLimits(xMin=0, xMax=x[-1])
|
||||
|
||||
# Связываем изменение региона навигатора с обновлением области просмотра основного графика
|
||||
ROI_region.sigRegionChanged.connect(lambda: self._sync_main_plot_with_navigator(main_plot, ROI_region))
|
||||
|
||||
return navigator, ROI_region
|
||||
|
||||
def _sync_main_plot_with_navigator(self,
|
||||
main_plot: pg.PlotItem,
|
||||
region: pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует область просмотра основного графика с регионом навигатора.
|
||||
"""
|
||||
x_min, x_max = region.getRegion()
|
||||
if main_plot:
|
||||
main_plot.blockSignals(True)
|
||||
main_plot.setXRange(x_min, x_max, padding=0)
|
||||
main_plot.blockSignals(False)
|
||||
|
||||
def _mirror_shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: shift-x)
|
||||
return dataframe
|
||||
|
||||
def _shift_data(self,
|
||||
valid_str: str,
|
||||
signals: list[dict],
|
||||
dataframe: pd.DataFrame,
|
||||
shift: float) -> pd.DataFrame:
|
||||
keys = dataframe.keys()
|
||||
for signal in signals:
|
||||
if valid_str in signal["name"] and signal["name"] in keys:
|
||||
dataframe[signal["name"]] = dataframe[signal["name"]].apply(lambda x: x + shift)
|
||||
return dataframe
|
||||
|
||||
def _sync_navigator_with_main(self, main_plot: pg.PlotItem, region:pg.LinearRegionItem):
|
||||
"""
|
||||
Синхронизирует регион навигатора с областью просмотра основного графика.
|
||||
"""
|
||||
if region:
|
||||
x_min, x_max = main_plot
|
||||
region.blockSignals(True) # Предотвращаем рекурсию
|
||||
region.setRegion([x_min, x_max])
|
||||
region.blockSignals(False)
|
||||
|
||||
@property
|
||||
def mediator(self) -> BaseMediator:
|
||||
return self._mediator
|
||||
@ -340,14 +252,12 @@ class BaseController(QObject):
|
||||
def send_widgets(self, widgets: list[QWidget]) -> None:
|
||||
...
|
||||
|
||||
def update_settings(self, settings: list[dict]) -> None:
|
||||
def push_settings(self, settings: list[dict]) -> None:
|
||||
...
|
||||
|
||||
def raport_mode (self, filepath: str) -> None:
|
||||
def open_custom_file (self, filepath: str) -> None:
|
||||
...
|
||||
|
||||
def seeking_mode(self) -> None:
|
||||
...
|
||||
|
||||
class BaseIdealDataBuilder(OptAlgorithm):
|
||||
|
||||
@ -360,12 +270,10 @@ class BaseIdealDataBuilder(OptAlgorithm):
|
||||
|
||||
def _get_data(self, end_timestamp:float, func:function) -> pd.DataFrame:
|
||||
data = []
|
||||
for i in range (0, int(end_timestamp*self.mul)+1):
|
||||
for i in range (0, int(end_timestamp*self.mul)):
|
||||
time = i/self.mul
|
||||
X1, X2, V1, V2, F = func(time)
|
||||
data.append({"time":time, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
X1, X2, V1, V2, F = func(end_timestamp)
|
||||
data.append({"time":end_timestamp, "Position FE":X1*1000,"Position ME":X2*1000, "Rotor Speed FE":V1*1000, "Rotor Speed ME":V2*1000, "Force":F})
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_closingDF(self) -> pd.DataFrame:
|
||||
@ -397,8 +305,6 @@ class BaseMainWindow(QWidget):
|
||||
def __init__(self,
|
||||
controller: Optional[BaseController] = None):
|
||||
super().__init__()
|
||||
self.set_style(self)
|
||||
self.resize(200,200)
|
||||
self._controller = controller
|
||||
...
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user