chore: переработан roboter, парсер трейсов вынесен в отдельный класс

This commit is contained in:
Andrew 2025-01-28 16:57:49 +03:00
parent e4a7b39307
commit a8a9d6fdb6
4 changed files with 4105 additions and 133 deletions

View File

@ -1,104 +1,121 @@
from abc import ABC, abstractmethod
import os
from typing import Optional
from __future__ import annotations from __future__ import annotations
import os
from typing import Optional, Tuple
from dataclasses import dataclass
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@dataclass
class KukaDataHead:
rob_ID: int
filename: str
channels: dict
class BasePerformanceFactory(ABC):
# TODO: Абстракция делается с целью определения интерфейса и только: внутри никаких реализаций быть не должно.
# Сначала делаешь абстракцию, затем от нее наследуешься и делаешь нечто базовое, затем конкретную реализацию.
# Есть вариант не делать базовое нечто, а сразу переходить к реализации.
# TODO: Убрать из этого класса все реализации и имплементировать соответствующие методы уже в наследниках. class KukaDataParser:
# TODO: Возможно стоит отдельно вынести все касающееся парсинга в отдельный класс (на подумать).
@abstractmethod def parse(self, head_path: str) -> pd.DataFrame:
def factory_method(self): head = self._parse_dat_file(head_path)
... body_path = os.path.join(os.path.dirname(head_path), head.filename)
# TODO: @abstractmethod dataframe = self._parse_r64_file(body_path, head)
def job(self):
...
def _get_file_data(self, path) -> pd.DataFrame:
head, file = self._dat_parser(path)
self.dat_name = file[:-4]
path_r64 = os.path.dirname(path) + '\\' + file
time_axis, dataframe = self._r64_parser(path_r64, head)
dataframe = pd.concat([dataframe, time_axis], axis=1)
return dataframe return dataframe
def _dat_parser(self, path: str) -> list[dict, str]: def _parse_dat_file(self, path: str) -> KukaDataHead:
with open(path, 'r') as file: with open(path, 'r', encoding='cp1252') as file:
head = {'channels': 0} head = KukaDataHead(0, "", {})
inside_channel = False inside_channel = False
channels = 0 self._ch_name = None
for line in file: for line in file:
line = line.strip() line = line.strip()
# TODO: if line in ("#BEGINCHANNELHEADER", "#BEGINGLOBALHEADER")... if line in ('#BEGINCHANNELHEADER', "#BEGINGLOBALHEADER"):
if line == '#BEGINCHANNELHEADER' or line == "#BEGINGLOBALHEADER":
inside_channel = True inside_channel = True
# TODO: if line in ("#ENDCHANNELHEADER", "#ENDGLOBALHEADER")... elif line in ('#ENDCHANNELHEADER' "#ENDGLOBALHEADER"):
elif line == '#ENDCHANNELHEADER' or line == "#ENDGLOBALHEADER":
inside_channel = False inside_channel = False
# TODO: для чего тут pass? else:
pass if inside_channel:
# TODO: inside_channel = line in ("#BEGINCHANNELHEADER", "#BEGINGLOBALHEADER") self._parse_head_file_line(line, head)
# Формирование словаря return head
# TODO: Предыдущие условия связаны были с line, а теперь под elif уже абсолютно другая логика.
# Не надо так делать.
elif inside_channel:
# TODO: нижним подчеркиванием обозначается переменная, которая как бы есть, но не нужна.
# Ты же на что-то проверяешь, т.е. используешь. Назови нормально.
_, data = line.split(',')
match _:
# TODO: тут default case не нужен?
case '102':
head['rob_id'] = data
case '200':
ch_name = data
if ch_name != 'Zeit': channels +=1
head[ch_name] = {}
case '202':
head[ch_name]['unit'] = data
case '211':
file = data
case '220':
head[ch_name]['len'] = int(data)
case '221':
head[ch_name]['num'] = int(data)
case '241':
head[ch_name]['multiplyer'] = float(data)
head['channels'] = int(channels)
# TODO: Метод декомпозировать и переписать.
return head, file
def _r64_parser(self, path: str, head: dict) -> Optional[list[pd.Series, pd.DataFrame]]: def _parse_head_file_line(self, line: str, head: KukaDataHead) -> None:
# TODO: Метод декомпозировать и переписать. tag, data = line.split(',')
ch = head['channels'] match tag:
keys = list(head.keys())[-ch:] case '102':
len_timestamps = head['Zeit']['len'] head.rob_ID = data
t_step = head['Zeit']['multiplyer'] case '200':
self._ch_name = str(data)
head.channels[self._ch_name] = {}
case '202':
head.channels[self._ch_name]['unit'] = str(data)
case '211':
head.filename = str(data)
case '220':
head.channels[self._ch_name]['len'] = int(data)
case '221':
head.channels[self._ch_name]['num'] = int(data)
case '241':
head.channels[self._ch_name]['multiplyer'] = float(data)
def _parse_r64_file(self, path: str, head: KukaDataHead) -> pd.DataFrame:
time_axis = self._build_time_axis(head)
mul, axes_names = self._extract_channels_data(head)
data_array = self._read_r64_file(path)
if data_array.size % len(axes_names) != 0:
raise ValueError("Количество записей в {path} не кратно количеству найденных каналов ({ch_count})")
try:
axes = data_array.reshape(-1, len(axes_names))
except ValueError as e:
raise ValueError(f"Ошибка при изменении формы data_array: {e}")
return pd.DataFrame(axes*mul, columns=axes_names, index=time_axis)
@staticmethod
def _extract_channels_data(head: KukaDataHead) -> Tuple[np.ndarray, list]:
sorted_channels = sorted(
(item for item in head.channels.items() if item[0] != "Zeit"),
key=lambda item: item[1]['num']
)
mul = np.array([info['multiplyer'] for _, info in sorted_channels])
names = [key for key, _ in sorted_channels]
return mul, names
@staticmethod
def _build_time_axis(head: KukaDataHead) -> pd.Series:
len_timestamps = head.channels['Zeit']['len'] -1
t_step = head.channels['Zeit']['multiplyer']
time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step)) time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step))
time_axis.name = 'time' time_axis.name = 'time'
dataframe = pd.DataFrame({}) return time_axis
@staticmethod
def _read_r64_file(path: str) -> np.ndarray[float]:
with open(path, 'rb') as file: with open(path, 'rb') as file:
data = file.read() data = file.read()
floats = np.frombuffer(data, dtype='<d') # Little-endian double floats = np.frombuffer(data, dtype='<d')
for key in keys: return floats
step = head[key]['num']-1
result = pd.Series(np.array(floats[step::ch])* head[key]['multiplyer'])
result.name = key
dataframe = pd.concat([dataframe, result], axis=1)
return time_axis, dataframe
class BaseProduct(ABC): #TODO: Реализовать поиск времени простоя
# TODO: Аналогично BasePerformanceFactory: снова в абстракции куча реализаций. class DowntimeAnalyzer:
# TODO: Может стоит дать классу название осмысленное, а не из статьи по паттернам? =) def __init__(self) -> None:
...
def generate_downtime_report(self) -> list:
...
def _separate_conditions(self, TWC_raw:pd.DataFrame, robot_raw:pd.DataFrame) -> Tuple[dict, dict]:
robot_splitter = RobotConditionSplitter()
TWC_splitter = TWC_ConditionSplitter()
#comm_splitter = CommConditionSplitter()
rob_conditions = robot_splitter.split(robot_raw)
TWC_conditions = TWC_splitter.split(TWC_raw)
#comm_df = comm_splitter.split(rob_conditions["communication"], TWC_conditions["communication"])
return (rob_conditions, TWC_conditions)
class ConditionSplitter:
def __init__(self): def __init__(self):
self._signals = [] self._signals = []
@ -134,42 +151,8 @@ class BaseProduct(ABC):
intervals = {'start':start.tolist, 'end':end.tolist} intervals = {'start':start.tolist, 'end':end.tolist}
return intervals return intervals
@abstractmethod
def operation(self):
# TODO: Аналогично: что за операция? Что она делает? Не понятно.
...
class RobotConditionSplitter(ConditionSplitter):
class Performance(BasePerformanceFactory):
# TODO: где реализация factory_method? Если не нужен, то надо его убрать из интерфейса.
# TODO: данный класс, простите, лебедь раком щуку. На все руки мастер. И то создает, и это создает... И принцип
# единичной ответственности нарушает, и принцип подстановки...
# Если реализуешь паттерн "Фабричный метод", то реализуй его правильно: для каждого порождаемого объекта свой
# класс - создатель, классы наследники в полной мере имплементируют методы класса - родителя...
def robot_method(self) -> BaseProduct:
return RobotData()
def TWC_method(self) -> BaseProduct:
return TWC_Data()
def comm_method(self) -> BaseProduct:
return CommData()
def job(self, path:str, TWC_raw:pd.DataFrame) -> list[pd.DataFrame, list[pd.DataFrame], list[pd.DataFrame]]:
# TODO: сигнатура метода не соответствует той, которая определена в родительском классе.
robot = self.robot_method()
TWC = self.TWC_method()
comm=self.comm_method()
dataframe = self._get_file_data(path)
# TODO: operation в родителе определен без входных параметров, используется с ними. Не надо так делать.
rob_comm, rob_df = robot.operation(dataframe)
TWC_comm, TWC_df = TWC.operation(TWC_raw)
comm_df = comm.operation(rob_comm, TWC_comm)
return comm_df, rob_df, TWC_df
class RobotData(BaseProduct):
def __init__(self): def __init__(self):
self._signals = [ self._signals = [
"$OUT3012", "$OUT3012",
@ -178,18 +161,16 @@ class RobotData(BaseProduct):
"$OUT3244" "$OUT3244"
] ]
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]: def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals) events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]} communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]}
point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"]) point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"]) movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"])
# TODO: возвращаемый результат не соответствует аннотированному. conditions = {"communication":communication_sig, "waiting": point_interval, "moving": movement_interval}
# TODO: зачем возвращать DataFrame? В этом есть какой-то философский смысл? return conditions
return communication_sig, pd.DataFrame({'in_point':point_interval,
'in_move':movement_interval})
class TWC_Data(BaseProduct): class TWC_ConditionSplitter(ConditionSplitter):
def __init__(self): def __init__(self):
self._signals = [ self._signals = [
"Closing", "Closing",
@ -199,34 +180,43 @@ class TWC_Data(BaseProduct):
"Oncoming" "Oncoming"
] ]
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]: def split(self, dataframe: pd.DataFrame) -> dict:
events = self._find_events(dataframe, self._signals) events = self._find_events(dataframe, self._signals)
communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала
closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"]) closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"]) squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"]) relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"]) oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"])
# TODO: возвращаемый результат не соответствует аннотированному. conditions = {
# TODO: зачем возвращать DataFrame? В этом есть какой-то философский смысл? "communication":communication_sig,
return communication_sig, pd.DataFrame({'in_closing':closing_interval, 'closing':closing_interval,
'in_squeeze':squeeze_interval, 'squeeze':squeeze_interval,
'in_relief':relief_interval, 'relief':relief_interval,
'in_oncoming':oncoming_interval}) 'oncoming':oncoming_interval
}
return conditions
class CommData(BaseProduct):
class CommConditionSplitter(ConditionSplitter):
""" """
Определяет промежуток, в который происходит взаимодействие между нодами. Определяет промежуток, в который происходит взаимодействие между нодами.
Подразумевается следующая структура: node: tuple(list, list)
node[0] - время отправки пакетов. node[1] - время приема пакетов.
""" """
def operation(self, node1: dict, node2: dict) -> pd.DataFrame: def split(self, node1: dict, node2: dict) -> pd.DataFrame:
n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received']) n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received'])
n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received']) n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received'])
return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)]) return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)])
if __name__ == '__main__':
roboreader = KukaDataParser()
path = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat")
data = roboreader.parse(os.path.abspath(path))
save = os.path.dirname(path) + "/sample.csv"
data.to_csv(save)

3807
trace_samples/sample.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,175 @@
DIAEXTENDED
#BEGINGLOBALHEADER
1,DOS
101,---
102,rob "#KR210R2700_2 C4 FLR"
103,---
104,27.10.2022
105,11:48:43
106,668500000
110,#dd.mm.yyyy HH:MM:SS ns
111,9.9000000000E+34
112,High -> Low
#ENDGLOBALHEADER
#BEGINCHANNELHEADER
200,Zeit
201,Zeitkanal
202,sec
210,IMPLIZIT
220,3806
240,0.00000
241,1.20000000e-02
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,X_Act
201,IpoActual
202,mm
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,1
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,Y_Act
201,IpoActual
202,mm
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,2
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,Z_Act
201,IpoActual
202,mm
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,3
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,AxisPos_Act7
201,IpoActual
202,mm
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,4
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,DriveMotorPos_Act7
201,AxisValAtMMInterface
202,°
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,5
240,0
241,5.729578e+01
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,DriveMotorVel_Act7
201,AxisValAtMMInterface
202,°/s
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,6
240,0
241,5.729578e+01
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,DriveMotorTorq_Act7
201,AxisValAtMMInterface
202,Nm
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,7
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,DriveMotorCurr_Act7
201,AxisValAtMMInterface
202,A
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,8
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,AxisVel_Act7
201,IpoVelocities
202,mm/s
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,9
240,0
241,1.000000e+00
#ENDCHANNELHEADER
#BEGINCHANNELHEADER
200,CartVel_Act
201,IpoVelocities
202,m/s
210,EXPLIZIT
211,teslaSP_VelTCP_KRCIpo.r64
213,BLOCK
214,REAL64
220,3806
221,10
240,0
241,1.000000e-03
#ENDCHANNELHEADER

Binary file not shown.