本文首发于古月居
这篇博客主要讲述了如何通过OriginBot来看护宝宝,当宝宝的脸不在摄像头的范围之内时,发送消息到钉钉群组,通知家人及时查看。
前言 我在上个月有了宝宝,为了方便照看宝宝,就买了一个带有宝宝看护功能的摄像头,但是产品做的不怎么样,最重要的脸部遮挡检测功能用不了,后来就退货了。退货后就萌生了自己用OriginBot做一个类似功能的想法,于是就有了这篇博客~
功能流程图(架构图) 具体的流程或者说架构如下:
其实整体也不复杂,OriginBot上有一个MIPI摄像头,然后利用地平线TogetheROS.Bot的人体检测和跟踪 来实时检测摄像头中有没有脸部,如果没有脸部,就向后端发送一条数据,然后后端会向钉钉群组发消息告诉家人。钉钉群组里面需要事先创建一个webhook。
下面会分为人体检测、判断有无脸部、后端操作三个部分来记录。
人体检测 这一部分用的是地平线TogetheROS.Bot现成的功能 ,启动OriginBot之后,在命令行中运行如下命令:
1 2 3 4 5 6 7 8 9 10 11 source /opt/tros/setup.bashcp -r /opt/tros/lib/mono2d_body_detection/config/ .export CAM_TYPE=mipiros2 launch mono2d_body_detection mono2d_body_detection.launch.py
此时可以通过 http://IP:8000 查看检测效果,这个模块检测了人体、人头、人脸、人手检测框、检测框类型和目标跟踪ID以及人体关键点等,我只取其中的人脸部分,当然了,以后也可以增加如人体检测等。
上面的命令运行之后,在OriginBot上执行ros2 topic list
,应该会有一个topic叫做hobot_mono2d_body_detection
, 这个就是我们需要的,我们会订阅这个话题,然后分析其中发送数据来判断有没有脸部
判断摄像头中有没有脸部 按照TogetheROS.Bot的文档说明,hobot_mono2d_body_detection
的消息类型是ai_msgs.msg.PerceptionTargets
, 具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 # 感知结果 # 消息头 std_msgs/Header header # 感知结果的处理帧率 # fps val is invalid if fps is less than 0 int16 fps # 性能统计信息,比如记录每个模型推理的耗时 Perf[] perfs # 感知目标集合 Target[] targets # 消失目标集合 Target[] disappeared_targets
其中的disappeared_targets
是我们关注的重点,如果face出现在disappeared_targets
中,就说明之前是有脸部的,但是现在没有了,这个时候就要向后端发出数据进一步处理。
我为了判断有无脸部,写了一个ROS2 Node,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 import rclpyfrom rclpy.node import Nodefrom ai_msgs.msg import PerceptionTargetsfrom cv_bridge import CvBridgeimport timefrom api_connection import APIConnectionBabyMonitorMapping = { "face" : "看不到脸" , "body" : "不在摄像头范围内" , } class FaceDetectionListener (Node ): """ 检测宝宝的脸是不是在摄像头中 """ def __init__ (self ): super ().__init__("face_detection" ) self.bridge = CvBridge() self.subscription = self.create_subscription( PerceptionTargets, "hobot_mono2d_body_detection" , self.listener_callback, 10 ) self.conn = APIConnection() self.timer = time.time() self.counter = 0 def listener_callback (self, msg ): targets = msg.targets disappeared_targets = msg.disappeared_targets targets_list = [] disappeared_targets_list = [] if disappeared_targets: for item in disappeared_targets: disappeared_targets_list.append(item.rois[0 ].type ) if targets: for item in targets: targets_list.append(item.rois[0 ].type ) print (f"检测到的对象如下:{targets_list} " ) print (f"消失的对象如下:{disappeared_targets_list} " ) if disappeared_targets_list: self.sending_notification(disappeared_targets_list) def sending_notification (self, disappeared_targets_list ): for item in disappeared_targets_list: if BabyMonitorMapping.get(item): event = BabyMonitorMapping.get(item) if self.counter == 0 : data = {"event" : event, "baby" : "6b56979a-b2b9-11ee-920d-f12e14f97477" } self.conn.post_data(item=data, api="api/monitor/face-detection/" ) self.counter += 1 self.timer = time.time() else : if time.time() - self.timer >= 60.0 : data = {"event" : event, "baby" : "6b56979a-b2b9-11ee-920d-f12e14f97477" } self.conn.post_data(item=data, api="api/monitor/face-detection/" ) self.timer = time.time() self.counter += 1 def main (args=None ): rclpy.init(args=args) try : face_detection_listener = FaceDetectionListener() rclpy.spin(face_detection_listener) except KeyboardInterrupt: print ("终止运行" ) finally : face_detection_listener.destroy_node() rclpy.shutdown() if __name__ == "__main__" : main()
代码整体不难,但还是做几点必要的说明:1. BabyMonitorMapping 这个字典的作用是把TogetheROS.Bot里面的字段跟后端的字段做一个映射关系,方便后面使用
2. API调用 代码中有两行如下:
1 2 data = {"event" : event, "baby" : "6b56979a-b2b9-11ee-920d-f12e14f97477" } self.conn.post_data(item=data, api="api/monitor/face-detection/" )
这里的uri和data的格式由后端决定,不必细究,想了解细节的可以看后端代码。
3.APIConnection APIConnection是用于请求的API的一个封装类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 """ API CONNECTION FOR IMPORTING WRAPPER """ import jsonimport loggingimport requestsimport envslogging.basicConfig( format ="%(asctime)s %(levelname)-8s %(message)s" , level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S" , ) class APIConnection : """ Api Connection """ def __init__ (self ): self.api_url = envs.API_URL self.token = None self.headers = { "Content-Type" : "application/json" , "Cache-Control" : "no-cache" , } self.request_jwt() def request_jwt (self ): """ Request JWT token. """ logging.info("Requesting JWT.." ) api_url = f"{self.api_url} api/token/" data = { "username" : envs.SCRIPT_USER, "password" : envs.SCRIPT_PASSWORD, } res = requests.post(api_url, data=json.dumps(data), headers=self.headers) if res.status_code == 200 : data = res.json() self.token = data["access" ] self.headers["Authorization" ] = f"Bearer {self.token} " else : logging.error( f"Failed to obtain JWT. Status code: {res.status_code} , Message: {res.text} " ) def upload_video (self, api, file ): """ post data :param item: items to be posted in json format :param api: path of endpoint """ api_url = f"{self.api_url} {api} " try : res = requests.post(api_url, files=file, headers=self.headers, timeout=1 ) if res.status_code == 401 : self.request_jwt() self.post_data(api, file) elif res.status_code in [200 , 201 ]: logging.info(f"{res.status_code} - video uploaded successfully" ) return res.status_code else : logging.error( f"{res.status_code} - {res.json()} - unable to upload video" ) return res.status_code except Exception as err: logging.error(err) return 500 def post_data (self, item, api ): """ 新建一条数据 """ api_url = f"{self.api_url} {api} " try : response = requests.post( api_url, data=json.dumps(item), headers=self.headers, timeout=1 ) if response.status_code == 403 : self.request_jwt() self.post_data(item, api) elif response.status_code not in [200 , 201 ]: logging.error( f"post data to backend failed, \ status code is {response.status_code} , \ error message is:\n \ {response.text} " ) except Exception as err: logging.error( f"post data to backend failed, \ error message is:\n \ {err} " )
后端操作 经过前面两个部分,如果发现了脸部数据从OriginBot的摄像头中消失的话,已经会把记录发送到后端了,现在来讲一下后端的部分。
后端的操作说起来其实也很简单,就是接收到数据后,现在数据库中存一份,然后再向钉钉发消息提醒家人。
在OriginBot家庭助理计划 里面提到过,后端是基于Django + Django rest framework 开发的,下面的内容需要一点django的基础才能看懂。
首先是创建了两个django model来存储数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class Baby (models.Model): """ 记录的宝宝的数据 """ id = models.UUIDField(primary_key=True , default=uuid.uuid1, editable=False ) name = models.CharField(max_length=256 ) birth_date = models.DateField() gender = models.CharField(max_length=1 , choices=(("男" , "男" ), ("女" , "女" ))) def __str__ (self ): return self.name class BabyMonitorData (models.Model): """ 记录宝宝监控相关的数据 """ event_choices = ( ("看不到脸" , "看不到脸" ), ("哭" , "哭" ), ("翻身" , "翻身" ), ("不在摄像头范围内" , "不在摄像头范围内" ), ) baby = models.ForeignKey(Baby, on_delete=models.PROTECT) event = models.CharField(max_length=128 , choices=event_choices) timestamp = models.DateTimeField(auto_now_add=True ) def __str__ (self ): return f"{self.baby.name} {self.event} {self.timestamp} " class Meta : ordering = ["-timestamp" ]
需要注意的是,Baby和BabyMonitorData两个类之间存在外键 关系。
在FaceDetectionListener中请求的api/monitor/face-detection/
这个uri,最终在后端执行的代码是下面这一段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class BabyMonitorView (viewsets.ModelViewSet): queryset = BabyMonitorData.objects.all ().order_by("-timestamp" ) serializer_class = BabyMonitorSerializer def create (self, request, *args, **kwargs ): response = super ().create(request, *args, **kwargs) message = request.data try : event = message.get("event" ) baby = Baby.objects.filter (id =message.get("baby" ))[0 ].name except IndexError: print ("找不到对应的婴儿数据" ) return response send_msg_to_dingtalk(f"{baby} {event} 啦!" ) return response
这里面做的事情就是向BabyMonitorData中存一条数据,然后通过send_msg_to_dingtalksend_msg_to_dingtalk
给钉钉群组发消息。
send_msg_to_dingtalk
的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import jsonimport requestsfrom utils import envsfrom datetime import datetimedef send_msg_to_dingtalk (msg ): webhook = envs.DING_TALK_URL headers = {"Content-Type" : "application/json;charset=utf-8" } data = { "msgtype" : "text" , "text" : { "content" : msg + f"[来自originbot,{datetime.now().strftime('%Y-%m-%d %H:%M:%S' )} ]" }, } response = requests.post(webhook, headers=headers, data=json.dumps(data)) return response.text if __name__ == "__main__" : message = "Hello from my Python script!" send_msg_to_dingtalk(message)
代码中的webhook = envs.DING_TALK_URL
其实是获取环境变量中钉钉群组机器人链接,至于如何创建钉钉群组机器人网上有很多教程,不再赘述。
到这里,如果一切顺利,应该就可以在钉钉里面看到消息啦,效果如下:
源代码 源码地址:https://github.com/yexia553/originbot_home_assistant
待优化 虽然一个最基础的demo完成了,但是还有很多需要优化的地方:
小车只能在地上跑,但是宝宝一般在床上,只有把小车放在特定的位置,才能实现上述功能,这限制了实际应用,需要解决。
续航问题,目前小车的电池只能使用2个小时左右
目前只是检测摄像头中有没有脸,而不是检测宝宝的脸 ,可以考虑优化
可以增加类似哭声检测、翻身等动作识别,并加上数据分析