dev: создан анализатор трейсов клиента, разделяющий датафрейм на стадии

This commit is contained in:
Andrew 2025-01-30 13:50:35 +03:00
parent cc1cd28a36
commit 3f7959e47c
2 changed files with 2293 additions and 6 deletions

View File

@ -1,6 +1,6 @@
from __future__ import annotations
import os
from typing import Tuple
from typing import Tuple, Union
from dataclasses import dataclass, field
import numpy as np
@ -120,18 +120,19 @@ class KukaDataParser:
return floats
class TXT_Parser:
class TextParser:
def __init__(self):
self._in_msg = False
self._datapacks = []
def parse(self, path:str ):
def parse(self, path:str ) -> list[KukaTXT]:
self._datapacks = []
with open(path, 'r') as file:
datapack = KukaTXT()
for line in file:
line = line.strip()
datapack = self._check_msg_flow(line, datapack)
return self._datapacks
def _check_msg_flow(self, line:str, datapack:KukaTXT) -> KukaTXT:
@ -189,6 +190,108 @@ class TXT_Parser:
pass
return datapack
class TraceStageDetector:
def __init__(
self,
rob_vel_zero_th: float = 0.008,
act_vel_zero_th: float = 10,
act_vel_touching: float = 2050,
act_vel_touching_th: float = 200,
act_curr_force_th: float = 0.5,
region_of_focus: list = [0, 100]
):
self.rob_vel_zero_th = rob_vel_zero_th
self.act_vel_zero_th = act_vel_zero_th
self.act_vel_touching = act_vel_touching
self.act_vel_touching_th = act_vel_touching_th
self.act_curr_force_th = act_curr_force_th
self.ROF = region_of_focus
def detect_stages(self, df: pd.DataFrame) -> list:
df = df.sort_index()
stage_markers = []
df_indices = df.index.to_list() # времена (в секундах) как список
for i in df_indices:
mark = self._mark_timestamp(df, i)
if mark: stage_markers.append(mark)
# Объединяем подряд идущие одинаковые метки в единые этапы.
stages = self._merge_marks_to_stages(stage_markers, df_indices)
return stages
def _mark_timestamp(self, df:pd.DataFrame, index:float) -> Union[str, None]:
# Не интересные значения помечаем как "unknown"
if index < self.ROF[0] or index > self.ROF[1]:
return "unknown"
rob_vel = df.loc[index, "CartVel_Act"]
act_vel = df.loc[index, "DriveMotorVel_Act7"]
act_curr = df.loc[index, "DriveMotorCurr_Act7"]
# 1) Перемещение (moving): если скорость |CartVel_Act| > vel_threshold
if abs(rob_vel) > self.rob_vel_zero_th:
return "moving"
# 2) Создание усилия (force): если |DriveMotorVel_Act7| < act_vel_zero_th и |DriveMotorCurr_Act7| > act_curr_force_th
if abs(act_vel) < self.act_vel_zero_th and abs(act_curr) > self.act_curr_force_th:
return "force"
# 3) Смыкание (closing): если скорость |DriveMotorVel_Act7| = act_vel_touching
if self.act_vel_touching_th > abs(abs(act_vel) - self.act_vel_touching):
return "closing"
return None
def _merge_marks_to_stages(self, stage_markers:list, df_indices:list) -> list:
stages = []
current_stage = stage_markers[0]
start_time = df_indices[0]
for i in range(1, len(stage_markers)):
if stage_markers[i] != current_stage:
end_time = df_indices[i - 1]
stages.append({
"stage": current_stage,
"start_time": start_time,
"end_time": end_time
})
current_stage = stage_markers[i]
start_time = df_indices[i]
stages.append({
"stage": current_stage,
"start_time": start_time,
"end_time": df_indices[-1]
})
return stages
class TextStageDetector:
def __init__(self,
region_of_focus: list = [0, 100]
):
self.ROF = region_of_focus
def detect_welding(self, data:list[KukaTXT]):
stages = []
for i in range(len(data)):
if data[i].func == " SPOT" and data[i].signal == " END":
stages.append({
"stage": "welding",
"start_time": data[i].time,
"end_time": data[i+1].time
})
return stages
"Перемещение"
#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение
#
@ -225,16 +328,41 @@ class TXT_Parser:
#SIGNAL: BLENDING
if __name__ == '__main__':
roboreader = KukaDataParser()
txt_reader = TextParser()
detector_traces = TraceStageDetector(region_of_focus=[7.7, 42])
detector_weldings = TextStageDetector()
path1 = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat")
path2 = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT")
data1 = roboreader.parse(path1)
data2 = txt_reader.parse(path2)
stages = detector_traces.detect_stages(data1)
weldings = detector_weldings.detect_welding(data2)
counter = 0
for st in stages:
print(st)
if st["stage"] == "force":
counter+=1
print(counter)
print("=========================================")
for we in weldings:
print (we)
"""
path = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat")
data = roboreader.parse(os.path.abspath(path))
save = os.path.dirname(path) + "/sample.csv"
data.to_csv(save)"""
data.to_csv(save)
path = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT")
txt_parser = TXT_Parser()
txt_parser.parse(path)
print(len(txt_parser._datapacks))
"""

File diff suppressed because it is too large Load Diff