Skip to content

Commit

Permalink
Merge pull request #5 from nonnontrivial/feature/comparison-workflow
Browse files Browse the repository at this point in the history
comparison workflow
  • Loading branch information
nonnontrivial authored Oct 28, 2024
2 parents a96cbbf + 58ecc7a commit f05585c
Show file tree
Hide file tree
Showing 23 changed files with 374 additions and 299 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ permissions:

jobs:
test:
runs-on: macos-latest
runs-on: ubuntu-latest

strategy:
matrix:
python-version: [3.12]
package-dir:
package-dir:
- api
- pp

Expand All @@ -29,7 +29,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-verion }}
python-version: 3.12

- name: Install dependencies
run: |
Expand All @@ -41,4 +41,3 @@ jobs:
run: |
cd ${{ matrix.package-dir }}
python -m pytest
61 changes: 30 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@ earth, without a sensor.

## features

* gRPC api for sky brightness "readings" (at the current time across H3 cells at resolution 6 in north america)
* gRPC api for predicting sky brightness

* gRPC api for light pollution values (in RGBA, from a 2022 map)

* publisher component that repeatedly generates & stores readings for coordinates of H3 cells
* publisher component that repeatedly generates readings for coordinates of H3 cells

* consumer component that stores the readings and computes the reading with highest `mpsas` during the last cycle of observation

## todos

- [ ] support for continents other than north america
- [ ] less noisy container startup
- [x] support for continents other than north america
- [x] less noisy container startup
- [ ] live updates to open meteo data while app is running
- [ ] REST apis in addition to the gRPC ones
- [ ] better storage of predictions in order to faciliate grouping/sorting
- [x] better storage of predictions in order to faciliate grouping/sorting

## about

This project is motivated by the desire for synoptic knowledge of "where are the stars good".

It would be infeasible to have [sensors](http://unihedron.com/projects/darksky/TSL237-E32.pdf)
everywhere that a brightness measurement is desired, so it would make sense to have a way of
everywhere you would want a brightness measurement, so it would make sense to have a way of
doing inference of this value.


The approach this project takes is to use pytorch to capture the relationships in the [Globe At Night
dataset](https://globeatnight.org/maps-data/) and use that to predict sky brightness for H3
cells at resoultion 6.

> note: currently limited to north america
cells at a configured H3 resoultion (default `0`).

## running with docker

Expand All @@ -49,38 +49,37 @@ docker volume create --name open-meteo-data
# get latest data into the above volume
./update-open-meteo.sh

# run the containers (`build` flag only necessary for first run)
# run the containers (optionally use `build` flag)
docker compose up --build
```

> Rabbitmq will take time to start up, at which time `producer` and `consumer` containers will attempt restart until the channel becomes available.
> Once rabbitmq does start, there should be output like this:
```log
producer-1 | 2024-10-03 23:59:12,266 [INFO] brightness observation response for 8649a36a7ffffff is uuid: "f275a795-8af7-491b-9645-3ce2e14fe3cd"
producer-1 | lat: 18.575429951519293
producer-1 | lon: -101.86020792493713
producer-1 | utc_iso: "2024-10-03T23:59:12.266163+00:00"
producer-1 | mpsas: 12.7519617
consumer-1 | 2024-10-03 23:59:12,269 [INFO] received message b'{"uuid": "f275a795-8af7-491b-9645-3ce2e14fe3cd", "lat": 18.575429951519293, "lon": -101.86020792493713, "h3_id": "8649a36a7ffffff", "utc_iso": "2024-10-03T23:59:12.266163+00:00", "mpsas": 12.751961708068848}'
producer-1 |
producer-1 | 2024-10-03 23:59:12,267 [INFO] 260 distinct cells have had observations published
consumer-1 | 2024-10-03 23:59:12,276 [INFO] saved BrightnessObservation(#8649a36a7ffffff,12.751961708068848,2024-10-03T23:59:12.266163+00:00)
After rabbitmq starts up, the producer and consumer containers will start up,
at which point you should see output like this:

```sh
producer-1 | 2024-10-28 13:16:27,502 [INFO] publishing {'uuid': 'e6c22004-9180-4599-9e87-36b86f68a5e7', 'lat': 43.42281493904898, 'lon': -97.42465926125905, 'h3_id': '8027fffffffffff', 'mpsas': 9.942784309387207, 'timestamp_utc': '2024-10-28T13:16:27.501192+00:00'} to brightness.prediction
producer-1 | 2024-10-28 13:16:27,580 [INFO] publishing {'uuid': 'a548be90-89f7-4995-9239-197523c3afd0', 'lat': 19.093680683484372, 'lon': 43.638818828910864, 'h3_id': '8053fffffffffff', 'mpsas': 9.202325820922852, 'timestamp_utc': '2024-10-28T13:16:27.579755+00:00'} to brightness.prediction
producer-1 | 2024-10-28 13:16:27,656 [INFO] publishing {'uuid': '2759f0e8-2f94-4efd-bf19-4b765947d983', 'lat': 60.432795263055546, 'lon': -77.20705748560815, 'h3_id': '800ffffffffffff', 'mpsas': 11.305692672729492, 'timestamp_utc': '2024-10-28T13:16:27.655087+00:00'} to brightness.prediction
producer-1 | 2024-10-28 13:16:27,736 [INFO] publishing {'uuid': 'd53872da-2505-41a9-84f1-d9336b0aff83', 'lat': -30.01574044171678, 'lon': 129.95847216046155, 'h3_id': '80b9fffffffffff', 'mpsas': 12.414505004882812, 'timestamp_utc': '2024-10-28T13:16:27.735392+00:00'} to brightness.prediction
producer-1 | 2024-10-28 13:16:27,737 [INFO] publishing {'start_time_utc': '2024-10-28T13:16:24.874541+00:00', 'end_time_utc': '2024-10-28T13:16:27.737791+00:00', 'duration_s': 2} to brightness.cycle
consumer-1 | 2024-10-28 13:16:27,744 [INFO] {'uuid': 'aa89439d-9a22-41e1-b8d2-674bea5263ee', 'lat': -74.92843438917433, 'lon': -34.64375807722018, 'h3_id': '80effffffffffff', 'mpsas': 23.74591636657715, 'timestamp_utc': datetime.datetime(2024, 10, 28, 13, 16, 25, 624186, tzinfo=datetime.timezone.utc)}
```

This output indicates that the producer service is successfully getting sky brightness predictions
for H3 cells and that the consumer service is storing them in the postgres table `brightnessobservation`.
The above output means:

1. the producer container is publishing the brightness readings it is getting from
the api container

2. the consumer container has determined which reading made during the last cycle
through H3 cells had the highest brightness (`mpsas` is the measure of brightness
spread over a square arcsecond of sky, where higher means darker sky with more
stars visible)

`mpsas` in the response stands for 'magnitudes per square arcsecond', and it is the predicted brightness
value for that location.

## documentation

- [api client usage](./api/README.md)
- [how to write your own client for the sky brightness gRPC api](./api/README.md)

## licensing

This project is licensed under the AGPL-3.0 license.

Note: The GeoJSON file located at `./pp/pp/cells/north-america.geojson` is licensed under the Apache License, Version 2.0, and retains its original copyright (Copyright 2018 Esri).

36 changes: 17 additions & 19 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ services:

rabbitmq:
image: "rabbitmq:alpine"
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
ports:
- "5672:5672"
- "15672:15672"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:15672" ]
interval: 30s
timeout: 10s
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
environment:
RABBITMQ_DEFAULT_USER: "guest"
RABBITMQ_DEFAULT_PASS: "guest"
start_period: 20s

openmeteo:
image: "ghcr.io/open-meteo/open-meteo"
Expand All @@ -41,21 +41,20 @@ services:
LOG_LEVEL: 30
restart: on-failure
depends_on:
- openmeteo
openmeteo:
condition: service_started

producer:
build: ./pp
environment:
API_VERSION: "v1"
API_HOST: "api"
MODEL_VERSION: "0.1.0"
RABBITMQ_HOST: "rabbitmq"
restart: on-failure
depends_on:
- rabbitmq
- api
links:
- rabbitmq
api:
condition: service_started
rabbitmq:
condition: service_healthy

consumer:
build: ./pc
Expand All @@ -66,13 +65,12 @@ services:
PG_DATABASE: "postgres"
restart: on-failure
depends_on:
- postgres
- rabbitmq
links:
- rabbitmq
postgres:
condition: service_started
rabbitmq:
condition: service_healthy

volumes:
postgres-data:
open-meteo-data:
external: true

6 changes: 2 additions & 4 deletions pc/pc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
rabbitmq_user = os.getenv("AMQP_USER", "guest")
rabbitmq_password = os.getenv("AMQP_PASSWORD", "guest")
rabbitmq_host = os.getenv("AMQP_HOST", "localhost")
prediction_queue = os.getenv("AMQP_PREDICTION_QUEUE", "prediction")
prediction_queue = os.getenv("AMQP_PREDICTION_QUEUE", "brightness.prediction")
cycle_queue = os.getenv("AMQP_CYCLE_QUEUE", "brightness.cycle")

amqp_url = f"amqp://{rabbitmq_user}:{rabbitmq_password}@{rabbitmq_host}"

ws_host = os.getenv("WS_HOST", "0.0.0.0")
ws_port = int(os.getenv("WS_PORT", 8765))
63 changes: 44 additions & 19 deletions pc/pc/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,32 @@
import asyncio
import typing

import asyncpg
import aio_pika
from aio_pika.abc import AbstractIncomingMessage, AbstractRobustChannel

from pc.persistence.models import BrightnessObservation
from pc.consumer.websocket_handler import WebsocketHandler
from pc.persistence.models import BrightnessObservation, CellCycle
from pc.persistence.db import insert_brightness_observation, select_max_brightness_record_in_range

log = logging.getLogger(__name__)


class Consumer:
def __init__(self, url: str, queue_name: str, websocket_handler: typing.Optional[WebsocketHandler] = None):
def __init__(self, url: str, prediction_queue: str, cycle_queue: str, connection_pool: asyncpg.Pool, on_cycle_completion: typing.Callable[[BrightnessObservation],None]):
self._amqp_url = url
self._queue_name = queue_name
self._websocket_handler = websocket_handler
self._prediction_queue = prediction_queue
self._cycle_queue = cycle_queue
self._pool = connection_pool
self._on_cycle_completion = on_cycle_completion

async def start(self):
try:
log.info(f"connecting to {self._amqp_url}")
connection = await aio_pika.connect_robust(self._amqp_url)
except Exception as e:
import sys

log.error(f"could not form amqp connection because {e}; has rabbitmq started?")
log.error(f"could not form amqp connection because {e}; is rabbitmq running?")
log.warning("exiting")
sys.exit(1)
else:
Expand All @@ -33,24 +37,45 @@ async def start(self):
await self.consume(channel) # type: ignore

async def consume(self, channel: AbstractRobustChannel):
"""begin consuming from queue on a given channel"""
queue = await channel.declare_queue(self._queue_name)
await queue.consume(self._on_message, no_ack=True)
log.info(f"consuming from {self._prediction_queue}")
prediction_queue = await channel.declare_queue(self._prediction_queue)
await prediction_queue.consume(self._on_prediction_message, no_ack=True)

log.info(f"consuming from {self._cycle_queue}")
cycle_queue = await channel.declare_queue(self._cycle_queue)
await cycle_queue.consume(self._on_cycle_message, no_ack=True)

await asyncio.Future()

async def _on_message(self, message: AbstractIncomingMessage):
"""handle incoming message by storing in postgres"""
async def _on_cycle_message(self, message: AbstractIncomingMessage):
"""handle incoming message by retrieving max reading from postgres within
the range in the mesage"""
log.debug(f"received message {message.body}")
try:
log.info(f"received message {message.body}")
message_dict: typing.Dict = json.loads(message.body.decode())
cell_cycle = CellCycle(**message_dict)

brightness_observation_json = json.loads(message.body.decode())
brightness_observation = BrightnessObservation(**brightness_observation_json)
record = await select_max_brightness_record_in_range(self._pool, cell_cycle)
if record is not None:
record = dict(record)
uuid = str(record["uuid"])
del record["uuid"]
max_brightness_observation_in_cycle = BrightnessObservation(**record, uuid=uuid)
self._on_cycle_completion(max_brightness_observation_in_cycle)
except Exception as e:
log.error(f"could not process cycle message: {e}")
else:
pass

await brightness_observation.save()

if self._websocket_handler is not None:
await self._websocket_handler.broadcast(brightness_observation_json)
async def _on_prediction_message(self, message: AbstractIncomingMessage):
"""handle incoming message by storing in postgres"""
log.debug(f"received message {message.body}")
try:
message_dict: typing.Dict = json.loads(message.body.decode())
brightness_observation = BrightnessObservation(**message_dict)
await insert_brightness_observation(self._pool, brightness_observation)
except Exception as e:
log.error(f"could not save brightness observation {e}")
log.error(f"could not save brightness observation: {e}")
else:
log.info(f"saved {brightness_observation}")
log.debug(f"saved brightness of {brightness_observation.h3_id}")
46 changes: 0 additions & 46 deletions pc/pc/consumer/websocket_handler.py

This file was deleted.

Loading

0 comments on commit f05585c

Please sign in to comment.