From cc1cd28a362ee528f3bbb921463bda2a367b65c2 Mon Sep 17 00:00:00 2001 From: Andrew Date: Wed, 29 Jan 2025 16:23:30 +0300 Subject: [PATCH] =?UTF-8?q?dev:=20=D0=A0=D0=B0=D0=B7=D1=80=D0=B0=D0=B1?= =?UTF-8?q?=D0=BE=D1=82=D0=B0=D0=BD=20=D0=BF=D0=B0=D1=80=D1=81=D0=B5=D1=80?= =?UTF-8?q?=20TXT=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=BE=D1=82=20?= =?UTF-8?q?=D0=BA=D1=83=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/performance/performance.py | 117 +++++++++++++++-- src/performance/roboter.py | 233 ++++++++++++++++++--------------- 2 files changed, 232 insertions(+), 118 deletions(-) diff --git a/src/performance/performance.py b/src/performance/performance.py index 7e86dbf..e240215 100644 --- a/src/performance/performance.py +++ b/src/performance/performance.py @@ -1,26 +1,119 @@ -from typing import Optional -import os +from typing import Optional, Tuple import numpy as np import pandas as pd -from roboter import Performance + +#TODO: Реализовать поиск времени простоя +class DowntimeAnalyzer: + def __init__(self) -> None: + ... -class PerformanceProcessor: + def generate_downtime_report(self) -> list: + ... - def calc_performance(self, path:str, TWC_raw:pd.DataFrame): - factory = Performance() - comm_df, rob_df, TWC_df = factory.job(path, TWC_raw) - + def _separate_conditions(self, TWC_raw:pd.DataFrame, robot_raw:pd.DataFrame) -> Tuple[dict, dict]: + robot_splitter = RobotConditionSplitter() + TWC_splitter = TWC_ConditionSplitter() + #comm_splitter = CommConditionSplitter() + rob_conditions = robot_splitter.split(robot_raw) + TWC_conditions = TWC_splitter.split(TWC_raw) + #comm_df = comm_splitter.split(rob_conditions["communication"], TWC_conditions["communication"]) + return (rob_conditions, TWC_conditions) +class ConditionSplitter: + def __init__(self): + self._signals = [] + + def _find_indexes(self, + signal: str, + dataframe: pd.DataFrame) -> list[list[float], list[float]]: + stage_diff = np.diff(dataframe[signal]) + start_idx = np.where(stage_diff == 1) + finish_idx = np.where(stage_diff == -1) + return start_idx[0], finish_idx[0] + + def _find_events(self, + dataframe: pd.DataFrame, + signals: list[str]) -> Optional[dict[dict[pd.Series]]]: + intervals = {} + end_time = 0 + for signal in signals: + start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe)) + start_series = dataframe.loc[start_idx, "time"].reset_index(drop=True) + end_series = dataframe.loc[finish_idx, "time"].reset_index(drop=True) + end_series.fillna(end_time) + intervals[signal] = {"rise": start_series, + "fall": end_series} + return intervals + + def _form_intervals(self, + start: pd.Series, + end: pd.Series) -> dict: + if len(start) != len(end): + for i in range(1, len(end)): + if end[i-1] > start[i]: + start = start.drop(i).reset_index(drop=True) + intervals = {'start':start.tolist, 'end':end.tolist} + return intervals + + +class RobotConditionSplitter(ConditionSplitter): + def __init__(self): + self._signals = [ + "$OUT3012", + "$IN3003", + "$OUT3003", + "$OUT3244" + ] + + def split(self, dataframe: pd.DataFrame) -> dict: + events = self._find_events(dataframe, self._signals) + communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]} + point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"]) + movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"]) + conditions = {"communication":communication_sig, "waiting": point_interval, "moving": movement_interval} + return conditions + + +class TWC_ConditionSplitter(ConditionSplitter): + def __init__(self): + self._signals = [ + "Closing", + "Squeeze", + "Welding", + "Relief", + "Oncoming" + ] + + def split(self, dataframe: pd.DataFrame) -> dict: + events = self._find_events(dataframe, self._signals) + communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала + closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"]) + squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"]) + relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"]) + oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"]) + conditions = { + "communication":communication_sig, + 'closing':closing_interval, + 'squeeze':squeeze_interval, + 'relief':relief_interval, + 'oncoming':oncoming_interval + } + return conditions +class CommConditionSplitter(ConditionSplitter): + """ + Определяет промежуток, в который происходит взаимодействие между нодами. + """ - - - - + def split(self, node1: dict, node2: dict) -> pd.DataFrame: + n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received']) + n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received']) + return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)]) + if __name__ == "__main__": path = "D:\downloads\Test2\TeslaTIME29_71_KRCIO.dat" diff --git a/src/performance/roboter.py b/src/performance/roboter.py index 07fa556..2158213 100644 --- a/src/performance/roboter.py +++ b/src/performance/roboter.py @@ -1,7 +1,7 @@ from __future__ import annotations import os -from typing import Optional, Tuple -from dataclasses import dataclass +from typing import Tuple +from dataclasses import dataclass, field import numpy as np import pandas as pd @@ -14,6 +14,29 @@ class KukaDataHead: channels: dict +@dataclass +class KukaTXT: + time: float = 0 + endtime: float = 0 + #module: str + func: str = "" + type_: str = "" + signal: str = "" + #line: int = 0 + #point_name: str = "" + #point_coord: dict = field(default_factory=lambda: {}) + #blending: str = "" + #blending_param: float = 0 + #velocities: dict = field(default_factory=lambda: {}) + #accelerarions: dict = field(default_factory=lambda: {}) + #base: dict = field(default_factory=lambda: {}) + #tool: dict = field(default_factory=lambda: {}) + #ipo_mode: str = "" + #motion_mode: str = "" + #load: dict = field(default_factory=lambda: {}) + #load_a3: dict = field(default_factory=lambda: {}) + + class KukaDataParser: def parse(self, head_path: str) -> pd.DataFrame: @@ -95,127 +118,125 @@ class KukaDataParser: data = file.read() floats = np.frombuffer(data, dtype=' None: - ... - - def generate_downtime_report(self) -> list: - ... - - def _separate_conditions(self, TWC_raw:pd.DataFrame, robot_raw:pd.DataFrame) -> Tuple[dict, dict]: - robot_splitter = RobotConditionSplitter() - TWC_splitter = TWC_ConditionSplitter() - #comm_splitter = CommConditionSplitter() - rob_conditions = robot_splitter.split(robot_raw) - TWC_conditions = TWC_splitter.split(TWC_raw) - #comm_df = comm_splitter.split(rob_conditions["communication"], TWC_conditions["communication"]) - return (rob_conditions, TWC_conditions) -class ConditionSplitter: +class TXT_Parser: def __init__(self): - self._signals = [] + self._in_msg = False + self._datapacks = [] - def _find_indexes(self, - signal: str, - dataframe: pd.DataFrame) -> list[list[float], list[float]]: - stage_diff = np.diff(dataframe[signal]) - start_idx = np.where(stage_diff == 1) - finish_idx = np.where(stage_diff == -1) - return start_idx[0], finish_idx[0] - - def _find_events(self, - dataframe: pd.DataFrame, - signals: list[str]) -> Optional[dict[dict[pd.Series]]]: - intervals = {} - end_time = 0 - for signal in signals: - start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe)) - start_series = dataframe.loc[start_idx, "time"].reset_index(drop=True) - end_series = dataframe.loc[finish_idx, "time"].reset_index(drop=True) - end_series.fillna(end_time) - intervals[signal] = {"rise": start_series, - "fall": end_series} - return intervals - - def _form_intervals(self, - start: pd.Series, - end: pd.Series) -> dict: - if len(start) != len(end): - for i in range(1, len(end)): - if end[i-1] > start[i]: - start = start.drop(i).reset_index(drop=True) - intervals = {'start':start.tolist, 'end':end.tolist} - return intervals - + def parse(self, path:str ): + self._datapacks = [] + with open(path, 'r') as file: + datapack = KukaTXT() + for line in file: + line = line.strip() + datapack = self._check_msg_flow(line, datapack) + -class RobotConditionSplitter(ConditionSplitter): - def __init__(self): - self._signals = [ - "$OUT3012", - "$IN3003", - "$OUT3003", - "$OUT3244" - ] + def _check_msg_flow(self, line:str, datapack:KukaTXT) -> KukaTXT: + if line == "#BEGINMOTIONINFO": + self._in_msg = True + datapack = KukaTXT() + elif line == "#ENDMOTIONINFO": + self._in_msg = False + self._datapacks.append(datapack) + datapack = KukaTXT() + elif self._in_msg: + datapack = self._process_line(line, datapack) + return datapack - def split(self, dataframe: pd.DataFrame) -> dict: - events = self._find_events(dataframe, self._signals) - communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]} - point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"]) - movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"]) - conditions = {"communication":communication_sig, "waiting": point_interval, "moving": movement_interval} - return conditions + def _process_line(self, line:str, datapack:KukaTXT) -> KukaTXT: + tag, data = line.split(":") + match tag: + case "TIME": + datapack.time = float(data) + case "ENDTIME": + datapack.endtime = float(data) + case "MODULE": + pass + case "FUNCTION/PROCEDURE": + datapack.func = data + case "TYPE": + datapack.type_ = data + case "SIGNAL": + datapack.signal = data + case "LINE": + pass + case "POINT NAME": + pass + case "POINT COORDINATES": + pass + case "BLENDING": + pass + case "BLENDING PARAMETER": + pass + case "VELOCITIES": + pass + case "ACCELERATIONS": + pass + case "BASE": + pass + case "TOOL": + pass + case "IPO MODE": + pass + case "MOTION MODE": + pass + case "LOAD": + pass + case "LOAD A3": + pass + return datapack - -class TWC_ConditionSplitter(ConditionSplitter): - def __init__(self): - self._signals = [ - "Closing", - "Squeeze", - "Welding", - "Relief", - "Oncoming" - ] +"Перемещение" +#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение +# +# ИЛИ +# +#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками +#SIGNAL: BLENDING - def split(self, dataframe: pd.DataFrame) -> dict: - events = self._find_events(dataframe, self._signals) - communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала - closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"]) - squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"]) - relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"]) - oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"]) - conditions = { - "communication":communication_sig, - 'closing':closing_interval, - 'squeeze':squeeze_interval, - 'relief':relief_interval, - 'oncoming':oncoming_interval - } - return conditions +"Смыкание и набор усилия" +#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода движения робота +# ... ... +#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение 0.5 мм роботом с движением электрода +#... ... +#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - остановка в позиции (может быть набор усилия?) +"Сварка" +#FUNCTION/PROCEDURE: SPOT - Начало сварочного процесса +#SIGNAL: START +# +#FUNCTION/PROCEDURE: SPOT - Конец сварочного процесса +#SIGNAL: END -class CommConditionSplitter(ConditionSplitter): - """ - Определяет промежуток, в который происходит взаимодействие между нодами. - """ - - def split(self, node1: dict, node2: dict) -> pd.DataFrame: - n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received']) - n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received']) - return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)]) - +"Снятие усилия и разъезд" +#FUNCTION/PROCEDURE: SGL_MoveToPos - Выход из контакта с точкой движением робота и электрода +#SIGNAL: START +# +"Перемещение" +#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение +# +# ИЛИ +# +#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками +#SIGNAL: BLENDING if __name__ == '__main__': roboreader = KukaDataParser() - + """ path = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat") data = roboreader.parse(os.path.abspath(path)) save = os.path.dirname(path) + "/sample.csv" - data.to_csv(save) + data.to_csv(save)""" + path = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT") + txt_parser = TXT_Parser() + txt_parser.parse(path) + print(len(txt_parser._datapacks)) + +