Skip to content

Commit

Permalink
Improve Scheduling [WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
math-a3k committed Aug 23, 2023
1 parent 7018a35 commit 62decca
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 47 deletions.
29 changes: 2 additions & 27 deletions base/management/commands/load_symbols.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from binance.spot import Spot
from django.conf import settings
from django.core.management.base import BaseCommand

from base.models import Symbol
Expand All @@ -9,28 +7,5 @@ class Command(BaseCommand):
help = "Loads symbols from Binance"

def handle(self, *args, **options):
client = Spot()
ei = client.exchange_info()
symbols_processed, symbols_created = 0, 0
for symbol in ei["symbols"]:
if symbol["symbol"].endswith(settings.QUOTE_ASSET):
s, c = Symbol.objects.update_or_create(
symbol=symbol["symbol"],
defaults={
"status": symbol["status"],
"base_asset": symbol["baseAsset"],
"quote_asset": symbol["quoteAsset"],
"info": symbol,
},
)
symbols_processed += 1
if c:
symbols_created += 1
self.stdout.write(
self.style.SUCCESS(
(
f"Successfully processed {symbols_processed} symbols "
f"({symbols_created} created)"
)
)
)
message = Symbol.load_symbols()
self.stdout.write(self.style.SUCCESS(message))
50 changes: 47 additions & 3 deletions base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ class Symbol(models.Model):
_outliers_model_class = None
_last_td = None
_serializer_class = None

client = Spot()
_client = None

symbol = models.CharField("Symbol", max_length=20)
status = models.CharField("Status", max_length=20)
Expand Down Expand Up @@ -289,6 +288,13 @@ def load_all_data_sync(cls): # pragma: no cover
for s in symbols:
s.load_data()

@classmethod
def get_client(cls):
if not cls._client:
cls._client = Spot()
cls._client.session.mount("https://", HTTPAdapter(pool_maxsize=36))
return cls._client

def load_data(self, start_time=None, end_time=None):
Kline.load_klines(self, start_time=start_time, end_time=end_time)
td = TrainingData.from_klines(self)
Expand Down Expand Up @@ -435,7 +441,9 @@ def update_indicators(
# Early bot notification if available
if bot_early_notification: # pragma: no cover
if getattr(self, "bots_prefetched", []):
price = self.client.ticker_price(symbol=self.symbol)["price"]
price = self.get_client().ticker_price(symbol=self.symbol)[
"price"
]
# Re-fetch to avoid errors
for bot in self.bots.enabled():
bot.price_current = Decimal(price)
Expand Down Expand Up @@ -619,6 +627,42 @@ def reset_symbols(
symbol.klines.all().delete()
logger.warning(f"{symbol}: RESET")

@classmethod
def load_symbols(cls):
cache_key = settings.SYMBOLS_UPDATE_ALL_INDICATORS_KEY
if not cache.get(cache_key, False) or "pytest" in sys.modules:
cache.set(cache_key, True, 2400)
client = cls.get_client()
ei = client.exchange_info()
symbols_processed, symbols_created = 0, 0
for symbol in ei["symbols"]:
if symbol["symbol"].endswith(settings.QUOTE_ASSET):
s, c = cls.objects.update_or_create(
symbol=symbol["symbol"],
defaults={
"status": symbol["status"],
"base_asset": symbol["baseAsset"],
"quote_asset": symbol["quoteAsset"],
"info": symbol,
},
)
symbols_processed += 1
if c:
symbols_created += 1
message = (
f"Successfully processed {symbols_processed} symbols "
f"({symbols_created} created)"
)

cache.set(cache_key, False)
else: # pragma: no cover
message = (
"Other process updating all indicators is running, please "
"wait."
)
logger.warning(message)
return message


class Kline(models.Model):
_client = None
Expand Down
11 changes: 9 additions & 2 deletions base/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ def retrieve_and_update_symbol(symbol): # pragma: no cover


@shared_task
def update_all_indicators_job(): # pragma: no cover
if Symbol.objects.available().filter(model_score__isnull=True).count() > 0:
def update_all_indicators_job(
all_symbols=False, load_symbols=False
): # pragma: no cover
if load_symbols:
Symbol.load_symbols()
if (
Symbol.objects.available().filter(model_score__isnull=True).count() > 0
or all_symbols
):
# Case of cold start
message = Symbol.update_all_indicators(push=False)
message = Symbol.update_all_indicators(only_top=True)
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ services:
links:
- db:db
- redis:redis
worker:
hostname: worker
build:
context: .
dockerfile: ./dockerfiles/Dockerfile-worker
env_file:
- ./dockerfiles/.env_file
volumes:
- static:/vol/tradero/static
depends_on:
- db
- redis
- instance
links:
- db:db
- redis:redis

volumes:
static:
Expand Down
35 changes: 35 additions & 0 deletions dockerfiles/Dockerfile-worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM python:3.11

# RUN pacman -Syu python python-setuptools python-pipenv supervisor git postgresql-libs --noconfirm && yes | pacman -Scc

RUN useradd -mU tradero && \
mkdir -p /vol/tradero/{media,static,db,log} && \
chown -R tradero:tradero /vol/tradero && \
chmod -R 755 /vol/tradero && \
python -m pip install pipenv

WORKDIR /home/tradero

USER root

COPY ./tradero tradero
COPY ./base base
COPY dockerfiles/tradero_uwsgi.ini .
COPY dockerfiles/django_tasks.sh .
COPY dockerfiles/supervisor.conf .
COPY dockerfiles/.env_file tradero/.env
COPY Pipfile Pipfile.lock pytest.ini pyproject.toml .coveragerc manage.py .
# For development
# COPY ./django-ai django-ai
RUN chown -R tradero:tradero /home/tradero
RUN chmod a+x /home/tradero/django_tasks.sh
RUN mkdir /run/daphne && chown -R tradero:tradero /run/daphne

ENV PYTHONUNBUFFERED=1
ENV PIPENV_VENV_IN_PROJECT=1
ENV PATH="/home/tradero:/home/tradero/.venv/bin:${PATH}"

USER tradero
RUN pipenv install

CMD ["pipenv", "run", "celery", "-A", "tradero", "worker", "-l", "INFO", "-Q", "symbols,bots"]
15 changes: 0 additions & 15 deletions dockerfiles/supervisor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,6 @@ redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0


[program:worker]
command=celery -A tradero worker -l INFO
user=tradero
directory=/home/tradero/
environment=PATH="/home/tradero:/home/tradero/.venv/bin:%(ENV_PATH)s",PIPENV_VENV_IN_PROJECT=1
stopsignal=TERM
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0


[program:scheduler]
command=celery -A tradero beat
user=tradero
Expand All @@ -56,7 +42,6 @@ redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0


[fcgi-program:daphne]
# TCP socket used by Nginx backend upstream
socket=tcp://0.0.0.0:9000
Expand Down
15 changes: 15 additions & 0 deletions tradero/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,22 @@

app = Celery("tradero", include=["base.tasks"])

app.conf.task_routes = {
"base.tasks.retrieve_and_update_symbol": {"queue": "symbols"},
"base.tasks.update_all_indicators_job": {"queue": "symbols"},
"base.tasks.update_all_bots_job": {"queue": "bots"},
}

app.conf.beat_schedule = {
# Executes every day.
"update-symbols-and-indicators": {
"task": "base.tasks.update_all_indicators_job",
"schedule": crontab(
hour=23,
minute=57,
),
"kwargs": {"load_symbols": True, "all_symbols": True},
},
# Executes every 5 mins.
"update-indicators-every-5-mins": {
"task": "base.tasks.update_all_indicators_job",
Expand Down

0 comments on commit 62decca

Please sign in to comment.