-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
107 lines (85 loc) · 3.39 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from flask import Flask
from flask_cors import CORS
from flask_socketio import SocketIO, emit
from flask_mqtt import Mqtt
import numpy as np
import eventlet
from datetime import datetime
import json
import re
eventlet.monkey_patch()
# Create a Flask application
app = Flask(__name__)
app.config['MQTT_BROKER_URL'] = 'nf.enk.icu'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = 'inky'
app.config['MQTT_PASSWORD'] = 'asdf'
# Create a Socket.IO server with CORS enabled
socketio = SocketIO(app, cors_allowed_origins='*')
mqtt = Mqtt(app)
@app.route('/')
def index():
return 'Socket.IO server is running.'
@socketio.on('connect')
def handle_connect():
print('Client connected')
@socketio.on('disconnect')
def handle_disconnect():
print('Client disconnected')
# MQTT broker details
# broker = "nf.enk.icu"
# port = 1883
topic = "testTopic"
# publish_topic = "beta"
# username = "inky"
# password = "asdf"
# FFT configuration
chunk_size = 256 # example larger chunk size
sampling_rate = 1000 # Hz, change as per your requirements
beta_freq_range = (12, 30) # Beta frequency range in Hz
data_chunk = []
@mqtt.on_connect()
def handle_connect(client, userdata, flags, rc):
mqtt.subscribe(topic)
# Callback function on message receive
@mqtt.on_message()
def handle_mqtt_message(client, userdata, msg):
global data_chunk
try:
# Log the message payload for debugging
print(f"Received message payload (length {len(msg.payload)})")
# Extract the content inside the square brackets
content = re.search(r'\[(.*?)\]', msg.payload.decode("utf-8")).group(1)
# Split the content by commas
split_items = content.split(',')
# Strip any leading/trailing whitespace from each item
items = [int(item.strip()) for item in split_items]
print(f"Integer items: {items}")
for item in items:
data_chunk.append(item)
# Decode the message payload as an integer
print(f"Received data chunk size: {len(data_chunk)}")
# Ensure data_chunk is the correct size
if len(data_chunk) >= chunk_size:
print(datetime.now())
print("Performing FFT...")
# Perform FFT
fft_result = np.fft.fft(data_chunk[:chunk_size])
fft_freqs = np.fft.fftfreq(chunk_size, 1/sampling_rate)
# Find indices corresponding to beta frequency range
beta_indices = np.where((fft_freqs >= beta_freq_range[0]) & (fft_freqs <= beta_freq_range[1]))[0]
# Calculate average amplitude for beta range
beta_amplitudes = [np.abs(fft_result[i]) for i in beta_indices]
print("Filtered size:", len(beta_amplitudes), beta_amplitudes)
avg_beta_amplitude = np.mean(beta_amplitudes)
print("Average amplitude for beta range:", avg_beta_amplitude)
# Emit the average beta amplitude
data = {'avg_beta_amplitude': avg_beta_amplitude, 'time': datetime.now()}
socketio.emit('mqtt', data=json.dumps(data, default=str))
print("Emitted avg_beta_amplitude ", avg_beta_amplitude)
# Clear data_chunk for the next set of samples
data_chunk = data_chunk[chunk_size:]
except Exception as e:
print(f"Error processing message: {e}")
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000, use_reloader=False, debug=True)