Skip to content

Commit

Permalink
Merge pull request #26 from Netzvamp/main
Browse files Browse the repository at this point in the history
Event body handling
  • Loading branch information
Otoru authored Nov 21, 2024
2 parents ff9b968 + 0e0fc90 commit ca60847
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 46 deletions.
84 changes: 71 additions & 13 deletions genesis/logger.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,77 @@
import logging

import os
from rich.logging import RichHandler

logger = logging.getLogger(__name__)
# Define TRACE level
TRACE_LEVEL_NUM = 5
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")


def trace(self, message, *args, **kws):
if self.isEnabledFor(TRACE_LEVEL_NUM):
self._log(TRACE_LEVEL_NUM, message, args, **kws)


logging.Logger.trace = trace


def get_log_level() -> int:
"""
Get log level from environment variable or return default (INFO).
Valid values for LOG_LEVEL are: DEBUG, INFO, WARNING, ERROR, CRITICAL, TRACE
Returns logging level constant.
"""
level_map = {
'TRACE': TRACE_LEVEL_NUM,
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}

env_level = os.getenv('GENESIS_LOG_LEVEL', 'INFO').upper()
return level_map.get(env_level, logging.INFO)


def setup_logger(name: str = __name__) -> logging.Logger:
"""Configure a logger with rich handler and conventional formatting.
Args:
name: The name for the logger instance
Returns:
A configured logger instance
"""
logger = logging.getLogger(name)

# Prevent duplicate handlers
if logger.handlers:
return logger

# Configure Rich Handler according to conventions
handler = RichHandler(
rich_tracebacks=True,
tracebacks_show_locals=True,
markup=True,
show_path=False,
show_time=True,
omit_repeated_times=False
)

formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)

# Get log level from environment variable
log_level = get_log_level()
logger.setLevel(log_level)
logger.addHandler(handler)
logger.propagate = False

handler = RichHandler(
show_time=False,
rich_tracebacks=True,
tracebacks_show_locals=True,
markup=True,
show_path=False,
)
logger.debug(f"Logger initialized with level: {logging.getLevelName(log_level)}")
return logger

handler.setFormatter(logging.Formatter("%(message)s"))

logger.setLevel(logging.INFO)
logger.addHandler(handler)
logger.propagate = False
# Create default logger
logger = setup_logger(__name__)
6 changes: 3 additions & 3 deletions genesis/outbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def _awaitable_complete_command(self, application: str) -> Event:
semaphore = Event()

async def handler(session: Session, event: ESLEvent):
logger.debug(f"Recived channel execute complete event: {event}")
logger.debug(f"Received channel execute complete event: {event}")

if "variable_current_application" in event:
if event["variable_current_application"] == application:
Expand Down Expand Up @@ -144,7 +144,7 @@ async def say(
await command_is_complete.wait()

event = await self.fifo.get()
logger.debug(f"Execute complete event recived: {event}")
logger.debug(f"Execute complete event received: {event}")

return event

Expand Down Expand Up @@ -197,7 +197,7 @@ async def play_and_get_digits(
await command_is_complete.wait()

event = await self.fifo.get()
logger.debug(f"Execute complete event recived: {event}")
logger.debug(f"Execute complete event received: {event}")

return event

Expand Down
107 changes: 81 additions & 26 deletions genesis/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
from typing import List, Awaitable, Dict, NoReturn, Optional, Callable, Coroutine, Any, Union
from inspect import isawaitable
from abc import ABC
import logging

from genesis.exceptions import UnconnectedError, ConnectionError
from genesis.parser import parse_headers, ESLEvent
from genesis.logger import logger
from genesis.logger import logger, TRACE_LEVEL_NUM


class Protocol(ABC):
Expand Down Expand Up @@ -74,14 +75,14 @@ async def handler(self) -> None:
try:
content = await self.reader.readline()
buffer += content.decode("utf-8")

except:
except Exception as e:
logger.error(f"Error reading from stream. {str(e)}")
self.is_connected = False
break

if buffer[-2:] == "\n\n" or buffer[-4:] == "\r\n\r\n":
request = buffer
logger.debug(f"<<< Complete message received: {repr(request)}")
logger.trace(f"<<< Complete message received: {repr(request)}")
break

if not request or not self.is_connected:
Expand All @@ -90,43 +91,97 @@ async def handler(self) -> None:
event = parse_headers(request)

if "Content-Length" in event:
length = int(event["Content-Length"])
logger.debug(f"Read more {length} bytes.")
# Get the total length from the first Content-Length header
length = int(event["Content-Length"].split('\n')[0])
logger.trace(f"Total content length: {length} bytes")

# Read the complete data
data = await self.reader.readexactly(length)
logger.debug(f"Received data: {data}")
result = data.decode("utf-8")
logger.trace(f"Received complete data: {data}")
complete_content = data.decode("utf-8")

if "Content-Type" in event:
content = event["Content-Type"]
logger.debug(f"Check content type of event: {event}")
logger.trace(f"Check content type of event: {event}")

if content not in ["api/response", "text/rude-rejection", "log/data"]:
headers = parse_headers(result)
logger.debug(f"Received headers: {headers}")

if "Content-Length" in headers:
length = int(headers["Content-Length"])
logger.debug(f"Read more {length} bytes.")
data = await self.reader.readexactly(length)
result = data.decode("utf-8")
# Try to split headers and body
if '\n\n' in complete_content:
headers_part, body = complete_content.split('\n\n', 1)

# Here we check for multiple events in one message (can happen if event-lock is set)
event_parts = []
if "event-lock: true" in headers_part.lower():
# Split the string on "Event-Name: "
parts = headers_part.split("\nEvent-Name: ")
if len(parts) > 1:
# reconstruct the event parts
event_parts = [parts[0]] # First part don't need to be modified
for part in parts[1:]:
event_parts.append(f"Event-Name: {part}")
logger.debug(f"Split locked event into {len(event_parts)} separate events")
else:
event_parts = [headers_part]

# Process each event part
for idx, event_str in enumerate(event_parts):
if idx == 0:
# First event is the original event
additional_headers = parse_headers(event_str)
event.update(additional_headers)
event.body = body
await self.events.put(event)
else:
# More events are new events
new_event = parse_headers(event_str)
# Copy some headers from the original event
for key in ['Content-Length', 'Content-Type']:
if key in event:
new_event[key] = event[key]
new_event.body = body
await self.events.put(new_event)
continue # Skip the final event.put

logger.debug(f"Received body: {result}")
event.body = result

event.update(headers)
else:
# If no clear header/body separation, treat everything as body
event.body = complete_content
else:
event.body = result

event.body = complete_content
else:
event.body = result
event.body = complete_content

await self.events.put(event)

async def consume(self) -> None:
"""Arm all event processors."""
while self.is_connected:
event = await self.events.get()
logger.debug(f"Received an event: '{event}'.")

try:
if logger.isEnabledFor(TRACE_LEVEL_NUM):
logger.trace(f"Received an event: '{event}'.")
else:
if logger.isEnabledFor(logging.DEBUG):
if "Unique-ID" in event:
logtext = f"Received an event: '{event['Event-Name']}' for call '{event['Unique-ID']}'. "
if event["Event-Name"] == "CHANNEL_EXECUTE_COMPLETE":
logtext += f"Application: '{event['Application']}' - "
logtext += f"Response: '{event['Application-Response']}'"
logger.debug(logtext)
else:
if "Event-Name" in event:
logger.debug(f"Received an event: '{event['Event-Name']}'.")
elif "Content-Type" in event and event["Content-Type"] in ["command/reply", "auth/request"]:
if event["Content-Type"] == "command/reply":
if "Reply-Text" in event:
logger.debug(f"Received an command reply: '{event['Reply-Text']}'.")
if event["Content-Type"] == "auth/request":
if "Reply-Text" in event:
logger.debug(f"Received an authentication reply: '{event}'.")
else:
logger.debug(f"Received an event: '{event}'.")
except Exception as e:
logger.error(f"Error logging event: {str(e)} - Event: {event}")

if "Content-Type" in event and event["Content-Type"] == "auth/request":
self.authentication_event.set()
Expand Down Expand Up @@ -155,7 +210,7 @@ async def consume(self) -> None:
name = identifier

if name:
logger.debug(f"Get all handlers for '{name}'.")
logger.trace(f"Get all handlers for '{name}'.")
specific = self.handlers.get(name, [])
generic = self.handlers.get("*", list())
handlers = specific + generic
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def channel() -> Dict[str, str]:
def background_job() -> str:
event = dedent(
"""\
Content-Length: 542
Content-Length: 582
Content-Type: text/event-plain
Job-UUID: 7f4db78a-17d7-11dd-b7a0-db4edd065621
Expand Down
4 changes: 2 additions & 2 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def test_consumer_wait_method_behavior(host, port, password, monkeypatch):
assert spider.call_count == 2, message


async def test_recive_background_job_event(freeswitch, background_job):
async def test_receive_background_job_event(freeswitch, background_job):
async with freeswitch as server:
server.events.append(background_job)
server.oncommand(
Expand Down Expand Up @@ -199,7 +199,7 @@ async def handler(event):
future.cancel()


async def test_recive_mod_audio_stream_play(freeswitch, mod_audio_stream_play):
async def test_receive_mod_audio_stream_play(freeswitch, mod_audio_stream_play):
async with freeswitch as server:
server.events.append(mod_audio_stream_play)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_inbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def test_send_command_without_connection():


async def test_connect_without_freeswitch():
with pytest.raises(ConnectionRefusedError):
with pytest.raises((ConnectionRefusedError, OSError)):
async with Inbound("0.0.0.0", 8021, "ClueCon"):
await asyncio.sleep(1)

Expand Down

0 comments on commit ca60847

Please sign in to comment.