118 lines
3.4 KiB
Python
118 lines
3.4 KiB
Python
import rclpy
|
|
import rclpy.callback_groups
|
|
import rclpy.executors
|
|
from rclpy.node import Node
|
|
from std_srvs.srv import Empty
|
|
|
|
from gpiozero import Button, OutputDevice
|
|
|
|
from serial.tools import list_ports
|
|
|
|
from toid_interaction.mechanism.sekvenca_2026 import okreni, okreni_niz
|
|
from toid_interaction.mechanism.zidovi_load import ZidoviAction
|
|
from toid_interaction.mechanism.zidovi import Zidovi
|
|
from toid_interaction.mechanism.zupcanik import ZupcanikAction
|
|
from toid_msgs.action import EmptyAction
|
|
from toid_msgs.srv import SendString
|
|
|
|
from rclpy.action.server import ServerGoalHandle
|
|
from rclpy.action.server import ActionServer
|
|
|
|
|
|
class InteracitionNode(Node):
|
|
step: int = 0
|
|
btn_: Button
|
|
output_pin_: OutputDevice
|
|
start_pin_action_: ActionServer
|
|
|
|
def __init__(self):
|
|
super().__init__("ToidInteractionNode")
|
|
|
|
self.find_sigma()
|
|
|
|
self.srv = self.create_service(Empty, "/sequence1", self.sequence1_cb)
|
|
self.get_logger().info("Service 'sequence1' ready.")
|
|
|
|
self.srv = self.create_service(SendString, "/sequence2", self.sequence2_cb)
|
|
self.get_logger().info("Service 'sequence2' ready.")
|
|
|
|
self.srv = self.create_service(Empty, "/sequence3", self.sequence3_cb)
|
|
self.get_logger().info("Service 'sequence3' ready.")
|
|
|
|
self.btn_ = Button(17)
|
|
self.output_pin_ = OutputDevice(27)
|
|
|
|
self.start_pin_action_ = ActionServer(
|
|
self, EmptyAction, "/start_plug", execute_callback=self.start_plug_action_cb,
|
|
callback_group=rclpy.callback_groups.ReentrantCallbackGroup()
|
|
)
|
|
|
|
self.get_logger().info("Action 'start_plug' ready.")
|
|
|
|
|
|
def find_sigma(self):
|
|
for port_info in list_ports.comports():
|
|
if port_info.vid == 0x1A86 and port_info.pid == 0x55D3:
|
|
break
|
|
|
|
print(port_info.device)
|
|
self.st_motor_device_name = port_info.device
|
|
|
|
def sequence1_cb(self, request, response):
|
|
if self.step != 0:
|
|
return Empty.Response()
|
|
okreni(5)
|
|
zupcanik = ZupcanikAction(self.st_motor_device_name)
|
|
zupcanik.zupcanik(1, -1015, 25)
|
|
self.step = 1
|
|
return response
|
|
|
|
def sequence2_cb(self, request: SendString.Request, response: SendString.Response):
|
|
if self.step != 1:
|
|
return SendString.Response()
|
|
|
|
zidovi = Zidovi(self.st_motor_device_name)
|
|
|
|
zidovi.zidovi(1, 1500, 600, 210, 120, 120)
|
|
|
|
okreni_niz(request.text)
|
|
|
|
zidovi.zidovi(0, 1500, 420, 50, 120, 120)
|
|
self.step = 2
|
|
return response
|
|
|
|
def sequence3_cb(self, request, response):
|
|
if self.step != 2:
|
|
return Empty.Response()
|
|
|
|
zupcanik = ZupcanikAction(self.st_motor_device_name)
|
|
zidovi = Zidovi(self.st_motor_device_name)
|
|
|
|
zupcanik.zupcanik(1, 1015, 25)
|
|
zidovi.zidovi(0, 1500, 180, 160, 120, 120)
|
|
okreni(5)
|
|
self.step = 0
|
|
return response
|
|
|
|
async def start_plug_action_cb(self, goal_handle: ServerGoalHandle):
|
|
while not self.btn_.is_active:
|
|
pass
|
|
while self.btn_.is_active:
|
|
pass
|
|
goal_handle.succeed()
|
|
return EmptyAction.Result()
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
|
|
node = InteracitionNode()
|
|
executor = rclpy.executors.MultiThreadedExecutor()
|
|
executor.add_node(node)
|
|
executor.spin()
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|