chore: изменен модуль roboter

This commit is contained in:
Andrew 2025-02-11 12:22:37 +03:00
parent dea6d9899c
commit 6ac05f5e3d
2 changed files with 413 additions and 318 deletions

View File

@ -222,7 +222,7 @@
"pen": "g"
},
{
"name": "Rotor Speed, mm/s ME",
"name": "Rotor Speed, deg/s ME",
"pen": "b"
},
{
@ -286,7 +286,7 @@
"pen": "g"
},
{
"name": "Rotor Speed, mm/s ME",
"name": "Rotor Speed, deg/s ME",
"pen": "b"
},
{

View File

@ -1,144 +1,223 @@
from __future__ import annotations
import os
from typing import Tuple, Union, Optional
from typing import Tuple, Union, Optional, List
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from loguru import logger
from base.base import (BaseKukaDataParser, BaseKukaTextParser,
from base.base import (
BaseKukaDataParser, BaseKukaTextParser,
BaseTraceStageDetector, BaseTextStageDetector,
BaseRawTraceProcessor, KukaDataHead,
KukaTXT, Settings)
KukaTXT, Settings
)
class KukaDataParser(BaseKukaDataParser):
"""
Класс для парсинга данных Кука.
def parse(self, head_path: str) -> pd.DataFrame:
head = self._parse_dat_file(head_path)
body_path = os.path.join(os.path.dirname(head_path), head.filename)
dataframe = self._parse_r64_file(body_path, head)
Читает заголовочный файл (.dat) и соответствующий файл данных (.r64),
объединяет их в pandas.DataFrame.
"""
def parse(self, head_filepath: str) -> pd.DataFrame:
"""
Основной метод парсинга. Читает заголовочный файл и файл данных.
:param head_filepath: Путь к заголовочному файлу.
:return: DataFrame с объединёнными данными.
"""
header = self._parse_header_file(head_filepath)
body_filepath = os.path.join(os.path.dirname(head_filepath), header.filename)
dataframe = self._parse_body_file(body_filepath, header)
return dataframe
def _parse_dat_file(self, path: str) -> KukaDataHead:
with open(path, 'r', encoding='cp1252') as file:
head = KukaDataHead(0, "", {})
inside_channel = False
self._ch_name = None
def _parse_header_file(self, filepath: str) -> KukaDataHead:
"""
Парсит заголовочный файл (.dat) и извлекает информацию о каналах.
:param filepath: Путь к заголовочному файлу.
:return: Объект KukaDataHead с информацией о данных.
"""
with open(filepath, 'r', encoding='cp1252') as file:
data_head = KukaDataHead(0, "", {})
is_inside_channel = False
self._current_channel = None # Текущее название канала
for line in file:
line = line.strip()
if line in ('#BEGINCHANNELHEADER', "#BEGINGLOBALHEADER"):
inside_channel = True
elif line in ('#ENDCHANNELHEADER' "#ENDGLOBALHEADER"):
inside_channel = False
is_inside_channel = True
elif line in ('#ENDCHANNELHEADER', "#ENDGLOBALHEADER"):
is_inside_channel = False
else:
if inside_channel:
self._parse_head_file_line(line, head)
return head
if is_inside_channel:
self._parse_header_line(line, data_head)
return data_head
def _parse_head_file_line(self, line: str, head: KukaDataHead) -> None:
tag, data = line.split(',')
def _parse_header_line(self, line: str, data_head: KukaDataHead) -> None:
"""
Обрабатывает отдельную строку заголовочного файла и обновляет объект data_head.
:param line: Строка из файла.
:param data_head: Объект KukaDataHead для обновления.
"""
tag, value = line.split(',')
match tag:
case '102':
head.rob_ID = data
data_head.rob_ID = value
case '200':
self._ch_name = str(data)
head.channels[self._ch_name] = {}
self._current_channel = str(value)
data_head.channels[self._current_channel] = {}
case '202':
head.channels[self._ch_name]['unit'] = str(data)
data_head.channels[self._current_channel]['unit'] = str(value)
case '211':
head.filename = str(data)
data_head.filename = str(value)
case '220':
head.channels[self._ch_name]['len'] = int(data)
data_head.channels[self._current_channel]['len'] = int(value)
case '221':
head.channels[self._ch_name]['num'] = int(data)
data_head.channels[self._current_channel]['num'] = int(value)
case '241':
head.channels[self._ch_name]['multiplyer'] = float(data)
# Переименовано в "multiplier" для повышения читаемости
data_head.channels[self._current_channel]['multiplier'] = float(value)
def _parse_r64_file(self, path: str, head: KukaDataHead) -> pd.DataFrame:
time_axis = self._build_time_axis(head)
mul, axes_names = self._extract_channels_data(head)
data_array = self._read_r64_file(path)
def _parse_body_file(self, filepath: str, data_head: KukaDataHead) -> pd.DataFrame:
"""
Парсит файл данных (.r64) и объединяет его с временной осью.
if data_array.size % len(axes_names) != 0:
raise ValueError("Количество записей в {path} не кратно количеству найденных каналов ({ch_count})")
:param filepath: Путь к файлу данных.
:param data_head: Заголовочная информация.
:return: DataFrame с данными.
"""
time_axis = self._build_time_axis(data_head)
multipliers, channel_names = self._extract_channels_data(data_head)
raw_data = self._read_r64_file(filepath)
if raw_data.size % len(channel_names) != 0:
raise ValueError(f"Количество записей в {filepath} не кратно количеству найденных каналов ({len(channel_names)})")
try:
axes = data_array.reshape(-1, len(axes_names))
data_reshaped = raw_data.reshape(-1, len(channel_names))
except ValueError as e:
raise ValueError(f"Ошибка при изменении формы data_array: {e}")
raise ValueError(f"Ошибка при изменении формы raw_data: {e}")
dataframe = pd.concat([time_axis, pd.DataFrame(axes*mul, columns=axes_names)], axis=1)
# Применяем множители к данным каналов
df_data = pd.DataFrame(data_reshaped * multipliers, columns=channel_names)
dataframe = pd.concat([time_axis, df_data], axis=1)
return dataframe
@staticmethod
def _extract_channels_data(head: KukaDataHead) -> Tuple[np.ndarray, list]:
def _extract_channels_data(data_head: KukaDataHead) -> Tuple[np.ndarray, List[str]]:
"""
Извлекает данные по каналам, сортируя их по номеру.
:param data_head: Заголовочная информация.
:return: Кортеж из массива множителей и списка имён каналов.
"""
sorted_channels = sorted(
(item for item in head.channels.items() if item[0] != "Zeit"),
(item for item in data_head.channels.items() if item[0] != "Zeit"),
key=lambda item: item[1]['num']
)
mul = np.array([info['multiplyer'] for _, info in sorted_channels])
names = [key for key, _ in sorted_channels]
return mul, names
multipliers = np.array([info['multiplier'] for _, info in sorted_channels])
channel_names = [key for key, _ in sorted_channels]
return multipliers, channel_names
@staticmethod
def _build_time_axis(head: KukaDataHead) -> pd.Series:
len_timestamps = head.channels['Zeit']['len'] -1
t_step = head.channels['Zeit']['multiplyer']
time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step))
def _build_time_axis(data_head: KukaDataHead) -> pd.Series:
"""
Строит временную ось на основе информации из канала 'Zeit'.
:param data_head: Заголовочная информация.
:return: Серия pandas с временными метками.
"""
num_timestamps = data_head.channels['Zeit']['len'] - 1
time_step = data_head.channels['Zeit']['multiplier']
time_axis = pd.Series(np.arange(0, num_timestamps * time_step, time_step))
time_axis.name = 'time'
return time_axis
@staticmethod
def _read_r64_file(path: str) -> np.ndarray[float]:
with open(path, 'rb') as file:
def _read_r64_file(filepath: str) -> np.ndarray:
"""
Считывает бинарный файл (.r64) и возвращает массив чисел.
:param filepath: Путь к файлу.
:return: Массив numpy с типом float.
"""
with open(filepath, 'rb') as file:
data = file.read()
floats = np.frombuffer(data, dtype='<d')
return floats
numbers = np.frombuffer(data, dtype='<d')
return numbers
class KukaTextParser(BaseKukaTextParser):
"""
Класс для парсинга текстовых данных Кука.
Извлекает сообщения из файла.
"""
def __init__(self):
super().__init__()
self._in_msg = False
self._datapacks = []
self._in_message = False
self._data_packs: List[KukaTXT] = []
def parse(self, path:str ) -> list[KukaTXT]:
self._datapacks = []
with open(path, 'r') as file:
datapack = KukaTXT()
def parse(self, filepath: str) -> List[KukaTXT]:
"""
Парсит текстовый файл и возвращает список объектов KukaTXT.
:param filepath: Путь к текстовому файлу.
:return: Список объектов KukaTXT.
"""
self._data_packs = []
with open(filepath, 'r') as file:
current_pack = KukaTXT()
for line in file:
line = line.strip()
datapack = self._check_msg_flow(line, datapack)
return self._datapacks
current_pack = self._process_message_flow(line, current_pack)
return self._data_packs
def _check_msg_flow(self, line:str, datapack:KukaTXT) -> KukaTXT:
def _process_message_flow(self, line: str, current_pack: KukaTXT) -> KukaTXT:
"""
Обрабатывает поток сообщений, определяя начало и конец блока.
:param line: Строка из файла.
:param current_pack: Текущий объект KukaTXT.
:return: Обновлённый объект KukaTXT.
"""
if line == "#BEGINMOTIONINFO":
self._in_msg = True
datapack = KukaTXT()
self._in_message = True
current_pack = KukaTXT()
elif line == "#ENDMOTIONINFO":
self._in_msg = False
self._datapacks.append(datapack)
datapack = KukaTXT()
elif self._in_msg:
datapack = self._process_line(line, datapack)
return datapack
self._in_message = False
self._data_packs.append(current_pack)
current_pack = KukaTXT()
elif self._in_message:
current_pack = self._process_line(line, current_pack)
return current_pack
def _process_line(self, line:str, datapack:KukaTXT) -> KukaTXT:
tag, data = line.split(":")
def _process_line(self, line: str, current_pack: KukaTXT) -> KukaTXT:
"""
Обрабатывает отдельную строку внутри блока сообщения.
:param line: Строка с данными.
:param current_pack: Текущий объект KukaTXT.
:return: Обновлённый объект KukaTXT.
"""
tag, value = line.split(":")
match tag:
case "TIME":
datapack.time = float(data)
current_pack.time = float(value)
case "ENDTIME":
datapack.endtime = float(data)
current_pack.endtime = float(value)
case "MODULE":
pass
case "FUNCTION/PROCEDURE":
datapack.func = data
current_pack.func = value
case "TYPE":
datapack.type_ = data
current_pack.type_ = value
case "SIGNAL":
datapack.signal = data
current_pack.signal = value
case "LINE":
pass
case "POINT NAME":
@ -165,122 +244,159 @@ class KukaTextParser(BaseKukaTextParser):
pass
case "LOAD A3":
pass
return datapack
return current_pack
class TraceStageDetector(BaseTraceStageDetector):
"""
Класс для детекции этапов (стадий) на основе данных трассировки.
Определяет переходы между различными стадиями процесса по пороговым значениям.
"""
def __init__(self, parent: TraceProcessor = None):
super().__init__(parent)
def _mark_timestamp(self, df:pd.DataFrame, index:int, time:float) -> Union[str, None]:
# Не интересные значения помечаем как "unknown"
if (time < self._parent._settings.filter["ROI_start"][0] or
time > self._parent._settings.filter["ROI_finish"][0]):
return "unknown"
@staticmethod
def is_closing(robot_velocity: float, actual_velocity: float, actual_force: float, thresholds: dict) -> bool:
"""
Определяет, находится ли робот в стадии закрытия.
:param robot_velocity: Скорость робота.
:param actual_velocity: Фактическая скорость.
:param actual_force: Фактическое значение силы.
:param thresholds: Словарь пороговых значений.
:return: True, если условие стадии закрытия выполнено.
"""
return actual_velocity > thresholds['act_vel_min'] and abs(robot_velocity) < thresholds['rob_vel_thresh'] and actual_force < thresholds['act_force_close']
@staticmethod
def is_closing(rob_vel:float,
act_vel:float,
act_force:float,
th:dict) -> bool:
# act_vel > min, rob_vel ~ 0, act_force ниже порога
return act_vel > th['act_vel_min'] and abs(rob_vel) < th['rob_vel_thresh'] and act_force < th['act_force_close']
def is_squeeze(robot_velocity: float, actual_velocity: float, actual_force: float, force_rate: float, thresholds: dict) -> bool:
"""
Определяет, находится ли робот в стадии сжатия.
:param robot_velocity: Скорость робота.
:param actual_velocity: Фактическая скорость.
:param actual_force: Фактическое значение силы.
:param force_rate: Темп изменения силы.
:param thresholds: Словарь пороговых значений.
:return: True, если условие стадии сжатия выполнено.
"""
return abs(robot_velocity) < thresholds['rob_vel_thresh'] and actual_velocity < thresholds['act_vel_close'] and force_rate > thresholds['force_increase']
@staticmethod
def is_squeeze(rob_vel:float,
act_vel:float,
act_force:float,
force_rate:float,
th:dict) -> bool:
# rob_vel ~ 0, act_vel меньше, чем в closing, и резкий рост act_force
return abs(rob_vel) < th['rob_vel_thresh'] and act_vel < th['act_vel_close'] and force_rate > th['force_increase']
def is_welding(robot_velocity: float, actual_velocity: float, actual_force: float, thresholds: dict) -> bool:
"""
Определяет, находится ли робот в стадии сварки.
:param robot_velocity: Скорость робота.
:param actual_velocity: Фактическая скорость.
:param actual_force: Фактическое значение силы.
:param thresholds: Словарь пороговых значений.
:return: True, если условие стадии сварки выполнено.
"""
return abs(robot_velocity) < thresholds['rob_vel_thresh'] and abs(actual_velocity) < thresholds['act_vel_thresh'] and actual_force > thresholds['act_force_weld']
@staticmethod
def is_welding(rob_vel:float,
act_vel:float,
act_force:float,
th:dict) -> bool:
# обе скорости ≈ 0, act_force выше порога сварки
return abs(rob_vel) < th['rob_vel_thresh'] and abs(act_vel) < th['act_vel_thresh'] and act_force > th['act_force_weld']
def is_relief(actual_velocity: float, actual_position_diff: float, force_rate: float, thresholds: dict) -> bool:
"""
Определяет, находится ли робот в стадии снятия усилия (relief).
@staticmethod
def is_relief(rob_vel:float,
act_vel:float,
act_force:float,
force_rate:float,
th:dict) -> bool:
# резкое падение act_force, отрицательный act_vel, малые rob_vel
return force_rate < -th['force_decrease'] and act_vel < -th['act_vel_negative'] and abs(rob_vel) < th['rob_vel_thresh']
:param actual_velocity: Фактическая скорость.
:param actual_position_diff: Разница в позициях.
:param force_rate: Темп изменения силы.
:param thresholds: Словарь пороговых значений.
:return: True, если условие стадии снятия усилия выполнено.
"""
return force_rate < -thresholds['force_decrease'] and abs(actual_position_diff) < thresholds['act_pos_decrease'] and actual_velocity < -thresholds['act_vel_negative']
def detect_stages(self, df:pd.DataFrame):
def detect_stages(self, df: pd.DataFrame) -> List[Tuple[str, float, float]]:
"""
Детектирует стадии процесса по данным трассировки.
:param df: DataFrame с данными трассировки.
:return: Список кортежей (название стадии, время начала, время окончания).
"""
timestamps = df['time'].to_list()
n = len(df)
th = {key: item[0] for key, item in self._parent._settings.filter.items()}
# Вычисляем разностную производную силы
act_force = df["DriveMotorTorq_Act7"].values
force_diff = np.diff(act_force, prepend=act_force[0])
# Извлекаем пороговые значения из настроек
thresholds = {key: item[0] for key, item in self._parent._settings.filter.items()}
states = []
# Вычисляем производную силы и разницу позиций
actual_force = df["DriveMotorTorq_Act7"].values
force_diff = np.diff(actual_force, prepend=actual_force[0])
actual_position = df["DriveMotorPos_Act7"].values
position_diff = np.diff(actual_position, prepend=actual_position[0])
stages = []
current_state = "Oncomming"
state_start = timestamps[0]
# Проходим по всем записям
# Проходим по всем записям DataFrame
for i in range(n):
rob_vel = df.loc[i, "CartVel_Act"]
act_vel = df.loc[i, "DriveMotorVel_Act7"]
act_force_val = df.loc[i, "DriveMotorTorq_Act7"]
force_rate = force_diff[i]
robot_velocity = df.loc[i, "CartVel_Act"]
actual_velocity = df.loc[i, "DriveMotorVel_Act7"]
force_value = df.loc[i, "DriveMotorTorq_Act7"]
current_force_rate = force_diff[i]
current_position_diff = position_diff[i]
if current_state == "Oncomming":
if self.is_closing(rob_vel, act_vel, act_force_val, th):
if self.is_closing(robot_velocity, actual_velocity, force_value, thresholds):
state_end = timestamps[i]
states.append(("Oncomming", state_start, state_end))
stages.append(("Oncomming", state_start, state_end))
current_state = "Closing"
state_start = timestamps[i]
elif current_state == "Closing":
if self.is_squeeze(rob_vel, act_vel, act_force_val, force_rate, th):
if self.is_squeeze(robot_velocity, actual_velocity, force_value, current_force_rate, thresholds):
state_end = timestamps[i]
states.append(("Closing", state_start, state_end))
stages.append(("Closing", state_start, state_end))
current_state = "Squeeze"
state_start = timestamps[i]
elif current_state == "Squeeze":
if self.is_welding(rob_vel, act_vel, act_force_val, th):
if self.is_welding(robot_velocity, actual_velocity, force_value, thresholds):
state_end = timestamps[i]
states.append(("Squeeze", state_start, state_end))
stages.append(("Squeeze", state_start, state_end))
current_state = "Welding"
state_start = timestamps[i]
elif current_state == "Welding":
if self.is_relief(rob_vel, act_vel, act_force_val, force_rate, th):
if self.is_relief(actual_velocity, current_position_diff, current_force_rate, thresholds):
state_end = timestamps[i]
states.append(("Welding", state_start, state_end))
stages.append(("Welding", state_start, state_end))
current_state = "Relief"
state_start = timestamps[i]
elif current_state == "Relief":
# Когда признаки relief отпадают, считаем, что цикл завершён и возвращаемся в Oncomming
if not self.is_relief(rob_vel, act_vel, act_force_val, force_rate, th):
# Если признаки снятия усилия не соблюдаются, завершаем цикл и переходим в Oncomming
if not self.is_relief(actual_velocity, current_position_diff, current_force_rate, thresholds):
state_end = timestamps[i]
states.append(("Relief", state_start, state_end))
stages.append(("Relief", state_start, state_end))
current_state = "Oncomming"
state_start = timestamps[i]
# Фиксируем последний сегмент
states.append((current_state, state_start, timestamps[-1]))
return states
stages.append((current_state, state_start, timestamps[-1]))
return stages
class TextStageDetector(BaseTextStageDetector):
"""
Класс для детекции сварочных стадий на основе текстовых данных.
"""
def __init__(self, parent: TraceProcessor = None):
super().__init__(parent)
def detect_welding(self, data:list[KukaTXT]) -> list:
@staticmethod
def detect_welding(data: List[KukaTXT]) -> List[dict]:
"""
Детектирует этап сварки по текстовым сообщениям.
:param data: Список объектов KukaTXT.
:return: Список словарей с информацией о сварке (стадия, время начала и окончания).
"""
stages = []
for i in range(len(data)):
for i in range(len(data) - 1): # Предотвращаем выход за пределы списка
if data[i].func == " SPOT" and data[i].signal == " END":
stages.append({
"stage": "welding",
@ -291,114 +407,137 @@ class TextStageDetector(BaseTextStageDetector):
class TraceProcessor(BaseRawTraceProcessor):
"""
Основной класс для обработки трассировок.
Объединяет данные, детектирует стадии и инициирует рендеринг через медиатор.
"""
def __init__(self):
self._settings = Settings()
dataparser = KukaDataParser()
textparser = KukaTextParser()
data_detector = TraceStageDetector(self)
data_parser = KukaDataParser()
text_parser = KukaTextParser()
trace_detector = TraceStageDetector(self)
text_detector = TextStageDetector(self)
super().__init__(dataparser, textparser, data_detector, text_detector)
super().__init__(data_parser, text_parser, trace_detector, text_detector)
def prerender(self, data:list[str]) -> None:
def prerender(self, data: List[str]) -> None:
"""
Препроцессинг данных для отрисовки трейсов и обнаруженных событий.
:param data: Список путей к файлам данных.
"""
rendered_data = self._render_data(data)
self._mediator.prerender_TCW(self, rendered_data)
def final_render(self) -> None:
"""
Финальный рендеринг данных для отрисовки
трейсов клиента и расчета трейсов ТСК.
"""
events = self._detect_stages(self._trace_df, self._text_data)
dataframe = self._rename_df_columns(self._trace_df)
self._mediator.render_TCW(self, [dataframe, events])
renamed_df = self._rename_df_columns(self._trace_df)
part_position, max_open = self._detect_coords(renamed_df, events)
self._mediator.render_TCW(self, [renamed_df, events, [part_position, max_open]])
def _render_data(self, data:list[str]) -> list[pd.DataFrame, dict]:
def _detect_coords(self, dataframe: pd.DataFrame, events: dict) -> Tuple[List[float], List[float]]:
"""
Детектирует координаты на основе событий и данных.
:param dataframe: DataFrame с данными.
:param events: Словарь с событиями.
:return: Кортеж из списков: координаты детали и максимальное открытие электрода.
"""
weld_timings = events["Welding"]
oncommings = events["Oncomming"]
open_positions = []
part_positions = []
for i in range(len(weld_timings[0])):
weld_start = weld_timings[0][i]
weld_end = weld_timings[1][i]
onc_start = oncommings[0][i]
onc_end = oncommings[1][i]
pos_part = dataframe[(dataframe["time"] > weld_start) & (dataframe["time"] < weld_end)]["Electrode Position, mm ME"].mean()
part_positions.append(float(pos_part) / 1000)
pos_open = dataframe[(dataframe["time"] > onc_start) & (dataframe["time"] < onc_end)]["Electrode Position, mm ME"].abs().max()
open_positions.append(float(pos_open) / 1000)
return (part_positions, open_positions)
def _render_data(self, data: List[str]) -> List:
"""
Обрабатывает входные файлы данных и возвращает предварительно обработанные данные.
:param data: Список путей к файлам (первый для трассировки, второй для текстовых данных).
:return: Список, содержащий DataFrame и события.
"""
if data and len(data) == 2:
dat_filepath = data[0]
txt_filepath = data[1]
trace_filepath = data[0]
text_filepath = data[1]
elif data:
dat_filepath = data[0]
txt_filepath = None
trace_filepath = data[0]
text_filepath = None
else:
dat_filepath = None
txt_filepath = None
trace_filepath = None
text_filepath = None
self._trace_df = self._unpack_trace(dat_filepath)
self._text_data = self._unpack_text(txt_filepath)
self._trace_df = self._unpack_trace(trace_filepath)
self._text_data = self._unpack_text(text_filepath)
events = self._detect_stages(self._trace_df, self._text_data)
dataframe = self._rename_df_columns(self._trace_df)
return [dataframe, events]
renamed_df = self._rename_df_columns(self._trace_df)
return [renamed_df, events]
def update_settings(self, data:Settings) -> None:
self._settings = data
def update_settings(self, settings: Settings) -> None:
"""
Обновляет настройки обработки.
def _unpack_trace(self, dat_filepath:str = None) -> Optional[pd.DataFrame]:
if dat_filepath:
return self._dataparser.parse(dat_filepath)
:param settings: Объект настроек.
"""
self._settings = settings
def _unpack_trace(self, trace_filepath: str = None) -> Optional[pd.DataFrame]:
"""
Распаковывает трассировочные данные из файла.
:param trace_filepath: Путь к файлу трассировки.
:return: DataFrame с данными или None.
"""
if trace_filepath:
return self._dataparser.parse(trace_filepath)
return None
def _unpack_text(self, txt_filepath:str = None) -> Optional[list[KukaTXT]]:
if txt_filepath:
return self._textparser.parse(txt_filepath)
def _unpack_text(self, text_filepath: str = None) -> Optional[List[KukaTXT]]:
"""
Распаковывает текстовые данные из файла.
:param text_filepath: Путь к текстовому файлу.
:return: Список объектов KukaTXT или None.
"""
if text_filepath:
return self._textparser.parse(text_filepath)
return None
def _detect_stages(self, trace_df:pd.DataFrame, text_data:list) -> dict[list]:
def _detect_stages(self, trace_df: pd.DataFrame, text_data: List) -> Optional[dict]:
"""
Детектирует события на основе трассировочных и текстовых данных.
:param trace_df: DataFrame с трассировочными данными.
:param text_data: Список текстовых данных.
:return: Словарь с событиями или None.
"""
if trace_df is not None and text_data is not None:
trace_stages = self._data_detector.detect_stages(trace_df)
#welding_stages = self._text_detector.detect_welding(text_data)
#events = self._form_events(trace_stages, welding_stages)
events = self.__form_events_2(trace_stages)
events = self._form_events(trace_stages)
return events
return None
@staticmethod
def _normalize_events(events:dict[list]) -> dict[list]:
max_len = max([len(item_list[0]) if key != "Oncomming" else len(item_list[0])-1 for key, item_list in events.items() ])
for key, item_list in events.items():
list_len = len(item_list[0])
if list_len < max_len:
logger.warning(f"_normalize_events - Ошибка детекции событий {key}: ожидалось {max_len} событий, получено {list_len}")
for i in range (max_len - list_len):
events[key][0].append(0)
events[key][1].append(1)
return events
@staticmethod
def _form_events(trace_stages:list, welding_stages:list) -> dict[list]:
idx = 0
events = {
"Closing":[[],[]],
"Squeeze":[[],[]],
"Welding":[[],[]],
"Relief":[[],[]],
"Oncomming":[[],[]]
}
for i in range(len(trace_stages)-1):
if trace_stages[i]['stage'] == 'Squeeze&Welding&Relief':
if trace_stages[i]['end_time'] > welding_stages[idx]['start_time']:
def _form_events(trace_stages: list) -> dict:
"""
Формирует словарь событий на основе списка этапов трассировки.
events["Squeeze"][0].append(trace_stages[i]['start_time'])
events["Squeeze"][1].append(welding_stages[idx]['start_time'])
events["Welding"][0].append(welding_stages[idx]['start_time'])
events["Welding"][1].append(welding_stages[idx]['end_time'])
events["Relief"][0].append(welding_stages[idx]['end_time'])
events["Relief"][1].append(trace_stages[i+1]['start_time'])
idx += 1
else:
events["Squeeze"][0].append(trace_stages[i]['start_time'])
events["Squeeze"][1].append(trace_stages[i]['end_time'])
elif trace_stages[i]['stage'] == 'unknown':
pass
else:
events[trace_stages[i]['stage']][0].append(trace_stages[i]['start_time'])
events[trace_stages[i]['stage']][1].append(trace_stages[i]['end_time'])
normalized_events = TraceProcessor._normalize_events(events)
return normalized_events
@staticmethod
def __form_events_2(trace_stages:list):
:param trace_stages: Список этапов (кортежи: название, время начала, время окончания).
:return: Словарь с событиями.
"""
events = {
"Closing": [[], []],
"Squeeze": [[], []],
@ -407,13 +546,19 @@ class TraceProcessor(BaseRawTraceProcessor):
"Oncomming": [[], []]
}
for stage in trace_stages:
name, start_t, end_t = stage
events[name][0].append(start_t)
events[name][1].append(end_t)
name, start_time, end_time = stage
events[name][0].append(start_time)
events[name][1].append(end_time)
return events
@staticmethod
def _rename_df_columns(dataframe: pd.DataFrame) -> pd.DataFrame:
def _rename_df_columns(dataframe: pd.DataFrame) -> Optional[pd.DataFrame]:
"""
Переименовывает столбцы DataFrame на основе корректной карты соответствия.
:param dataframe: Исходный DataFrame.
:return: DataFrame с переименованными столбцами или None в случае ошибки.
"""
correct_mapping = {
"time": ["Time", "Timestamp"],
"Tool Coordinate, mm X": ["X_Act"],
@ -422,17 +567,17 @@ class TraceProcessor(BaseRawTraceProcessor):
"Electrode Force, N ME": ["DriveMotorTorq_Act7"],
"Electrode Position, mm ME": ["AxisPos_Act7"],
"Electrode Speed, mm ME": ["AxisVel_Act7"],
"Rotor Position, mm FE": ["DriveMotorPos_Act7"],
"Rotor Speed, mm/s ME": ["DriveMotorVel_Act7"],
"Rotor Position, deg FE": ["DriveMotorPos_Act7"],
"Rotor Speed, deg/s ME": ["DriveMotorVel_Act7"],
"Rotor Current, A ME": ["DriveMotorCurr_Act7"],
"Cartesian Tool Speed, mm/s ME": ["CartVel_Act"],
}
dataframe = dataframe.copy(deep=True)
try:
df_copy = dataframe.copy(deep=True)
working_mapping = {key: [item.lower() for item in items] for key, items in correct_mapping.items()}
try:
new_columns = {}
for col in dataframe.columns:
for col in df_copy.columns:
col_lower = col.lower()
for key, values in working_mapping.items():
if col_lower in values:
@ -440,10 +585,9 @@ class TraceProcessor(BaseRawTraceProcessor):
break
else:
new_columns[col] = col
dataframe.rename(columns=new_columns, inplace=True)
dataframe = dataframe.loc[:, ~dataframe.columns.duplicated()]
return dataframe
df_copy.rename(columns=new_columns, inplace=True)
df_copy = df_copy.loc[:, ~df_copy.columns.duplicated()]
return df_copy
except AttributeError as e:
logger.error(f"_rename_df_columns - AttributeError: Проверьте, что переданный объект является DataFrame. {e}")
return None
@ -451,7 +595,11 @@ class TraceProcessor(BaseRawTraceProcessor):
logger.error(f"_rename_df_columns - Непредвиденная ошибка: {e}")
return None
"Перемещение"
"""
Примеры комментариев для этапов процесса:
Перемещение:
# FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение
#
# ИЛИ
@ -459,74 +607,21 @@ class TraceProcessor(BaseRawTraceProcessor):
# FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками
# SIGNAL: BLENDING
"Смыкание и набор усилия"
#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода движения робота
# ... ...
Смыкание и набор усилия:
# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода с движением робота
# ...
# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение 0.5 мм роботом с движением электрода
#... ...
# ...
# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - остановка в позиции (может быть набор усилия?)
"Сварка"
Сварка:
# FUNCTION/PROCEDURE: SPOT - Начало сварочного процесса
# SIGNAL: START
#
# FUNCTION/PROCEDURE: SPOT - Конец сварочного процесса
# SIGNAL: END
"Снятие усилия и разъезд"
Снятие усилия и разъезд:
# FUNCTION/PROCEDURE: SGL_MoveToPos - Выход из контакта с точкой движением робота и электрода
# SIGNAL: START
#
"Перемещение"
#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение
#
# ИЛИ
#
#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками
#SIGNAL: BLENDING
if __name__ == '__main__':
roboreader = KukaDataParser()
txt_reader = KukaTextParser()
detector_traces = TraceStageDetector(region_of_focus=[7.7, 42])
detector_weldings = TextStageDetector()
path1 = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat")
path2 = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT")
data1 = roboreader.parse(path1)
data2 = txt_reader.parse(path2)
stages = detector_traces.detect_stages(data1)
weldings = detector_weldings.detect_welding(data2)
counter = 0
for st in stages:
print(st)
if st["stage"] == "force":
counter+=1
print(counter)
print("=========================================")
for we in weldings:
print (we)
"""
save = os.path.dirname(path) + "/sample.csv"
data.to_csv(save)
path = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT")
txt_parser = TXT_Parser()
txt_parser.parse(path)
print(len(txt_parser._datapacks))
"""