diff --git a/.gitignore b/.gitignore index d7a9887..b716404 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,6 @@ MANIFEST /data/ /tests/.pytest_cache/ +/strategies/data/ +/strategies/results/ diff --git a/.travis.yml b/.travis.yml index ec5c0b4..7a50e15 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,21 @@ +env: + global: + - CC_TEST_REPORTER_ID=9a990fea3fb57063b45010735b989c837fbf4b5da5d4bdfeafb89539e2b61d19 language: python python: - "3.6" +before_script: + - curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 > ./cc-test-reporter + - chmod +x ./cc-test-reporter + - ./cc-test-reporter before-build # command to install dependencies install: - pip install -r requirements.txt - - pip install codecov + - pip install coverage - pip install pytest pytest-cov # command to run tests script: - pytest --cov=./ -after_success: - - codecov \ No newline at end of file +after_script: + - ./cc-test-reporter after-build -t coverage.py --debug --exit-code $TRAVIS_TEST_RESULT \ No newline at end of file diff --git a/README.md b/README.md index 959b4aa..dedc834 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Downloads](https://pepy.tech/badge/optopsy)](https://pepy.tech/project/optopsy) -[![Codacy Badge](https://api.codacy.com/project/badge/Grade/2de8f5b3fa2742de93fb60b3a1ae5683)](https://app.codacy.com/app/michaelchu/optopsy?utm_source=github.com&utm_medium=referral&utm_content=michaelchu/optopsy&utm_campaign=badger) +[![Maintainability](https://api.codeclimate.com/v1/badges/37b11e992a6900d30310/maintainability)](https://codeclimate.com/github/michaelchu/optopsy/maintainability) [![Build Status](https://travis-ci.org/michaelchu/optopsy.svg?branch=master)](https://travis-ci.org/michaelchu/optopsy) -[![codecov](https://codecov.io/gh/michaelchu/optopsy/branch/master/graph/badge.svg)](https://codecov.io/gh/michaelchu/optopsy) +[![Test Coverage](https://api.codeclimate.com/v1/badges/37b11e992a6900d30310/test_coverage)](https://codeclimate.com/github/michaelchu/optopsy/test_coverage) # Optopsy @@ -35,23 +35,20 @@ the rapid development of complex options trading strategies. * Spread delta * Spread price -### Planned Features -* Indicator Support - Create entry and exit rules based on indicators -* Optimizer - Allows users to run multiple backtests with different combinations of parameters -* Option strategy support: - * Single Calls/Puts - * Vertical Spreads - * Iron Condors (Iron Butterflies) - * Covered Stock - * Combos (Synthetics/Collars) - * Diagonal Spreads - * Calendar Spreads - * Custom Spreads - * Strangles - * Straddles +### Option strategy support +* Single Calls/Puts +* Vertical Spreads +* (Coming Soon) Iron Condors (Iron Butterflies) +* (Coming Soon) Covered Stock +* (Coming Soon) Combos (Synthetics/Collars) +* (Coming Soon) Diagonal Spreads +* (Coming Soon) Calendar Spreads +* (Coming Soon) Custom Spreads +* (Coming Soon) Strangles +* (Coming Soon) Straddles ### Dependencies -You will need Python 3.6.x. It is recommended to install [Miniconda3](https://conda.io/miniconda.html). See [requirements.txt](https://github.com/michaelchu/optopsy/blob/master/requirements.txt) for full details. +You will need Python 3.6.x and Pandas 0.23.1 or newer. It is recommended to install [Miniconda3](https://conda.io/miniconda.html). See [requirements.txt](https://github.com/michaelchu/optopsy/blob/master/requirements.txt) for full details. ### Installation ``` @@ -69,7 +66,6 @@ In order to use it, you will need to define the struct variable to map the colum First we import the library and other nessesary libaries: ```python import optopsy as op -import os from datetime import datetime ``` @@ -112,13 +108,9 @@ def run_strategy(): start = datetime(2016, 1, 1) end = datetime(2016, 12, 31) - # create the option spread that matches the entry filters + # create the option spreads that matches the entry filters trades = op.strategies.short_call_spread(data, start, end, filters) - # we get a dataframe of our orders based on the entry filter rules, let's export - # this to csv file for reference - trades.to_csv('./strategies/results/trades.csv') - # call the run method with our data, option spreads and filters to run the backtest backtest = op.run(data, trades, filters) diff --git a/optopsy/__init__.py b/optopsy/__init__.py index e74879d..d47bc6c 100644 --- a/optopsy/__init__.py +++ b/optopsy/__init__.py @@ -1,5 +1,4 @@ from .data import get as get -from .data import gets as gets from .enums import * from .backtest import run import optopsy.option_strategies as strategies diff --git a/optopsy/backtest.py b/optopsy/backtest.py index 47e4d43..5c58ebd 100644 --- a/optopsy/backtest.py +++ b/optopsy/backtest.py @@ -22,73 +22,60 @@ from .option_queries import opt_type from .statistics import * -pd.set_option('display.expand_frame_repr', False) - -sort_by = [ - 'underlying_symbol', - 'quote_date', - 'option_type', - 'expiration', - 'strike' -] +pd.set_option("display.expand_frame_repr", False) -on = [ - 'underlying_symbol', - 'option_type', - 'expiration', - 'strike' -] + +on = ["underlying_symbol", "option_type", "expiration", "strike"] default_entry_filters = { "std_expr": False, "contract_size": 10, "entry_dte": (27, 30, 31), - "exit_dte": None + "exit_dte": None, } output_cols = { - 'quote_date_entry': 'entry_date', - 'quote_date_exit': 'exit_date', - 'delta_entry': 'entry_delta', - 'underlying_price_entry': 'entry_stk_price', - 'underlying_price_exit': 'exit_stk_price', - 'dte_entry': 'DTE' + "quote_date_entry": "entry_date", + "quote_date_exit": "exit_date", + "delta_entry": "entry_delta", + "underlying_price_entry": "entry_stk_price", + "underlying_price_exit": "exit_stk_price", + "dte_entry": "dte", } output_format = [ - 'entry_date', - 'exit_date', - 'expiration', - 'DTE', - 'ratio', - 'contracts', - 'option_type', - 'strike', - 'entry_delta', - 'entry_stk_price', - 'exit_stk_price', - 'entry_opt_price', - 'exit_opt_price', - 'entry_price', - 'exit_price', - 'profit' + "entry_date", + "exit_date", + "expiration", + "underlying_symbol", + "dte", + "ratio", + "contracts", + "option_type", + "strike", + "entry_delta", + "entry_stk_price", + "exit_stk_price", + "entry_opt_price", + "exit_opt_price", + "entry_price", + "exit_price", + "profit", ] def _create_legs(data, leg): - return ( - data - .pipe(opt_type, option_type=leg[0]) - .assign(ratio=leg[1]) - ) + return data.pipe(opt_type, option_type=leg[0]).assign(ratio=leg[1]) def _apply_filters(legs, filters): if not filters: return legs else: - return [reduce(lambda l, f: getattr(fil, f)(l, filters[f], idx), filters, leg) - for idx, leg in enumerate(legs)] + return [ + reduce(lambda l, f: getattr(fil, f)(l, filters[f], idx), filters, leg) + for idx, leg in enumerate(legs) + ] def _filter_data(data, filters): @@ -101,37 +88,32 @@ def create_spread(data, leg_structs, filters): # merge and apply leg filters to create spread filters = {**default_entry_filters, **filters} - entry_filters = {f: filters[f] - for f in filters if (not f.startswith('entry_spread') and - not f.startswith('exit_'))} + entry_filters = { + f: filters[f] + for f in filters + if (not f.startswith("entry_spread") and not f.startswith("exit_")) + } spread = _filter_data(legs, entry_filters) # apply spread level filters to spread - spread_filters = {f: filters[f] - for f in filters if f.startswith('entry_spread')} - return _filter_data(spread, spread_filters).sort_values(sort_by) + spread_filters = {f: filters[f] for f in filters if f.startswith("entry_spread")} + return _filter_data(spread, spread_filters) # this is the main function that runs the backtest engine -def run(data, trades, filters, init_balance=10000, mode='midpoint'): - trades = trades if isinstance(trades, list) else [trades] - - # merge trades from multiple underlying symbols if applicable - all_trades = pd.concat(trades).sort_values(sort_by) - +def run(data, trades, filters, init_balance=10000, mode="midpoint"): # for each option to be traded, determine the historical price action filters = {**default_entry_filters, **filters} - exit_filters = {f: filters[f] for f in filters if f.startswith('exit_')} + exit_filters = {f: filters[f] for f in filters if f.startswith("exit_")} res = ( - pd - .merge(all_trades, data, on=on, suffixes=('_entry', '_exit')) + pd.merge(trades, data, on=on, suffixes=("_entry", "_exit")) .pipe(_filter_data, exit_filters) .pipe(calc_entry_px, mode) .pipe(calc_exit_px, mode) .pipe(calc_pnl) - # .pipe(calc_running_balance, init_balance) .rename(columns=output_cols) - .sort_values(['entry_date', 'expiration', 'underlying_symbol', 'strike']) + .sort_values(["entry_date", "expiration", "underlying_symbol", "strike"]) + .pipe(assign_trade_num) ) return calc_total_profit(res), res[output_format] diff --git a/optopsy/data.py b/optopsy/data.py index 03e8759..216b0cb 100644 --- a/optopsy/data.py +++ b/optopsy/data.py @@ -29,114 +29,77 @@ # The fourth item of each tuple defines if the field is affected by ratios fields = ( - ('option_symbol', False, 'text', 'common'), - ('underlying_symbol', True, 'text', 'common'), - ('quote_date', True, 'date', 'common'), - ('expiration', True, 'date', 'common'), - ('strike', True, 'numeric', 'common'), - ('option_type', True, 'text', 'common'), - ('bid', True, 'numeric', 'leg'), - ('ask', True, 'numeric', 'leg'), - ('underlying_price', True, 'numeric', 'common'), - ('implied_vol', False, 'numeric', 'common'), - ('delta', True, 'numeric', 'leg'), - ('gamma', True, 'numeric', 'leg'), - ('theta', True, 'numeric', 'leg'), - ('vega', True, 'numeric', 'leg'), - ('rho', False, 'numeric', 'leg') + ("option_symbol", False), + ("underlying_symbol", True), + ("quote_date", True), + ("expiration", True), + ("strike", True), + ("option_type", True), + ("bid", True), + ("ask", True), + ("underlying_price", True), + ("implied_vol", False), + ("delta", True), + ("gamma", True), + ("theta", True), + ("vega", True), + ("rho", False), ) -def _read_file(path, names, usecols, date_cols, skiprow, nrows=None): - return pd.read_csv( - path, - names=names, - usecols=usecols, - parse_dates=date_cols, - skiprows=skiprow, - nrows=nrows, - infer_datetime_format=True) - - def _import_file(path, names, usecols, date_cols, skiprow): - if _check_file_exists(path): - return _read_file(path, names, usecols, date_cols, - skiprow).pipe(format_option_df) - - -def _import_dir_files(path, names, usecols, date_cols, skiprow): - if _check_file_path_exists(path): + if not os.path.isdir(path): + data = pd.read_csv( + path, + names=names, + usecols=usecols, + parse_dates=date_cols, + skiprows=skiprow, + infer_datetime_format=True, + ) + elif os.path.isdir(path): fls = sorted(glob.glob(os.path.join(path, "*.csv"))) - return pd.concat(_read_file(f, names, usecols, date_cols, skiprow) - for f in fls).pipe(format_option_df) - - -def _check_file_exists(path): - if os.path.isdir(path): + data = pd.concat( + pd.read_csv( + f, names=names, usecols=usecols, parse_dates=date_cols, skiprows=skiprow + ) + for f in fls + ) + else: raise ValueError("Invalid path, please provide a valid path to a file") - return True + return data.pipe(format_option_df) -def _check_file_path_exists(path): - if not os.path.isdir(path): - raise ValueError("Invalid path, please provide a valid directory path") - return True - -def _do_preview(path, names, usecols, date_cols, skiprow): - print(_read_file(path, names, usecols, date_cols, skiprow, nrows=5) - .pipe(format_option_df).head() - ) +def _do_preview(data): + print(data.head()) return _user_prompt("Does this look correct?") def get(file_path, struct, skiprow=1, prompt=True): - return _do_import(file_path, struct, skiprow, prompt, bulk=False) - + return _do_import(file_path, struct, skiprow, prompt) -def gets(dir_path, struct, skiprow=1, prompt=True): - return _do_import(dir_path, struct, skiprow, prompt, bulk=True) - -def _do_import(path, struct, skiprow, prompt, bulk): +def _do_import(path, struct, skiprow, prompt): cols = list(zip(*struct)) - quote_date_idx = cols[0].index('quote_date') - expiration_idx = cols[0].index('expiration') - date_cols = [quote_date_idx, expiration_idx] + date_cols = [cols[0].index("quote_date"), cols[0].index("expiration")] if _check_structs(struct, cols): - names = cols[0] - usecols = cols[1] - if not prompt or (prompt & _do_preview( - path, names, usecols, date_cols, skiprow)): - if bulk: - return _import_dir_files( - path, names, usecols, date_cols, skiprow) - else: - return _import_file(path, names, usecols, date_cols, skiprow) + data = _import_file(path, cols[0], cols[1], date_cols, skiprow) + if not prompt or (prompt & _do_preview(data)): + return data else: sys.exit() def format_option_df(df): - """ - Format the data frame to a standard format - :param df: dataframe to format - :return: formatted dataframe - """ - return ( - df - .assign( - expiration=lambda r: pd.to_datetime( - r['expiration'], - format='%Y-%m-%d'), - quote_date=lambda r: pd.to_datetime( - r['quote_date'], - format='%Y-%m-%d'), - option_type=lambda r: r['option_type'].str.lower().str[:1] + df.assign( + expiration=lambda r: pd.to_datetime(r["expiration"], format="%Y-%m-%d"), + quote_date=lambda r: pd.to_datetime(r["quote_date"], format="%Y-%m-%d"), + option_type=lambda r: r["option_type"].str.lower().str[:1], ) - .assign(dte=lambda r: (r['expiration'] - r['quote_date']).dt.days) + .assign(dte=lambda r: (r["expiration"] - r["quote_date"]).dt.days) .round(2) ) @@ -167,16 +130,14 @@ def _check_fields_contains_required(cols): def _check_structs(struct, cols): - return (_check_field_is_standard(struct) and - _check_field_is_duplicated(cols) and - _check_fields_contains_required(cols)) + return ( + _check_field_is_standard(struct) + and _check_field_is_duplicated(cols) + and _check_fields_contains_required(cols) + ) def _user_prompt(question): - """ - Prompts a Yes/No questions. - :param question: The question to ask the user - """ while True: sys.stdout.write(question + " [y/n]: ") user_input = input().lower() diff --git a/optopsy/enums.py b/optopsy/enums.py index e061b9d..ad2b61e 100644 --- a/optopsy/enums.py +++ b/optopsy/enums.py @@ -37,34 +37,16 @@ class Period(Enum): SEVEN_WEEKS = 49 -class Struct(Enum): - CBOE = ( - ('symbol', 0), - ('quote_date', 1), - ('root', 2), - ('expiration', 3), - ('strike', 4), - ('option_type', 5), - ('bid', 12), - ('ask', 14), - ('underlying_price', 17), - ('delta', 19), - ('gamma', 20), - ('theta', 21), - ('vega', 22) - ) - - class OptionType(Enum): - CALL = ('c', 1) - PUT = ('p', -1) + CALL = ("c", 1) + PUT = ("p", -1) class OrderAction(Enum): - BTO = (1, 'BUY', 'BOT') - BTC = (1, 'BUY', 'BOT') - STO = (-1, 'SELL', 'SLD') - STC = (-1, 'SELL', 'SLD') + BTO = (1, "BUY", "BOT") + BTC = (1, "BUY", "BOT") + STO = (-1, "SELL", "SLD") + STC = (-1, "SELL", "SLD") class DayOfWeek(Enum): diff --git a/optopsy/filters.py b/optopsy/filters.py index 9e2db8b..f63c076 100644 --- a/optopsy/filters.py +++ b/optopsy/filters.py @@ -19,47 +19,45 @@ import pandas as pd -def _process_values(data, col, value, groupby=None, - valid_types=(int, float, tuple)): +def _process_tuples(data, col, groupby, value): + if len(set(value)) == 1: + return eq(data, col, value[1]) + else: + return data.pipe(nearest, col, value[1], groupby=groupby).pipe( + between, col, value[0], value[2], absolute=True + ) + + +def _process_values(data, col, value, groupby=None, valid_types=(int, float, tuple)): if not isinstance(value, valid_types): raise ValueError("Invalid value passed to filter") elif isinstance(value, tuple): - if len(set(value)) == 1: - return eq(data, col, value[1]) - else: - return ( - data - .pipe(nearest, col, value[1], groupby=groupby) - .pipe(between, col, value[0], value[2], absolute=True) - ) + return _process_tuples(data, col, groupby, value) else: return nearest(data, col, value, groupby=groupby) def _calc_strike_pct(data, value, n, idx): if not isinstance(value, (int, float, tuple)): - raise ValueError( - f"Invalid value passed for leg {n+1} entry strike percentage") + raise ValueError(f"Invalid value passed for leg {n+1} entry strike percentage") elif idx == n: - return ( - data - .assign(strike_pct=lambda r: (r['strike'] / r['underlying_price']).round(2)) - .pipe(_process_values, 'strike_pct', value) - ) + return data.assign( + strike_pct=lambda r: (r["strike"] / r["underlying_price"]).round(2) + ).pipe(_process_values, "strike_pct", value) else: return data def start_date(data, value, _idx): if isinstance(value, datetime): - return data[data['expiration'] >= value] + return data[data["expiration"] >= value] else: raise ValueError("Start Dates must of Date type") def end_date(data, value, _idx): if isinstance(value, datetime): - return data[data['expiration'] <= value] + return data[data["expiration"] <= value] else: raise ValueError("End Dates must of Date type") @@ -93,8 +91,8 @@ def entry_dte(data, value, _idx): For example, it will search options that have days to expiration between and including 20 to 55. """ - groupby = ['option_type', 'expiration', 'underlying_symbol'] - return _process_values(data, 'dte', value, groupby=groupby) + groupby = ["option_type", "expiration", "underlying_symbol"] + return _process_values(data, "dte", value, groupby=groupby) def entry_days(data, value, _idx): @@ -110,28 +108,28 @@ def leg1_delta(data, value, idx): """ Absolute value of a delta of an option. """ - return _process_values(data, 'delta', value) if idx == 0 else data + return _process_values(data, "delta", value) if idx == 0 else data def leg2_delta(data, value, idx): """ Absolute value of a delta of an option. """ - return _process_values(data, 'delta', value) if idx == 1 else data + return _process_values(data, "delta", value) if idx == 1 else data def leg3_delta(data, value, idx): """ Absolute value of a delta of an option. """ - return _process_values(data, 'delta', value) if idx == 2 else data + return _process_values(data, "delta", value) if idx == 2 else data def leg4_delta(data, value, idx): """ Absolute value of a delta of an option. """ - return _process_values(data, 'delta', value) if idx == 3 else data + return _process_values(data, "delta", value) if idx == 3 else data def leg1_strike_pct(data, value, idx): @@ -199,10 +197,10 @@ def exit_dte(data, value, _idx): For example, it would exit a trade with 10 days to expiration. """ if value is None: - return data[data['quote_date_exit'] == data['expiration']] + return data[data["quote_date_exit"] == data["expiration"]] else: - groupby = ['option_type', 'expiration', 'underlying_symbol'] - return _process_values(data, 'dte_exit', value, groupby=groupby) + groupby = ["option_type", "expiration", "underlying_symbol"] + return _process_values(data, "dte_exit", value, groupby=groupby) def exit_hold_days(data, value, _idx): diff --git a/optopsy/option_queries.py b/optopsy/option_queries.py index 12968e5..8f7a3bb 100644 --- a/optopsy/option_queries.py +++ b/optopsy/option_queries.py @@ -28,23 +28,23 @@ def _calc_abs_distance(row, column, val, absolute): def calls(df): - return df[df.option_type.str.lower().str.startswith('c')] + return df[df.option_type.str.lower().str.startswith("c")] def puts(df): - return df[df.option_type.str.lower().str.startswith('p')] + return df[df.option_type.str.lower().str.startswith("p")] def opt_type(df, option_type): if isinstance(option_type, OptionType): - return df[df['option_type'] == option_type.value[0]] + return df[df["option_type"] == option_type.value[0]] else: raise ValueError("option_type must be of type OptionType") def underlying_price(df): - if 'underlying_price' in df: - dates = df['underlying_price'].unique() + if "underlying_price" in df: + dates = df["underlying_price"].unique() return dates.mean() else: raise ValueError("Underlying Price column undefined!") @@ -55,22 +55,18 @@ def nearest(df, column, val, groupby=None, absolute=True, tie="roundup"): # getting the min abs dist over multiple sets of option groups # instead of the absolute min of the entire data set. if groupby is None: - groupby = ['quote_date', 'option_type', - 'expiration', 'underlying_symbol'] + groupby = ["quote_date", "option_type", "expiration", "underlying_symbol"] on = groupby + ["abs_dist"] - data = df.assign( - abs_dist=lambda r: - _calc_abs_distance(r, column, val, absolute) - ) + data = df.assign(abs_dist=lambda r: _calc_abs_distance(r, column, val, absolute)) return ( - data - .groupby(groupby)['abs_dist'].min() + data.groupby(groupby)["abs_dist"] + .min() .to_frame() .merge(data, on=on) - .drop('abs_dist', axis=1) + .drop("abs_dist", axis=1) ) @@ -105,6 +101,7 @@ def between(df, column, start, end, inclusive=True, absolute=False): else: temp_col = column - result = df[df[temp_col].between( - _convert(start), _convert(end), inclusive=inclusive)] + result = df[ + df[temp_col].between(_convert(start), _convert(end), inclusive=inclusive) + ] return result.drop(temp_col, axis=1) if absolute else result diff --git a/optopsy/option_strategies.py b/optopsy/option_strategies.py index f085530..a457ed9 100644 --- a/optopsy/option_strategies.py +++ b/optopsy/option_strategies.py @@ -16,93 +16,76 @@ from .enums import OptionType, OrderAction from .backtest import create_spread +from datetime import datetime def _add_date_range(s, e, f): - f['start_date'] = s - f['end_date'] = e + f["start_date"] = s + f["end_date"] = e return f def _dedup_legs(spreads): - groupby = ['quote_date', 'option_type', - 'expiration', 'underlying_symbol', 'ratio'] - on = groupby + ['delta'] + sort_by = ["quote_date", "expiration", "underlying_symbol", "strike"] + groupby = ["quote_date", "expiration", "underlying_symbol", "ratio", "option_type"] + on = groupby + ["delta"] return ( - spreads - .groupby(groupby)['delta'] + spreads.groupby(groupby)["delta"] .max() .to_frame() .merge(spreads, on=on) + .sort_values(sort_by) ) +def _filter_check(filters): + return True + + +def _date_checks(start, end): + return isinstance(start, datetime) and isinstance(end, datetime) + + +def _process_legs(data, start, end, legs, filters): + filters = _add_date_range(start, end, filters) + if _filter_check(filters) and _date_checks(start, end): + return _dedup_legs(create_spread(data, legs, filters)) + else: + raise ValueError("Invalid filters, or date types provided!") + + def long_call(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - return _dedup_legs(create_spread( - data, [(OptionType.CALL, 1)], filters)) + return _process_legs(data, start_date, end_date, [(OptionType.CALL, 1)], filters) def short_call(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - return _dedup_legs(create_spread( - data, [(OptionType.CALL, 1)], filters)) + return _process_legs(data, start_date, end_date, [(OptionType.CALL, -1)], filters) def long_put(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - return _dedup_legs(create_spread(data, [(OptionType.PUT, 1)], filters)) + return _process_legs(data, start_date, end_date, [(OptionType.PUT, 1)], filters) def short_put(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - return _dedup_legs(create_spread(data, [(OptionType.PUT, 1)], filters)) + return _process_legs(data, start_date, end_date, [(OptionType.PUT, -1)], filters) def long_call_spread(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - legs = [(OptionType.CALL, -1), (OptionType.CALL, 1)] - return _dedup_legs(create_spread(data, legs, filters)) - else: - raise ValueError( - "Long delta must be less than short delta for long call spreads!") + legs = [(OptionType.CALL, -1), (OptionType.CALL, 1)] + return _process_legs(data, start_date, end_date, legs, filters) def short_call_spread(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - legs = [(OptionType.CALL, 1), (OptionType.CALL, -1)] - return _dedup_legs(create_spread(data, legs, filters)) - else: - raise ValueError( - "Short delta must be less than long delta for short call spreads!") + legs = [(OptionType.CALL, 1), (OptionType.CALL, -1)] + return _process_legs(data, start_date, end_date, legs, filters) def long_put_spread(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - legs = [(OptionType.PUT, 1), (OptionType.PUT, -1)] - return _dedup_legs(create_spread(data, legs, filters)) - else: - raise ValueError( - "Short delta must be less than long delta for long put spreads!") + legs = [(OptionType.PUT, -1), (OptionType.PUT, 1)] + return _process_legs(data, start_date, end_date, legs, filters) def short_put_spread(data, start_date, end_date, filters): - filters = _add_date_range(start_date, end_date, filters) - if _filter_check(filters): - legs = [(OptionType.PUT, -1), (OptionType.PUT, 1)] - return _dedup_legs(create_spread(data, legs, filters)) - else: - raise ValueError( - "Long delta must be less than short delta for short put spreads!") - - -def _filter_check(filters): - return True + legs = [(OptionType.PUT, 1), (OptionType.PUT, -1)] + return _process_legs(data, start_date, end_date, legs, filters) diff --git a/optopsy/statistics.py b/optopsy/statistics.py index 5ca3533..312e377 100644 --- a/optopsy/statistics.py +++ b/optopsy/statistics.py @@ -14,31 +14,37 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -def calc_entry_px(data, mode='midpoint'): - if mode == 'midpoint': - return data.assign( - entry_opt_price=data[['bid_entry', 'ask_entry']].mean(axis=1)) - elif mode == 'market': - return data.assign(entry_opt_price=data['ask_entry']) + +def _assign_opt_px(data, mode, action): + if mode == "midpoint": + data[f"{action}_opt_price"] = data[[f"bid_{action}", f"ask_{action}"]].mean(axis=1) + elif mode == "market": + data[f"{action}_opt_price"] = data[f"ask_{action}"] + return data + + +def assign_trade_num(data): + groupby = ["entry_date", "expiration", "underlying_symbol"] + data["trade_num"] = data.groupby(groupby).ngroup() + data.set_index("trade_num", inplace=True) + return data + + +def calc_entry_px(data, mode="midpoint"): + return _assign_opt_px(data, mode,'entry') -def calc_exit_px(data, mode='midpoint'): - if mode == 'midpoint': - return data.assign( - exit_opt_price=data[['bid_exit', 'ask_exit']].mean(axis=1)) - elif mode == 'market': - return data.assign(exit_opt_price=data['ask_exit']) +def calc_exit_px(data, mode="midpoint"): + return _assign_opt_px(data, mode, 'exit') def calc_pnl(data): # calculate the p/l for the trades - data['entry_price'] = data['entry_opt_price'] * \ - data['ratio'] * data['contracts'] - data['exit_price'] = data['exit_opt_price'] * \ - data['ratio'] * data['contracts'] - data['profit'] = data['exit_price'] - data['entry_price'] + data["entry_price"] = data["entry_opt_price"] * data["ratio"] * data["contracts"] + data["exit_price"] = data["exit_opt_price"] * data["ratio"] * data["contracts"] + data["profit"] = data["exit_price"] - data["entry_price"] return data def calc_total_profit(data): - return data['profit'].sum().round(2) + return data["profit"].sum().round(2) diff --git a/strategies/sample_strategy.py b/strategies/sample_strategy.py index fa65d6e..4c6e7b3 100755 --- a/strategies/sample_strategy.py +++ b/strategies/sample_strategy.py @@ -4,25 +4,25 @@ # absolute file path to our input file CURRENT_FILE = os.path.abspath(os.path.dirname(__file__)) -FILE = os.path.join(CURRENT_FILE, 'data', 'SPX_2016.csv') +FILE = os.path.join(CURRENT_FILE, "data", "SPX_2016.csv") # Here we define the struct to match the format of our csv file # the struct indices are 0-indexed where first column of the csv file # is mapped to 0 SPX_FILE_STRUCT = ( - ('underlying_symbol', 0), - ('underlying_price', 1), - ('option_symbol', 3), - ('option_type', 5), - ('expiration', 6), - ('quote_date', 7), - ('strike', 8), - ('bid', 10), - ('ask', 11), - ('delta', 15), - ('gamma', 16), - ('theta', 17), - ('vega', 18) + ("underlying_symbol", 0), + ("underlying_price", 1), + ("option_symbol", 3), + ("option_type", 5), + ("expiration", 6), + ("quote_date", 7), + ("strike", 8), + ("bid", 10), + ("ask", 11), + ("delta", 15), + ("gamma", 16), + ("theta", 17), + ("vega", 18), ) @@ -32,30 +32,30 @@ def run_strategy(): data = op.get(FILE, SPX_FILE_STRUCT, prompt=False) # define the entry and exit filters to use for this strategy, full list of - # filters is listed in the documentation (WIP). + # filters will be listed in the documentation (WIP). filters = { - 'entry_dte': (27, 30, 31), - 'leg1_delta': 0.30, - 'leg2_delta': 0.50, - 'contract_size': 10 + "entry_dte": (27, 30, 31), + "leg1_delta": 0.30, + "leg2_delta": 0.50, + "contract_size": 10, } # set the start and end dates for the backtest, the dates are inclusive + # start and end dates are python datetime objects start = datetime(2016, 1, 1) end = datetime(2016, 12, 31) # create the option spread that matches the entry filters trades = op.strategies.short_call_spread(data, start, end, filters) - trades.to_csv('./strategies/results/trades.csv') # call the run method with our data, option spreads and filters to run the backtest backtest = op.run(data, trades, filters) # backtest will return a tuple with the profit amount and a dataframe # containing the backtest results(the return format may be subject to change) - backtest[1].to_csv('./strategies/results/results.csv') + backtest[1].to_csv("./strategies/results/results.csv") print("Total Profit: %s" % backtest[0]) -if __name__ == '__main__': +if __name__ == "__main__": run_strategy() diff --git a/tests/integration/test_integration_singles.py b/tests/integration/test_integration_singles.py new file mode 100644 index 0000000..7efa5ef --- /dev/null +++ b/tests/integration/test_integration_singles.py @@ -0,0 +1,76 @@ +from optopsy.backtest import run +from optopsy.option_strategies import long_call, short_call, long_put, short_put +from optopsy.data import get +from datetime import datetime +import os +import pytest + +CURRENT_FILE = os.path.abspath(os.path.dirname(__file__)) +TEST_FILE_PATH_FULL = os.path.join( + CURRENT_FILE, "../test_data/test_options_data_full.csv" +) + + +def test_long_call_integration(hod_struct): + data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) + + filters = {"entry_dte": 31, "leg1_delta": 0.30, "exit_dte": 7} + + start = datetime(2018, 1, 1) + end = datetime(2018, 2, 28) + + trades = long_call(data, start, end, filters) + backtest = run(data, trades, filters) + assert backtest[0] == 963.0 + + +def test_long_call_no_exit_dte_integration(hod_struct): + data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) + + filters = {"entry_dte": 31, "leg1_delta": 0.30} + + start = datetime(2018, 1, 1) + end = datetime(2018, 2, 28) + + trades = long_call(data, start, end, filters) + backtest = run(data, trades, filters) + assert backtest[0] == 818.75 + + +def test_short_call_integration(hod_struct): + data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) + + filters = {"entry_dte": 31, "leg1_delta": 0.30, "exit_dte": 7} + + start = datetime(2018, 1, 1) + end = datetime(2018, 2, 28) + + trades = short_call(data, start, end, filters) + backtest = run(data, trades, filters) + assert backtest[0] == -963.0 + + +def test_long_put_spread_integration(hod_struct): + data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) + + filters = {"entry_dte": 31, "leg1_delta": 0.30, "exit_dte": 7} + + start = datetime(2018, 1, 1) + end = datetime(2018, 2, 28) + + trades = long_put(data, start, end, filters) + backtest = run(data, trades, filters) + assert backtest[0] == 476.5 + + +def test_short_put_spread_integration(hod_struct): + data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) + + filters = {"entry_dte": 31, "leg1_delta": 0.30, "exit_dte": 7} + + start = datetime(2018, 1, 1) + end = datetime(2018, 2, 28) + + trades = short_put(data, start, end, filters) + backtest = run(data, trades, filters) + assert backtest[0] == -476.5 diff --git a/tests/integration/test_integration_verticals.py b/tests/integration/test_integration_verticals.py index 195466c..83e36c8 100644 --- a/tests/integration/test_integration_verticals.py +++ b/tests/integration/test_integration_verticals.py @@ -1,24 +1,25 @@ from optopsy.backtest import run -from optopsy.option_strategies import long_call_spread, short_call_spread, long_put_spread, short_put_spread +from optopsy.option_strategies import ( + long_call_spread, + short_call_spread, + long_put_spread, + short_put_spread, +) from optopsy.data import get from datetime import datetime import os import pytest CURRENT_FILE = os.path.abspath(os.path.dirname(__file__)) -TEST_FILE_PATH_FULL = os.path.join(CURRENT_FILE, - '../test_data/test_options_data_full.csv') +TEST_FILE_PATH_FULL = os.path.join( + CURRENT_FILE, "../test_data/test_options_data_full.csv" +) def test_long_call_spread_integration(hod_struct): data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) - filters = { - 'entry_dte': 31, - 'leg1_delta': 0.30, - 'leg2_delta': 0.50, - 'exit_dte': 7 - } + filters = {"entry_dte": 31, "leg1_delta": 0.30, "leg2_delta": 0.50, "exit_dte": 7} start = datetime(2018, 1, 1) end = datetime(2018, 2, 28) @@ -26,35 +27,33 @@ def test_long_call_spread_integration(hod_struct): trades = long_call_spread(data, start, end, filters) backtest = run(data, trades, filters) assert backtest[0] == -80.25 + assert backtest[1].iat[0, 5] == 1 + assert backtest[1].iat[1, 5] == -1 + assert backtest[1].iat[2, 5] == 1 + assert backtest[1].iat[3, 5] == -1 def test_long_call_spread_no_exit_dte_integration(hod_struct): data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) - filters = { - 'entry_dte': 31, - 'leg1_delta': 0.30, - 'leg2_delta': 0.50 - } + filters = {"entry_dte": 31, "leg1_delta": 0.30, "leg2_delta": 0.50} start = datetime(2018, 1, 1) end = datetime(2018, 2, 28) trades = long_call_spread(data, start, end, filters) backtest = run(data, trades, filters) - print(backtest[1]) assert backtest[0] == -72.00 + assert backtest[1].iat[0, 5] == 1 + assert backtest[1].iat[1, 5] == -1 + assert backtest[1].iat[2, 5] == 1 + assert backtest[1].iat[3, 5] == -1 def test_short_call_spread_integration(hod_struct): data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) - filters = { - 'entry_dte': 31, - 'leg1_delta': 0.30, - 'leg2_delta': 0.50, - 'exit_dte': 7 - } + filters = {"entry_dte": 31, "leg1_delta": 0.30, "leg2_delta": 0.50, "exit_dte": 7} start = datetime(2018, 1, 1) end = datetime(2018, 2, 28) @@ -62,39 +61,41 @@ def test_short_call_spread_integration(hod_struct): trades = short_call_spread(data, start, end, filters) backtest = run(data, trades, filters) assert backtest[0] == 80.25 + assert backtest[1].iat[0, 5] == -1 + assert backtest[1].iat[1, 5] == 1 + assert backtest[1].iat[2, 5] == -1 + assert backtest[1].iat[3, 5] == 1 def test_long_put_spread_integration(hod_struct): data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) - filters = { - 'entry_dte': 31, - 'leg1_delta': 0.30, - 'leg2_delta': 0.50, - 'exit_dte': 7 - } + filters = {"entry_dte": 31, "leg1_delta": 0.30, "leg2_delta": 0.50, "exit_dte": 7} start = datetime(2018, 1, 1) end = datetime(2018, 2, 28) trades = long_put_spread(data, start, end, filters) backtest = run(data, trades, filters) - assert backtest[0] == -227.50 + assert backtest[0] == 227.50 + assert backtest[1].iat[0, 5] == -1 + assert backtest[1].iat[1, 5] == 1 + assert backtest[1].iat[2, 5] == -1 + assert backtest[1].iat[3, 5] == 1 def test_short_put_spread_integration(hod_struct): data = get(TEST_FILE_PATH_FULL, hod_struct, prompt=False) - filters = { - 'entry_dte': 31, - 'leg1_delta': 0.30, - 'leg2_delta': 0.50, - 'exit_dte': 7 - } + filters = {"entry_dte": 31, "leg1_delta": 0.30, "leg2_delta": 0.50, "exit_dte": 7} start = datetime(2018, 1, 1) end = datetime(2018, 2, 28) trades = short_put_spread(data, start, end, filters) backtest = run(data, trades, filters) - assert backtest[0] == 227.50 + assert backtest[0] == -227.50 + assert backtest[1].iat[0, 5] == 1 + assert backtest[1].iat[1, 5] == -1 + assert backtest[1].iat[2, 5] == 1 + assert backtest[1].iat[3, 5] == -1 diff --git a/tests/support/data_fixtures.py b/tests/support/data_fixtures.py index 3cbd4ca..20e3cf8 100644 --- a/tests/support/data_fixtures.py +++ b/tests/support/data_fixtures.py @@ -5,30 +5,30 @@ CURRENT_FILE = os.path.abspath(os.path.dirname(__file__)) -TEST_FILE_PATH = os.path.join(CURRENT_FILE, - '../test_data/test_options_data.csv') -TEST_FILE_PATH_FULL = os.path.join(CURRENT_FILE, - '../test_data/test_options_data_full.csv') +TEST_FILE_PATH = os.path.join(CURRENT_FILE, "../test_data/test_options_data.csv") +TEST_FILE_PATH_FULL = os.path.join( + CURRENT_FILE, "../test_data/test_options_data_full.csv" +) TEST_STRUCT = ( - ('underlying_symbol', 0), - ('underlying_price', 1), - ('option_type', 5), - ('expiration', 6), - ('quote_date', 7), - ('strike', 8), - ('bid', 10), - ('ask', 11), - ('delta', 15), - ('gamma', 16), - ('theta', 17), - ('vega', 18) + ("underlying_symbol", 0), + ("underlying_price", 1), + ("option_type", 5), + ("expiration", 6), + ("quote_date", 7), + ("strike", 8), + ("bid", 10), + ("ask", 11), + ("delta", 15), + ("gamma", 16), + ("theta", 17), + ("vega", 18), ) def _parse_date_cols(columns): - quote_date_idx = columns[0].index('quote_date') - expiration_idx = columns[0].index('expiration') + quote_date_idx = columns[0].index("quote_date") + expiration_idx = columns[0].index("expiration") return [quote_date_idx, expiration_idx] @@ -37,12 +37,11 @@ def _parse_date_cols(columns): def options_data(hod_struct): cols = list(zip(*hod_struct)) date_cols = _parse_date_cols(cols) - return ( - pd.read_csv( - TEST_FILE_PATH, - parse_dates=date_cols, - names=cols[0], - usecols=cols[1], - skiprows=1, - nrows=None - ).pipe(format_option_df)) + return pd.read_csv( + TEST_FILE_PATH, + parse_dates=date_cols, + names=cols[0], + usecols=cols[1], + skiprows=1, + nrows=None, + ).pipe(format_option_df) diff --git a/tests/support/struct_fixtures.py b/tests/support/struct_fixtures.py index 63e0833..876a690 100644 --- a/tests/support/struct_fixtures.py +++ b/tests/support/struct_fixtures.py @@ -5,84 +5,78 @@ @pytest.fixture def valid_struct(): return ( - ('underlying_symbol', 0), - ('underlying_price', 1), - ('root', 2), - ('option_type', 4), - ('expiration', 5), - ('quote_date', 6), - ('strike', 7), - ('bid', 9), - ('ask', 10), - ('volume', 11), - ('open_interest', 12), - ('implied_vol', 14), - ('delta', 17), - ('gamma', 18), - ('theta', 19), - ('vega', 20) + ("underlying_symbol", 0), + ("underlying_price", 1), + ("root", 2), + ("option_type", 4), + ("expiration", 5), + ("quote_date", 6), + ("strike", 7), + ("bid", 9), + ("ask", 10), + ("volume", 11), + ("open_interest", 12), + ("implied_vol", 14), + ("delta", 17), + ("gamma", 18), + ("theta", 19), + ("vega", 20), ) @pytest.fixture def invalid_idx(): - return ( - ('symbol', -1), - ('quote_date', -2) - ) + return (("symbol", -1), ("quote_date", -2)) @pytest.fixture def invalid_fields(): - return ( - ('symbol', 0), - ('invalid', 1) - ) + return (("symbol", 0), ("invalid", 1)) @pytest.fixture def invalid_struct(): return ( - ('option_type', 4), - ('expiration', 5), - ('quote_date', 5), - ('strike', 7), - ('bid', 9), - ('ask', 10) + ("option_type", 4), + ("expiration", 5), + ("quote_date", 5), + ("strike", 7), + ("bid", 9), + ("ask", 10), ) @pytest.fixture def cboe_struct(): return ( - ('underlying_symbol', 0), - ('quote_date', 1), - ('expiration', 3), - ('strike', 4), - ('option_type', 5), - ('bid', 12), - ('ask', 14), - ('underlying_price', 17), - ('delta', 19), - ('gamma', 20), - ('theta', 21), - ('vega', 22) + ("underlying_symbol", 0), + ("quote_date", 1), + ("expiration", 3), + ("strike", 4), + ("option_type", 5), + ("bid", 12), + ("ask", 14), + ("underlying_price", 17), + ("delta", 19), + ("gamma", 20), + ("theta", 21), + ("vega", 22), ) @pytest.fixture def hod_struct(): return ( - ('underlying_symbol', 0), - ('underlying_price', 1), - ('option_type', 5), - ('expiration', 6), - ('quote_date', 7), - ('strike', 8), - ('bid', 10), - ('ask', 11), - ('delta', 15), - ('gamma', 16), - ('theta', 17), - ('vega', 18) + ("underlying_symbol", 0), + ("underlying_price", 1), + ("option_type", 5), + ("expiration", 6), + ("quote_date", 7), + ("strike", 8), + ("bid", 10), + ("ask", 11), + ("delta", 15), + ("gamma", 16), + ("theta", 17), + ("vega", 18), ) diff --git a/tests/test_data_import.py b/tests/test_data_import.py index 6e1d733..e6255f8 100644 --- a/tests/test_data_import.py +++ b/tests/test_data_import.py @@ -6,13 +6,14 @@ @pytest.fixture def mock_daily_dir(): - return os.path.join(os.path.dirname(__file__), 'test_data_dir') + return os.path.join(os.path.dirname(__file__), "test_data_dir") @pytest.fixture def mock_daily_file(): - return os.path.join(os.path.dirname(__file__), - 'test_data_dir', 'test_cboe_20160104.csv') + return os.path.join( + os.path.dirname(__file__), "test_data_dir", "test_cboe_20160104.csv" + ) @pytest.fixture @@ -29,7 +30,7 @@ def test_valid_fields(mock_daily_file, cboe_struct): try: op.get(mock_daily_file, struct=cboe_struct, prompt=False) except ValueError: - pytest.fail('ValueError raised') + pytest.fail("ValueError raised") def test_invalid_idx(mock_file_dir, invalid_idx): @@ -42,21 +43,11 @@ def test_duplicate_idx_in_struct(mock_file_dir, invalid_struct): op.get(mock_file_dir, struct=invalid_struct, prompt=False) -def test_invalid_path_data_import(mock_daily_dir, cboe_struct): - with pytest.raises(ValueError): - op.get(mock_daily_dir, struct=cboe_struct, prompt=False) - - -def test_invalid_path_data_import_bulk(mock_daily_file, cboe_struct): - with pytest.raises(ValueError): - op.gets(mock_daily_file, struct=cboe_struct, prompt=False) - - def test_data_import(mock_daily_file, cboe_struct): data = op.get(mock_daily_file, struct=cboe_struct, prompt=False) assert data.shape == (2, 13) def test_data_import_bulk(mock_daily_dir, cboe_struct): - data = op.gets(mock_daily_dir, struct=cboe_struct, prompt=False) + data = op.get(mock_daily_dir, struct=cboe_struct, prompt=False) assert data.shape == (6, 13) diff --git a/tests/test_filters.py b/tests/test_filters.py index b413b9e..adb392e 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -8,7 +8,7 @@ def test_start_date(options_data): start = datetime(1990, 1, 1) df = start_date(options_data, start, 0) assert not df.empty - assert all(v >= start for v in df['expiration']) + assert all(v >= start for v in df["expiration"]) def test_start_date_ouf_of_bound(options_data): @@ -21,7 +21,7 @@ def test_end_date(options_data): end = datetime(1990, 1, 21) df = end_date(options_data, end, 0) assert not df.empty - assert all(v <= end for v in df['expiration']) + assert all(v <= end for v in df["expiration"]) def test_end_date_ouf_of_bound(options_data): @@ -32,19 +32,19 @@ def test_end_date_ouf_of_bound(options_data): def test_invalid_start_date(options_data): with pytest.raises(ValueError): - start_date(options_data, '123', 0) + start_date(options_data, "123", 0) def test_invalid_end_date(options_data): with pytest.raises(ValueError): - end_date(options_data, '123', 0) + end_date(options_data, "123", 0) def test_contract_size(options_data): df = contract_size(options_data, 10, 0) assert not df.empty - assert 'contracts' in df.columns - assert all(v == 10 for v in df['contracts']) + assert "contracts" in df.columns + assert all(v == 10 for v in df["contracts"]) def test_invalid_contract_size(options_data): @@ -55,104 +55,96 @@ def test_invalid_contract_size(options_data): def test_dte(options_data): df = entry_dte(options_data, 18, 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [17, 18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [17, 18] for v in df["dte"]) def test_dte_exact(options_data): df = entry_dte(options_data, (18, 18, 18), 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [18] for v in df["dte"]) def test_dte_tuple(options_data): df = entry_dte(options_data, (17, 18, 19), 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [17, 18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [17, 18] for v in df["dte"]) def test_dte_float(options_data): df = entry_dte(options_data, 18.25, 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [17, 18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [17, 18] for v in df["dte"]) def test_dte_float_tuple(options_data): df = entry_dte(options_data, (17.05, 18.05, 190.5), 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [17, 18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [17, 18] for v in df["dte"]) def test_dte_float_value(options_data): df = entry_dte(options_data, (18, 18, 18), 0) assert not df.empty - assert 'dte' in df.columns - assert all(v in [18] for v in df['dte']) + assert "dte" in df.columns + assert all(v in [18] for v in df["dte"]) def test_leg1_delta_tuple(options_data): df = leg1_delta(options_data, (0.45, 0.50, 0.55), 0) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg1_delta_value(options_data): df = leg1_delta(options_data, 0.50, 0) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg2_delta_tuple(options_data): df = leg2_delta(options_data, (0.45, 0.50, 0.55), 1) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg2_delta_value(options_data): df = leg2_delta(options_data, 0.50, 1) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg3_delta_tuple(options_data): df = leg3_delta(options_data, (0.45, 0.50, 0.55), 2) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg3_delta_value(options_data): df = leg3_delta(options_data, 0.50, 2) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg4_delta_tuple(options_data): df = leg4_delta(options_data, (0.45, 0.50, 0.55), 3) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_leg4_delta_value(options_data): df = leg4_delta(options_data, 0.50, 3) assert not df.empty - assert all(v in [0.55, 0.51, -0.46, -0.49] - for v in df['delta'].unique().tolist()) + assert all(v in [0.55, 0.51, -0.46, -0.49] for v in df["delta"].unique().tolist()) def test_invalid_leg1_delta(options_data): with pytest.raises(ValueError): - leg1_delta(options_data, 'invalid', 0) + leg1_delta(options_data, "invalid", 0) def test_wrong_leg_leg1_delta(options_data): @@ -182,54 +174,54 @@ def test_wrong_leg_leg4_delta(options_data): def test_leg1_strike_pct_tuple(options_data): df = leg1_strike_pct(options_data, (0.75, 0.80, 0.90), 0) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg1_strike_pct_value(options_data): df = leg1_strike_pct(options_data, 0.80, 0) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg2_strike_pct_tuple(options_data): df = leg2_strike_pct(options_data, (0.75, 0.80, 0.90), 1) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg2_strike_pct_value(options_data): df = leg2_strike_pct(options_data, 0.80, 1) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg3_strike_pct_tuple(options_data): df = leg3_strike_pct(options_data, (0.75, 0.80, 0.90), 2) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg3_strike_pct_value(options_data): df = leg3_strike_pct(options_data, 0.80, 2) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg4_strike_pct_tuple(options_data): df = leg4_strike_pct(options_data, (0.75, 0.80, 0.90), 3) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_leg4_strike_pct_value(options_data): df = leg4_strike_pct(options_data, 0.80, 3) assert not df.empty - assert all(v in [0.89] for v in df['strike_pct'].unique().tolist()) + assert all(v in [0.89] for v in df["strike_pct"].unique().tolist()) def test_invalid_leg1_strike_pct(options_data): with pytest.raises(ValueError): - leg1_strike_pct(options_data, 'invalid', 0) + leg1_strike_pct(options_data, "invalid", 0) def test_wrong_leg_leg1_strike_pct(options_data): diff --git a/tests/test_option_query.py b/tests/test_option_query.py index 5b701be..9efae5c 100644 --- a/tests/test_option_query.py +++ b/tests/test_option_query.py @@ -5,16 +5,16 @@ def test_calls(options_data): c = calls(options_data).option_type.unique() assert len(c) == 1 - assert c[0] == 'c' + assert c[0] == "c" def test_puts(options_data): p = puts(options_data).option_type.unique() assert len(p) == 1 - assert p[0] == 'p' + assert p[0] == "p" -@pytest.mark.parametrize('option_type', [2, 'x', 'invalid', (3, 4)]) +@pytest.mark.parametrize("option_type", [2, "x", "invalid", (3, 4)]) def test_invalid_option_type(options_data, option_type): with pytest.raises(ValueError): opt_type(options_data, option_type) @@ -33,9 +33,12 @@ def test_underlying_price(options_data): @pytest.mark.parametrize( - "value", [('strike', 357.5, [355, 360]), - ('delta', 0.50, [0.55, -0.46, 0.51, -0.49]), - ('delta', 0.34, [0.35, -0.31, 0.32, -0.33])] + "value", + [ + ("strike", 357.5, [355, 360]), + ("delta", 0.50, [0.55, -0.46, 0.51, -0.49]), + ("delta", 0.34, [0.35, -0.31, 0.32, -0.33]), + ], ) def test_nearest_column(options_data, value): # here we test for mid-point, values returned should round up. @@ -43,119 +46,147 @@ def test_nearest_column(options_data, value): assert all(v in value[2] for v in chain[value[0]].unique().tolist()) -@pytest.mark.parametrize( - "value", [('test', 1), (1234, 1), ('option_symbol', 'test')]) +@pytest.mark.parametrize("value", [("test", 1), (1234, 1), ("option_symbol", "test")]) def test_invalid_column_values(options_data, value): with pytest.raises(KeyError): nearest(options_data, value[0], value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-21'), - ('quote_date', '01-01-1990'), - ('dte', Period.SEVEN_WEEKS.value), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-21"), + ("quote_date", "01-01-1990"), + ("dte", Period.SEVEN_WEEKS.value), + ("dte", Period.ONE_DAY.value), + ], +) def test_lte(options_data, value): values = lte(options_data, column=value[0], val=value[1])[value[0]] assert all(values <= value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-21'), - ('quote_date', '01-01-1990'), - ('dte', Period.SEVEN_WEEKS.value), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-21"), + ("quote_date", "01-01-1990"), + ("dte", Period.SEVEN_WEEKS.value), + ("dte", Period.ONE_DAY.value), + ], +) def test_gte(options_data, value): values = gte(options_data, column=value[0], val=value[1])[value[0]] assert all(values >= value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-21'), - ('quote_date', '01-01-1990'), - ('dte', Period.SEVEN_WEEKS.value), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-21"), + ("quote_date", "01-01-1990"), + ("dte", Period.SEVEN_WEEKS.value), + ("dte", Period.ONE_DAY.value), + ], +) def test_ge(options_data, value): values = gt(options_data, column=value[0], val=value[1])[value[0]] assert all(values > value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-20'), - ('quote_date', '01-01-1990'), - ('dte', 18), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-20"), + ("quote_date", "01-01-1990"), + ("dte", 18), + ("dte", Period.ONE_DAY.value), + ], +) def test_eq(options_data, value): values = eq(options_data, column=value[0], val=value[1])[value[0]] assert all(values == value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-21'), - ('quote_date', '01-01-1990'), - ('dte', Period.SEVEN_WEEKS.value), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-21"), + ("quote_date", "01-01-1990"), + ("dte", Period.SEVEN_WEEKS.value), + ("dte", Period.ONE_DAY.value), + ], +) def test_lt(options_data, value): values = lt(options_data, column=value[0], val=value[1])[value[0]] assert all(values < value[1]) -@pytest.mark.parametrize("value", [('strike', 350), - ('delta', 0.50), - ('gamma', 0.02), - ('expiration', '1990-01-21'), - ('quote_date', '01-01-1990'), - ('dte', Period.SEVEN_WEEKS.value), - ('dte', Period.ONE_DAY.value)]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350), + ("delta", 0.50), + ("gamma", 0.02), + ("expiration", "1990-01-21"), + ("quote_date", "01-01-1990"), + ("dte", Period.SEVEN_WEEKS.value), + ("dte", Period.ONE_DAY.value), + ], +) def test_ne(options_data, value): values = ne(options_data, column=value[0], val=value[1])[value[0]] assert all(values != value[1]) -@pytest.mark.parametrize("value", [('strike', 350, 370), - ('delta', 0.5, -0.5), - ('gamma', 0.04, 0.01), - ('expiration', '1990-01-20', '1990-01-21'), - ('quote_date', '01-01-1990', '01-04-1990'), - ('dte', 1, 1.10), - ('dte', Period.ONE_DAY.value, - Period.ONE_WEEK.value) - ]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350, 370), + ("delta", 0.5, -0.5), + ("gamma", 0.04, 0.01), + ("expiration", "1990-01-20", "1990-01-21"), + ("quote_date", "01-01-1990", "01-04-1990"), + ("dte", 1, 1.10), + ("dte", Period.ONE_DAY.value, Period.ONE_WEEK.value), + ], +) def test_between_inclusive(options_data, value): - values = between( - options_data, - column=value[0], - start=value[1], - end=value[2])[ - value[0]] + values = between(options_data, column=value[0], start=value[1], end=value[2])[ + value[0] + ] assert all(values.between(value[1], value[2])) -@pytest.mark.parametrize("value", [('strike', 350, 370), - ('delta', 0.5, -0.5), - ('gamma', 0.04, 0.01), - ('expiration', '1990-01-20', '1990-01-21'), - ('quote_date', '01-01-1990', '01-04-1990'), - ('dte', 1, 1.10), - ('dte', Period.ONE_DAY.value, - Period.ONE_WEEK.value) - ]) +@pytest.mark.parametrize( + "value", + [ + ("strike", 350, 370), + ("delta", 0.5, -0.5), + ("gamma", 0.04, 0.01), + ("expiration", "1990-01-20", "1990-01-21"), + ("quote_date", "01-01-1990", "01-04-1990"), + ("dte", 1, 1.10), + ("dte", Period.ONE_DAY.value, Period.ONE_WEEK.value), + ], +) def test_between(options_data, value): values = between( - options_data, - column=value[0], - start=value[1], - end=value[2], - inclusive=False)[ - value[0]] + options_data, column=value[0], start=value[1], end=value[2], inclusive=False + )[value[0]] assert all(values.between(value[1], value[2], inclusive=False)) diff --git a/tests/test_single_strategy.py b/tests/test_single_strategy.py index 6e7dba2..5ecddb1 100644 --- a/tests/test_single_strategy.py +++ b/tests/test_single_strategy.py @@ -3,7 +3,7 @@ from optopsy.enums import OrderAction from optopsy.option_strategies import long_call, short_call, long_put, short_put -pd.set_option('display.expand_frame_repr', False) +pd.set_option("display.expand_frame_repr", False) start = datetime(1990, 1, 20) end = datetime(1990, 1, 20) @@ -14,14 +14,11 @@ def test_long_call(options_data): options_data, start, end, - { - 'leg1_delta': (0.45, 0.50, 0.55), - 'entry_dte': (18, 18, 18) - } + {"leg1_delta": (0.45, 0.50, 0.55), "entry_dte": (18, 18, 18)}, ) results = actual_spread - assert all(results['option_type'] == 'c') - assert all(v in [0.55, 0.51] for v in results['delta'].unique().tolist()) + assert all(results["option_type"] == "c") + assert all(v in [0.55, 0.51] for v in results["delta"].unique().tolist()) assert results.shape == (1, 15) @@ -30,14 +27,11 @@ def test_short_call(options_data): options_data, start, end, - { - 'leg1_delta': (0.45, 0.50, 0.55), - 'entry_dte': (18, 18, 18) - } + {"leg1_delta": (0.45, 0.50, 0.55), "entry_dte": (18, 18, 18)}, ) results = actual_spread - assert all(results['option_type'] == 'c') - assert all(v in [0.55, 0.51] for v in results['delta'].unique().tolist()) + assert all(results["option_type"] == "c") + assert all(v in [0.55, 0.51] for v in results["delta"].unique().tolist()) def test_long_put(options_data): @@ -45,14 +39,11 @@ def test_long_put(options_data): options_data, start, end, - { - 'leg1_delta': (0.45, 0.50, 0.55), - 'entry_dte': (17, 17, 17) - } + {"leg1_delta": (0.45, 0.50, 0.55), "entry_dte": (17, 17, 17)}, ) results = actual_spread - assert all(results['option_type'] == 'p') - assert all(v in [-0.49] for v in results['delta'].unique().tolist()) + assert all(results["option_type"] == "p") + assert all(v in [-0.49] for v in results["delta"].unique().tolist()) def test_short_put(options_data): @@ -60,11 +51,8 @@ def test_short_put(options_data): options_data, start, end, - { - 'leg1_delta': (0.45, 0.50, 0.55), - 'entry_dte': (17, 17, 17) - } + {"leg1_delta": (0.45, 0.50, 0.55), "entry_dte": (17, 17, 17)}, ) results = actual_spread - assert all(results['option_type'] == 'p') - assert all(v in [-0.49] for v in results['delta'].unique().tolist()) + assert all(results["option_type"] == "p") + assert all(v in [-0.49] for v in results["delta"].unique().tolist()) diff --git a/tests/test_vertical_strategy.py b/tests/test_vertical_strategy.py index 32746c9..9d79f17 100644 --- a/tests/test_vertical_strategy.py +++ b/tests/test_vertical_strategy.py @@ -1,6 +1,10 @@ from optopsy.enums import OrderAction -from optopsy.option_strategies import long_call_spread, short_call_spread, long_put_spread, \ - short_put_spread +from optopsy.option_strategies import ( + long_call_spread, + short_call_spread, + long_put_spread, + short_put_spread, +) from datetime import datetime @@ -8,21 +12,21 @@ end = datetime(1990, 1, 20) params = { - 'leg1_delta': (0.25, 0.30, 0.45), - 'leg2_delta': (0.45, 0.50, 0.55), - 'entry_dte': (18, 18, 18) + "leg1_delta": (0.25, 0.30, 0.45), + "leg2_delta": (0.45, 0.50, 0.55), + "entry_dte": (18, 18, 18), } def _test_call_results(result): - assert all(result['option_type'] == 'c') - assert all(v in [0.35, 0.55] for v in result['delta'].unique().tolist()) + assert all(result["option_type"] == "c") + assert all(v in [0.35, 0.55] for v in result["delta"].unique().tolist()) assert result.shape == (2, 15) def _test_put_results(result): - assert all(result['option_type'] == 'p') - assert all(v in [-0.31, -0.46] for v in result['delta'].unique().tolist()) + assert all(result["option_type"] == "p") + assert all(v in [-0.31, -0.46] for v in result["delta"].unique().tolist()) assert result.shape == (2, 15)