fix: заменил библиотеку intervaltree на pandas

This commit is contained in:
Andrew 2024-12-16 10:26:13 +03:00
parent 2d98c1c5b7
commit 647f137d33
4 changed files with 61 additions and 61 deletions

View File

@ -176,7 +176,7 @@
0.5
],
"object_thickness": [
0.0045,
0.0,
0.0045,
0.0045,
0.0045,
@ -192,7 +192,7 @@
0.0045
],
"force_target": [
4000.0,
5000.0,
5000.0,
5000.0,
5000.0,

View File

@ -42,7 +42,7 @@
1.0
],
"position_start_1": [
0.0175
0.02
],
"position_start_2": [
0.081

View File

@ -2,29 +2,17 @@ from typing import Optional
import os
import numpy as np
import pandas as pd
from intervaltree import Interval, IntervalTree
from roboter import RobotPerformance, TWC_Performance
from roboter import Performance
class PerformanceProcessor:
def calc_performance(self, path, df):
robot = RobotPerformance()
TWC = TWC_Performance()
point_tree, movement_tree, dialog_tree = robot.job(path)
closing_tree, squeeze_tree, relief_tree = TWC.job(df)
def calc_performance(self, path:str, TWC_raw:pd.DataFrame):
factory = Performance()
comm_df, rob_df, TWC_df = factory.job(path, TWC_raw)
dialog_inPoint = point_tree & dialog_tree
dialog_inMovement = movement_tree & dialog_tree
closing_inPoint = point_tree & closing_tree
closing_inMovement = movement_tree & closing_tree
squeeze_inPoint = point_tree & squeeze_tree
squeeze_inMovement = movement_tree & squeeze_tree
relief_inPoint = point_tree & relief_tree
relief_inMovement = movement_tree & relief_tree
@ -35,8 +23,3 @@ class PerformanceProcessor:
if __name__ == "__main__":
path = "D:\downloads\Test2\TeslaTIME29_71_KRCIO.dat"
robot = RobotPerformance()
#TWC = TWC_Performance()
result = robot.job(path)
print (result[0])
print (result[1])

View File

@ -5,7 +5,6 @@ import os
import numpy as np
import pandas as pd
from intervaltree import Interval, IntervalTree
class BasePerformanceFactory(ABC):
@ -108,74 +107,92 @@ class BaseProduct(ABC):
def _form_intervals(self,
start: pd.Series,
end: pd.Series) -> IntervalTree:
end: pd.Series) -> dict:
if len(start) != len(end):
for i in range(1, len(end)):
if end[i-1] > start[i]:
start = start.drop(i).reset_index(drop=True)
tree = IntervalTree(Interval(start[i], end[i], i) for i in range(len (start)))
return tree
intervals = {'start':start.tolist, 'end':end.tolist}
return intervals
@abstractmethod
def operation(self):
...
class RobotPerformance(BasePerformanceFactory):
class Performance(BasePerformanceFactory):
def factory_method(self) -> BaseProduct:
return RobotDF()
def robot_method(self) -> BaseProduct:
return RobotData()
def job(self, path) -> list[pd.DataFrame]:
product = self.factory_method()
def TWC_method(self) -> BaseProduct:
return TWC_Data()
def comm_method(self) -> BaseProduct:
return CommData()
def job(self, path:str, TWC_raw:pd.DataFrame) -> list[pd.DataFrame, list[pd.DataFrame], list[pd.DataFrame]]:
robot = self.robot_method()
TWC = self.TWC_method()
comm=self.comm_method()
dataframe = self._get_file_data(path)
rob_df = product.operation(dataframe)
return rob_df
class TWC_Performance(BasePerformanceFactory):
def factory_method(self) -> BaseProduct:
return TWC_DF()
def job(self, TWC_DF: pd.DataFrame) -> list[pd.DataFrame]:
product = self.factory_method()
result = product.operation(TWC_DF)
return result
rob_comm, rob_df = robot.operation(dataframe)
TWC_comm, TWC_df = TWC.operation(TWC_raw)
comm_df = comm.operation(rob_comm, TWC_comm)
return comm_df, rob_df, TWC_df
class RobotDF(BaseProduct):
class RobotData(BaseProduct):
def __init__(self):
self._signals = [
"$OUT3012",
"$IN3003",
"$OUT3003"
"$OUT3003",
"$OUT3244"
]
def operation(self, dataframe: pd.DataFrame) -> list[IntervalTree]:
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]:
events = self._find_events(dataframe, self._signals)
point_tree = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_tree = self._form_intervals(start=events["$OUT3003"]["fall"], end=events["$OUT3012"]["rise"])
dialog_tree = []
return point_tree, movement_tree, dialog_tree
communication_sig = {'sent':events["$OUT3244"]["fall"], 'received':events[""][""]}
point_interval = self._form_intervals(start=events["$OUT3012"]["rise"], end=events["$OUT3012"]["fall"])
movement_interval = self._form_intervals(start=events["$OUT3244"]["rise"], end=events["$OUT3244"]["fall"])
return communication_sig, pd.DataFrame({'in_point':point_interval,
'in_move':movement_interval})
class TWC_DF(BaseProduct):
class TWC_Data(BaseProduct):
def __init__(self):
self._signals = [
"Closing",
"Squeeze",
"Welding",
"Relief"
"Relief",
"Oncoming"
]
def operation(self, dataframe: pd.DataFrame) -> list[IntervalTree]:
def operation(self, dataframe: pd.DataFrame) -> list[dict, pd.DataFrame]:
events = self._find_events(dataframe, self._signals)
closing_tree = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_tree = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_tree = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
return closing_tree, squeeze_tree, relief_tree
communication_sig = {'sent':events[""][""], 'received':events[""][""]} #ситара что-то делает -конец сигнала
closing_interval = self._form_intervals(start=events["Closing"]["rise"], end=events["Closing"]["fall"])
squeeze_interval = self._form_intervals(start=events["Squeeze"]["rise"], end=events["Squeeze"]["fall"])
relief_interval = self._form_intervals(start=events["Relief"]["rise"], end=events["Relief"]["fall"])
oncoming_interval = self._form_intervals(start=events["Oncoming"]["rise"], end=events["Oncoming"]["fall"])
return communication_sig, pd.DataFrame({'in_closing':closing_interval,
'in_squeeze':squeeze_interval,
'in_relief':relief_interval,
'in_oncoming':oncoming_interval})
class CommData(BaseProduct):
"""
Определяет промежуток, в который происходит взаимодействие между нодами.
Подразумевается следующая структура: node: tuple(list, list)
node[0] - время отправки пакетов. node[1] - время приема пакетов.
"""
def operation(self, node1: dict, node2: dict) -> pd.DataFrame:
n1_to_n2 = self._form_intervals(start=node1['sent'], end=node2['received'])
n2_to_n1 = self._form_intervals(start=node2['sent'], end=node1['received'])
return pd.concat([pd.DataFrame(n1_to_n2), pd.DataFrame(n2_to_n1)])