통신/mqtt
SLAM map 정보 송수신(.pgm, .yaml)
우금붕
2024. 4. 26. 10:08
오늘은 MQTT로 SLAM 2D map을 송수신하는 pub/sub 파일을 제작해보려고 합니다.
제작 언어는 python이며, 혹시 틀린점이나 더 좋은 아이디어가 있다면 공유해주시면 감사하겠습니다

1. file_pubisher.py 만들기
1-1. 전체 코드
import rclpy
from rclpy.node import Node
import paho.mqtt.client as mqtt
import os
class FilePublisher(Node):
def __init__(self):
super().__init__('file_publisher')
self.client = mqtt.Client()
self.client.connect("broker_IP_address", port_num, 60)
self.client.loop_start()
self.yaml_file_path = 'your_yaml_path' # .yaml 파일 경로
self.pgm_file_path = 'your_pgm_path' # .pgm 파일 경로
self.yaml_file_topic = 'yaml_file_topic'
self.pgm_file_topic = 'pgm_file_topic'
self.publish_file(self.yaml_file_path, self.yaml_file_topic)
self.publish_file(self.pgm_file_path, self.pgm_file_topic)
def publish_file(self, file_path, topic):
with open(file_path, 'rb') as file:
file_data = file.read()
result = self.client.publish(topic, file_data, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.get_logger().info(f"File published to {topic}")
else:
self.get_logger().warning(f"Failed to publish file to {topic}")
def main(args=None):
rclpy.init(args=args)
file_publisher = FilePublisher()
rclpy.spin(file_publisher)
file_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
1-2. 각 구문별 설명
- FilePublisher 클래스의 __init__ 메서드 :
class FilePublisher(Node):
def __init__(self):
super().__init__('file_publisher') # rclpy.Node의 __init__ 메서드 호출 > 노드 초기화
self.client = mqtt.Client() # Paho MQTT 클라이언트 객체 생성
self.client.connect("broker_IP_address", port_num, 60) # MQTT 브로커 연결
self.client.loop_start() # MQTT 클라이언트의 비동기 루프 시작
self.yaml_file_path = 'your_yaml_path' # 보낼 .yaml 파일 경로 지정
self.pgm_file_path = 'your_pgm_path' # 보낼 .pgm 파일 경로 지정
self.yaml_file_topic = 'yaml_file_topic' # .yaml 파일을 전송할 MQTT 토픽 이름 지정
self.pgm_file_topic = 'pgm_file_topic' # # .pgm 파일을 전송할 MQTT 토픽 이름 지정
self.publish_file(self.yaml_file_path, self.yaml_file_topic) # .yaml 파일을 전송하는 publish_file 메서드 호출
self.publish_file(self.pgm_file_path, self.pgm_file_topic) # .pgm 파일을 전송하는 publish_file 메서드 호출
- FilePublisher 클래스의 publish_file 메서드 :
def publish_file(self, file_path, topic): # file_path : 전송할 파일 경로 / topic : 전송할 topic명
with open(file_path, 'rb') as file:
file_data = file.read()
# file_data를 topic으로 발행, 발행 결과 확인하여 로그 출력
result = self.client.publish(topic, file_data, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
self.get_logger().info(f"File published to {topic}")
else:
self.get_logger().warning(f"Failed to publish file to {topic}")
: 파일을 바이너리 모드로 읽어서 file_data에 저장
- main 함수
def main(args=None):
rclpy.init(args=args)
file_publisher = FilePublisher()
rclpy.spin(file_publisher)
file_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
: ROS 2 노드를 초기화하고 실행합니다.
1-3. 실행 순서
1. FIlePublisher 객체 생성
2. .yaml 파일 전송
3. .pgm 파일 전송
4. FilePublisher 노드 종료
2. file_subscriber.py 만들기
2-1. 전체 코드
import rclpy
from rclpy.node import Node
import paho.mqtt.client as mqtt
import os
class FileSubscriber(Node):
def __init__(self):
super().__init__('file_subscriber')
self.client = mqtt.Client()
self.client.connect("broker_IP_address", port_num, 60)
self.client.loop_start()
self.yaml_file_topic = 'yaml_file_topic'
self.pgm_file_topic = 'pgm_file_topic'
self.received_folder = 'save_path'
# 수신 폴더가 존재하지 않으면 생성
if not os.path.exists(self.received_folder):
os.makedirs(self.received_folder)
self.client.subscribe(self.yaml_file_topic)
self.client.subscribe(self.pgm_file_topic)
self.client.message_callback_add(self.yaml_file_topic, self.on_message)
self.client.message_callback_add(self.pgm_file_topic, self.on_message)
def on_message(self, client, userdata, msg):
file_name = os.path.basename(msg.topic)
received_file_path = os.path.join(self.received_folder, file_name)
with open(received_file_path, 'wb') as file:
file.write(msg.payload)
self.get_logger().info(f"Received file saved at: {received_file_path}")
def main(args=None):
rclpy.init(args=args)
file_subscriber = FileSubscriber()
rclpy.spin(file_subscriber)
file_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2-2. 각 구문별 설명
- FileSubscriber 클래스의 __init__ 메서드 :
class FileSubscriber(Node):
def __init__(self):
super().__init__('file_subscriber') # rclpy.Node의 __init__ 메서드 호출 > 노드 초기화
self.client = mqtt.Client() # Paho MQTT 클라이언트 객체 생성
self.client.connect("broker_IP_adrress", port_num, 60) # MQTT 브로커에 연결
self.client.loop_start() # MQTT 클라이언트의 비동기 루프 시작
self.yaml_file_topic = 'yaml_file_topic' # .yaml 파일 수신할 MQTT 토픽 이름 지정
self.pgm_file_topic = 'pgm_file_topic' # .pgm 파일 수신할 MQTT 토픽 이름 지정
self.received_folder = 'save_path' # 수신한 파일 저장할 폴더 경로 지정
# 수신 폴더가 존재하지 않으면 생성
if not os.path.exists(self.received_folder):
os.makedirs(self.received_folder)
self.client.subscribe(self.yaml_file_topic) # .yaml 파일을 위한 MQTT 토픽 구독
self.client.subscribe(self.pgm_file_topic) # .pgm 파일을 위한 MQTT 토픽 구독
self.client.message_callback_add(self.yaml_file_topic, self.on_message) # .yaml 파일 토픽에 대한 메시지 콜백 추가
self.client.message_callback_add(self.pgm_file_topic, self.on_message) # .pgm 파일 토픽에 대한 메시지 콜백 추가
- FileSubscriber 클래스의 on_message 메서드 :
def on_message(self, client, userdata, msg):
file_name = os.path.basename(msg.topic) # 메시지의 토픽에서 파일 이름 추출
received_file_path = os.path.join(self.received_folder, file_name) # 수신 폴더 경로와 파일 이름을 합쳐 수신 파일의 전체 경로를 생성
# 수신된 파일 데이터를 바이너리 모드로 열어 저장
with open(received_file_path, 'wb') as file:
file.write(msg.payload)
# 수신된 파일의 저장 경로를 로그로 출력
self.get_logger().info(f"Received file saved at: {received_file_path}")
- main 함수
def main(args=None):
rclpy.init(args=args)
file_subscriber = FileSubscriber()
rclpy.spin(file_subscriber)
file_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
: ROS 2 노드를 초기화하고 실행
2-3. 실행 순서
1. FileSubscriber 객체 생성
2. .yaml 파일 및 .pgm 파일 수신을 위한 토픽 구독
3. 메시지 수신 시 on_message 콜백 실행
4. 수신된 파일 저장
5. FileSubscriber 노드 종료
3. setup.py
from setuptools import find_packages, setup
from glob import glob
import os
package_name = 'mqtt_test'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py'))
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='hkyoo',
maintainer_email='hkyoo@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'file_publisher = mqtt_test.file_publisher:main',
'file_subscriber = mqtt_test.file_subscriber:main',
],
},
)
: entry_points에 file_publisher, file_subscriber 추가
4. 실행 결과
3-1. 터미널 출력
: 송/수신 완료 로그 출력됨
3-2. 수신된 파일
감사합니다. 빠른 시일 내에 github 에도 코드 공유할 예정입니다.
github upload 시에 블로그에 해당 링크 첨부하겠습니다.
이 글을 보신 모두들 좋은 하루 보내세요! 화이팅~~~!!!!