wip-behaviors #3

Merged
Pimpest merged 86 commits from wip-behaviors into main 2026-05-28 06:21:26 +00:00
4 changed files with 262 additions and 8 deletions
Showing only changes of commit ae77335ef9 - Show all commits

View File

@@ -10,6 +10,7 @@ from serial.tools import list_ports
from toid_interaction.mechanism.sekvenca_2026 import okreni, okreni_niz from toid_interaction.mechanism.sekvenca_2026 import okreni, okreni_niz
from toid_interaction.mechanism.zidovi_load import ZidoviAction from toid_interaction.mechanism.zidovi_load import ZidoviAction
from toid_interaction.mechanism.zidovi import Zidovi
from toid_interaction.mechanism.zupcanik import ZupcanikAction from toid_interaction.mechanism.zupcanik import ZupcanikAction
from toid_msgs.action import EmptyAction from toid_msgs.action import EmptyAction
from toid_msgs.srv import SendString from toid_msgs.srv import SendString
@@ -69,14 +70,14 @@ class InteracitionNode(Node):
def sequence2_cb(self, request: SendString.Request, response: SendString.Response): def sequence2_cb(self, request: SendString.Request, response: SendString.Response):
if self.step != 1: if self.step != 1:
return Empty.Response() return Empty.Response()
zidovi = ZidoviAction(self.st_motor_device_name)
zidovi.beli_zid(1) zidovi = Zidovi(self.st_motor_device_name)
zidovi.plavi_zid(1)
zidovi.zidovi(0, 1500, 600, 300, 100, 120)
okreni_niz(request.text) okreni_niz(request.text)
zidovi.plavi_zid(0, TargetPos=150) zidovi.zidovi(0, 1500, 450, 150, 100, 120)
zidovi.beli_zid(0, TargetPos=450)
self.step = 2 self.step = 2
return response return response
@@ -85,11 +86,10 @@ class InteracitionNode(Node):
return Empty.Response() return Empty.Response()
zupcanik = ZupcanikAction(self.st_motor_device_name) zupcanik = ZupcanikAction(self.st_motor_device_name)
zidovi = ZidoviAction(self.st_motor_device_name) zidovi = Zidovi(self.st_motor_device_name)
zupcanik.zupcanik(1, 1010, 25) zupcanik.zupcanik(1, 1010, 25)
zidovi.plavi_zid(0, TargetPos=150) zidovi.zidovi(0, 1500, 150, 150, 100, 120)
zidovi.beli_zid(0, TargetPos=150)
okreni(5) okreni(5)
self.step = 0 self.step = 0
return response return response

View File

@@ -75,6 +75,10 @@ class sts(protocol_packet_handler):
def ReadLoad(self, sts_id): def ReadLoad(self, sts_id):
sts_present_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_PRESENT_LOAD_L) # 60 sts_present_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_PRESENT_LOAD_L) # 60
return self.sts_tohost(sts_present_load, 15), sts_comm_result, sts_error return self.sts_tohost(sts_present_load, 15), sts_comm_result, sts_error
def ReadCurrent(self, sts_id):
sts_present_current, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_PRESENT_CURRENT_L) # 69
return self.sts_tohost(sts_present_current, 15), sts_comm_result, sts_error
def ReadMaxLoad(self, sts_id): def ReadMaxLoad(self, sts_id):
sts_max_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_LOAD_LIMIT) # 0x36 sts_max_load, sts_comm_result,sts_error = self.read2ByteTxRx(sts_id, STS_LOAD_LIMIT) # 0x36

View File

@@ -0,0 +1,83 @@
from .STservo_sdk.stservo_def import COMM_SUCCESS
from .STservo_sdk import sts
STS_MOVING_ACC = 0
def get_load(packetHandler : sts, id_motora: int) -> float:
sts_present_load, _, _ = packetHandler.ReadLoad(id_motora)
return (sts_present_load & ((1 << 10) - 1)) * 0.1
def get_current(packetHandler : sts, id_motora: int) -> float:
sts_present_current, _, _ = packetHandler.ReadCurrent(id_motora)
return (sts_present_current * 6.5)
def stop_motor(packetHandler : sts, id_motora: int):
packetHandler.WriteSpec(id_motora, 0, 0)
def print_motor_error(packetHandler : sts, sts_com_result, sts_error, id_motora):
if sts_com_result != COMM_SUCCESS:
print("[ID:%d] %s" % (id_motora, packetHandler.getTxRxResult(sts_com_result)))
if sts_error != 0:
print("[ID:%d] %s" % (id_motora, packetHandler.getRxPacketError(sts_error)))
def get_motor_pos_speed(packetHandler : sts, id_motora: int) -> tuple[float, float]:
sts_present_position, sts_present_speed, sts_com_result, sts_error = packetHandler.ReadPosSpeed(id_motora)
print_motor_error(packetHandler, sts_com_result, sts_error, id_motora)
return sts_present_position, sts_present_speed
def set_motor_speed(packetHandler : sts, id_motora: int, speed: float) -> tuple[float, float]:
sts_com_result, sts_error = packetHandler.WriteSpec(id_motora, speed, STS_MOVING_ACC)
print_motor_error(packetHandler, sts_com_result, sts_error, id_motora)
def normalize_dpos(curr: float, prev: float) -> float:
if abs(curr - prev) > 3000:
return 4096 - abs(curr - prev)
else:
return abs(curr - prev)
class CrniMotor:
def __init__(self, packetHandler : sts, id_motora: int):
self.packetHandler = packetHandler
self.id_motora = id_motora
self.targret_reached = False
self.brzina = 0
sts_comm_result, sts_error = packetHandler.WheelMode(id_motora)
if sts_comm_result != COMM_SUCCESS:
print("[ID:%01d] %s" % (id_motora, packetHandler.getTxRxResult(sts_comm_result)))
elif sts_error != 0:
print("[ID:%01d] %s" % (id_motora, packetHandler.getRxPacketError(sts_error)))
sts_present_position, sts_present_speed = get_motor_pos_speed(packetHandler, id_motora)
self.trenutna_pos = sts_present_position
self.prosla_pos = sts_present_position
self.predjeni_put = 0
packetHandler.SetMaxLoad(id_motora, 80)
def doCycle(self, target_pos: float, brzina: float) -> bool | None:
if not self.targret_reached:
sts_present_position, sts_present_speed = get_motor_pos_speed(self.packetHandler, self.id_motora)
self.trenutna_pos = sts_present_position
self.predjeni_put += normalize_dpos(self.trenutna_pos, self.prosla_pos)
self.prosla_pos = self.trenutna_pos
if self.predjeni_put >= target_pos:
self.stop()
self.targret_reached = True
else:
set_motor_speed(self.packetHandler, self.id_motora, brzina)
self.brzina = brzina
# TODO: if overload stop motor and return false:
return True
return True
def stop(self):
stop_motor(self.packetHandler, self.id_motora)
self.brzina = 0

View File

@@ -0,0 +1,167 @@
from .STservo_sdk import PortHandler, sts
from .motori import get_load, stop_motor, get_current, CrniMotor
import time
BAUDRATE = 1000000
STS_MOVING_ACC = 0
class Zidovi:
def __init__(self, devicename: str = '/dev/ttyUSB0'):
self.devicename = devicename
self.portHandler = None
self.packetHandler = None
def _open_port(self):
self.portHandler = PortHandler(self.devicename)
self.packetHandler = sts(self.portHandler)
if self.portHandler.openPort():
print("Succeeded to open the port")
else:
print("Failed to open the port")
raise RuntimeError(f"Failed to open port: {self.devicename}")
def _close_port(self):
if self.portHandler is not None:
for motor_id in (3, 5, 2, 4):
stopped = False
while not stopped:
try:
stop_motor(self.packetHandler, motor_id)
stopped = True
except Exception:
self.portHandler.ser.reset_input_buffer()
self.portHandler.ser.reset_output_buffer()
time.sleep(0.01)
self.portHandler.closePort()
def _stop_all(self):
# Reset serial buffers — a KeyboardInterrupt can fire mid-packet,
# leaving garbage in the buffers that would corrupt subsequent writes.
if self.portHandler is not None and self.portHandler.ser is not None:
self.portHandler.ser.reset_input_buffer()
self.portHandler.ser.reset_output_buffer()
for motor_id in (2, 3, 4, 5):
stopped = False
while not stopped:
try:
stop_motor(self.packetHandler, motor_id)
stopped = True
except Exception:
self.portHandler.ser.reset_input_buffer()
self.portHandler.ser.reset_output_buffer()
time.sleep(0.01)
def zidovi(self, smer, brzina=1500,
TargetPos_beli=600, TargetPos_plavi=300,
opterecenje_threshold_beli=120, opterecenje_threshold_plavi=150):
TargetPos_beli = TargetPos_beli * 4096 / 360
TargetPos_plavi = TargetPos_plavi * 4096 / 360
if smer == 1:
Speed_ID2 = brzina
Speed_ID4 = -brzina
Speed_ID3 = -brzina
Speed_ID5 = brzina
else:
Speed_ID2 = -brzina
Speed_ID4 = brzina
Speed_ID3 = brzina
Speed_ID5 = -brzina
self._open_port()
try:
motor2 = CrniMotor(self.packetHandler, 2)
motor3 = CrniMotor(self.packetHandler, 3)
motor4 = CrniMotor(self.packetHandler, 4)
motor5 = CrniMotor(self.packetHandler, 5)
brojac = 0
load_blue_count = 0
load_white_count = 0
while True:
curr_time = time.time()
if smer == 1:
motor3.doCycle(TargetPos_beli, Speed_ID3)
motor5.doCycle(TargetPos_beli, Speed_ID5)
if brojac >= 50:
motor2.doCycle(TargetPos_plavi, Speed_ID2)
motor4.doCycle(TargetPos_plavi, Speed_ID4)
else:
brojac += 1
else:
motor2.doCycle(TargetPos_plavi, Speed_ID2)
motor4.doCycle(TargetPos_plavi, Speed_ID4)
if brojac >= 50:
motor3.doCycle(TargetPos_beli, Speed_ID3)
motor5.doCycle(TargetPos_beli, Speed_ID5)
else:
brojac += 1
print("Time elapsed: %.4f seconds" % (time.time() - curr_time))
opterecenje2 = get_load(self.packetHandler, 2)
opterecenje4 = get_load(self.packetHandler, 4)
opterecenje3 = get_load(self.packetHandler, 3)
opterecenje5 = get_load(self.packetHandler, 5)
napon2 = get_current(self.packetHandler, 2)
napon4 = get_current(self.packetHandler, 4)
napon3 = get_current(self.packetHandler, 3)
napon5 = get_current(self.packetHandler, 5)
print("Current positions:")
print("[ID:002] PresPos:%d Load:%.1f Current:%.1f" % (motor2.prosla_pos, opterecenje2, napon2))
print("[ID:004] PresPos:%d Load:%.1f Current:%.1f" % (motor4.prosla_pos, opterecenje4, napon4))
print("[ID:003] PresPos:%d Load:%.1f Current:%.1f" % (motor3.prosla_pos, opterecenje3, napon3))
print("[ID:005] PresPos:%d Load:%.1f Current:%.1f" % (motor5.prosla_pos, opterecenje5, napon5))
if napon2 > opterecenje_threshold_plavi or napon4 > opterecenje_threshold_plavi:
load_blue_count+=1
if load_blue_count > 2:
print("High load detected: [ID:002] Load:%.1f [ID:004] Load:%.1f" % (opterecenje2, opterecenje4))
stop_motor(self.packetHandler, 2)
stop_motor(self.packetHandler, 4)
motor2.targret_reached = True
motor4.targret_reached = True
else:
load_blue_count = max(0, load_white_count - 1)
if napon3 > opterecenje_threshold_beli or napon5 > opterecenje_threshold_beli:
load_white_count+=1
if load_white_count > 2:
print("High load detected: [ID:003] Load:%.1f [ID:005] Load:%.1f" % (opterecenje3, opterecenje5))
stop_motor(self.packetHandler, 3)
stop_motor(self.packetHandler, 5)
motor3.targret_reached = True
motor5.targret_reached = True
else:
load_white_count = max(0, load_white_count - 1)
print("PredjeniPut ID2: %d deg" % (motor2.predjeni_put * 360 / 4096))
print("PredjeniPut ID4: %d deg" % (motor4.predjeni_put * 360 / 4096))
print("PredjeniPut ID3: %d deg" % (motor3.predjeni_put * 360 / 4096))
print("PredjeniPut ID5: %d deg" % (motor5.predjeni_put * 360 / 4096))
targets_reached = (
motor2.targret_reached
and motor3.targret_reached
and motor4.targret_reached
and motor5.targret_reached
)
if targets_reached:
print("Svi motori su stigli na cilj!")
break
return smer, motor2.predjeni_put, motor3.predjeni_put, motor4.predjeni_put, motor5.predjeni_put
except KeyboardInterrupt:
print("\nCtrl+C detected, stopping all motors.")
self._stop_all()
finally:
self._close_port()