weather-be/weather/views.py

74 lines
2.6 KiB
Python

from django.core.cache import cache
from rest_framework.generics import CreateAPIView, ListAPIView
from rest_framework.views import APIView
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework import status
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiResponse
from .models import WeatherStats
from .serializers import WeatherStatSerializer
from .utils import IsMikrokontroller, PageNumberPagination
CACHE_KEY_WEATHER = 'weather_latest_data'
CACHE_TIMEOUT = 30 # 30 seconds
# Create your views here.
@extend_schema(tags=['Weather'],
description="Call method by mikrocontroller to set new data (required MIKRO_SECRET_KEY with header X-Mikro-Key) with delay 30 seconds",
parameters=[
OpenApiParameter(
name='X-Mikro-Key',
type=str,
location=OpenApiParameter.HEADER,
description='Secret Key for microcontroller',
required=True
)
]
)
class CreateStatAPI(CreateAPIView):
serializer_class = WeatherStatSerializer
permission_classes = [ IsMikrokontroller ]
queryset = WeatherStats.objects.all()
def perform_create(self, serializer: WeatherStatSerializer):
new_data = serializer.validated_data
cached_data = cache.get(CACHE_KEY_WEATHER)
if cached_data is None:
serializer.save()
cache.set(CACHE_KEY_WEATHER, new_data, CACHE_TIMEOUT)
@extend_schema(tags=['Weather'],
description="Get latest stat by microcontroller. Can be used to get actual data now",
responses={
200: WeatherStatSerializer,
404: OpenApiResponse(
description="No latest data available from microcontroller",
response={
"message": "We don't have latest data from microcontroller"
}
)
}
)
class LastStatAPI(APIView):
def get(self, request: Request, format=None):
cached_data = cache.get(CACHE_KEY_WEATHER)
if cached_data is None:
return Response(
{"message": "We don't have latest data from microcontroller. Maybe microcontroller is not connected"},
status=status.HTTP_404_NOT_FOUND
)
return Response(cached_data)
@extend_schema(tags=['Weather'], description="Get full history for graph")
class StatsHistoryAPI(ListAPIView):
serializer_class = WeatherStatSerializer
queryset = WeatherStats.objects.order_by('-created_at')
pagination_class = PageNumberPagination