import pyqtgraph as pg import numpy as np from utils import read_json from Requestinator import Request from DataParsinator import DataParser #нижний fe x1 import qt_settings as qts from OptAlgorithm import OptAlgorithm class Application: def __init__(self, opt: OptAlgorithm): pg.setConfigOptions(antialias=True) self.opt = opt self.scaler = 1000 self.parser = DataParser(self.scaler) self.r = Request() self._getIdealTimings() self._init_app() self.updateUML() self.WeldTime = 0.5 #[sec] self.alpha = 100 #[0-255 прозрачность фона] def _init_app(self): self.app = pg.mkQApp("Plotting") self.win = pg.GraphicsLayoutWidget(show=True, title="") self.win.resize(1000,600) self.win.setWindowTitle('') self.p11, self.l11 = self._init_graph('Electrode force, closure', 'Force', 'N', 'Time', 'ms') #self.p21, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms') #self.p31, _ = self._init_graph('Electrode force, compression', 'Force', 'N', 'Time', 'ms') self.win.nextRow() self.p12, self.l12 = self._init_graph('Rotor Position, closure', 'Posicion', 'mm', 'Time', 'ms') #self.p22, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms') #self.p32, _ = self._init_graph('Rotor Position, compression', 'Posicion', 'mm', 'Time', 'ms') self.win.nextRow() self.p13, self.l13 = self._init_graph('Rotor Speed, closure', 'Speed', 'mm/s', 'Time', 'ms') #self.p23, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms') #self.p33, _ = self._init_graph('Rotor Speed, compression', 'Speed', 'mm/s', 'Time', 'ms') self.win.nextRow() self.p12.setXLink(self.p11) self.p13.setXLink(self.p11) self.p11.setAutoVisible(x=False, y=True) self.p12.setAutoVisible(x=False, y=True) self.p13.setAutoVisible(x=False, y=True) def _init_graph(self, title, Yname, Yunits, Xname, Xunits): p11 = self.win.addPlot(title = title) p11.showGrid(x=True, y=True) p11.setLabel('left', Yname, units=Yunits) p11.setLabel('bottom', Xname, units=Xunits) legend1 = pg.LegendItem((80,60), offset=(70,20)) legend1.setParentItem(p11) return p11, legend1 def updateUML(self, path = '2024_10_28-17_03_34.csv'): real, client, ideal, bool = self._form_UMLdata(path) self._requestSVG(real, client, ideal, bool) def updatePlots(self): #self._plotRealData() times = np.arange(20000)/10 self._plotIdealData(times) def _requestSVG(self, real_data, client_data, ideal_data, bool_data): try: self.r.clear() self.r.addLineStyle('biba', 'red', 2) for i, [signal, changes] in enumerate(bool_data.items()): name = 'bool_'+str(i) self.r.addBinary(name, str(signal), 'biba') self.r.setTimestamps(name, changes) self.r.addConcise('RD', 'Real data') self.r.setTimestamps('RD', real_data) self.r.addConcise('CD', 'Client data') self.r.setTimestamps('CD', client_data) self.r.addConcise('ID', 'Ideal data') self.r.setTimestamps('ID', ideal_data) self.r.generateSVG() except Exception as e: print ('Ну, svg у нас нет') def _form_UMLdata(self, path): scaler = self.scaler sig = [ 'Electrode Closing Algorithm Execute', #Начало закрытия 'Electrode Closing Algorithm Done', 'STEP 3: ME Hold P2 AND Condition Start Force Control', #Конец закрытия и Начало набора усилия 'STEP 4: ME Force Control', 'Position Control ME', #Начало разъезда или 'Posision Control Activated FE' 'Position Control FE', #Начало разъезда 'Position Control Completed ME', #Конец разъезда 'Position Control Completed FE', #Конец разъезда 'STEP 4: ME Force Control Complete', #Конец набора усилия ] self.parser.setData(path) self.bool_dict = self.parser.getBoolDict() self.float_dict = self.parser.getFloatDict() closure = self.__getTime([sig[0], sig[1]]) compression = self.__getTime([sig[2], sig[3]]) #opening = self.__getTime([sig[4], sig[5], sig[6], sig[7]]) real_data = [ [closure[0], 'closure #green'], [closure[1], '{-}'], [compression[0], 'compression #green'], [compression[1], '{-}'], #[max(opening[0:2]), 'opening #green'], #[max(opening[2:4]), '{-}'], ] client_data = [ [0.0*scaler, 'closure'], [0.165*scaler, '{-}'], [0.166*scaler, 'compression'], [0.176*scaler, '{-}'], #[0.180*scaler, 'welding'], #[0.200*scaler, '{-}'], [0.210*scaler, 'opening'], [0.300*scaler, '{-}'], ] ideal_data = [ [0.0*scaler, 'closure #yellow'], [self.idealTime[0]*scaler, '{-}'], [(self.idealTime[0] + 0.0001)*scaler, 'compression #yellow'], [(self.idealTime[0] + self.idealTime[1])*scaler, '{-}'], #[0.*scaler, 'welding #yellow'], #[0.*scaler, '{-}'], [(self.idealTime[0] + self.idealTime[1] + 0.0001)*scaler, 'opening #yellow'], [(self.idealTime[0] + self.idealTime[1] + self.idealTime[2])*scaler, '{-}'], ] return real_data, client_data, ideal_data, self.bool_dict def __getTime(self, signals = '', states = ''): res = [] for i, sig in enumerate(signals): if states: state = states[i] else: state = 'high' index1 = next ((i for i, _ in enumerate(self.bool_dict[sig]) if _[1]==state), -1) time = self.bool_dict[sig][index1][0] res.append(time) return res def _getIdealTimings(self): data = self.opt.Ts self.idealTime = [data['tclose'], data['tgrow'], data['topen']] def _form_idealdatGraph(self, times): self.idealPhase0 = (self.idealTime[0])*self.scaler #Подъезд self.idealPhase1 = (self.idealTime[0]+ self.idealTime[1])*self.scaler #Сжатие self.idealPhase2 = (self.idealTime[0]+ self.idealTime[1] + self.WeldTime)*self.scaler #Сварка self.idealPhase3 = (self.idealTime[0]+ self.idealTime[1] + self.idealTime[2] + self.WeldTime)*self.scaler #Разъезд data = [] for time in times: if time <= self.idealPhase0: x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseClose(time/self.scaler) elif time <= self.idealPhase1: x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseGrow(time/self.scaler-self.idealTime[0]) elif time <= self.idealPhase2: x_fe, x_me, v_fe, v_me, f = data[-1] elif time <= self.idealPhase3: x_fe, x_me, v_fe, v_me, f = self.opt.calcPhaseOpen(time/self.scaler-self.idealTime[0]-self.idealTime[1]-self.WeldTime) else: x_fe, x_me, v_fe, v_me, f = 0, 0, 0, 0, 0 data.append([x_fe, x_me, v_fe, v_me, f]) data = np.array(data).T return data def _plotRealData(self): for i, (key, dat) in enumerate(self.float_dict.items()): dat = np.array(dat).T dat[0] = dat[0]*self.scaler curve = pg.PlotDataItem(dat[0], dat[1], pen=pg.mkPen(color=qts.colors[i], width=2), name=key, autoDownsample=True, downsample=True) if 'Electrode Force' in key: self.p11.addItem(curve) self.l11.addItem(curve, key) elif 'Rotor Position' in key: self.p12.addItem(curve) self.l12.addItem(curve, key) elif 'Rotor Speed' in key: self.p13.addItem(curve) self.l13.addItem(curve, key) return dat[0] def _plotIdealData(self, times): data = self._form_idealdatGraph(times) x_fe = pg.PlotDataItem(times, data[0]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='x_fe', autoDownsample=True, downsample=True) x_me = pg.PlotDataItem(times, data[1]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='x_me', autoDownsample=True, downsample=True) v_fe = pg.PlotDataItem(times, data[2]*1000, pen=pg.mkPen(color=qts.colors[8], width=2), name='v_fe', autoDownsample=True, downsample=True) v_me = pg.PlotDataItem(times, data[3]*1000, pen=pg.mkPen(color=qts.colors[9], width=2), name='v_me', autoDownsample=True, downsample=True) f = pg.PlotDataItem(times, data[4], pen=pg.mkPen(color=qts.colors[8], width=2), name='f', autoDownsample=True, downsample=True) self.p11.addItem(f) self.l11.addItem(f, 'Ideal force') self.p12.addItem(x_fe) self.l12.addItem(x_fe, 'FE POS') self.p12.addItem(x_me) self.l12.addItem(x_me, 'ME POS') self.p13.addItem(v_fe) self.l13.addItem(v_fe, 'FE VEL') self.p13.addItem(v_me) self.l13.addItem(v_me, 'ME VEL') self._addBackgroundSplitter() def _addBackgroundSplitter(self): alpha = self.alpha x = [[0, self.idealPhase0], [self.idealPhase0, self.idealPhase1], [self.idealPhase1, self.idealPhase2], [self.idealPhase2, self.idealPhase3]] y01 = [100000000000, 100000000000] y0_1 = [-100000000000, -100000000000] for i, _ in enumerate(x): a01 = pg.PlotDataItem(_, y01, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ') a0_1 = pg.PlotDataItem(_, y0_1, pen=pg.mkPen(color=qts.colors[8], width=2), name=' ') bg1 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,)) bg2 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,)) bg3 = pg.FillBetweenItem(a01, a0_1, qts.RGBA[i]+(alpha,)) self.p11.addItem(bg1) self.p12.addItem(bg2) self.p13.addItem(bg3) self.p11.setYRange(-1000, 5000) self.p12.setYRange(-50, 250) self.p13.setYRange(-400, 400) if __name__ == '__main__': operator_params = read_json("params/operator_params.json") system_params = read_json("params/system_params.json") opt_algorithm = OptAlgorithm(operator_config=operator_params, system_config=system_params) app = Application(opt_algorithm) app.updatePlots() pg.exec()