From e2d9c62976b0359a4faa68ee9d5af5e891a8d247 Mon Sep 17 00:00:00 2001 From: Yongho Kim Date: Wed, 2 Oct 2024 11:36:48 -0500 Subject: [PATCH] Added multi stream support; Added camera name in the meta information --- README.md | 5 +- app.py | 72 ++++++++++++++++++----------- ecr-meta/ecr-science-description.md | 42 ++++++++++++++++- requirements.txt | 2 +- sage.yaml | 4 +- 5 files changed, 93 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index fea8090..2dc7215 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ # Image Sampler Plugin -This plugin utilizes [PyWaggle](https://github.com/waggle-sensor/pywaggle) library to capture frames from a stream. Captured frames are stored in a local storage as a jpeg image. +This plugin utilizes [PyWaggle](https://github.com/waggle-sensor/pywaggle) library to capture frames from a stream. Captured frames are stored in a local storage as a jpeg image. + +## Usage +Please refer to the [description](ecr-meta/ecr-science-description.md#how-to-use). ## Developer Notes diff --git a/app.py b/app.py index 1a49c62..0eb898a 100644 --- a/app.py +++ b/app.py @@ -10,6 +10,7 @@ import time import os import argparse +from multiprocessing import Process from waggle.plugin import Plugin from waggle.data.vision import Camera @@ -21,64 +22,81 @@ datefmt='%Y/%m/%d %H:%M:%S') -def capture(plugin, cam, args): - sample_file_name = "sample.jpg" - sample = cam.snapshot() - if args.out_dir == "": +def capture(plugin, stream, out_dir=""): + sample_file_name = f'{stream}.jpg' + with Camera(stream) as cam: + sample = cam.snapshot() + if out_dir == "": sample.save(sample_file_name) - plugin.upload_file(sample_file_name) + # The camera name is added in the meta + meta = { + "camera": stream, + } + plugin.upload_file(sample_file_name, meta=meta) else: dt = datetime.fromtimestamp(sample.timestamp / 1e9) - base_dir = os.path.join(args.out_dir, dt.astimezone(timezone.utc).strftime('%Y/%m/%d/%H')) + base_dir = os.path.join(out_dir, stream, dt.astimezone(timezone.utc).strftime('%Y/%m/%d/%H')) os.makedirs(base_dir, exist_ok=True) sample_path = os.path.join(base_dir, dt.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S%z.jpg')) sample.save(sample_path) -def run(args): - logging.info("starting image sampler.") - if args.cronjob == "": - logging.info("capturing...") - with Plugin() as plugin, Camera(args.stream) as cam: - capture(plugin, cam, args) +def run(stream, cronjob, out_dir=""): + logger_header = f'Stream {stream}: ' + logging.info(f'{logger_header}starting image sampler.') + if cronjob == "": + logging.info(f'{logger_header}capturing...') + with Plugin() as plugin: + capture(plugin, stream, out_dir) return 0 - logging.info("cronjob style sampling triggered") - if not croniter.is_valid(args.cronjob): - logging.error(f'cronjob format {args.cronjob} is not valid') + logging.info(f'{logger_header}cronjob style sampling triggered') + if not croniter.is_valid(cronjob): + logging.error(f'{logger_header}cronjob format {cronjob} is not valid') return 1 now = datetime.now(timezone.utc) - cron = croniter(args.cronjob, now) + cron = croniter(cronjob, now) with Plugin() as plugin: while True: n = cron.get_next(datetime).replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) next_in_seconds = (n - now).total_seconds() if next_in_seconds > 0: - logging.info(f'sleeping for {next_in_seconds} seconds') + logging.info(f'{logger_header}sleeping for {next_in_seconds} seconds') time.sleep(next_in_seconds) - logging.info("capturing...") - with Camera(args.stream) as cam: - capture(plugin, cam, args) + logging.info(f'{logger_header}capturing...') + capture(plugin, stream, out_dir) + return 0 + + +def main(args): + workers = [] + for stream in args.stream: + worker = Process(target=run, args=(stream, args.cronjob, args.out_dir)) + workers.append(worker) + worker.start() + + for worker in workers: + worker.join() return 0 if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( - '-stream', dest='stream', - action='store', default="camera", type=str, - help='ID or name of a stream, e.g. sample') + '--stream', dest='stream', + action='append', + help='ID or name of a stream. Multiple streams can be specified, each stream with the --stream option.') parser.add_argument( - '-out-dir', dest='out_dir', + '--out-dir', dest='out_dir', action='store', default="", type=str, - help='Path to save images locally in %Y-%m-%dT%H:%M:%S%z.jpg format') + help='Path to save images locally in %%Y-%%m-%%dT%%H:%%M:%%S%%z.jpg format') parser.add_argument( - '-cronjob', dest='cronjob', + '--cronjob', dest='cronjob', action='store', default="", type=str, help='Time interval expressed in cronjob style') args = parser.parse_args() if args.out_dir != "": os.makedirs(args.out_dir, exist_ok=True) - exit(run(args)) + exit(main(args)) diff --git a/ecr-meta/ecr-science-description.md b/ecr-meta/ecr-science-description.md index 4ac1ad4..a587aa6 100644 --- a/ecr-meta/ecr-science-description.md +++ b/ecr-meta/ecr-science-description.md @@ -1,3 +1,43 @@ # Image Sampling -Image sampling samples still images from a camera stream. This is one of the fundamental ways for collecting dataset that will later be used in training machine learning models. This also gives a guidance on how an inferencing was performed -- an image taken approximately at the same time when the inference was performed (on the same scene) visually shows what the context was. \ No newline at end of file +Image sampling samples still images from a camera stream. This is one of the fundamental ways for collecting data that will later be used in training machine learning models. This also gives a guidance on how an inferencing was performed; an image taken approximately at the same time when the inference was performed (on the same scene) visually shows the context. + +# How to Use +To run the program, + +```bash +# Captures and publishes an image from the camera stream +python3 app.py --stream bottom_camera +``` + +### Capturing an Image from Multiple Streams + +```bash +python3 app.py \ + --stream bottom_camera \ + --stream top_camera +``` + +### Capturing and Saving Images Locally + +```bash +# This does not publish images to the cloud, +# instead they are saved locally +python3 app.py \ + --stream bottom_camera \ + --out-dir /path/to/local/storage +``` + +The directory will have a directory for each stream and be structured with subdirectories helping to organize the images. + +> NOTE: The directory structure recognizes those slashes (/) when creating subdirectories if the stream is a URL like rtsp://IP:PORT/stream. It will be /OUTDIR/RTSP:/IP:PORT/... + +### Capturing Images using Cronjob + +```bash +# Capturing an image from the stream every hour. +# Note that the program runs forever. +python3 app.py \ + --stream bottom_camera \ + --cronjob "0 * * * *" +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 9de56cb..7cae040 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ pip croniter -pywaggle[vision] == 0.55.* \ No newline at end of file +pywaggle[vision] == 0.56.* \ No newline at end of file diff --git a/sage.yaml b/sage.yaml index 218804c..99d9cfa 100644 --- a/sage.yaml +++ b/sage.yaml @@ -1,6 +1,6 @@ name: "imagesampler" description: "Periodical/Trigger-based Image sampler" -version : "0.3.4" +version : "0.3.5" namespace: "waggle" authors: "Yongho Kim " collaborators: "Waggle Team " @@ -16,7 +16,7 @@ source: branch: "main" inputs: - id: "stream" - type: "string" + type: "list" - id: "out-dir" type: "string" - id: "cronjob"