255 lines
11 KiB
Python
255 lines
11 KiB
Python
import pyqtgraph as pg
|
||
import numpy as np
|
||
|
||
from utils import read_json, DataParser
|
||
from Requestinator import Request
|
||
#нижний fe x1
|
||
import qt_settings as qts
|
||
from OptAlgorithm import OptAlgorithm
|
||
|
||
|
||
class Application:
|
||
def __init__(self, opt: OptAlgorithm):
|
||
pg.setConfigOptions(antialias=True)
|
||
self.opt = opt
|
||
self.scaler = 1000
|
||
self.parser = DataParser(self.scaler)
|
||
self.r = Request()
|
||
self._getIdealTimings()
|
||
self._init_app()
|
||
self.updateUML()
|
||
self.WeldTime = 0.5 #[sec]
|
||
self.alpha = 100 #[0-255 прозрачность фона]
|
||
|
||
def _init_app(self):
|
||
self.app = pg.mkQApp("Plotting")
|
||
self.win = pg.GraphicsLayoutWidget(show=True, title="")
|
||
self.win.resize(1000,600)
|
||
self.win.setWindowTitle('')
|
||
|
||
self.p11, self.l11 = self._init_graph('Electrode force, closure', 'Force', 'N', 'Time', 'ms')
|
||
#self.p21, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
|
||
#self.p31, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms')
|
||
self.win.nextRow()
|
||
self.p12, self.l12 = self._init_graph('Rotor Position, closure', 'Posicion', 'mm', 'Time', 'ms')
|
||
#self.p22, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
|
||
#self.p32, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms')
|
||
self.win.nextRow()
|
||
self.p13, self.l13 = self._init_graph('Rotor Speed, closure', 'Speed', 'mm/s', 'Time', 'ms')
|
||
#self.p23, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
|
||
#self.p33, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms')
|
||
self.win.nextRow()
|
||
|
||
self.p12.setXLink(self.p11)
|
||
self.p13.setXLink(self.p11)
|
||
|
||
self.p11.setAutoVisible(x=False, y=True)
|
||
self.p12.setAutoVisible(x=False, y=True)
|
||
self.p13.setAutoVisible(x=False, y=True)
|
||
|
||
def _init_graph(self, title, Yname, Yunits, Xname, Xunits):
|
||
p11 = self.win.addPlot(title = title)
|
||
p11.showGrid(x=True, y=True)
|
||
p11.setLabel('left', Yname, units=Yunits)
|
||
p11.setLabel('bottom', Xname, units=Xunits)
|
||
legend1 = pg.LegendItem((80,60), offset=(70,20))
|
||
legend1.setParentItem(p11)
|
||
return p11, legend1
|
||
|
||
def updateUML(self, path = '2024_10_28-17_03_34.csv'):
|
||
real, client, ideal, bool = self._form_UMLdata(path)
|
||
self._requestSVG(real, client, ideal, bool)
|
||
|
||
def updatePlots(self):
|
||
self._plotRealData()
|
||
times = np.arange(20000)/10
|
||
self._plotIdealData(times)
|
||
|
||
def _requestSVG(self, real_data, client_data, ideal_data, bool_data):
|
||
try:
|
||
|
||
self.r.clear()
|
||
self.r.addLineStyle('biba', 'red', 2)
|
||
for i, [signal, changes] in enumerate(bool_data.items()):
|
||
name = 'bool_'+str(i)
|
||
self.r.addBinary(name, str(signal), 'biba')
|
||
self.r.setTimestamps(name, changes)
|
||
|
||
self.r.addConcise('RD', 'Real data')
|
||
self.r.setTimestamps('RD', real_data)
|
||
self.r.addConcise('CD', 'Client data')
|
||
self.r.setTimestamps('CD', client_data)
|
||
self.r.addConcise('ID', 'Ideal data')
|
||
self.r.setTimestamps('ID', ideal_data)
|
||
|
||
self.r.generateSVG()
|
||
except Exception as e:
|
||
print ('Ну, svg у нас нет')
|
||
|
||
|
||
def _form_UMLdata(self, path):
|
||
scaler = self.scaler
|
||
sig = [
|
||
'Electrode Closing Algorithm Execute', #Начало закрытия
|
||
'Electrode Closing Algorithm Done',
|
||
'STEP 3: ME Hold P2 AND Condition Start Force Control', #Конец закрытия и Начало набора усилия
|
||
'STEP 4: ME Force Control',
|
||
'Position Control ME', #Начало разъезда или 'Posision Control Activated FE'
|
||
'Position Control FE', #Начало разъезда
|
||
'Position Control Completed ME', #Конец разъезда
|
||
'Position Control Completed FE', #Конец разъезда
|
||
'STEP 4: ME Force Control Complete', #Конец набора усилия
|
||
]
|
||
|
||
self.parser.setData(path)
|
||
self.bool_dict = self.parser.getBoolDict()
|
||
self.float_dict = self.parser.getFloatDict()
|
||
|
||
closure = self.__getTime([sig[0], sig[1]])
|
||
compression = self.__getTime([sig[2], sig[3]])
|
||
#opening = self.__getTime([sig[4], sig[5], sig[6], sig[7]])
|
||
|
||
real_data = [
|
||
[closure[0], 'closure #green'],
|
||
[closure[1], '{-}'],
|
||
[compression[0], 'compression #green'],
|
||
[compression[1], '{-}'],
|
||
#[max(opening[0:2]), 'opening #green'],
|
||
#[max(opening[2:4]), '{-}'],
|
||
]
|
||
|
||
client_data = [
|
||
[0.0*scaler, 'closure'],
|
||
[0.165*scaler, '{-}'],
|
||
[0.166*scaler, 'compression'],
|
||
[0.176*scaler, '{-}'],
|
||
#[0.180*scaler, 'welding'],
|
||
#[0.200*scaler, '{-}'],
|
||
[0.210*scaler, 'opening'],
|
||
[0.300*scaler, '{-}'],
|
||
]
|
||
|
||
ideal_data = [
|
||
[0.0*scaler, 'closure #yellow'],
|
||
[self.idealTime[0]*scaler, '{-}'],
|
||
[(self.idealTime[0] + 0.0001)*scaler, 'compression #yellow'],
|
||
[(self.idealTime[0] + self.idealTime[1])*scaler, '{-}'],
|
||
#[0.*scaler, 'welding #yellow'],
|
||
#[0.*scaler, '{-}'],
|
||
[(self.idealTime[0] + self.idealTime[1] + 0.0001)*scaler, 'opening #yellow'],
|
||
[(self.idealTime[0] + self.idealTime[1] + self.idealTime[2])*scaler, '{-}'],
|
||
]
|
||
return real_data, client_data, ideal_data, self.bool_dict
|
||
|
||
def __getTime(self, signals = '', states = ''):
|
||
res = []
|
||
for i, sig in enumerate(signals):
|
||
if states: state = states[i]
|
||
else: state = 'high'
|
||
index1 = next ((i for i, _ in enumerate(self.bool_dict[sig]) if _[1]==state), -1)
|
||
time = self.bool_dict[sig][index1][0]
|
||
res.append(time)
|
||
return res
|
||
|
||
|
||
def _getIdealTimings(self):
|
||
data = self.opt.Ts
|
||
self.idealTime = [data['tclose'], data['tgrow'], self.opt.getMarkOpen()]
|
||
|
||
def _form_idealdatGraph(self, times):
|
||
self.idealPhase0 = (self.idealTime[0])*self.scaler #Подъезд
|
||
self.idealPhase1 = (self.idealTime[0]+ self.idealTime[1])*self.scaler #Сжатие
|
||
self.idealPhase2 = (self.idealTime[0]+ self.idealTime[1] + self.WeldTime)*self.scaler #Сварка
|
||
self.idealPhase3 = (self.idealTime[0]+ self.idealTime[1] + self.idealTime[2] + self.WeldTime)*self.scaler #Разъезд
|
||
data = []
|
||
for time in times:
|
||
if time <= self.idealPhase0:
|
||
x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseClose(time/self.scaler)
|
||
elif time <= self.idealPhase1:
|
||
x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseGrow(time/self.scaler-self.idealTime[0])
|
||
elif time <= self.idealPhase2:
|
||
x_fe, x_me, v_fe, v_me, f = data[-1]
|
||
elif time <= self.idealPhase3:
|
||
x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseOpen(time/self.scaler-self.idealTime[0]-self.idealTime[1]-self.WeldTime)
|
||
else:
|
||
x_fe, x_me, v_fe, v_me, f = 0, 0, 0, 0, 0
|
||
data.append([x_fe, x_me, v_fe, v_me, f])
|
||
data = np.array(data).T
|
||
return data
|
||
|
||
def _plotRealData(self):
|
||
for i, (key, dat) in enumerate(self.float_dict.items()):
|
||
dat = np.array(dat).T
|
||
dat[0] = dat[0]*self.scaler
|
||
curve = pg.PlotDataItem(dat[0], dat[1], pen=pg.mkPen(color=qts.colors[i], width=2), name=key, autoDownsample=True, downsample=True)
|
||
if 'Electrode Force' in key:
|
||
self.p11.addItem(curve)
|
||
self.l11.addItem(curve, key)
|
||
elif 'Rotor Position' in key:
|
||
self.p12.addItem(curve)
|
||
self.l12.addItem(curve, key)
|
||
elif 'Rotor Speed' in key:
|
||
self.p13.addItem(curve)
|
||
self.l13.addItem(curve, key)
|
||
return dat[0]
|
||
|
||
def _plotIdealData(self, times):
|
||
data = self._form_idealdatGraph(times)
|
||
|
||
x_fe = pg.PlotDataItem(times, data[0]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='x_fe', autoDownsample=True, downsample=True)
|
||
x_me = pg.PlotDataItem(times, data[1]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='x_me', autoDownsample=True, downsample=True)
|
||
v_fe = pg.PlotDataItem(times, data[2]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='v_fe', autoDownsample=True, downsample=True)
|
||
v_me = pg.PlotDataItem(times, data[3]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='v_me', autoDownsample=True, downsample=True)
|
||
f = pg.PlotDataItem(times, data[4], pen=pg.mkPen(color=qts.colors[8], width=2), name='f', autoDownsample=True, downsample=True)
|
||
|
||
self.p11.addItem(f)
|
||
self.l11.addItem(f, 'Ideal force')
|
||
|
||
self.p12.addItem(x_fe)
|
||
self.l12.addItem(x_fe, 'FE POS')
|
||
self.p12.addItem(x_me)
|
||
self.l12.addItem(x_me, 'ME POS')
|
||
|
||
self.p13.addItem(v_fe)
|
||
self.l13.addItem(v_fe, 'FE VEL')
|
||
self.p13.addItem(v_me)
|
||
self.l13.addItem(v_me, 'ME VEL')
|
||
self._addBackgroundSplitter()
|
||
|
||
def _addBackgroundSplitter(self):
|
||
alpha = self.alpha
|
||
x = [[0, self.idealPhase0],
|
||
[self.idealPhase0, self.idealPhase1],
|
||
[self.idealPhase1, self.idealPhase2],
|
||
[self.idealPhase2, self.idealPhase3]]
|
||
y01 = [100000000000, 100000000000]
|
||
y0_1 = [-100000000000, -100000000000]
|
||
for i, _ in enumerate(x):
|
||
a01 = pg.PlotDataItem(_, y01, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
|
||
a0_1 = pg.PlotDataItem(_, y0_1, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ')
|
||
bg1 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,))
|
||
bg2 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,))
|
||
bg3 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,))
|
||
self.p11.addItem(bg1)
|
||
self.p12.addItem(bg2)
|
||
self.p13.addItem(bg3)
|
||
|
||
self.p11.setYRange(-1000, 5000)
|
||
self.p12.setYRange(-50, 250)
|
||
self.p13.setYRange(-400, 400)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == '__main__':
|
||
operator_params = read_json("params/operator_params.json")
|
||
system_params = read_json("params/system_params.json")
|
||
opt_algorithm = OptAlgorithm(operator_config=operator_params, system_config=system_params)
|
||
app = Application(opt_algorithm)
|
||
app.updatePlots()
|
||
pg.exec()
|
||
|