From ae77335ef981078796b2aec096756a2280c57c22 Mon Sep 17 00:00:00 2001 From: Pimpest <82343504+pimpest@users.noreply.github.com> Date: Thu, 16 Apr 2026 06:36:04 +0200 Subject: [PATCH] New mechanism code for going in same direction --- .../toid_interaction/interaction_node.py | 16 +- .../mechanism/STservo_sdk/sts.py | 4 + .../toid_interaction/mechanism/motori.py | 83 +++++++++ .../toid_interaction/mechanism/zidovi.py | 167 ++++++++++++++++++ 4 files changed, 262 insertions(+), 8 deletions(-) create mode 100644 toid_interaction/toid_interaction/mechanism/motori.py create mode 100644 toid_interaction/toid_interaction/mechanism/zidovi.py diff --git a/toid_interaction/toid_interaction/interaction_node.py b/toid_interaction/toid_interaction/interaction_node.py index f3c225d..c496f07 100644 --- a/toid_interaction/toid_interaction/interaction_node.py +++ b/toid_interaction/toid_interaction/interaction_node.py @@ -10,6 +10,7 @@ from serial.tools import list_ports from toid_interaction.mechanism.sekvenca_2026 import okreni, okreni_niz from toid_interaction.mechanism.zidovi_load import ZidoviAction +from toid_interaction.mechanism.zidovi import Zidovi from toid_interaction.mechanism.zupcanik import ZupcanikAction from toid_msgs.action import EmptyAction from toid_msgs.srv import SendString @@ -69,14 +70,14 @@ class InteracitionNode(Node): def sequence2_cb(self, request: SendString.Request, response: SendString.Response): if self.step != 1: return Empty.Response() - zidovi = ZidoviAction(self.st_motor_device_name) - zidovi.beli_zid(1) - zidovi.plavi_zid(1) + + zidovi = Zidovi(self.st_motor_device_name) + + zidovi.zidovi(0, 1500, 600, 300, 100, 120) okreni_niz(request.text) - zidovi.plavi_zid(0, TargetPos=150) - zidovi.beli_zid(0, TargetPos=450) + zidovi.zidovi(0, 1500, 450, 150, 100, 120) self.step = 2 return response @@ -85,11 +86,10 @@ class InteracitionNode(Node): return Empty.Response() zupcanik = ZupcanikAction(self.st_motor_device_name) - zidovi = ZidoviAction(self.st_motor_device_name) + zidovi = Zidovi(self.st_motor_device_name) zupcanik.zupcanik(1, 1010, 25) - zidovi.plavi_zid(0, TargetPos=150) - zidovi.beli_zid(0, TargetPos=150) + zidovi.zidovi(0, 1500, 150, 150, 100, 120) okreni(5) self.step = 0 return response diff --git a/toid_interaction/toid_interaction/mechanism/STservo_sdk/sts.py b/toid_interaction/toid_interaction/mechanism/STservo_sdk/sts.py index 3123996..4941c79 100644 --- a/toid_interaction/toid_interaction/mechanism/STservo_sdk/sts.py +++ b/toid_interaction/toid_interaction/mechanism/STservo_sdk/sts.py @@ -75,6 +75,10 @@ class sts(protocol_packet_handler): def ReadLoad(self, sts_id): sts_present_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_PRESENT_LOAD_L) # 60 return self.sts_tohost(sts_present_load, 15), sts_comm_result, sts_error + + def ReadCurrent(self, sts_id): + sts_present_current, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_PRESENT_CURRENT_L) # 69 + return self.sts_tohost(sts_present_current, 15), sts_comm_result, sts_error def ReadMaxLoad(self, sts_id): sts_max_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_LOAD_LIMIT) # 0x36 diff --git a/toid_interaction/toid_interaction/mechanism/motori.py b/toid_interaction/toid_interaction/mechanism/motori.py new file mode 100644 index 0000000..2e3f05c --- /dev/null +++ b/toid_interaction/toid_interaction/mechanism/motori.py @@ -0,0 +1,83 @@ +from .STservo_sdk.stservo_def import COMM_SUCCESS +from .STservo_sdk import sts + +STS_MOVING_ACC = 0 + +def get_load(packetHandler : sts, id_motora: int) -> float: + sts_present_load, _, _ = packetHandler.ReadLoad(id_motora) + return (sts_present_load & ((1 << 10) - 1)) * 0.1 + +def get_current(packetHandler : sts, id_motora: int) -> float: + sts_present_current, _, _ = packetHandler.ReadCurrent(id_motora) + return (sts_present_current * 6.5) + +def stop_motor(packetHandler : sts, id_motora: int): + packetHandler.WriteSpec(id_motora, 0, 0) + +def print_motor_error(packetHandler : sts, sts_com_result, sts_error, id_motora): + if sts_com_result != COMM_SUCCESS: + print("[ID:%d] %s" % (id_motora, packetHandler.getTxRxResult(sts_com_result))) + if sts_error != 0: + print("[ID:%d] %s" % (id_motora, packetHandler.getRxPacketError(sts_error))) + +def get_motor_pos_speed(packetHandler : sts, id_motora: int) -> tuple[float, float]: + sts_present_position, sts_present_speed, sts_com_result, sts_error = packetHandler.ReadPosSpeed(id_motora) + print_motor_error(packetHandler, sts_com_result, sts_error, id_motora) + + return sts_present_position, sts_present_speed + +def set_motor_speed(packetHandler : sts, id_motora: int, speed: float) -> tuple[float, float]: + sts_com_result, sts_error = packetHandler.WriteSpec(id_motora, speed, STS_MOVING_ACC) + print_motor_error(packetHandler, sts_com_result, sts_error, id_motora) + +def normalize_dpos(curr: float, prev: float) -> float: + if abs(curr - prev) > 3000: + return 4096 - abs(curr - prev) + else: + return abs(curr - prev) + +class CrniMotor: + def __init__(self, packetHandler : sts, id_motora: int): + self.packetHandler = packetHandler + self.id_motora = id_motora + self.targret_reached = False + self.brzina = 0 + + + sts_comm_result, sts_error = packetHandler.WheelMode(id_motora) + if sts_comm_result != COMM_SUCCESS: + print("[ID:%01d] %s" % (id_motora, packetHandler.getTxRxResult(sts_comm_result))) + elif sts_error != 0: + print("[ID:%01d] %s" % (id_motora, packetHandler.getRxPacketError(sts_error))) + + sts_present_position, sts_present_speed = get_motor_pos_speed(packetHandler, id_motora) + self.trenutna_pos = sts_present_position + self.prosla_pos = sts_present_position + self.predjeni_put = 0 + + packetHandler.SetMaxLoad(id_motora, 80) + + + def doCycle(self, target_pos: float, brzina: float) -> bool | None: + if not self.targret_reached: + sts_present_position, sts_present_speed = get_motor_pos_speed(self.packetHandler, self.id_motora) + self.trenutna_pos = sts_present_position + self.predjeni_put += normalize_dpos(self.trenutna_pos, self.prosla_pos) + self.prosla_pos = self.trenutna_pos + + if self.predjeni_put >= target_pos: + self.stop() + self.targret_reached = True + else: + set_motor_speed(self.packetHandler, self.id_motora, brzina) + self.brzina = brzina + + # TODO: if overload stop motor and return false: + + return True + return True + + def stop(self): + stop_motor(self.packetHandler, self.id_motora) + self.brzina = 0 + \ No newline at end of file diff --git a/toid_interaction/toid_interaction/mechanism/zidovi.py b/toid_interaction/toid_interaction/mechanism/zidovi.py new file mode 100644 index 0000000..4ee2140 --- /dev/null +++ b/toid_interaction/toid_interaction/mechanism/zidovi.py @@ -0,0 +1,167 @@ +from .STservo_sdk import PortHandler, sts +from .motori import get_load, stop_motor, get_current, CrniMotor +import time + + +BAUDRATE = 1000000 +STS_MOVING_ACC = 0 + + +class Zidovi: + def __init__(self, devicename: str = '/dev/ttyUSB0'): + self.devicename = devicename + self.portHandler = None + self.packetHandler = None + + def _open_port(self): + self.portHandler = PortHandler(self.devicename) + self.packetHandler = sts(self.portHandler) + if self.portHandler.openPort(): + print("Succeeded to open the port") + else: + print("Failed to open the port") + raise RuntimeError(f"Failed to open port: {self.devicename}") + + def _close_port(self): + if self.portHandler is not None: + for motor_id in (3, 5, 2, 4): + stopped = False + while not stopped: + try: + stop_motor(self.packetHandler, motor_id) + stopped = True + except Exception: + self.portHandler.ser.reset_input_buffer() + self.portHandler.ser.reset_output_buffer() + time.sleep(0.01) + self.portHandler.closePort() + + def _stop_all(self): + # Reset serial buffers — a KeyboardInterrupt can fire mid-packet, + # leaving garbage in the buffers that would corrupt subsequent writes. + if self.portHandler is not None and self.portHandler.ser is not None: + self.portHandler.ser.reset_input_buffer() + self.portHandler.ser.reset_output_buffer() + for motor_id in (2, 3, 4, 5): + stopped = False + while not stopped: + try: + stop_motor(self.packetHandler, motor_id) + stopped = True + except Exception: + self.portHandler.ser.reset_input_buffer() + self.portHandler.ser.reset_output_buffer() + time.sleep(0.01) + + def zidovi(self, smer, brzina=1500, + TargetPos_beli=600, TargetPos_plavi=300, + opterecenje_threshold_beli=120, opterecenje_threshold_plavi=150): + TargetPos_beli = TargetPos_beli * 4096 / 360 + TargetPos_plavi = TargetPos_plavi * 4096 / 360 + + if smer == 1: + Speed_ID2 = brzina + Speed_ID4 = -brzina + Speed_ID3 = -brzina + Speed_ID5 = brzina + else: + Speed_ID2 = -brzina + Speed_ID4 = brzina + Speed_ID3 = brzina + Speed_ID5 = -brzina + + self._open_port() + try: + motor2 = CrniMotor(self.packetHandler, 2) + motor3 = CrniMotor(self.packetHandler, 3) + motor4 = CrniMotor(self.packetHandler, 4) + motor5 = CrniMotor(self.packetHandler, 5) + + brojac = 0 + + + load_blue_count = 0 + load_white_count = 0 + + while True: + curr_time = time.time() + + if smer == 1: + motor3.doCycle(TargetPos_beli, Speed_ID3) + motor5.doCycle(TargetPos_beli, Speed_ID5) + if brojac >= 50: + motor2.doCycle(TargetPos_plavi, Speed_ID2) + motor4.doCycle(TargetPos_plavi, Speed_ID4) + else: + brojac += 1 + else: + motor2.doCycle(TargetPos_plavi, Speed_ID2) + motor4.doCycle(TargetPos_plavi, Speed_ID4) + if brojac >= 50: + motor3.doCycle(TargetPos_beli, Speed_ID3) + motor5.doCycle(TargetPos_beli, Speed_ID5) + else: + brojac += 1 + + print("Time elapsed: %.4f seconds" % (time.time() - curr_time)) + + opterecenje2 = get_load(self.packetHandler, 2) + opterecenje4 = get_load(self.packetHandler, 4) + opterecenje3 = get_load(self.packetHandler, 3) + opterecenje5 = get_load(self.packetHandler, 5) + + napon2 = get_current(self.packetHandler, 2) + napon4 = get_current(self.packetHandler, 4) + napon3 = get_current(self.packetHandler, 3) + napon5 = get_current(self.packetHandler, 5) + + print("Current positions:") + print("[ID:002] PresPos:%d Load:%.1f Current:%.1f" % (motor2.prosla_pos, opterecenje2, napon2)) + print("[ID:004] PresPos:%d Load:%.1f Current:%.1f" % (motor4.prosla_pos, opterecenje4, napon4)) + print("[ID:003] PresPos:%d Load:%.1f Current:%.1f" % (motor3.prosla_pos, opterecenje3, napon3)) + print("[ID:005] PresPos:%d Load:%.1f Current:%.1f" % (motor5.prosla_pos, opterecenje5, napon5)) + + if napon2 > opterecenje_threshold_plavi or napon4 > opterecenje_threshold_plavi: + + load_blue_count+=1 + if load_blue_count > 2: + print("High load detected: [ID:002] Load:%.1f [ID:004] Load:%.1f" % (opterecenje2, opterecenje4)) + stop_motor(self.packetHandler, 2) + stop_motor(self.packetHandler, 4) + motor2.targret_reached = True + motor4.targret_reached = True + else: + load_blue_count = max(0, load_white_count - 1) + + if napon3 > opterecenje_threshold_beli or napon5 > opterecenje_threshold_beli: + load_white_count+=1 + if load_white_count > 2: + print("High load detected: [ID:003] Load:%.1f [ID:005] Load:%.1f" % (opterecenje3, opterecenje5)) + stop_motor(self.packetHandler, 3) + stop_motor(self.packetHandler, 5) + motor3.targret_reached = True + motor5.targret_reached = True + else: + load_white_count = max(0, load_white_count - 1) + + print("PredjeniPut ID2: %d deg" % (motor2.predjeni_put * 360 / 4096)) + print("PredjeniPut ID4: %d deg" % (motor4.predjeni_put * 360 / 4096)) + print("PredjeniPut ID3: %d deg" % (motor3.predjeni_put * 360 / 4096)) + print("PredjeniPut ID5: %d deg" % (motor5.predjeni_put * 360 / 4096)) + + targets_reached = ( + motor2.targret_reached + and motor3.targret_reached + and motor4.targret_reached + and motor5.targret_reached + ) + if targets_reached: + print("Svi motori su stigli na cilj!") + break + + return smer, motor2.predjeni_put, motor3.predjeni_put, motor4.predjeni_put, motor5.predjeni_put + except KeyboardInterrupt: + print("\nCtrl+C detected, stopping all motors.") + self._stop_all() + finally: + self._close_port()