Send the measurements from the sensor device connected to the Raspberry Pi to the Kafka broker using SINETStream.
flowchart LR
subgraph C1[Raspberry Pi]
S1([sensor])
subgraph P["producer.py"]
P1(SINETStream)
end
end
subgraph Server
B[Kafka Broker]
end
S1---P
P1==>B
- Raspberry Pi
- The steps shown here have been tested on the Raspberry Pi OS.
- Python
- Python 3.8 or later
The Kafka broker to which the sensor data will be sent must be available. Please build the Kafka broker in advance with one of the following configurations.
- NumericalSensorData/Server/Kafka-Grafana
- NumericalSensorData/Server/Kafka-Zabbix
- option/Server/Kafka
Create a Python script to send sensor measurements to the Kafka broker. Since the process of acquiring sensor readings differs for each sensor, this section first shows the steps for creating a send script, and then shows an example implementation for several sensors.
Create a Python script to send sensor readings to the Kafka broker. There are three main processes required.
- calling the main loop process
- preparing the sensor for use
- reading measurements from the sensor
The main loop reads sensor data at regular intervals and sends the values to the Kafka broker. This is the function of the simple producer provided by sinetstream-cmd.
The following is an example implementation that serves as a model for a sender program. In this example, the program does not acquire values from the actual sensor, but uses random numbers as the sensor's measurement values.
from random import seed, normalvariate
from sinetstream.cmd.producer import SimpleProducer
def get_sensor_data(device):
"""Read measurements from the sensor."""
device["value"] += normalvariate(0, 5)
return {"random": device["value"]}
def init():
""""Prepare the sensor for use."""
seed()
return {"value": 50.0}
def main():
"""Call the main loop."""
device = init()
producer = SimpleProducer(lambda: get_sensor_data(device))
producer.run()
if __name__ == "__main__":
main()
A file with the same contents as above can be found in template/proudcer.py. Use it as a template.
Descriptions of each process are given below.
The process of calling the main loop is as follows:
def main():
device = init()
producer = SimpleProducer(lambda: get_sensor_data(device))
producer.run()
This process creates a SimpleProducer
object provided by sinetstream-cmd and calls its method run()
. The main loop is executed by calling the run()
method.
In the main loop, the callback function specified in the constructor of SimpleProducer
is called at regular intervals. Here, lambda: get_sensor_data(device)
is specified as the callback function, so the sensor readings are read at regular intervals. The sensor readings returned by the callback function are sent to the Kafka broker by the main loop using the SINETStream functionality.
The main loop will continue unless interrupted by an error or signal notification. Therefore, run()
is a method that never returns to the caller.
In this process, objects are created and initialized to use the sensor. In the previous example, we initialize the random number generator and set the initial values of the data.
def init():
seed()
return {"value": 50.0}
When actually using the sensor, there is no specific interface for the process here, as each process is different. Implement the processing according to your needs. In general, the return value of this function is the object that operates the sensor. Check the specific processing in the implementation of each sensor shown in "[2.2. Implementation Examples] (#22-implementation-examples)".
This process implements a function that returns the measured value of the sensor. The return value of the function must conform to the following rules.
- The type of the return value should be a dictionary type (
dict
) - The key must be a sensor type
- The value should be a measurement value of type
float
orint
In the previous example of the template, the current value plus a random number value is used as the return value of the function as a new measurement value.
def get_sensor_data(device):
device["value"] += normalvariate(0, 5)
return {"random": device["value"]}
The following are implementation examples for some sensors.
- DHT11: temperature/humidity sensor
- Example implementation: dht11/producer-dht11.py
- Procedure: dht11/README.en.md
- SHT3x: Temperature/humidity sensor
- Example implementation: sht3x/producer-sht3x.py
- Procedure: sht3x/README.en.md
- SCD41: CO2 sensor
- Implementation example: scd41/producer-scd41.py
- Procedure: scd41/README.en.md
Install the Python libraries that the transmitter program will use.
pip install -U --user sinetstream-kafka sinetstream-cmd
If you get an error because of conflicts with libraries you have already installed, use venv or pipenv. Also, the
pip
command may bepip3
in some environments. Replace it as necessary.
In addition to this, a library for using sensors is required to perform sensor measurements on Raspberry Pi. The libraries for using the sensors are different, so please install the appropriate one. The installation procedure for each sensor is shown in 2.2. Implementation Examples.
The sensor data transmission program uses the SINETStream library to send measurements to the Kafka broker. SINETStream requires parameters such as the address of the message brokers, topic name, and type to be described in the configuration file .sinetstream_config.yml
. An example of the configuration file is shown below.
sensors:
topic: sinetstream.sensor
brokers: kafka.example.org:9092
type: kafka
consistency: AT_LEAST_ONCE
Modify the values of brokers
and topic
to match your environment. See SINETstream - Configuration File for details on how to write sinetstream_config.yml
, including other parameters. The configuration file should be placed in the same directory as the sending script.
A file example_sinetstream_config.yml with the same contents as the example is located in this directory. Use it as a template.
The data sent to the Kafka broker by the send program includes the sensor type and the hostname of the sender, so you can specify the same topic name for all sensor types and clients.
This section describes the command line arguments of the send script.
$ . /producer.py --help
usage: producer.py [-h] [-s SERVICE] [-n NODE_NAME] [-I INTERVAL] [-v] [-R MAX_RETRY]
SINETStream Producer
optional arguments:
-h, --help show this help message and exit
-s SERVICE, --service SERVICE
-n NODE_NAME, --name NODE_NAME
-I INTERVAL, --interval INTERVAL
-v, --verbose
-R MAX_RETRY, --retry MAX_RETRY
-n
- hostname of data source
- default value: ホスト名
-I
- Sensor measurement interval
- Default value: 60 (seconds)
-v
- Display transmitted data on console
-s
- Service names defined in the SINETStream configuration file
- Default value:
sensors
-R
- Number of retries on error
- If a negative value is specified, retry indefinitely
- Default value: -1
If you want the sensor data transmission program to run constantly on the Raspberry Pi, registering it as a systemd service makes it easier to manage. This will make it possible to automatically run the sending program when the Raspberry Pi starts up, or to rerun the program when an error occurs.
The procedure for registering the transmission program as a service in systemd and starting it is shown below.
- Create a configuration file for the service to be registered.
- Instruct systemd to load the configuration file.
- Start the service.
- Configure automatic startup of the service.
All of the above steps require administrator privileges. Please use
sudo
or similar to obtain administrator privileges.
Create a systemd configuration file in /etc/systemd/system/
. The file name should be suffixed with .service
, for example sensor.service
. An example configuration file is shown below.
[Unit]
Description=Send sensor data
[Service]
Type=simple
User=user01
WorkingDirectory=/home/user01/sensor
ExecStart=/home/user01/sensor/producer.py
Restart=always
[Install]
WantedBy=multi-user.target
Change Description
, User
, WorkingDirectory
, and ExecStart
appropriately for your actual environment. Specify the path to the sending program in ExecStart
and the directory where the SINETStream configuration file .sinetstream_config.yml
is located in WorkingDirectory
. Also, for User
, specify the user name under which you want to run the program. Specifying User
is necessary to use libraries installed in the user's $HOME/.local/lib/
from the sending program.
A file example_sensor.service with the same contents as the description example is located in this directory. Use it as a template.
To tell systemd to read the configuration file you created in /etc/systemd/system/
, issue the following command.
sudo systemctl daemon-reload
Confirm that the service has been registered with the systemctl status
command. The following is an example of execution when the service is registered with the service name sensor
.
$ sudo systemctl status sensor
● sensor.service - Send sensor data
Loaded: loaded (/etc/systemd/system/sensor.service; disabled; vendor prese>
Active: inactive (dead)
Start the registered service with the systemctl start
command.
The following is an example of execution when the service is registered with the service name sensor
. After starting the service with systemctl start
, check the status with systemctl status
.
$ sudo systemctl start sensor
$ sudo systemctl status sensor
● sensor.service - Send sensor data
Loaded: loaded (/etc/systemd/system/sensor.service; disabled; vendor prese>
Active: active (running) since Wed 2022-02-09 02:57:05 GMT; 3s ago
Main PID: 732 (python)
Tasks: 1 (limit: 4915)
CPU: 2.395s
CGroup: /system.slice/sensor.service
└─732 python ./producer.py
Feb 09 02:57:05 raspberrypi systemd[1]: Started Send sensor data.
Configure the registered service to start automatically when the Raspberry Pi is started. Run the systemctl enable
command with the service name.
The following is an example of execution when the service name is registered as sensor
.
$ sudo systemctl enable sensor
Created symlink /etc/systemd/system/multi-user.target.wants/sensor.service → /etc/systemd/system/sensor.service.
consumer.py to check the sensor data sent from the RaspberryPi. For instructions on how to run consumer.py, please refer to the following link.
Specify the same values for the message broker addresses (brokers), topic name (topic), and type (type) in the .sinetstream_config.yml
configuration file for consumer.py as for the sensor data transmission program.