From 6ac05f5e3de0aea15c23ebce3c9c1c36becdc8fd Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 11 Feb 2025 12:22:37 +0300 Subject: [PATCH] =?UTF-8?q?chore:=20=D0=B8=D0=B7=D0=BC=D0=B5=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=20=D0=BC=D0=BE=D0=B4=D1=83=D0=BB=D1=8C=20roboter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- params/plot_structure_params.json | 4 +- src/performance/roboter.py | 727 +++++++++++++++++------------- 2 files changed, 413 insertions(+), 318 deletions(-) diff --git a/params/plot_structure_params.json b/params/plot_structure_params.json index d312290..f0b4cf0 100644 --- a/params/plot_structure_params.json +++ b/params/plot_structure_params.json @@ -222,7 +222,7 @@ "pen": "g" }, { - "name": "Rotor Speed, mm/s ME", + "name": "Rotor Speed, deg/s ME", "pen": "b" }, { @@ -286,7 +286,7 @@ "pen": "g" }, { - "name": "Rotor Speed, mm/s ME", + "name": "Rotor Speed, deg/s ME", "pen": "b" }, { diff --git a/src/performance/roboter.py b/src/performance/roboter.py index 728b2fa..dac41a8 100644 --- a/src/performance/roboter.py +++ b/src/performance/roboter.py @@ -1,144 +1,223 @@ from __future__ import annotations import os -from typing import Tuple, Union, Optional +from typing import Tuple, Union, Optional, List from dataclasses import dataclass, field import numpy as np import pandas as pd from loguru import logger -from base.base import (BaseKukaDataParser, BaseKukaTextParser, - BaseTraceStageDetector, BaseTextStageDetector, - BaseRawTraceProcessor, KukaDataHead, - KukaTXT, Settings) +from base.base import ( + BaseKukaDataParser, BaseKukaTextParser, + BaseTraceStageDetector, BaseTextStageDetector, + BaseRawTraceProcessor, KukaDataHead, + KukaTXT, Settings +) class KukaDataParser(BaseKukaDataParser): + """ + Класс для парсинга данных Кука. - def parse(self, head_path: str) -> pd.DataFrame: - head = self._parse_dat_file(head_path) - body_path = os.path.join(os.path.dirname(head_path), head.filename) - dataframe = self._parse_r64_file(body_path, head) + Читает заголовочный файл (.dat) и соответствующий файл данных (.r64), + объединяет их в pandas.DataFrame. + """ + + def parse(self, head_filepath: str) -> pd.DataFrame: + """ + Основной метод парсинга. Читает заголовочный файл и файл данных. + + :param head_filepath: Путь к заголовочному файлу. + :return: DataFrame с объединёнными данными. + """ + header = self._parse_header_file(head_filepath) + body_filepath = os.path.join(os.path.dirname(head_filepath), header.filename) + dataframe = self._parse_body_file(body_filepath, header) return dataframe - def _parse_dat_file(self, path: str) -> KukaDataHead: - with open(path, 'r', encoding='cp1252') as file: - head = KukaDataHead(0, "", {}) - inside_channel = False - self._ch_name = None + def _parse_header_file(self, filepath: str) -> KukaDataHead: + """ + Парсит заголовочный файл (.dat) и извлекает информацию о каналах. + + :param filepath: Путь к заголовочному файлу. + :return: Объект KukaDataHead с информацией о данных. + """ + with open(filepath, 'r', encoding='cp1252') as file: + data_head = KukaDataHead(0, "", {}) + is_inside_channel = False + self._current_channel = None # Текущее название канала for line in file: line = line.strip() if line in ('#BEGINCHANNELHEADER', "#BEGINGLOBALHEADER"): - inside_channel = True - elif line in ('#ENDCHANNELHEADER' "#ENDGLOBALHEADER"): - inside_channel = False + is_inside_channel = True + elif line in ('#ENDCHANNELHEADER', "#ENDGLOBALHEADER"): + is_inside_channel = False else: - if inside_channel: - self._parse_head_file_line(line, head) - return head - - def _parse_head_file_line(self, line: str, head: KukaDataHead) -> None: - tag, data = line.split(',') + if is_inside_channel: + self._parse_header_line(line, data_head) + return data_head + + def _parse_header_line(self, line: str, data_head: KukaDataHead) -> None: + """ + Обрабатывает отдельную строку заголовочного файла и обновляет объект data_head. + + :param line: Строка из файла. + :param data_head: Объект KukaDataHead для обновления. + """ + tag, value = line.split(',') match tag: case '102': - head.rob_ID = data + data_head.rob_ID = value case '200': - self._ch_name = str(data) - head.channels[self._ch_name] = {} + self._current_channel = str(value) + data_head.channels[self._current_channel] = {} case '202': - head.channels[self._ch_name]['unit'] = str(data) + data_head.channels[self._current_channel]['unit'] = str(value) case '211': - head.filename = str(data) + data_head.filename = str(value) case '220': - head.channels[self._ch_name]['len'] = int(data) + data_head.channels[self._current_channel]['len'] = int(value) case '221': - head.channels[self._ch_name]['num'] = int(data) + data_head.channels[self._current_channel]['num'] = int(value) case '241': - head.channels[self._ch_name]['multiplyer'] = float(data) + # Переименовано в "multiplier" для повышения читаемости + data_head.channels[self._current_channel]['multiplier'] = float(value) - def _parse_r64_file(self, path: str, head: KukaDataHead) -> pd.DataFrame: - time_axis = self._build_time_axis(head) - mul, axes_names = self._extract_channels_data(head) - data_array = self._read_r64_file(path) + def _parse_body_file(self, filepath: str, data_head: KukaDataHead) -> pd.DataFrame: + """ + Парсит файл данных (.r64) и объединяет его с временной осью. + + :param filepath: Путь к файлу данных. + :param data_head: Заголовочная информация. + :return: DataFrame с данными. + """ + time_axis = self._build_time_axis(data_head) + multipliers, channel_names = self._extract_channels_data(data_head) + raw_data = self._read_r64_file(filepath) - if data_array.size % len(axes_names) != 0: - raise ValueError("Количество записей в {path} не кратно количеству найденных каналов ({ch_count})") + if raw_data.size % len(channel_names) != 0: + raise ValueError(f"Количество записей в {filepath} не кратно количеству найденных каналов ({len(channel_names)})") try: - axes = data_array.reshape(-1, len(axes_names)) + data_reshaped = raw_data.reshape(-1, len(channel_names)) except ValueError as e: - raise ValueError(f"Ошибка при изменении формы data_array: {e}") + raise ValueError(f"Ошибка при изменении формы raw_data: {e}") - dataframe = pd.concat([time_axis, pd.DataFrame(axes*mul, columns=axes_names)], axis=1) + # Применяем множители к данным каналов + df_data = pd.DataFrame(data_reshaped * multipliers, columns=channel_names) + dataframe = pd.concat([time_axis, df_data], axis=1) return dataframe - - @staticmethod - def _extract_channels_data(head: KukaDataHead) -> Tuple[np.ndarray, list]: - sorted_channels = sorted( - (item for item in head.channels.items() if item[0] != "Zeit"), - key=lambda item: item[1]['num'] - ) - mul = np.array([info['multiplyer'] for _, info in sorted_channels]) - names = [key for key, _ in sorted_channels] - return mul, names @staticmethod - def _build_time_axis(head: KukaDataHead) -> pd.Series: - len_timestamps = head.channels['Zeit']['len'] -1 - t_step = head.channels['Zeit']['multiplyer'] - time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step)) + def _extract_channels_data(data_head: KukaDataHead) -> Tuple[np.ndarray, List[str]]: + """ + Извлекает данные по каналам, сортируя их по номеру. + + :param data_head: Заголовочная информация. + :return: Кортеж из массива множителей и списка имён каналов. + """ + sorted_channels = sorted( + (item for item in data_head.channels.items() if item[0] != "Zeit"), + key=lambda item: item[1]['num'] + ) + multipliers = np.array([info['multiplier'] for _, info in sorted_channels]) + channel_names = [key for key, _ in sorted_channels] + return multipliers, channel_names + + @staticmethod + def _build_time_axis(data_head: KukaDataHead) -> pd.Series: + """ + Строит временную ось на основе информации из канала 'Zeit'. + + :param data_head: Заголовочная информация. + :return: Серия pandas с временными метками. + """ + num_timestamps = data_head.channels['Zeit']['len'] - 1 + time_step = data_head.channels['Zeit']['multiplier'] + time_axis = pd.Series(np.arange(0, num_timestamps * time_step, time_step)) time_axis.name = 'time' return time_axis @staticmethod - def _read_r64_file(path: str) -> np.ndarray[float]: - with open(path, 'rb') as file: + def _read_r64_file(filepath: str) -> np.ndarray: + """ + Считывает бинарный файл (.r64) и возвращает массив чисел. + + :param filepath: Путь к файлу. + :return: Массив numpy с типом float. + """ + with open(filepath, 'rb') as file: data = file.read() - floats = np.frombuffer(data, dtype=' list[KukaTXT]: - self._datapacks = [] - with open(path, 'r') as file: - datapack = KukaTXT() + def parse(self, filepath: str) -> List[KukaTXT]: + """ + Парсит текстовый файл и возвращает список объектов KukaTXT. + + :param filepath: Путь к текстовому файлу. + :return: Список объектов KukaTXT. + """ + self._data_packs = [] + with open(filepath, 'r') as file: + current_pack = KukaTXT() for line in file: line = line.strip() - datapack = self._check_msg_flow(line, datapack) - return self._datapacks + current_pack = self._process_message_flow(line, current_pack) + return self._data_packs - def _check_msg_flow(self, line:str, datapack:KukaTXT) -> KukaTXT: + def _process_message_flow(self, line: str, current_pack: KukaTXT) -> KukaTXT: + """ + Обрабатывает поток сообщений, определяя начало и конец блока. + + :param line: Строка из файла. + :param current_pack: Текущий объект KukaTXT. + :return: Обновлённый объект KukaTXT. + """ if line == "#BEGINMOTIONINFO": - self._in_msg = True - datapack = KukaTXT() + self._in_message = True + current_pack = KukaTXT() elif line == "#ENDMOTIONINFO": - self._in_msg = False - self._datapacks.append(datapack) - datapack = KukaTXT() - elif self._in_msg: - datapack = self._process_line(line, datapack) - return datapack + self._in_message = False + self._data_packs.append(current_pack) + current_pack = KukaTXT() + elif self._in_message: + current_pack = self._process_line(line, current_pack) + return current_pack - def _process_line(self, line:str, datapack:KukaTXT) -> KukaTXT: - tag, data = line.split(":") + def _process_line(self, line: str, current_pack: KukaTXT) -> KukaTXT: + """ + Обрабатывает отдельную строку внутри блока сообщения. + + :param line: Строка с данными. + :param current_pack: Текущий объект KukaTXT. + :return: Обновлённый объект KukaTXT. + """ + tag, value = line.split(":") match tag: case "TIME": - datapack.time = float(data) + current_pack.time = float(value) case "ENDTIME": - datapack.endtime = float(data) + current_pack.endtime = float(value) case "MODULE": pass case "FUNCTION/PROCEDURE": - datapack.func = data + current_pack.func = value case "TYPE": - datapack.type_ = data + current_pack.type_ = value case "SIGNAL": - datapack.signal = data + current_pack.signal = value case "LINE": pass case "POINT NAME": @@ -165,255 +244,321 @@ class KukaTextParser(BaseKukaTextParser): pass case "LOAD A3": pass - return datapack + return current_pack class TraceStageDetector(BaseTraceStageDetector): + """ + Класс для детекции этапов (стадий) на основе данных трассировки. - def __init__(self, parent:TraceProcessor = None): + Определяет переходы между различными стадиями процесса по пороговым значениям. + """ + def __init__(self, parent: TraceProcessor = None): super().__init__(parent) - def _mark_timestamp(self, df:pd.DataFrame, index:int, time:float) -> Union[str, None]: - # Не интересные значения помечаем как "unknown" - if (time < self._parent._settings.filter["ROI_start"][0] or - time > self._parent._settings.filter["ROI_finish"][0]): - return "unknown" + @staticmethod + def is_closing(robot_velocity: float, actual_velocity: float, actual_force: float, thresholds: dict) -> bool: + """ + Определяет, находится ли робот в стадии закрытия. + + :param robot_velocity: Скорость робота. + :param actual_velocity: Фактическая скорость. + :param actual_force: Фактическое значение силы. + :param thresholds: Словарь пороговых значений. + :return: True, если условие стадии закрытия выполнено. + """ + return actual_velocity > thresholds['act_vel_min'] and abs(robot_velocity) < thresholds['rob_vel_thresh'] and actual_force < thresholds['act_force_close'] @staticmethod - def is_closing(rob_vel:float, - act_vel:float, - act_force:float, - th:dict) -> bool: - # act_vel > min, rob_vel ~ 0, act_force ниже порога - return act_vel > th['act_vel_min'] and abs(rob_vel) < th['rob_vel_thresh'] and act_force < th['act_force_close'] + def is_squeeze(robot_velocity: float, actual_velocity: float, actual_force: float, force_rate: float, thresholds: dict) -> bool: + """ + Определяет, находится ли робот в стадии сжатия. + + :param robot_velocity: Скорость робота. + :param actual_velocity: Фактическая скорость. + :param actual_force: Фактическое значение силы. + :param force_rate: Темп изменения силы. + :param thresholds: Словарь пороговых значений. + :return: True, если условие стадии сжатия выполнено. + """ + return abs(robot_velocity) < thresholds['rob_vel_thresh'] and actual_velocity < thresholds['act_vel_close'] and force_rate > thresholds['force_increase'] @staticmethod - def is_squeeze(rob_vel:float, - act_vel:float, - act_force:float, - force_rate:float, - th:dict) -> bool: - # rob_vel ~ 0, act_vel меньше, чем в closing, и резкий рост act_force - return abs(rob_vel) < th['rob_vel_thresh'] and act_vel < th['act_vel_close'] and force_rate > th['force_increase'] + def is_welding(robot_velocity: float, actual_velocity: float, actual_force: float, thresholds: dict) -> bool: + """ + Определяет, находится ли робот в стадии сварки. + + :param robot_velocity: Скорость робота. + :param actual_velocity: Фактическая скорость. + :param actual_force: Фактическое значение силы. + :param thresholds: Словарь пороговых значений. + :return: True, если условие стадии сварки выполнено. + """ + return abs(robot_velocity) < thresholds['rob_vel_thresh'] and abs(actual_velocity) < thresholds['act_vel_thresh'] and actual_force > thresholds['act_force_weld'] @staticmethod - def is_welding(rob_vel:float, - act_vel:float, - act_force:float, - th:dict) -> bool: - # обе скорости ≈ 0, act_force выше порога сварки - return abs(rob_vel) < th['rob_vel_thresh'] and abs(act_vel) < th['act_vel_thresh'] and act_force > th['act_force_weld'] + def is_relief(actual_velocity: float, actual_position_diff: float, force_rate: float, thresholds: dict) -> bool: + """ + Определяет, находится ли робот в стадии снятия усилия (relief). + + :param actual_velocity: Фактическая скорость. + :param actual_position_diff: Разница в позициях. + :param force_rate: Темп изменения силы. + :param thresholds: Словарь пороговых значений. + :return: True, если условие стадии снятия усилия выполнено. + """ + return force_rate < -thresholds['force_decrease'] and abs(actual_position_diff) < thresholds['act_pos_decrease'] and actual_velocity < -thresholds['act_vel_negative'] - @staticmethod - def is_relief(rob_vel:float, - act_vel:float, - act_force:float, - force_rate:float, - th:dict) -> bool: - # резкое падение act_force, отрицательный act_vel, малые rob_vel - return force_rate < -th['force_decrease'] and act_vel < -th['act_vel_negative'] and abs(rob_vel) < th['rob_vel_thresh'] - - def detect_stages(self, df:pd.DataFrame): + def detect_stages(self, df: pd.DataFrame) -> List[Tuple[str, float, float]]: + """ + Детектирует стадии процесса по данным трассировки. + + :param df: DataFrame с данными трассировки. + :return: Список кортежей (название стадии, время начала, время окончания). + """ timestamps = df['time'].to_list() n = len(df) - th = {key: item[0] for key, item in self._parent._settings.filter.items()} - # Вычисляем разностную производную силы - act_force = df["DriveMotorTorq_Act7"].values - force_diff = np.diff(act_force, prepend=act_force[0]) + # Извлекаем пороговые значения из настроек + thresholds = {key: item[0] for key, item in self._parent._settings.filter.items()} - states = [] + # Вычисляем производную силы и разницу позиций + actual_force = df["DriveMotorTorq_Act7"].values + force_diff = np.diff(actual_force, prepend=actual_force[0]) + actual_position = df["DriveMotorPos_Act7"].values + position_diff = np.diff(actual_position, prepend=actual_position[0]) + + stages = [] current_state = "Oncomming" state_start = timestamps[0] - # Проходим по всем записям + # Проходим по всем записям DataFrame for i in range(n): - rob_vel = df.loc[i, "CartVel_Act"] - act_vel = df.loc[i, "DriveMotorVel_Act7"] - act_force_val = df.loc[i, "DriveMotorTorq_Act7"] - force_rate = force_diff[i] + robot_velocity = df.loc[i, "CartVel_Act"] + actual_velocity = df.loc[i, "DriveMotorVel_Act7"] + force_value = df.loc[i, "DriveMotorTorq_Act7"] + current_force_rate = force_diff[i] + current_position_diff = position_diff[i] if current_state == "Oncomming": - if self.is_closing(rob_vel, act_vel, act_force_val, th): + if self.is_closing(robot_velocity, actual_velocity, force_value, thresholds): state_end = timestamps[i] - states.append(("Oncomming", state_start, state_end)) + stages.append(("Oncomming", state_start, state_end)) current_state = "Closing" state_start = timestamps[i] elif current_state == "Closing": - if self.is_squeeze(rob_vel, act_vel, act_force_val, force_rate, th): + if self.is_squeeze(robot_velocity, actual_velocity, force_value, current_force_rate, thresholds): state_end = timestamps[i] - states.append(("Closing", state_start, state_end)) + stages.append(("Closing", state_start, state_end)) current_state = "Squeeze" state_start = timestamps[i] elif current_state == "Squeeze": - if self.is_welding(rob_vel, act_vel, act_force_val, th): + if self.is_welding(robot_velocity, actual_velocity, force_value, thresholds): state_end = timestamps[i] - states.append(("Squeeze", state_start, state_end)) + stages.append(("Squeeze", state_start, state_end)) current_state = "Welding" state_start = timestamps[i] elif current_state == "Welding": - if self.is_relief(rob_vel, act_vel, act_force_val, force_rate, th): + if self.is_relief(actual_velocity, current_position_diff, current_force_rate, thresholds): state_end = timestamps[i] - states.append(("Welding", state_start, state_end)) + stages.append(("Welding", state_start, state_end)) current_state = "Relief" state_start = timestamps[i] elif current_state == "Relief": - # Когда признаки relief отпадают, считаем, что цикл завершён и возвращаемся в Oncomming - if not self.is_relief(rob_vel, act_vel, act_force_val, force_rate, th): + # Если признаки снятия усилия не соблюдаются, завершаем цикл и переходим в Oncomming + if not self.is_relief(actual_velocity, current_position_diff, current_force_rate, thresholds): state_end = timestamps[i] - states.append(("Relief", state_start, state_end)) + stages.append(("Relief", state_start, state_end)) current_state = "Oncomming" state_start = timestamps[i] # Фиксируем последний сегмент - states.append((current_state, state_start, timestamps[-1])) - return states + stages.append((current_state, state_start, timestamps[-1])) + return stages class TextStageDetector(BaseTextStageDetector): - - def __init__(self, parent :TraceProcessor = None): + """ + Класс для детекции сварочных стадий на основе текстовых данных. + """ + def __init__(self, parent: TraceProcessor = None): super().__init__(parent) - def detect_welding(self, data:list[KukaTXT]) -> list: + @staticmethod + def detect_welding(data: List[KukaTXT]) -> List[dict]: + """ + Детектирует этап сварки по текстовым сообщениям. + + :param data: Список объектов KukaTXT. + :return: Список словарей с информацией о сварке (стадия, время начала и окончания). + """ stages = [] - for i in range(len(data)): + for i in range(len(data) - 1): # Предотвращаем выход за пределы списка if data[i].func == " SPOT" and data[i].signal == " END": stages.append({ "stage": "welding", "start_time": data[i].time, - "end_time": data[i+1].time + "end_time": data[i + 1].time }) return stages class TraceProcessor(BaseRawTraceProcessor): - + """ + Основной класс для обработки трассировок. + + Объединяет данные, детектирует стадии и инициирует рендеринг через медиатор. + """ def __init__(self): self._settings = Settings() - dataparser = KukaDataParser() - textparser = KukaTextParser() - data_detector = TraceStageDetector(self) + data_parser = KukaDataParser() + text_parser = KukaTextParser() + trace_detector = TraceStageDetector(self) text_detector = TextStageDetector(self) - super().__init__(dataparser, textparser, data_detector, text_detector) + super().__init__(data_parser, text_parser, trace_detector, text_detector) - def prerender(self, data:list[str]) -> None: + def prerender(self, data: List[str]) -> None: + """ + Препроцессинг данных для отрисовки трейсов и обнаруженных событий. + :param data: Список путей к файлам данных. + """ rendered_data = self._render_data(data) self._mediator.prerender_TCW(self, rendered_data) def final_render(self) -> None: - + """ + Финальный рендеринг данных для отрисовки + трейсов клиента и расчета трейсов ТСК. + """ events = self._detect_stages(self._trace_df, self._text_data) - dataframe = self._rename_df_columns(self._trace_df) - self._mediator.render_TCW(self, [dataframe, events]) + renamed_df = self._rename_df_columns(self._trace_df) + part_position, max_open = self._detect_coords(renamed_df, events) + self._mediator.render_TCW(self, [renamed_df, events, [part_position, max_open]]) - def _render_data(self, data:list[str]) -> list[pd.DataFrame, dict]: + def _detect_coords(self, dataframe: pd.DataFrame, events: dict) -> Tuple[List[float], List[float]]: + """ + Детектирует координаты на основе событий и данных. + + :param dataframe: DataFrame с данными. + :param events: Словарь с событиями. + :return: Кортеж из списков: координаты детали и максимальное открытие электрода. + """ + weld_timings = events["Welding"] + oncommings = events["Oncomming"] + open_positions = [] + part_positions = [] + for i in range(len(weld_timings[0])): + weld_start = weld_timings[0][i] + weld_end = weld_timings[1][i] + onc_start = oncommings[0][i] + onc_end = oncommings[1][i] + pos_part = dataframe[(dataframe["time"] > weld_start) & (dataframe["time"] < weld_end)]["Electrode Position, mm ME"].mean() + part_positions.append(float(pos_part) / 1000) + pos_open = dataframe[(dataframe["time"] > onc_start) & (dataframe["time"] < onc_end)]["Electrode Position, mm ME"].abs().max() + open_positions.append(float(pos_open) / 1000) + return (part_positions, open_positions) + def _render_data(self, data: List[str]) -> List: + """ + Обрабатывает входные файлы данных и возвращает предварительно обработанные данные. + + :param data: Список путей к файлам (первый для трассировки, второй для текстовых данных). + :return: Список, содержащий DataFrame и события. + """ if data and len(data) == 2: - dat_filepath = data[0] - txt_filepath = data[1] + trace_filepath = data[0] + text_filepath = data[1] elif data: - dat_filepath = data[0] - txt_filepath = None + trace_filepath = data[0] + text_filepath = None else: - dat_filepath = None - txt_filepath = None + trace_filepath = None + text_filepath = None - self._trace_df = self._unpack_trace(dat_filepath) - self._text_data = self._unpack_text(txt_filepath) + self._trace_df = self._unpack_trace(trace_filepath) + self._text_data = self._unpack_text(text_filepath) events = self._detect_stages(self._trace_df, self._text_data) - dataframe = self._rename_df_columns(self._trace_df) - return [dataframe, events] + renamed_df = self._rename_df_columns(self._trace_df) + return [renamed_df, events] - def update_settings(self, data:Settings) -> None: - self._settings = data + def update_settings(self, settings: Settings) -> None: + """ + Обновляет настройки обработки. + + :param settings: Объект настроек. + """ + self._settings = settings - def _unpack_trace(self, dat_filepath:str = None) -> Optional[pd.DataFrame]: - if dat_filepath: - return self._dataparser.parse(dat_filepath) + def _unpack_trace(self, trace_filepath: str = None) -> Optional[pd.DataFrame]: + """ + Распаковывает трассировочные данные из файла. + + :param trace_filepath: Путь к файлу трассировки. + :return: DataFrame с данными или None. + """ + if trace_filepath: + return self._dataparser.parse(trace_filepath) return None - def _unpack_text(self, txt_filepath:str = None) -> Optional[list[KukaTXT]]: - if txt_filepath: - return self._textparser.parse(txt_filepath) + def _unpack_text(self, text_filepath: str = None) -> Optional[List[KukaTXT]]: + """ + Распаковывает текстовые данные из файла. + + :param text_filepath: Путь к текстовому файлу. + :return: Список объектов KukaTXT или None. + """ + if text_filepath: + return self._textparser.parse(text_filepath) return None - def _detect_stages(self, trace_df:pd.DataFrame, text_data:list) -> dict[list]: + def _detect_stages(self, trace_df: pd.DataFrame, text_data: List) -> Optional[dict]: + """ + Детектирует события на основе трассировочных и текстовых данных. + + :param trace_df: DataFrame с трассировочными данными. + :param text_data: Список текстовых данных. + :return: Словарь с событиями или None. + """ if trace_df is not None and text_data is not None: trace_stages = self._data_detector.detect_stages(trace_df) - #welding_stages = self._text_detector.detect_welding(text_data) - #events = self._form_events(trace_stages, welding_stages) - events = self.__form_events_2(trace_stages) + events = self._form_events(trace_stages) return events return None @staticmethod - def _normalize_events(events:dict[list]) -> dict[list]: - max_len = max([len(item_list[0]) if key != "Oncomming" else len(item_list[0])-1 for key, item_list in events.items() ]) - for key, item_list in events.items(): - list_len = len(item_list[0]) - if list_len < max_len: - logger.warning(f"_normalize_events - Ошибка детекции событий {key}: ожидалось {max_len} событий, получено {list_len}") - for i in range (max_len - list_len): - events[key][0].append(0) - events[key][1].append(1) - return events - @staticmethod - def _form_events(trace_stages:list, welding_stages:list) -> dict[list]: - idx = 0 + def _form_events(trace_stages: list) -> dict: + """ + Формирует словарь событий на основе списка этапов трассировки. + + :param trace_stages: Список этапов (кортежи: название, время начала, время окончания). + :return: Словарь с событиями. + """ events = { - "Closing":[[],[]], - "Squeeze":[[],[]], - "Welding":[[],[]], - "Relief":[[],[]], - "Oncomming":[[],[]] - } - for i in range(len(trace_stages)-1): - if trace_stages[i]['stage'] == 'Squeeze&Welding&Relief': - if trace_stages[i]['end_time'] > welding_stages[idx]['start_time']: - - events["Squeeze"][0].append(trace_stages[i]['start_time']) - events["Squeeze"][1].append(welding_stages[idx]['start_time']) - - events["Welding"][0].append(welding_stages[idx]['start_time']) - events["Welding"][1].append(welding_stages[idx]['end_time']) - - events["Relief"][0].append(welding_stages[idx]['end_time']) - events["Relief"][1].append(trace_stages[i+1]['start_time']) - idx += 1 - else: - events["Squeeze"][0].append(trace_stages[i]['start_time']) - events["Squeeze"][1].append(trace_stages[i]['end_time']) - elif trace_stages[i]['stage'] == 'unknown': - pass - else: - events[trace_stages[i]['stage']][0].append(trace_stages[i]['start_time']) - events[trace_stages[i]['stage']][1].append(trace_stages[i]['end_time']) - normalized_events = TraceProcessor._normalize_events(events) - return normalized_events - - @staticmethod - def __form_events_2(trace_stages:list): - events = { - "Closing":[[],[]], - "Squeeze":[[],[]], - "Welding":[[],[]], - "Relief":[[],[]], - "Oncomming":[[],[]] - } + "Closing": [[], []], + "Squeeze": [[], []], + "Welding": [[], []], + "Relief": [[], []], + "Oncomming": [[], []] + } for stage in trace_stages: - name, start_t, end_t = stage - events[name][0].append(start_t) - events[name][1].append(end_t) + name, start_time, end_time = stage + events[name][0].append(start_time) + events[name][1].append(end_time) return events @staticmethod - def _rename_df_columns(dataframe: pd.DataFrame) -> pd.DataFrame: + def _rename_df_columns(dataframe: pd.DataFrame) -> Optional[pd.DataFrame]: + """ + Переименовывает столбцы DataFrame на основе корректной карты соответствия. + + :param dataframe: Исходный DataFrame. + :return: DataFrame с переименованными столбцами или None в случае ошибки. + """ correct_mapping = { "time": ["Time", "Timestamp"], "Tool Coordinate, mm X": ["X_Act"], @@ -422,17 +567,17 @@ class TraceProcessor(BaseRawTraceProcessor): "Electrode Force, N ME": ["DriveMotorTorq_Act7"], "Electrode Position, mm ME": ["AxisPos_Act7"], "Electrode Speed, mm ME": ["AxisVel_Act7"], - "Rotor Position, mm FE": ["DriveMotorPos_Act7"], - "Rotor Speed, mm/s ME": ["DriveMotorVel_Act7"], + "Rotor Position, deg FE": ["DriveMotorPos_Act7"], + "Rotor Speed, deg/s ME": ["DriveMotorVel_Act7"], "Rotor Current, A ME": ["DriveMotorCurr_Act7"], "Cartesian Tool Speed, mm/s ME": ["CartVel_Act"], } - dataframe = dataframe.copy(deep=True) - working_mapping = {key: [item.lower() for item in items] for key, items in correct_mapping.items()} - try: + df_copy = dataframe.copy(deep=True) + working_mapping = {key: [item.lower() for item in items] for key, items in correct_mapping.items()} + new_columns = {} - for col in dataframe.columns: + for col in df_copy.columns: col_lower = col.lower() for key, values in working_mapping.items(): if col_lower in values: @@ -440,10 +585,9 @@ class TraceProcessor(BaseRawTraceProcessor): break else: new_columns[col] = col - - dataframe.rename(columns=new_columns, inplace=True) - dataframe = dataframe.loc[:, ~dataframe.columns.duplicated()] - return dataframe + df_copy.rename(columns=new_columns, inplace=True) + df_copy = df_copy.loc[:, ~df_copy.columns.duplicated()] + return df_copy except AttributeError as e: logger.error(f"_rename_df_columns - AttributeError: Проверьте, что переданный объект является DataFrame. {e}") return None @@ -451,82 +595,33 @@ class TraceProcessor(BaseRawTraceProcessor): logger.error(f"_rename_df_columns - Непредвиденная ошибка: {e}") return None -"Перемещение" -#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение + +""" +Примеры комментариев для этапов процесса: + +Перемещение: +# FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение # # ИЛИ # -#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками -#SIGNAL: BLENDING +# FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками +# SIGNAL: BLENDING -"Смыкание и набор усилия" -#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода движения робота -# ... ... -#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение 0.5 мм роботом с движением электрода -#... ... -#FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - остановка в позиции (может быть набор усилия?) +Смыкание и набор усилия: +# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение электрода с движением робота +# ... +# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - перемещение 0.5 мм роботом с движением электрода +# ... +# FUNCTION/PROCEDURE: SGM_MOVE_TO_FORCE - остановка в позиции (может быть набор усилия?) -"Сварка" -#FUNCTION/PROCEDURE: SPOT - Начало сварочного процесса -#SIGNAL: START +Сварка: +# FUNCTION/PROCEDURE: SPOT - Начало сварочного процесса +# SIGNAL: START # -#FUNCTION/PROCEDURE: SPOT - Конец сварочного процесса -#SIGNAL: END - -"Снятие усилия и разъезд" -#FUNCTION/PROCEDURE: SGL_MoveToPos - Выход из контакта с точкой движением робота и электрода -#SIGNAL: START -# - -"Перемещение" -#FUNCTION/PROCEDURE: SW_RSP030TL01_SN - какое-то перемещение -# -# ИЛИ -# -#FUNCTION/PROCEDURE: SGL_MoveToPos - перемещение между точками -#SIGNAL: BLENDING - -if __name__ == '__main__': - - roboreader = KukaDataParser() - txt_reader = KukaTextParser() - detector_traces = TraceStageDetector(region_of_focus=[7.7, 42]) - detector_weldings = TextStageDetector() - - path1 = os.path.abspath("trace_samples/teslaSP_VelTCP_KRCIpo.dat") - path2 = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT") - - data1 = roboreader.parse(path1) - data2 = txt_reader.parse(path2) - - stages = detector_traces.detect_stages(data1) - weldings = detector_weldings.detect_welding(data2) - - counter = 0 - for st in stages: - print(st) - if st["stage"] == "force": - counter+=1 - print(counter) - print("=========================================") - for we in weldings: - print (we) - - - - """ - save = os.path.dirname(path) + "/sample.csv" - data.to_csv(save) - path = os.path.abspath("trace_samples/teslaSP_VelTCP_PROG.TXT") - txt_parser = TXT_Parser() - txt_parser.parse(path) - print(len(txt_parser._datapacks)) - """ - - - - - - - +# FUNCTION/PROCEDURE: SPOT - Конец сварочного процесса +# SIGNAL: END +Снятие усилия и разъезд: +# FUNCTION/PROCEDURE: SGL_MoveToPos - Выход из контакта с точкой движением робота и электрода +# SIGNAL: START +"""