Skip to content

Commit

Permalink
Denodo integration for multimodal trading bot (#19)
Browse files Browse the repository at this point in the history
Related to #18

Add Denodo integration to the repository.

* **Denodo Integration Module**
- Add `denodo_integration.py` to establish Denodo connection using
`jaydebeapi` or `pyodbc`.
- Implement utility functions for executing queries and retrieving
results.
  - Include error handling and logging for connection and query issues.

* **Configuration Updates**
- Update `config/api_config.py` to include Denodo-related configurations
and headers.
- Update `config/config.py` to include Denodo configurations in the main
configuration settings.
  - Add Denodo-related environment variables to `.env`.

* **Main Application Changes**
- Update `main.py` to initialize Denodo connection and utilize it for
data retrieval and processing.
- Add Denodo query execution in the asynchronous data processing
function.

* **Dependencies**
- Update `requirements.txt` to include `jaydebeapi` and `pyodbc`
libraries for Denodo integration.



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
	- Added Denodo database integration to the application
	- Introduced new configuration options for Denodo API connection
- Implemented methods to connect and execute queries with Denodo
database

- **Dependencies**
	- Added `jaydebeapi` and `pyodbc` libraries for database connectivity

- **Configuration**
	- Added new environment variables for Denodo API configuration
- Updated configuration management to support Denodo connection
parameters

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
vishwamartur authored Dec 29, 2024
2 parents 50985c2 + c86edc7 commit 4850195
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 5 deletions.
8 changes: 8 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ NEWS_API_ENDPOINT=your_news_api_endpoint
DEPLOYMENT_ENV=production
DEPLOYMENT_SERVER=your_deployment_server
DEPLOYMENT_PATH=/path/to/deployment

# Denodo API
DENODO_HOST=your_denodo_host
DENODO_PORT=your_denodo_port
DENODO_DATABASE=your_denodo_database
DENODO_USERNAME=your_denodo_username
DENODO_PASSWORD=your_denodo_password
DENODO_JDBC_DRIVER=your_denodo_jdbc_driver
23 changes: 23 additions & 0 deletions config/api_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def __init__(self):
self.mutual_funds_api_key = os.getenv("MUTUAL_FUNDS_API_KEY")
self.mutual_funds_api_endpoint = os.getenv("MUTUAL_FUNDS_API_ENDPOINT")

# Denodo API configurations
self.denodo_host = os.getenv("DENODO_HOST")
self.denodo_port = os.getenv("DENODO_PORT")
self.denodo_database = os.getenv("DENODO_DATABASE")
self.denodo_username = os.getenv("DENODO_USERNAME")
self.denodo_password = os.getenv("DENODO_PASSWORD")
self.denodo_jdbc_driver = os.getenv("DENODO_JDBC_DRIVER")

# Additional API configurations can be added here
# Example for other market data or sentiment analysis APIs:
# self.other_api_key = os.getenv("OTHER_API_KEY")
Expand Down Expand Up @@ -84,6 +92,15 @@ def mutual_funds_headers(self):
"Content-Type": "application/json",
}

def denodo_headers(self):
"""
Returns headers required for making requests to the Denodo API.
"""
return {
"Authorization": f"Bearer {self.denodo_username}:{self.denodo_password}",
"Content-Type": "application/json",
}

def get_dhan_endpoint(self, path):
"""
Constructs and returns a full Dhan API endpoint for a given path.
Expand Down Expand Up @@ -113,3 +130,9 @@ def get_mutual_funds_endpoint(self):
Returns the Mutual Funds API endpoint.
"""
return self.mutual_funds_api_endpoint

def get_denodo_endpoint(self):
"""
Returns the Denodo API endpoint.
"""
return f"jdbc:denodo://{self.denodo_host}:{self.denodo_port}/{self.denodo_database}"
6 changes: 6 additions & 0 deletions config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def load_config():
"chatgpt_api_key": os.getenv("CHATGPT_API_KEY"),
"weather_api_key": os.getenv("WEATHER_API_KEY"),
"weather_api_endpoint": os.getenv("WEATHER_API_ENDPOINT"),
"denodo_host": os.getenv("DENODO_HOST"),
"denodo_port": os.getenv("DENODO_PORT"),
"denodo_database": os.getenv("DENODO_DATABASE"),
"denodo_username": os.getenv("DENODO_USERNAME"),
"denodo_password": os.getenv("DENODO_PASSWORD"),
"denodo_jdbc_driver": os.getenv("DENODO_JDBC_DRIVER"),
},
"strategy": {
"futures": {
Expand Down
66 changes: 66 additions & 0 deletions denodo_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import jaydebeapi
import logging
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

class DenodoIntegration:
def __init__(self):
self.connection = None
self.cursor = None
self.connect()

def connect(self):
try:
denodo_host = os.getenv("DENODO_HOST")
denodo_port = os.getenv("DENODO_PORT")
denodo_database = os.getenv("DENODO_DATABASE")
denodo_username = os.getenv("DENODO_USERNAME")
denodo_password = os.getenv("DENODO_PASSWORD")
denodo_jdbc_driver = os.getenv("DENODO_JDBC_DRIVER")

url = f"jdbc:denodo://{denodo_host}:{denodo_port}/{denodo_database}"
self.connection = jaydebeapi.connect(
"com.denodo.vdp.jdbc.Driver",
url,
[denodo_username, denodo_password],
denodo_jdbc_driver
)
self.cursor = self.connection.cursor()
logger.info("Connected to Denodo successfully")
except Exception as e:
logger.error(f"Failed to connect to Denodo: {str(e)}")
raise

def execute_query(self, query, params=None):
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
result = self.cursor.fetchall()
return result
except Exception as e:
logger.error(f"Failed to execute query: {str(e)}")
raise

def close(self):
try:
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
logger.info("Closed Denodo connection")
except Exception as e:
logger.error(f"Failed to close Denodo connection: {str(e)}")
raise
26 changes: 21 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from data.weather_data_fetcher import WeatherDataFetcher
from data.news_data_fetcher import NewsDataFetcher
from data.mutual_funds_tracker import MutualFundsTracker
from denodo_integration import DenodoIntegration

# Load environment variables and validate required keys
load_dotenv()
Expand All @@ -34,7 +35,13 @@
"WEATHER_API_KEY",
"WEATHER_API_ENDPOINT",
"NEWS_API_KEY",
"NEWS_API_ENDPOINT"
"NEWS_API_ENDPOINT",
"DENODO_HOST",
"DENODO_PORT",
"DENODO_DATABASE",
"DENODO_USERNAME",
"DENODO_PASSWORD",
"DENODO_JDBC_DRIVER"
]
for var in required_env_vars:
if not os.getenv(var):
Expand All @@ -61,7 +68,8 @@ async def process_market_data(
investment_banking_tracker: InvestmentBankingTracker,
weather_fetcher: WeatherDataFetcher,
news_fetcher: NewsDataFetcher,
mutual_funds_tracker: MutualFundsTracker
mutual_funds_tracker: MutualFundsTracker,
denodo_integration: DenodoIntegration
) -> None:
"""
Process incoming market data asynchronously and execute trading strategies
Expand All @@ -78,9 +86,10 @@ async def process_market_data(
weather_task = asyncio.create_task(weather_fetcher.fetch_and_process_weather_data("New York"))
news_task = asyncio.create_task(news_fetcher.fetch_news_data("market"))
mutual_funds_task = asyncio.create_task(mutual_funds_tracker.process_mutual_funds_trades(data))
denodo_query_task = asyncio.create_task(denodo_integration.execute_query("SELECT * FROM your_table"))

futures_signal, options_signal, sentiment, _, weather_data, news_data, _ = await asyncio.gather(
futures_task, options_task, sentiment_task, investment_banking_task, weather_task, news_task, mutual_funds_task
futures_signal, options_signal, sentiment, _, weather_data, news_data, _, denodo_data = await asyncio.gather(
futures_task, options_task, sentiment_task, investment_banking_task, weather_task, news_task, mutual_funds_task, denodo_query_task
)

# Handle futures signals
Expand Down Expand Up @@ -110,6 +119,11 @@ async def process_market_data(
logger.info(f"News Data: {news_data}")
await notifier.send_async(f"News update: {news_data}")

# Process Denodo data
if denodo_data:
logger.info(f"Denodo Data: {denodo_data}")
await notifier.send_async(f"Denodo data update: {denodo_data}")

# Calculate profit and loss
executed_price = futures_signal.get("price") if futures_signal else options_signal.get("price")
current_price = processed_data.get("price")
Expand Down Expand Up @@ -158,6 +172,7 @@ async def main():
api_endpoint=config["api"]["news_api_endpoint"]
)
mutual_funds_tracker = MutualFundsTracker(config["investment_banking"])
denodo_integration = DenodoIntegration()

# Run backtesting with performance metrics
backtester = Backtester()
Expand All @@ -181,7 +196,8 @@ async def on_message(data: Dict) -> None:
investment_banking_tracker,
weather_fetcher,
news_fetcher,
mutual_funds_tracker
mutual_funds_tracker,
denodo_integration
)

# Start WebSocket connection
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ mypy>=1.4.1
black>=23.3.0
isort>=5.12.0
flake8>=6.0.0
jaydebeapi>=1.2.3
pyodbc>=4.0.32

0 comments on commit 4850195

Please sign in to comment.