MSC ROS Unit Test Quick Start Guide
: 본 페이지는 MORAI Simulator Control(MSC) Quick Start 사용 방법을 소개한다.
MSC는 UI를 직접 이용하지않고 통신 API를 이용하여 시뮬레이터를 제어 할 수 있는 기능이고,
이를 이용한 Unit Test Example Guide 이다.
환경설정
Example code 다운로드
다운로드 받은 파일 구성은 다음과 같다.
K-City : K-City Scenraio 파일 (Morai_Launcher_Data / SaveFile / Scenario 안에 넣어주고 사용)
msc_ros : msc_ros_example 파일 (workspace / src 안에 넣어주고 사용)
Python version
Example code는 python 3.7.5에서 작성 및 테스트 함.
Morai Launcher 실행
MSC를 사용하기 위해 Morai Launcher를 실행 후 좌상단의 MSC is Connect를 확인한다.
* MSC in DisConnect 일땐 다른 런쳐가 실행 중 인지 확인 필요.

Code 실행
시뮬레이터 및 Code 실행이 다른 환경(IP 가 다른 환경)에서 진행 되면 params.txt , params.json 의 ip를 수정해주어야 한다.
다운로드 받은 파일 중 params.txt를 열어 사용할 id, pw, version ,차량, 맵을 대소문자 구분 하여 채워준다.
e.g)
receive_user_ip : 127.0.0.1
receive_user_port : 10329
request_dst_ip : 127.0.0.1
request_dst_port : 10509
user_id : id
user_pw : pw
version : v.3.9.210226.3
map/vehicle : K-City/Nirounit_test_ros.py를 실행하면 parmas.txt에 입력 한 id, pw ,version, map/vehicle에 맞춰 시뮬레이터를 실행 하고 알고리즘 테스트를 확인할 수 있다.
Simulator Network setting
Network Setting은 Scenario를 저장할때 할때의 네트워크 정보를 적용.
Example 네트워크 setting은 시뮬레이터 상단 메뉴 Network → Network Settings에서 다음과 되어있다.
Bridge IP : 127.0.0.1


코드
class sim_ctrl
parmas.txt에 입력한 정보를 읽어 MSC를 통해 시뮬레이션을 시작해주는 classclass gen_planner
unit test를 할 알고리즘.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy,rospkg
from math import cos,sin,sqrt,pow,atan2,pi
import sys,os,time
import json
from morai_msgs.msg import EgoVehicleStatus,ObjectInfo,CtrlCmd,ScenarioLoad
from lib.ros_utils import pathReader,findLocalPath,purePursuit,Point,cruiseControl,vaildObject,velocityPlanning,pidController
from lib.udp_parser import udp_parser,udp_sender
#read params.txt
file = open('params.txt', 'r')
line=file.read()
params=line.split('\n')
for i in range(0,len(params)):
params[i]=(params[i].split(':')[1].replace(" ","")).replace('\r','')
recive_user_ip = params[0]
recive_user_port = int(params[1])
request_dst_ip = params[2]
request_dst_port = int(params[3])
class sim_ctrl :
def __init__(self):
self.get_status = udp_parser(recive_user_ip, recive_user_port,'get_sim_status')
self.set_status = udp_sender(request_dst_ip, request_dst_port,'set_sim_status')
self.main_loop()
def main_loop(self):
while True:
self.status_data=self.get_status.get_data()
print("data : ",self.status_data)
if not len(self.status_data)==0:
self.data_platform = self.status_data[0]
self.data_stage = self.status_data[1]
self.data_status = self.status_data[2]
self.command_platform = self.status_data[3]
self.command_cmd = self.status_data[4]
self.command_option = self.status_data[5]
self.command_result = self.status_data[6]
if self.data_platform == "0x01" and self.data_stage=="0x01": #Launcher platform에서 로그인 전 상태
cmd_platform="0x01"
cmd_command="0x0001"
cmd_option=params[4]+"/"+params[5]#(ID/PW)
self.send_data([cmd_platform,cmd_command,cmd_option])#Login명령
if self.data_platform=="0x01" and self.data_stage=="0x02": #Launcher platform에서 로그인 후 상태
cmd_platform = "0x01"
cmd_command = "0x0002"
cmd_option = params[6]#simulator version
self.send_data([cmd_platform,cmd_command,cmd_option]) #version 선택 명령
if self.data_status=="0x0001": #Result = 성공
cmd_platform="0x01"
cmd_command="0x0004"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 실행 명령
time.sleep(3)
elif self.data_status=="0x0012": #Result = Simulator 설치 안됨
cmd_platform="0x01"
cmd_command="0x0003"
cmd_option=""
self.send_data([cmd_platform,cmd_command,cmd_option]) #Simulator 설치 명령
if self.data_platform=="0x02" and self.data_stage=="0x01" and self.data_status=="0x0001": #Simulator platform에서 로비 Stage의 대기상태
cmd_platform="0x02"
cmd_command="0x0001"
cmd_option=params[7]#(MAP/Vehicle)
self.send_data([cmd_platform,cmd_command,cmd_option]) #시뮬레이션/옵션 변경 실행 명령
time.sleep(3)
while True: #로딩이 정상적으로 완료될때까지 대기.
_,self.data_stage,_,_,_,_,_=self.get_status.get_data()
print(self.data_stage)
if self.data_stage== "0x02":
if self.data_platform=="0x02" and self.data_stage=="0x02" and self.data_status=="0x0001": #Simulator platform에서 Play상태
break
time.sleep(1)
break
time.sleep(1)
else :
print("[NO Simulator Control Data]")
time.sleep(0.5)
gen_planner() ##test알고리즘
def send_data(self, data):
try:
print("send",data)
cmd_platform=int(data[0],0)
cmd_command=int(data[1],0)
cmd_option=data[2]
self.set_status.send_data([cmd_platform,cmd_command,cmd_option])
except ValueError:
print("Invalid input")
class gen_planner():
def __init__(self):
rospy.init_node('gen_planner', anonymous=True)
#publisher
ctrl_pub = rospy.Publisher('/ctrl_cmd',CtrlCmd, queue_size=1) ## Vehicl Control
scenario_pub = rospy.Publisher('/ScenarioLoad',ScenarioLoad, queue_size=1) ## Vehicl Control
ctrl_msg= CtrlCmd()
scenarioLoad_msgs = ScenarioLoad()
scenarioLoad_msgs.file_name = "scenario_example_ros"
scenarioLoad_msgs.load_ego_vehicle_data=True
scenarioLoad_msgs.load_pedestrian_data=True
scenarioLoad_msgs.load_surrounding_vehicle_data=True
scenarioLoad_msgs.load_object_data=True
#subscriber
rospy.Subscriber("/Ego_topic", EgoVehicleStatus, self.statusCB) ## Vehicl Status Subscriber
rospy.Subscriber("/Object_topic", ObjectInfo, self.objectInfoCB) ## Object information Subscriber
#def
self.is_status=False ## 차량 상태 점검
self.is_obj=False ## 장애물 상태 점검
#class
path_reader=pathReader('gen_ros') ## 경로 파일의 위치
pure_pursuit=purePursuit() ## purePursuit import
self.cc=cruiseControl(0.5,1) ## cruiseControl import (object_vel_gain, object_dis_gain)
self.vo=vaildObject()
pid=pidController() ## pidController import
#read path
self.global_path=path_reader.read_txt("scenario_example.txt") ## 출력할 경로의 이름
vel_planner=velocityPlanning(40/3.6,1.5) ## 속도 계획
vel_profile=vel_planner.curveBasedVelocity(self.global_path,50)
#time var
count=0
rate = rospy.Rate(30) # 30hz
while not rospy.is_shutdown():
if self.is_status==True and self.is_obj==True: ## 차량의 상태, 장애물 상태 점검
if count==0:
time.sleep(5)
scenario_pub.publish(scenarioLoad_msgs)
## global_path와 차량의 status_msg를 이용해 현제 waypoint와 local_path를 생성
local_path,self.current_waypoint=findLocalPath(self.global_path,self.status_msg,0)
## 장애물의 숫자와 Type 위치 속도 (object_num, object type, object pose_x, object pose_y, object velocity)
self.vo.get_object(self.object_info_msg.num_of_objects,self.object_info_msg.object_type,self.object_info_msg.pose_x,self.object_info_msg.pose_y,self.object_info_msg.velocity)
global_obj,local_obj=self.vo.calc_vaild_obj([self.status_msg.pose_x,self.status_msg.pose_y,(self.status_msg.heading+90)/180*pi])
self.cc.checkObject(local_path,global_obj,local_obj)
pure_pursuit.getPath(local_path) ## pure_pursuit 알고리즘에 Local path 적용
pure_pursuit.getEgoStatus(self.status_msg) ## pure_pursuit 알고리즘에 차량의 status 적용
ctrl_msg.steering=-pure_pursuit.steering_angle()/180*pi
cc_vel = self.cc.acc(local_obj,self.status_msg.velocity,vel_profile[self.current_waypoint],self.status_msg) ## advanced cruise control 적용한 속도 계획
target_velocity = cc_vel
control_input=pid.pid(target_velocity,self.status_msg.velocity) ## 속도 제어를 위한 PID 적용 (target Velocity, Status Velocity)
if control_input > 0 :
ctrl_msg.accel= control_input
ctrl_msg.brake= 0
else :
ctrl_msg.accel= 0
ctrl_msg.brake= -control_input
if self.status_msg.velocity < 3.0 and target_velocity<=0.0:
ctrl_msg.accel=0
ctrl_msg.brake=1
print(self.current_waypoint,len(self.global_path.poses)-1)
print(ctrl_msg)
ctrl_pub.publish(ctrl_msg) ## Vehicl Control 출력
if self.current_waypoint == (len(self.global_path.poses)-1) :
print("??????????")
count=0
self.current_waypoint=0
scenario_pub.publish(scenarioLoad_msgs)
time.sleep(3)
count+=1
rate.sleep()
def statusCB(self,data): ## Vehicl Status Subscriber
self.status_msg=data
self.is_status=True
def objectInfoCB(self,data): ## Object information Subscriber
self.object_info_msg=data
self.is_obj=True
if __name__ == "__main__":
ctrl=sim_ctrl()