Python Ros1 melodic

分享
程序员 2024-9-28 00:33:27 37 0 来自 中国
实验平台 ubuntu18.04
实验目标 把握python ros1 读取与发送
本次针对字符串消息举行实验
首先必要安装ubuntu 对应的ros1 版本
ubuntu18.04  对应的ros版本是melodic
安装

安装详细步调如下:
sources.list 添加 ros 源

sudo sh -c 'echo "deb http://packages.ros.org/ros/ubuntu $(lsb_release -sc) main" > /etc/apt/sources.list.d/ros-latest.list'
添加秘钥

sudo apt-key adv --keyserver 'hkp://keyserver.ubuntu.com:80' --recv-key C1CF6E31E6BADE8868B172B4F42ED6FBAB17C654
更新源

sudo apt update
安装ROS(melodic)

sudo apt install ros-melodic-desktop-full
安装依赖包

sudo apt install python-rosdep python-rosinstall python-rosinstall-generator python-wstool build-essential
添加到自启动文件里

echo "source /opt/ros/melodic/setup.bash" >> ~/.bashrc
source ~/.bashrc
ROS 使用的流程

启动roscore(服务端)
发送topic
吸取topic
ros根本指令:
启动主节点
roscore
查察话题名称
ros topic list
录制话题数据
rosbag record *topicname
录制全部话题数据
rosbag record -a
播放bag 包
rosbag play *.bag
Python 代码
实验目标:
学习python 对ros的使用
代码启动ros主节点服务端
代码完成消息的订阅与发送


实验步调:

python 进程启动 roscore
函数:
def start_ros_core():
    result_roscore = os.system('roscore')
使用代码
ros1bag_core_process = multiprocessing.Process(target=start_ros_core)
ros1bag_core_process.start()
发布消息

python 构建发布消息的节点

rospy.init_node("test_pub", anonymous=True))#初始化节点 名称:test_pub
python 构建发布话题的发布者
pub_parking = rospy.Publisher('mystring', String, queue_size=10)
循环每隔一秒发送消息
def sub_mutil():
    num = 0
    while True:
        my_msg = String()
        my_msg.data = f'{num}'
        pub_parking.publish(my_msg)
        time.sleep(1)
        print(f"发送消息:{num}")
        num+=1
sub_mutil()
订阅消息

python 构建吸取的节点

rospy.init_node("test_sub", anonymous=True)#初始化节点 名称:test_sub
python 订阅节点 回调函数的界说


def call_back_String(msg):
    print(f"吸取到消息: {msg.data}")
rospy.Subscriber('mystring',String, call_back_String)
rospy.spin()
展示:




全部代码

发布者代码:

import os
import sys
import time
import rospy
import multiprocessing
from std_msgs.msg import String
def start_ros_core():
    result_roscore = os.system('roscore')
ros1bag_core_process = multiprocessing.Process(target=start_ros_core)
ros1bag_core_process.start()
rospy.init_node("test_pub", anonymous=True)#初始化节点 名称:test
def sub_mutil():
    num = 0
    while True:
        my_msg = String()
        my_msg.data = f'{num}'
        pub_parking.publish(my_msg)
        time.sleep(1)
        print(f"发送消息:{num}")
        num+=1
发布话题

pub_parking = rospy.Publisher('mystring', String, queue_size=10)
sub_mutil()
订阅者代码
import rospy
from std_msgs.msg import String
rospy.init_node("test_sub", anonymous=True)#初始化节点 名称:test
def call_back_String(msg):
    print(f"吸取到消息: {msg.data}")
rospy.Subscriber('mystring',String, call_back_String)
rospy.spin()
您需要登录后才可以回帖 登录 | 立即注册

Powered by CangBaoKu v1.0 小黑屋藏宝库It社区( 冀ICP备14008649号 )

GMT+8, 2024-11-22 09:20, Processed in 0.156387 second(s), 32 queries.© 2003-2025 cbk Team.

快速回复 返回顶部 返回列表