통신/mqtt

SLAM map 정보 송수신(.pgm, .yaml)

우금붕 2024. 4. 26. 10:08

오늘은 MQTT로 SLAM 2D map을 송수신하는 pub/sub 파일을 제작해보려고 합니다.

제작 언어는 python이며, 혹시 틀린점이나 더 좋은 아이디어가 있다면 공유해주시면 감사하겠습니다

 

1. file_pubisher.py 만들기

 

1-1. 전체 코드

import rclpy
from rclpy.node import Node
import paho.mqtt.client as mqtt
import os

class FilePublisher(Node):

    def __init__(self):
        super().__init__('file_publisher')

        self.client = mqtt.Client()
        self.client.connect("broker_IP_address", port_num, 60)
        self.client.loop_start()

        self.yaml_file_path = 'your_yaml_path'  # .yaml 파일 경로
        self.pgm_file_path = 'your_pgm_path'  # .pgm 파일 경로
        self.yaml_file_topic = 'yaml_file_topic'
        self.pgm_file_topic = 'pgm_file_topic'
        
        self.publish_file(self.yaml_file_path, self.yaml_file_topic)
        self.publish_file(self.pgm_file_path, self.pgm_file_topic)

    def publish_file(self, file_path, topic):
        with open(file_path, 'rb') as file:
            file_data = file.read()
        
        result = self.client.publish(topic, file_data, qos=1)
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            self.get_logger().info(f"File published to {topic}")
        else:
            self.get_logger().warning(f"Failed to publish file to {topic}")

def main(args=None):
    rclpy.init(args=args)
    file_publisher = FilePublisher()
    rclpy.spin(file_publisher)
    file_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

 

 

1-2. 각 구문별 설명

  • FilePublisher 클래스의 __init__ 메서드 :
class FilePublisher(Node):

    def __init__(self):
        super().__init__('file_publisher')	# rclpy.Node의 __init__ 메서드 호출 > 노드 초기화

        self.client = mqtt.Client()	# Paho MQTT 클라이언트 객체 생성
        self.client.connect("broker_IP_address", port_num, 60)	# MQTT 브로커 연결
        self.client.loop_start() # MQTT 클라이언트의 비동기 루프 시작

        self.yaml_file_path = 'your_yaml_path'  # 보낼 .yaml 파일 경로 지정
        self.pgm_file_path = 'your_pgm_path'  # 보낼 .pgm 파일 경로 지정
        self.yaml_file_topic = 'yaml_file_topic'	# .yaml 파일을 전송할 MQTT 토픽 이름 지정
        self.pgm_file_topic = 'pgm_file_topic'	# # .pgm 파일을 전송할 MQTT 토픽 이름 지정
        
        self.publish_file(self.yaml_file_path, self.yaml_file_topic)	# .yaml 파일을 전송하는 publish_file 메서드 호출
        self.publish_file(self.pgm_file_path, self.pgm_file_topic)	# .pgm 파일을 전송하는 publish_file 메서드 호출

 

 

  • FilePublisher 클래스의 publish_file 메서드 :
    def publish_file(self, file_path, topic):	# file_path : 전송할 파일 경로 / topic : 전송할 topic명
        with open(file_path, 'rb') as file:	
            file_data = file.read()
        
        # file_data를 topic으로 발행, 발행 결과 확인하여 로그 출력
        result = self.client.publish(topic, file_data, qos=1)
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            self.get_logger().info(f"File published to {topic}")
        else:
            self.get_logger().warning(f"Failed to publish file to {topic}")

: 파일을 바이너리 모드로 읽어서 file_data에 저장

 

 

  • main 함수
def main(args=None):
    rclpy.init(args=args)
    file_publisher = FilePublisher()
    rclpy.spin(file_publisher)
    file_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

: ROS 2 노드를 초기화하고 실행합니다.

 

 

 

1-3. 실행 순서

1. FIlePublisher 객체 생성

2. .yaml 파일 전송

3. .pgm 파일 전송

4. FilePublisher 노드 종료

 

 

 

 

 


2. file_subscriber.py 만들기

 

2-1. 전체 코드

import rclpy
from rclpy.node import Node
import paho.mqtt.client as mqtt
import os

class FileSubscriber(Node):

    def __init__(self):
        super().__init__('file_subscriber')

        self.client = mqtt.Client()
        self.client.connect("broker_IP_address", port_num, 60)
        self.client.loop_start()

        self.yaml_file_topic = 'yaml_file_topic'
        self.pgm_file_topic = 'pgm_file_topic'
        self.received_folder = 'save_path'

        # 수신 폴더가 존재하지 않으면 생성
        if not os.path.exists(self.received_folder):
            os.makedirs(self.received_folder)

        self.client.subscribe(self.yaml_file_topic)
        self.client.subscribe(self.pgm_file_topic)
        
        self.client.message_callback_add(self.yaml_file_topic, self.on_message)
        self.client.message_callback_add(self.pgm_file_topic, self.on_message)

    def on_message(self, client, userdata, msg):
        file_name = os.path.basename(msg.topic)
        received_file_path = os.path.join(self.received_folder, file_name)
        
        with open(received_file_path, 'wb') as file:
            file.write(msg.payload)
        
        self.get_logger().info(f"Received file saved at: {received_file_path}")

def main(args=None):
    rclpy.init(args=args)
    file_subscriber = FileSubscriber()
    rclpy.spin(file_subscriber)
    file_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

 

 

 

 

2-2. 각 구문별 설명

  • FileSubscriber 클래스의 __init__ 메서드 :
class FileSubscriber(Node):

    def __init__(self):
        super().__init__('file_subscriber')	# rclpy.Node의 __init__ 메서드 호출 >  노드 초기화

        self.client = mqtt.Client()	# Paho MQTT 클라이언트 객체 생성
        self.client.connect("broker_IP_adrress", port_num, 60)	# MQTT 브로커에 연결
        self.client.loop_start()	# MQTT 클라이언트의 비동기 루프 시작

        self.yaml_file_topic = 'yaml_file_topic'	# .yaml 파일 수신할 MQTT 토픽 이름 지정
        self.pgm_file_topic = 'pgm_file_topic'	# .pgm 파일 수신할 MQTT 토픽 이름 지정
        self.received_folder = 'save_path'	# 수신한 파일 저장할 폴더 경로 지정

        # 수신 폴더가 존재하지 않으면 생성
        if not os.path.exists(self.received_folder):	
            os.makedirs(self.received_folder)

        self.client.subscribe(self.yaml_file_topic)	# .yaml 파일을 위한 MQTT 토픽 구독
        self.client.subscribe(self.pgm_file_topic)	# .pgm 파일을 위한 MQTT 토픽 구독
        
        self.client.message_callback_add(self.yaml_file_topic, self.on_message)	# .yaml 파일 토픽에 대한 메시지 콜백 추가
        self.client.message_callback_add(self.pgm_file_topic, self.on_message)	# .pgm 파일 토픽에 대한 메시지 콜백 추가

 

 

  • FileSubscriber 클래스의 on_message 메서드 :
    def on_message(self, client, userdata, msg):
        file_name = os.path.basename(msg.topic)	# 메시지의 토픽에서 파일 이름 추출
        received_file_path = os.path.join(self.received_folder, file_name)	# 수신 폴더 경로와 파일 이름을 합쳐 수신 파일의 전체 경로를 생성
        
        # 수신된 파일 데이터를 바이너리 모드로 열어 저장
        with open(received_file_path, 'wb') as file:
            file.write(msg.payload)
       	
        # 수신된 파일의 저장 경로를 로그로 출력
        self.get_logger().info(f"Received file saved at: {received_file_path}")

 

 

  • main 함수
def main(args=None):
    rclpy.init(args=args)
    file_subscriber = FileSubscriber()
    rclpy.spin(file_subscriber)
    file_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

: ROS 2 노드를 초기화하고 실행

 

 

 

2-3. 실행 순서

1. FileSubscriber 객체 생성

2. .yaml 파일 및 .pgm 파일 수신을 위한 토픽 구독
3. 메시지 수신 시 on_message 콜백 실행
4. 수신된 파일 저장
5. FileSubscriber 노드 종료

 

 

 

 

 

 

 


3. setup.py

from setuptools import find_packages, setup
from glob import glob
import os

package_name = 'mqtt_test'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages', ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='hkyoo',
    maintainer_email='hkyoo@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'file_publisher = mqtt_test.file_publisher:main',
            'file_subscriber = mqtt_test.file_subscriber:main',

        ],
    },
)

: entry_points에 file_publisher, file_subscriber 추가

 

 

 

 

 


4. 실행 결과

 

3-1. 터미널 출력

: 송/수신 완료 로그 출력됨

 

 

3-2. 수신된 파일

 

 

 

 

 

감사합니다. 빠른 시일 내에 github 에도 코드 공유할 예정입니다.

github upload 시에 블로그에 해당 링크 첨부하겠습니다.

 

이 글을 보신 모두들 좋은 하루 보내세요! 화이팅~~~!!!!