Added actions endpoint multiple clients can use to simultaneously control actions

This commit is contained in:
2026-03-20 11:23:51 +01:00
parent 28b120b88e
commit 2cc91c1d5d
15 changed files with 630 additions and 156 deletions

View File

@@ -1,13 +1,42 @@
import subprocess
import uuid
import os
import signal
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import rclpy
import uvicorn
import threading
import logging
app = FastAPI()
import toid_cli.routers.startup as startup
import toid_cli.routers.action as action
from toid_cli.services.services import actionClient, ServerRunner, log
from toid_cli.services.runners import main_runner
logging.getLogger().setLevel(logging.INFO)
@asynccontextmanager
async def lifespan(app: FastAPI):
rclpy.init()
ServerRunner.setRunner(ServerRunner(app))
thread = threading.Thread(target=rclpy.spin, args=(ServerRunner.getRunner(),), daemon=True)
thread.start()
log.info("Started up rclpy")
if not hasattr(app.state, 'loop'):
app.state.loop = asyncio.get_running_loop()
yield
await main_runner.stop_robot()
log.info("Stopped robot")
ServerRunner.getRunner().destroy_node()
rclpy.shutdown()
thread.join()
log.info("Closed rclpy")
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=[
@@ -17,84 +46,17 @@ app.add_middleware(
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(startup.router, prefix="/startup")
app.include_router(action.router, prefix="/action")
# store running launch processes
launch_processes = {}
class LaunchRequest(BaseModel):
package: str
launch_file: str
args: dict = {}
@app.post("/launch/start")
def start_launch(req: LaunchRequest):
launch_id = str(uuid.uuid4())
cmd = ["ros2", "launch", req.package, req.launch_file]
for k, v in req.args.items():
cmd.append(f"{k}:={v}")
log_dir = "log"
os.makedirs(log_dir, exist_ok=True)
stdout_file = open(f"{log_dir}/{launch_id}_stdout.log", "w")
stderr_file = open(f"{log_dir}/{launch_id}_stderr.log", "w")
proc = subprocess.Popen(
cmd,
stdout=stdout_file,
stderr=stderr_file,
preexec_fn=os.setsid
)
launch_processes[launch_id] = proc
return {
"launch_id": launch_id,
"pid": proc.pid,
"command": " ".join(cmd)
}
@app.get("/launch/list")
def list_launches():
running = {}
for lid, proc in launch_processes.items():
running[lid] = {
"pid": proc.pid,
"running": proc.poll() is None
}
return running
@app.post("/launch/stop/{launch_id}")
def stop_launch(launch_id: str):
if launch_id not in launch_processes:
return {"error": "launch id not found"}
proc = launch_processes[launch_id]
if proc.poll() is None:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
return {"stopped": launch_id}
@app.post("/launch/stop_all")
def stop_all():
for proc in launch_processes.values():
if proc.poll() is None:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
return {"status": "all stopped"}
def main():
uvicorn.run(
"toid_cli.main:app",

View File

View File

@@ -0,0 +1,88 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Request, status
from fastapi.responses import JSONResponse
from uvicorn.logging import ColourizedFormatter
from rosbridge_library.internal.message_conversion import populate_instance, NonexistentFieldException, FieldTypeMismatchException
from toid_msgs.action import SimpleRotate, SimpleTranslateX, SimpleMoveCoords
import logging
import json
from toid_cli.services.services import ServerRunner
router = APIRouter()
log = logging.getLogger("ActionRoute")
log.addHandler(logging.StreamHandler())
log.handlers[0].setFormatter(ColourizedFormatter('%(levelprefix)s %(message)s'))
log.setLevel(logging.INFO)
@router.post("/start_tree")
async def start_tree(req: Request):
tree = req.query_params.get("tree")
ServerRunner.getRunner().execute_tree(tree)
return JSONResponse("Works")
@router.post("/rotate")
async def rotate(req: Request):
try:
goal = await req.json()
except json.decoder.JSONDecodeError:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
log.info(goal)
try:
goal = populate_instance(goal, SimpleRotate.Goal())
if not ServerRunner.getRunner().rotate_action(goal):
return JSONResponse("Bad Request", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
except (NonexistentFieldException, FieldTypeMismatchException) as e:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
return JSONResponse("OK")
@router.post("/translate")
async def rotate(req: Request):
try:
goal = await req.json()
except json.decoder.JSONDecodeError:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
log.info(goal)
try:
goal = populate_instance(goal, SimpleMoveCoords.Goal())
if not ServerRunner.getRunner().translate_coords(goal):
return JSONResponse("Server busy", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
except (NonexistentFieldException, FieldTypeMismatchException) as e:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
return JSONResponse("OK")
@router.post("/translate_x")
async def rotate(req: Request):
try:
goal = await req.json()
except json.decoder.JSONDecodeError:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
log.info(goal)
try:
goal = populate_instance(goal, SimpleTranslateX.Goal())
if not ServerRunner.getRunner().translate_x(goal):
return JSONResponse("Server busy", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
except (NonexistentFieldException, FieldTypeMismatchException) as e:
return JSONResponse("Bad Request", status_code=status.HTTP_400_BAD_REQUEST)
return JSONResponse("OK")
@router.post("/start_tf_pub")
async def startTFpub(req: Request):
if not ServerRunner.getRunner().start_tf_pub():
return JSONResponse("Server busy", status_code=status.HTTP_503_SERVICE_UNAVAILABLE)
return JSONResponse("OK")
@router.websocket("/ws")
async def websocketHandler(ws: WebSocket):
await ws.accept()
try:
ServerRunner.getRunner().add(ws)
while True:
data = await ws.receive_text()
log.info(data)
await ws.send_text("hello")
except WebSocketDisconnect:
ServerRunner.getRunner().remove(ws)
log.warning("User disconnected")

View File

@@ -0,0 +1,27 @@
from fastapi import APIRouter, Request, Response
from toid_cli.services.runners import main_runner
router = APIRouter()
@router.post("/run/robot")
async def run_main(req: Request):
status = False
if(req.query_params.get("use_mock", False).lower() == "true"):
status = await main_runner.run_robot(use_mock=True)
else:
status = await main_runner.run_robot(use_mock=False)
if status:
return Response(status_code=200)
return Response(status_code=400)
@router.post("/stop/robot")
async def run_main(req: Request):
status = await main_runner.stop_robot()
if status:
return Response(status_code=200)
return Response(status_code=400)
@router.get("/status/robot")
async def run_main(req: Request):
status = "OK" if main_runner.status() else "NOT RUNNING"
return Response(content=status, status_code=200)

View File

View File

@@ -0,0 +1,47 @@
import asyncio
import asyncio.subprocess as subprocess
import signal
import os
class Runner():
lock = asyncio.Lock()
running = False
running_type = ''
proc = None
async def run_robot(self, use_mock=False) -> bool:
async with self.lock:
if self.running:
return False
if use_mock == False:
cmd = ["ros2", "launch", "toid_navigation", "main.py", f"use_mock:={use_mock}"]
else:
cmd = ["ros2", "launch", "toid_navigation", "main.py", f"use_mock:={use_mock}"]
self.proc = await subprocess.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
preexec_fn=os.setsid
)
self.running = True
return True
async def stop_robot(self):
async with self.lock:
if not self.running:
return False
os.killpg(os.getpgid(self.proc.pid), signal.SIGINT)
try:
await asyncio.wait_for(self.proc.wait(), timeout=8)
except asyncio.TimeoutError:
os.killpg(os.getpgid(self.proc.pid), signal.SIGKILL)
self.running = False
await self.proc.wait()
return True
def status(self):
return self.running
main_runner = Runner()

View File

@@ -0,0 +1,201 @@
from rclpy import Future
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.action.client import ClientGoalHandle
from rosbridge_library.internal.message_conversion import extract_values, populate_instance
from btcpp_ros2_interfaces.action import ExecuteTree
from toid_msgs.action import SimpleMoveCoords, SimpleRotate, SimpleTranslateX
from action_msgs.srv import CancelGoal
from tf2_web_republisher_interfaces.action import TFSubscription
from uvicorn.logging import ColourizedFormatter
from fastapi import WebSocket, FastAPI, WebSocketDisconnect
import asyncio
import logging
import threading
log = logging.getLogger("ActionClientHandler")
log.addHandler(logging.StreamHandler())
log.handlers[0].setFormatter(ColourizedFormatter('%(levelprefix)s %(message)s'))
log.setLevel(logging.INFO)
class ActionClientHandler:
action_client: ActionClient
action_name: str
running: bool
connections: set[WebSocket]
app: FastAPI = None
lock: threading.Lock
def __init__(self, app: FastAPI, action_client: ActionClient):
self.action_client = action_client
self.action_name = action_client._action_name
self.app = app
self.lock = threading.Lock()
self.running = False
self.connections = set()
def send_goal(self, goal):
with self.lock:
if not self.action_client.wait_for_server(1):
log.info("Server doesn't exist yet")
self.running = False
return False
if self.running:
if not self.cancel_goals():
return False
self.running = True
log.info("Sending goal")
future = self.action_client.send_goal_async(
goal=goal,
feedback_callback=self.feedback)
future.add_done_callback(self.response_callback)
return True
def feedback(self, feedback):
#log.info("Feedback recieved")
self.sendMessage(
{
'type': 'goal_feedback',
'name': self.action_name,
'message': extract_values(feedback.feedback),
})
def response_callback(self, future: Future):
log.info("Goal response")
goal_handle: ClientGoalHandle = future.result()
if not goal_handle.accepted:
log.info("Goal not accepeted")
with self.lock:
self.running = False
self.sendMessage(
{
'type': 'goal_accepted',
'name': self.action_name,
'accepted': False,
})
return
log.info("Goal accepeted")
self.sendMessage(
{
'type': 'goal_accepted',
'name': self.action_name,
'accepted': True,
})
f = goal_handle.get_result_async().add_done_callback(self.done)
def done(self, future: Future):
log.info("Goal done maybe")
result: ExecuteTree.Result = future.result().result
self.sendMessage(
{
'type': 'goal_done',
'name': self.action_name,
'message': extract_values(result),
})
with self.lock:
self.running = False
def sendMessage(self, msg: dict):
with self.lock:
connections = list(self.connections)
for conn in connections:
try:
asyncio.run_coroutine_threadsafe(
conn.send_json(msg),
self.app.state.loop
)
except WebSocketDisconnect:
pass
def cancel_goals(self):
node: Node = self.action_client._node
cli = node.create_client(CancelGoal, self.action_name + "/_action/cancel_goal")
return cli.call(CancelGoal.Request(), 1.0)
def status(self,):
with self.lock:
return self.running
def addConnection(self, ws: WebSocket):
with self.lock:
self.connections.add(ws)
def removeConnection(self, ws: WebSocket):
with self.lock:
self.connections.remove(ws)
actionClient = None
class ServerRunner(Node):
bt_action_client: ActionClientHandler
rotate: ActionClientHandler
move_coords: ActionClientHandler
move_x: ActionClientHandler
tf_web: ActionClientHandler
app: FastAPI = None
def __init__(self, app: FastAPI):
super().__init__("RestServerNode")
self.bt_action_client = ActionClientHandler(app, ActionClient(self, ExecuteTree, "/bt_run"))
self.rotate = ActionClientHandler(app, ActionClient(self, SimpleRotate, "/rotate"))
self.move_coords = ActionClientHandler(app, ActionClient(self, SimpleMoveCoords, "/moveCoords"))
self.move_x = ActionClientHandler(app, ActionClient(self, SimpleTranslateX, "/move_x"))
self.tf_web = ActionClientHandler(app, ActionClient(self, TFSubscription, "/tf2_web_republisher"))
self.app = None
def start_tf_pub(self):
goal = TFSubscription.Goal()
goal.target_frame = "map"
goal.source_frames = ["base_footprint"]
goal.angular_thres = 0.0
goal.trans_thres = 0.0
goal.rate = 10.0
return self.tf_web.send_goal(goal)
def execute_tree(self, tree_name,):
goal = ExecuteTree.Goal(target_tree=tree_name)
return self.bt_action_client.send_goal(goal)
def rotate_action(self, goal):
return self.rotate.send_goal(goal)
def translate_x(self, goal):
return self.move_x.send_goal(goal)
def translate_coords(self, goal):
return self.move_coords.send_goal(goal)
def add(self, ws: WebSocket):
self.bt_action_client.addConnection(ws)
self.rotate.addConnection(ws)
self.move_coords.addConnection(ws)
self.move_x.addConnection(ws)
self.tf_web.addConnection(ws)
def remove(self, ws: WebSocket):
self.bt_action_client.removeConnection(ws)
self.rotate.removeConnection(ws)
self.move_coords.removeConnection(ws)
self.tf_web.removeConnection(ws)
def setRunner(server):
global actionClient
actionClient = server
def getRunner():
global actionClient
return actionClient