Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip/jonas rem/implement send operation #75

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/django/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ djangorestframework==3.15.2
whitenoise==6.7.0
celery[redis]==5.4.0
gevent==24.2.1
python-dateutil==2.9.0

# Generation of Entity-Relationship Diagrams
graphviz==0.20.3
Expand Down
6 changes: 2 additions & 4 deletions server/django/sensordata/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ def get_model_perms(self, request):
class ResourceTypeAdmin(admin.ModelAdmin):
list_display = ('object_id', 'resource_id', 'name', 'data_type')
search_fields = ('object_id', 'resource_id', 'name')
readonly_fields = ('object_id', 'resource_id', 'name', 'data_type')

def get_model_perms(self, request):
return {
'add': False,
'add': True,
'change': False,
'delete': False,
'view': True,
Expand Down Expand Up @@ -138,8 +137,7 @@ def get_form(self, request, obj=None, **kwargs):
@admin.register(FirmwareUpdate)
class FirmwareUpdateAdmin(admin.ModelAdmin):
list_display = ('endpoint', 'firmware', 'state', 'result',
'timestamp_created', 'timestamp_updated',
'send_uri_operation', 'execute_operation')
'timestamp_created', 'timestamp_updated')
search_fields = ('endpoint__endpoint', 'firmware__version', 'state', 'result')
list_filter = ('state', 'result', 'timestamp_created', 'timestamp_updated')
readonly_fields = ('timestamp_created', 'timestamp_updated',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1 on 2024-09-14 05:26

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('sensordata', '0002_alter_firmware_binary'),
]

operations = [
migrations.AlterField(
model_name='resource',
name='timestamp_created',
field=models.DateTimeField(blank=True, null=True),
),
]
8 changes: 7 additions & 1 deletion server/django/sensordata/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.db import models
from django.core.exceptions import ValidationError
from django.db import transaction
from django.utils import timezone
import os


Expand Down Expand Up @@ -59,7 +60,12 @@ class Resource(models.Model):
int_value = models.IntegerField(null=True, blank=True)
float_value = models.FloatField(null=True, blank=True)
str_value = models.CharField(max_length=512, null=True, blank=True)
timestamp_created = models.DateTimeField(auto_now_add=True, blank=True)
timestamp_created = models.DateTimeField(blank=True, null=True)

def save(self, *args, **kwargs):
if not self.timestamp_created:
self.timestamp_created = timezone.now()
super().save(*args, **kwargs)

def __str__(self):
return f"{self.endpoint} - {self.resource_type} - {self.timestamp_created}"
Expand Down
165 changes: 96 additions & 69 deletions server/django/sensordata/serializers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,34 @@ def __init__(self, *args, **kwargs):
self.event = None


def create_event(self, endpoint, event_type):
def create_event(self, ep, event_type):
"""
Create an Event instance for the given endpoint and event type. If no event
is created, the resource data won't be associated with any event.
"""
event_data = {
'endpoint': endpoint,
'endpoint': ep,
'event_type': event_type,
}
self.event = Event.objects.create(**event_data)


def handle_resource(self, endpoint, obj_id, res):
def handle_resource(self, ep, obj_id, res, ts=None):
# Some LwM2M Resources are currently unsupported, we can skip them for now.
if res['kind'] == 'multiResource':
logging.error(f"multiResource currently not supported, skipping...")
return

res_id = res['id']
# Fetch resource information from Database
resource_type = ResourceType.objects.get(object_id=obj_id, resource_id=res_id)
if not resource_type:
res_type = ResourceType.objects.get(object_id=obj_id, resource_id=res_id)
if not res_type:
err = f"Resource type {obj_id}/{res_id} not found"
raise serializers.ValidationError(err)

# Validate that datatype is matching the resource type
data_type = dict(ResourceType.TYPE_CHOICES).get(res['type'])
res_data_type = dict(ResourceType.TYPE_CHOICES).get(resource_type.data_type)
res_data_type = dict(ResourceType.TYPE_CHOICES).get(res_type.data_type)
if not data_type:
err = f"Unsupported data type '{res['type']}', skipping..."
logger.error(err)
Expand All @@ -86,84 +86,111 @@ def handle_resource(self, endpoint, obj_id, res):
raise serializers.ValidationError(err)

# Create the Resource instance based on value type
logger.debug(f"Adding resource_type: {resource_type}")
logger.debug(f"Adding resource_type: {res_type}")
resource_data = {
'endpoint': endpoint,
'resource_type': resource_type,
data_type: res['value']
'endpoint': ep,
'resource_type': res_type,
data_type: res['value'],
**({'timestamp_created': ts} if ts is not None else {})
}

created_res = Resource.objects.create(**resource_data)

# Create EventResource linking the event and the resource
if self.event is not None:
EventResource.objects.create(event=self.event, resource=created_res)
logger.debug(f"Added EventResource: {self.event} - {created_res}")
logger.debug(f"Added Resource to event: {self.event.event_type}")

# Update the registration status if the resource is a registration resource
if resource_type.name == 'ep_registered':
endpoint.registered = True
endpoint.save()
return
elif resource_type.name == 'ep_unregistered':
endpoint.registered = False
endpoint.save()
if res_type.name in ['ep_registered', 'ep_registration_update']:
ep.registered = True
ep.save()
process_pending_operations.delay(ep.endpoint)
return
elif resource_type.name == 'ep_registration_update':
process_pending_operations.delay(endpoint.endpoint)
elif res_type.name == 'ep_unregistered':
ep.registered = False
ep.save()
return

# Cond 1: Check for fota update after ep registration.
# "Firmware Version - 3/0/3" Resource.
#
# Cond 2: Handle FOTA Update
elif ((resource_type.object_id == 3 and resource_type.resource_id == 3) or
resource_type.object_id == 5):
# There must be exactly one FirmwareUpdate object with
# result = 0 (RESULT_DEFAULT).
fw_queryset = FirmwareUpdate.objects.filter(endpoint=endpoint,
result=FirmwareUpdate.Result.RESULT_DEFAULT)
cnt = fw_queryset.count()
if cnt == 1:
fw_obj = fw_queryset.get()
elif cnt == 0:
if resource_type.resource_id == 3:
# This case can happen if the result is updated before the
# state. Can be ignored, as a result will automatically set the
# state to IDLE.
return
else:
err = "No active FirmwareUpdate object found for endpoint"
raise serializers.ValidationError(err)
else:
err = "Multiple active FirmwareUpdate objects found for endpoint"
logger.info(fw_queryset)
raise serializers.ValidationError(err)

# Compare version with expected version and set download state/result
if resource_type.object_id == 3 and resource_type.resource_id == 3:
fw_obj.state = FirmwareUpdate.State.STATE_IDLE
expected_version = fw_obj.firmware.version
reported_version = res['value']
if expected_version == reported_version:
fw_obj.result = FirmwareUpdate.Result.RESULT_SUCCESS
else:
fw_obj.result = FirmwareUpdate.Result.RESULT_UPDATE_FAILED
fw_obj.save()
elif ((res_type.object_id == 3 and res_type.resource_id == 3) or
res_type.object_id == 5):
self.handle_fota(ep, res_type, res['value'])
return


def handle_fota(self, ep, res_type, value):
# There must be exactly one FirmwareUpdate object with
# result = 0 (RESULT_DEFAULT).
fw_query = FirmwareUpdate.objects.filter(endpoint=ep,
result=FirmwareUpdate.Result.RESULT_DEFAULT)

# Check for exactly one FirmwareUpdate object
if fw_query.count() == 0:
if res_type.object_id == 3 and res_type.resource_id == 3:
# Just a regular reboot, no FOTA update
return
err = "No FirmwareUpdate object found for endpoint"
raise serializers.ValidationError
if fw_query.count() != 1:
err = "Multiple active FirmwareUpdate objects found for endpoint"
logger.info(fw_query)
raise serializers.ValidationError(err)

# Exactly one FirmwareUpdate object found
fw_obj = fw_query.get()

if resource_type.resource_id == 3:
if int(res['value']) == FirmwareUpdate.State.STATE_DOWNLOADED:
# Create "Update" resource to execute the update, no payload
exec_resource = Resource.objects.create(endpoint = endpoint,
resource_type = ResourceType.objects.get(object_id = 5, resource_id = 2)
)
exec_operation = EndpointOperation.objects.create(resource=exec_resource)
fw_obj.state = res['value']
fw_obj.execute_operation = exec_operation
fw_obj.save()
process_pending_operations.delay(endpoint.endpoint)
elif resource_type.resource_id == 5:
fw_obj.result = res['value']
fw_obj.state = FirmwareUpdate.State.STATE_IDLE
fw_obj.save()
# Device Rebooted
if res_type.object_id == 3 and res_type.resource_id == 3:
if (fw_obj.state == FirmwareUpdate.State.STATE_IDLE and \
fw_obj.result == FirmwareUpdate.Result.RESULT_DEFAULT):
# Update hasn't been started yet
return
expected_version = fw_obj.firmware.version
reported_version = value
if expected_version == reported_version:
fw_obj.result = FirmwareUpdate.Result.RESULT_SUCCESS
else:
fw_obj.result = FirmwareUpdate.Result.RESULT_UPDATE_FAILED
fw_obj.state = FirmwareUpdate.State.STATE_IDLE
self.abort_pending_fota_comms(fw_obj)
fw_obj.save()
return

# Update state changed
if res_type.resource_id == 3:
fw_obj.state = value
if int(value) == FirmwareUpdate.State.STATE_DOWNLOADED:
# Create "Update" resource to execute the update, no payload
exec_update = ResourceType.objects.get(object_id = 5, resource_id = 2)
exec_res = Resource.objects.create( endpoint = ep, resource_type = exec_update)
exec_operation = EndpointOperation.objects.create(resource=exec_res)
fw_obj.execute_operation = exec_operation
process_pending_operations.delay(ep.endpoint)
# Update Result changed (Success/Failure)
elif res_type.resource_id == 5:
fw_obj.result = value
# In cases (success/failure), the update process is finished.
fw_obj.state = FirmwareUpdate.State.STATE_IDLE
self.abort_pending_fota_comms(fw_obj)
else:
return
fw_obj.save()


# In case an update is finished (success/failure), abort any pending
# operations (send URI, execute update). All communications should usually
# be closed already.
def abort_pending_fota_comms(self, fw_obj):
# Abort all open operations
if fw_obj.execute_operation:
status = fw_obj.execute_operation.status
if status in (EndpointOperation.Status.QUEUED, EndpointOperation.Status.SENDING):
fw_obj.execute_operation.status = EndpointOperation.Status.FAILED

if fw_obj.send_uri_operation:
status = fw_obj.send_uri_operation.status
if status in (EndpointOperation.Status.QUEUED, EndpointOperation.Status.SENDING):
fw_obj.send_uri_operation.status = EndpointOperation.Status.FAILED
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# Copyright (c) 2024 Jonas Remmert
#
# SPDX-License-Identifier: Apache-2.0
#

from rest_framework import serializers
from .base import HandleResourceMixin, ResourceDataSerializer
from ..models import Endpoint
import logging
from dateutil import parser

logger = logging.getLogger(__name__)


class NodeSerializer(serializers.Serializer):
nodes = serializers.DictField(
child=ResourceDataSerializer(),
help_text="Dictionary of resource paths (e.g., '/3303/0/5700') to resource data"
)


class TimestampedResourceSerializer(HandleResourceMixin, serializers.Serializer):
ep = serializers.CharField(max_length=255, help_text="Unique LwM2M Endpoint")
val = serializers.ListField(
child=serializers.DictField(
child=NodeSerializer(),
)
)


def create(self, validated_data):
ep = validated_data['ep']
val = validated_data['val']
# Create an Event only once for the given endpoint
event_created = False
endpoint, _ = Endpoint.objects.get_or_create(endpoint=ep)

for item in val:
for ts, data in item.items():
nodes = data.get('nodes', {})

for path, resource in nodes.items():
obj_id = int(path.split('/')[1])
if len(nodes) > 1 or len(val) > 1:
if not event_created:
self.create_event(endpoint, obj_id)
event_created = True

if ts == 'null':
self.handle_resource(endpoint, obj_id, resource)
else:
timestamp = parser.parse(ts)
self.handle_resource(endpoint, obj_id, resource, timestamp)

return endpoint
Loading
Loading