dev: добавлен парсер r64 файлов

This commit is contained in:
Andrew 2024-12-02 18:41:02 +03:00
parent 40767d813a
commit 10e7cf2a37
4 changed files with 192 additions and 1 deletions

Binary file not shown.

View File

@ -0,0 +1,23 @@
from typing import Optional
import os
import numpy as np
import pandas as pd
from roboter import RobotPerformance, TWC_Performance
class PerformanceProcessor:
def calc_performance(self, path, df):
robot = RobotPerformance()
TWC = TWC_Performance()
rob_df = robot.job(path)
TWC_df = TWC.job(df)
if __name__ == "__main__":
path = "D:\downloads\Test2\TeslaTIME29_71_KRCIO.dat"
robot = RobotPerformance()
#TWC = TWC_Performance()
result = robot.job(path)
print (result)

166
src/performance/roboter.py Normal file
View File

@ -0,0 +1,166 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
import os
import numpy as np
import pandas as pd
class BasePerformanceFactory(ABC):
@abstractmethod
def factory_method(self):
...
def job(self):
...
def _get_file_data(self, path) -> pd.DataFrame:
head, file = self._dat_parser(path)
self.dat_name = file[:-4]
path_r64 = os.path.dirname(path) + '\\' + file
time_axis, dataframe = self._r64_parser(path_r64, head)
dataframe = pd.concat([dataframe, time_axis], axis=1)
return dataframe
def _dat_parser(self, path: str) -> list[dict, str]:
with open(path, 'r') as file:
head = {'channels': 0}
inside_channel = False
channels = 0
for line in file:
line = line.strip()
if line == '#BEGINCHANNELHEADER' or line == "#BEGINGLOBALHEADER":
inside_channel = True
elif line == '#ENDCHANNELHEADER' or line == "#ENDGLOBALHEADER":
inside_channel = False
pass
# Формирование словаря
elif inside_channel:
_, data = line.split(',')
match _:
case '102':
head['rob_id'] = data
case '200':
ch_name = data
if ch_name != 'Zeit': channels +=1
head[ch_name] = {}
case '202':
head[ch_name]['unit'] = data
case '211':
file = data
case '220':
head[ch_name]['len'] = int(data)
case '221':
head[ch_name]['num'] = int(data)
case '241':
head[ch_name]['multiplyer'] = float(data)
head['channels'] = int(channels)
return head, file
def _r64_parser(self, path: str, head: dict) -> Optional[list[pd.Series, pd.DataFrame]]:
ch = head['channels']
keys = list(head.keys())[-ch:]
len_timestamps = head['Zeit']['len']
t_step = head['Zeit']['multiplyer']
time_axis = pd.Series(np.arange(0, len_timestamps*t_step, t_step))
time_axis.name = 'time'
dataframe = pd.DataFrame({})
with open(path, 'rb') as file:
data = file.read()
floats = np.frombuffer(data, dtype='<d') # Little-endian double
for key in keys:
step = head[key]['num']-1
result = pd.Series(np.array(floats[step::ch])* head[key]['multiplyer'])
result.name = key
dataframe = pd.concat([dataframe, result], axis=1)
return time_axis, dataframe
class BaseProduct(ABC):
def __init__(self):
self._signals = []
def _find_indexes(self,
signal: str,
dataframe: pd.DataFrame) -> list[list[float], list[float]]:
stage_diff = np.diff(dataframe[signal])
start_idx = np.where(stage_diff == 1)
finish_idx = np.where(stage_diff == -1)
return start_idx, finish_idx
def operation(self, dataframe: pd.DataFrame) -> pd.DataFrame:
all_idx = np.array([])
for signal in self._signals:
start_idx, finish_idx = np.array(self._find_indexes(signal, dataframe))
all_idx = np.hstack([all_idx, start_idx[0], finish_idx[0]])
all_idx = np.sort(np.array(all_idx, dtype="int64"))
result = dataframe.loc[all_idx, self._signals + ["time"]]
return result
class RobotPerformance(BasePerformanceFactory):
def factory_method(self) -> BaseProduct:
return RobotDF()
def job(self, path) -> pd.DataFrame:
product = self.factory_method()
dataframe = self._get_file_data(path)
rob_df = product.operation(dataframe)
in_point, in_moving, in_dialog = False, False, False
for index, row in rob_df.iterrows():
if row["$OUT3012"] == 1:
if not in_point:
in_point = True
start_point = row["time"]
else:
in_point = False
time_in_point = row["time"] - start_point
return rob_df
class TWC_Performance(BasePerformanceFactory):
def factory_method(self) -> BaseProduct:
return TWC_DF()
def job(self, TWC_DF: pd.DataFrame) -> pd.DataFrame:
product = self.factory_method()
result = product.operation(TWC_DF)
return result
class RobotDF(BaseProduct):
def __init__(self):
self._signals = [
"$OUT3012",
"$IN3003",
"$OUT3003"
]
class TWC_DF(BaseProduct):
def __init__(self):
self._signals = [
"Closing",
"Squeeze",
"Welding",
"Relief"
]

View File

@ -336,3 +336,5 @@ class BaseMainWindow(QWidget):
font-family: "Segoe UI", sans-serif; font-family: "Segoe UI", sans-serif;
} }
""") """)