dev: Разработан парсер TXT данных от куки

This commit is contained in:
Andrew 2025-01-29 16:23:30 +03:00
parent a8a9d6fdb6
commit cc1cd28a36
2 changed files with 232 additions and 118 deletions

View File

@ -1,26 +1,119 @@
from typing import Optional
import os
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from roboter import Performance
#TODO: Реализовать поиск времени простоя
class DowntimeAnalyzer:
def __init__(self) -> None:
...
class PerformanceProcessor:
def generate_downtime_report(self) -> list:
...
def calc_performance(self, path:str, TWC_raw:pd.DataFrame):
factory = Performance()
comm_df, rob_df, TWC_df = factory.job(path, TWC_raw)
def _separate_conditions(self, TWC_raw:pd.DataFrame, robot_raw:pd.DataFrame) -> Tuple[dict, dict]:
robot_splitter = RobotConditionSplitter()
TWC_splitter = TWC_ConditionSplitter()
#comm_splitter = CommConditionSplitter()
rob_conditions = robot_splitter.split(robot_raw)
TWC_conditions = TWC_splitter.split(TWC_raw)
#comm_df = comm_splitter.split(rob_conditions["communication"], TWC_conditions["communication"])
return (rob_conditions, TWC_conditions)
class ConditionSplitter:
def __init__(self):
self._signals = []
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
dataframe: pd.DataFrame,
signals: list[str]) -> Optional[dict[dict[pd.Series]]]:
intervals = {}
end_time = 0
for signal in signals:
start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe))
start_series = dataframe.loc[start_idx, "time"].reset_index(drop=True)
end_series = dataframe.loc[finish_idx, "time"].reset_index(drop=True)
end_series.fillna(end_time)
intervals[signal] = {"rise": start_series,
"fall": end_series}
return intervals
def _form_intervals(self,
start: pd.Series,
end: pd.Series) -> dict:
if len(start) != len(end):
for i in range(1, len(end)):
if end[i-1] > start[i]:
start = start.drop(i).reset_index(drop=True)
intervals = {'start':start.tolist, 'end':end.tolist}
return intervals
class RobotConditionSplitter(ConditionSplitter):
def __init__(self):
self._signals = [
"$OUT3012",
"$IN3003",
"$OUT3003",
"$OUT3244"
]
def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]}
point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"])
conditions = {"communication":communication_sig, "waiting": point_interval, "moving": movement_interval}
return conditions
class TWC_ConditionSplitter(ConditionSplitter):
def __init__(self):
self._signals = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncoming"
]
def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала
closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"])
conditions = {
"communication":communication_sig,
'closing':closing_interval,
'squeeze':squeeze_interval,
'relief':relief_interval,
'oncoming':oncoming_interval
}
return conditions
class CommConditionSplitter(ConditionSplitter):
"""
Определяет промежуток, в который происходит взаимодействие между нодами.
"""
def split(self, node1: dict, node2: dict) -> pd.DataFrame:
n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received'])
n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received'])
return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)])
if __name__ == "__main__":
path = "D:\downloads\Test2\TeslaTIME29_71_KRCIO.dat"

View File

@ -1,7 +1,7 @@
from __future__ import annotations
import os
from typing import Optional, Tuple
from dataclasses import dataclass
from typing import Tuple
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
@ -14,6 +14,29 @@ class KukaDataHead:
channels: dict
@dataclass
class KukaTXT:
time: float = 0
endtime: float = 0
#module: str
func: str = ""
type_: str = ""
signal: str = ""
#line: int = 0
#point_name: str = ""
#point_coord: dict = field(default_factory=lambda: {})
#blending: str = ""
#blending_param: float = 0
#velocities: dict = field(default_factory=lambda: {})
#accelerarions: dict = field(default_factory=lambda: {})
#base: dict = field(default_factory=lambda: {})
#tool: dict = field(default_factory=lambda: {})
#ipo_mode: str = ""
#motion_mode: str = ""
#load: dict = field(default_factory=lambda: {})
#load_a3: dict = field(default_factory=lambda: {})
class KukaDataParser:
def parse(self, head_path: str) -> pd.DataFrame:
@ -95,127 +118,125 @@ class KukaDataParser:
data = file.read()
floats = np.frombuffer(data, dtype='<d')
return floats
#TODO: Реализовать поиск времени простоя
class DowntimeAnalyzer:
def __init__(self) -> None:
...
def generate_downtime_report(self) -> list:
...
def _separate_conditions(self, TWC_raw:pd.DataFrame, robot_raw:pd.DataFrame) -> Tuple[dict, dict]:
robot_splitter = RobotConditionSplitter()
TWC_splitter = TWC_ConditionSplitter()
#comm_splitter = CommConditionSplitter()
rob_conditions = robot_splitter.split(robot_raw)
TWC_conditions = TWC_splitter.split(TWC_raw)
#comm_df = comm_splitter.split(rob_conditions["communication"], TWC_conditions["communication"])
return (rob_conditions, TWC_conditions)
class ConditionSplitter:
class TXT_Parser:
def __init__(self):
self._signals = []
self._in_msg = False
self._datapacks = []
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx[0], finish_idx[0]
def _find_events(self,
dataframe: pd.DataFrame,
signals: list[str]) -> Optional[dict[dict[pd.Series]]]:
intervals = {}
end_time = 0
for signal in signals:
start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe))
start_series = dataframe.loc[start_idx, "time"].reset_index(drop=True)
end_series = dataframe.loc[finish_idx, "time"].reset_index(drop=True)
end_series.fillna(end_time)
intervals[signal] = {"rise": start_series,
"fall": end_series}
return intervals
def _form_intervals(self,
start: pd.Series,
end: pd.Series) -> dict:
if len(start) != len(end):
for i in range(1, len(end)):
if end[i-1] > start[i]:
start = start.drop(i).reset_index(drop=True)
intervals = {'start':start.tolist, 'end':end.tolist}
return intervals
def parse(self, path:str ):
self._datapacks = []
with open(path, 'r') as file:
datapack = KukaTXT()
for line in file:
line = line.strip()
datapack = self._check_msg_flow(line, datapack)
class RobotConditionSplitter(ConditionSplitter):
def __init__(self):
self._signals = [
"$OUT3012",
"$IN3003",
"$OUT3003",
"$OUT3244"
]
def _check_msg_flow(self, line:str, datapack:KukaTXT) -> KukaTXT:
if line == "#BEGINMOTIONINFO":
self._in_msg = True
datapack = KukaTXT()
elif line == "#ENDMOTIONINFO":
self._in_msg = False
self._datapacks.append(datapack)
datapack = KukaTXT()
elif self._in_msg:
datapack = self._process_line(line, datapack)
return datapack
def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]}
point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"])
conditions = {"communication":communication_sig, "waiting": point_interval, "moving": movement_interval}
return conditions
def _process_line(self, line:str, datapack:KukaTXT) -> KukaTXT:
tag, data = line.split(":")
match tag:
case "TIME":
datapack.time = float(data)
case "ENDTIME":
datapack.endtime = float(data)
case "MODULE":
pass
case "FUNCTION/PROCEDURE":
datapack.func = data
case "TYPE":
datapack.type_ = data
case "SIGNAL":
datapack.signal = data
case "LINE":
pass
case "POINT NAME":
pass
case "POINT COORDINATES":
pass
case "BLENDING":
pass
case "BLENDING PARAMETER":
pass
case "VELOCITIES":
pass
case "ACCELERATIONS":
pass
case "BASE":
pass
case "TOOL":
pass
case "IPO MODE":
pass
case "MOTION MODE":
pass
case "LOAD":
pass
case "LOAD A3":
pass
return datapack
class TWC_ConditionSplitter(ConditionSplitter):
def __init__(self):
self._signals = [
"Closing",
"Squeeze",
"Welding",
"Relief",
"Oncoming"
]
"Перемещение"
#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение
#
# ИЛИ
#
#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками
#SIGNAL: BLENDING
def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала
closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"])
conditions = {
"communication":communication_sig,
'closing':closing_interval,
'squeeze':squeeze_interval,
'relief':relief_interval,
'oncoming':oncoming_interval
}
return conditions
"Смыкание и набор усилия"
#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода движения робота
# ... ...
#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение 0.5 мм роботом с движением электрода
#... ...
#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - остановка в позиции (может быть набор усилия?)
"Сварка"
#FUNCTION/PROCEDURE: SPOT - Начало сварочного процесса
#SIGNAL: START
#
#FUNCTION/PROCEDURE: SPOT - Конец сварочного процесса
#SIGNAL: END
class CommConditionSplitter(ConditionSplitter):
"""
Определяет промежуток, в который происходит взаимодействие между нодами.
"""
def split(self, node1: dict, node2: dict) -> pd.DataFrame:
n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received'])
n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received'])
return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)])
"Снятие усилия и разъезд"
#FUNCTION/PROCEDURE: SGL_MoveToPos - Выход из контакта с точкой движением робота и электрода
#SIGNAL: START
#
"Перемещение"
#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение
#
# ИЛИ
#
#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками
#SIGNAL: BLENDING
if __name__ == '__main__':
roboreader = KukaDataParser()
"""
path = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat")
data = roboreader.parse(os.path.abspath(path))
save = os.path.dirname(path) + "/sample.csv"
data.to_csv(save)
data.to_csv(save)"""
path = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT")
txt_parser = TXT_Parser()
txt_parser.parse(path)
print(len(txt_parser._datapacks))