OriginBot Home Assistant Project - Baby Monitoring Assistant

This article was first published on Guyuehome

This blog mainly describes how to use OriginBot to monitor a baby. When the baby’s face is not within the camera’s range, a message will be sent to the DingTalk group to notify family members to check in a timely manner.


Introduction

I had a baby last month. To take care of the baby more conveniently, I bought a camera with a baby monitoring function. However, the product didn’t perform well. The most important face occlusion detection function didn’t work, so I returned it later. After returning it, I came up with the idea of creating a similar function using OriginBot myself, which led to this blog~


Function Flowchart (Architecture Diagram)

The specific process or architecture is as follows:

originbot face detection

In fact, the overall process is not complicated. There is a MIPI camera on the OriginBot. Then, Horizon TogetheROS.Bot’s human body detection and tracking is used to detect in real - time whether there is a face in the camera. If there is no face, a piece of data will be sent to the backend, and then the backend will send a message to the DingTalk group to inform family members. A webhook needs to be created in the DingTalk group in advance.

The following will be recorded in three parts: human body detection, judgment of the presence of a face, and backend operations.


Human Body Detection

This part uses the ready - made function of Horizon TogetheROS.Bot. After starting the OriginBot, run the following commands in the command line:

1
2
3
4
5
6
7
8
9
10
11
# Configure the tros.b environment
source /opt/tros/setup.bash

# Copy the configuration files needed to run the example from the tros.b installation path.
cp -r /opt/tros/lib/mono2d_body_detection/config/ .

# Configure the MIPI camera
export CAM_TYPE=mipi

# Launch the launch file
ros2 launch mono2d_body_detection mono2d_body_detection.launch.py

At this time, you can view the detection effect through http://IP:8000. This module detects human bodies, human heads, faces, human hand detection boxes, detection box types, target tracking IDs, and human body key points, etc. I only take the face part. Of course, in the future, functions such as human body detection can also be added.

After running the above commands, execute ros2 topic list on the OriginBot. There should be a topic called hobot_mono2d_body_detection. This is what we need. We will subscribe to this topic and then analyze the data sent in it to determine whether there is a face.


Judging Whether There is a Face in the Camera

According to the documentation of TogetheROS.Bot, the message type of hobot_mono2d_body_detection is ai_msgs.msg.PerceptionTargets, specifically as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Perception results

# Message header
std_msgs/Header header

# Processing frame rate of perception results
# fps val is invalid if fps is less than 0
int16 fps

# Performance statistics, such as recording the time consumption of each model inference
Perf[] perfs

# Set of perception targets
Target[] targets

# Set of disappeared targets
Target[] disappeared_targets

The disappeared_targets is what we focus on. If “face” appears in disappeared_targets, it means that there was a face before, but now there isn’t. At this time, data needs to be sent to the backend for further processing.

To judge whether there is a face, I wrote a ROS2 Node. The code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import rclpy
from rclpy.node import Node
from ai_msgs.msg import PerceptionTargets
from cv_bridge import CvBridge
import time

from api_connection import APIConnection

BabyMonitorMapping = {
# The k - v here should be determined according to the values in the backend Django
"face": "Can't see the face",
"body": "Not within the camera range",
}


class FaceDetectionListener(Node):
"""
Detect whether the baby's face is in the camera
"""
def __init__(self):
super().__init__("face_detection")
self.bridge = CvBridge()
self.subscription = self.create_subscription(
PerceptionTargets, "hobot_mono2d_body_detection", self.listener_callback, 10
)
self.conn = APIConnection()
self.timer = time.time()
self.counter = 0

def listener_callback(self, msg):
targets = msg.targets
disappeared_targets = msg.disappeared_targets
targets_list = []
disappeared_targets_list = []
if disappeared_targets:
for item in disappeared_targets:
disappeared_targets_list.append(item.rois[0].type)
if targets:
for item in targets:
targets_list.append(item.rois[0].type)
print(f"Detected objects are as follows: {targets_list}")
print(f"Disappeared objects are as follows: {disappeared_targets_list}")
if disappeared_targets_list:
self.sending_notification(disappeared_targets_list)

def sending_notification(self, disappeared_targets_list):
for item in disappeared_targets_list:
if BabyMonitorMapping.get(item):
event = BabyMonitorMapping.get(item)
if self.counter == 0:
# The baby's ID here is simulated and should be retrieved from the database
data = {"event": event, "baby": "6b56979a - b2b9 - 11ee - 920d - f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face - detection/")
self.counter += 1
self.timer = time.time()
else:
if time.time() - self.timer >= 60.0:
# Don't send repeated DingTalk messages within 60 seconds
data = {"event": event, "baby": "6b56979a - b2b9 - 11ee - 920d - f12e14f97477"}
self.conn.post_data(item=data, api="api/monitor/face - detection/")
self.timer = time.time()
self.counter += 1


def main(args=None):
rclpy.init(args=args)
try:
face_detection_listener = FaceDetectionListener()

rclpy.spin(face_detection_listener)
except KeyboardInterrupt:
print("Terminating the operation")
finally:
face_detection_listener.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()

The code as a whole is not difficult, but a few necessary explanations are still needed:
1. BabyMonitorMapping
The role of this dictionary is to map the fields in TogetheROS.Bot to the fields in the backend for convenient use later.

2. API call
There are two lines of code as follows:

1
2
data = {"event": event, "baby": "6b56979a - b2b9 - 11ee - 920d - f12e14f97477"} 
self.conn.post_data(item=data, api="api/monitor/face - detection/")

The format of the uri and data here is determined by the backend. There is no need to go into details. Those who want to know the details can refer to the backend code.

3.APIConnection
APIConnection is a wrapper class for requesting the API. The code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
API CONNECTION FOR IMPORTING WRAPPER
"""
import json
import logging
import requests
import envs


logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)


class APIConnection:
"""
Api Connection
"""

def __init__(self):
self.api_url = envs.API_URL

self.token = None
self.headers = {
"Content - Type": "application/json",
"Cache - Control": "no - cache",
}
self.request_jwt()

def request_jwt(self):
"""
Request JWT token.
"""
logging.info("Requesting JWT..")

api_url = f"{self.api_url}api/token/"
data = {
"username": envs.SCRIPT_USER,
"password": envs.SCRIPT_PASSWORD,
}

res = requests.post(api_url, data=json.dumps(data), headers=self.headers)

if res.status_code == 200:
data = res.json()
self.token = data["access"]
self.headers["Authorization"] = f"Bearer {self.token}"
else:
logging.error(
f"Failed to obtain JWT. Status code: {res.status_code}, Message: {res.text}"
)

def upload_video(self, api, file):
"""
post data
:param item: items to be posted in json format
:param api: path of endpoint
"""
api_url = f"{self.api_url}{api}"

try:
res = requests.post(api_url, files=file, headers=self.headers, timeout=1)

if res.status_code == 401:
self.request_jwt()
self.post_data(api, file)

elif res.status_code in [200, 201]:
logging.info(f"{res.status_code} - video uploaded successfully")
return res.status_code

else:
logging.error(
f"{res.status_code} - {res.json()}- unable to upload video"
)
return res.status_code

except Exception as err:
logging.error(err)
return 500

def post_data(self, item, api):
"""
Create a new piece of data
"""
api_url = f"{self.api_url}{api}"

try:
response = requests.post(
api_url, data=json.dumps(item), headers=self.headers, timeout=1
)

if response.status_code == 403:
self.request_jwt()
self.post_data(item, api)

elif response.status_code not in [200, 201]:
logging.error(
f"post data to backend failed, \
status code is {response.status_code}, \
error message is:\n \
{response.text}"
)
except Exception as err:
logging.error(
f"post data to backend failed, \
error message is:\n \
{err}"
)

Backend Operations

After the first two parts, if it is found that the face data has disappeared from the OriginBot’s camera, a record has been sent to the backend. Now let’s talk about the backend part.

The backend operations are actually quite simple. After receiving the data, it is first stored in the database, and then a message is sent to DingTalk to remind family members.

As mentioned in the OriginBot Home Assistant Project, the backend is developed based on Django + Django rest framework. The following content requires some basic knowledge of django to understand.

First, two django models are created to store data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Baby(models.Model):
"""
Recorded baby data
"""

id = models.UUIDField(primary_key=True, default=uuid.uuid1, editable=False)
name = models.CharField(max_length=256)
birth_date = models.DateField()
gender = models.CharField(max_length=1, choices=(("Male", "Male"), ("Female", "Female")))

def __str__(self):
return self.name


class BabyMonitorData(models.Model):
"""
Record data related to baby monitoring
"""

event_choices = (
("Can't see the face", "Can't see the face"),
("Crying", "Crying"),
("Turning over", "Turning over"),
("Not within the camera range", "Not within the camera range"),
)

baby = models.ForeignKey(Baby, on_delete=models.PROTECT)
event = models.CharField(max_length=128, choices=event_choices)
timestamp = models.DateTimeField(auto_now_add=True)

def __str__(self):
return f"{self.baby.name} {self.event} {self.timestamp}"

class Meta:
ordering = ["-timestamp"]

It should be noted that there is a foreign key relationship between the Baby and BabyMonitorData classes.

The uri api/monitor/face - detection/ requested in FaceDetectionListener finally executes the following code in the backend:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class BabyMonitorView(viewsets.ModelViewSet):
queryset = BabyMonitorData.objects.all().order_by("-timestamp")
serializer_class = BabyMonitorSerializer

def create(self, request, *args, **kwargs):
response = super().create(request, *args, **kwargs)

message = request.data
try:
event = message.get("event")
baby = Baby.objects.filter(id=message.get("baby"))[0].name
except IndexError:
print("Corresponding baby data not found")
return response

send_msg_to_dingtalk(f"{baby} {event} !")

# Return the original response
return response

What is done here is to store a piece of data in BabyMonitorData and then send a message to the DingTalk group through send_msg_to_dingtalk.

The code of send_msg_to_dingtalk is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json
import requests
from utils import envs

from datetime import datetime


def send_msg_to_dingtalk(msg):
webhook = envs.DING_TALK_URL
headers = {"Content - Type": "application/json;charset=utf - 8"}
data = {
"msgtype": "text",
"text": {
"content": msg
+ f"[From originbot, {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}]"
},
}

response = requests.post(webhook, headers=headers, data=json.dumps(data))
return response.text


if __name__ == "__main__":
message = "Hello from my Python script!"
send_msg_to_dingtalk(message)

In the code, webhook = envs.DING_TALK_URL actually obtains the DingTalk group robot link in the environment variable. As for how to create a DingTalk group robot, there are many tutorials on the Internet, so it will not be elaborated here.

Up to this point, if everything goes well, you should be able to see the message in DingTalk. The effect is as follows:

OriginBotFaceDetectionDingTalk


Source Code

Source code address: https://github.com/yexia553/originbot_home_assistant


Areas for Optimization

Although a basic demo has been completed, there are still many areas that need to be optimized:

  1. The robot can only run on the ground, but babies are usually on the bed. Only by placing the robot in a specific position can the above - mentioned function be realized, which limits practical applications and needs to be solved.
  2. Battery life issue. Currently, the robot’s battery can only last for about 2 hours.
  3. Currently, it only detects whether there is a face in the camera, not specifically the baby’s face, which can be considered for optimization.
  4. Functions such as crying detection, turning - over action recognition, and data analysis can be added.