Skip to content
This repository has been archived by the owner on Aug 1, 2021. It is now read-only.

Commit

Permalink
Voice Send Support (#17)
Browse files Browse the repository at this point in the history
* First pass at voice sending

* more voice

* Refactor playables a bit, general fixes n stuff

* Cleanup

* Voice encryption, dep version bump, etc fixes

* Remove debugging, don't open a pipe for stderr

* Refactor playables

This is still a very lose concept, need to think about what the actual
differences between encoders and playables are. Also some rough edges in
general with the frame/sample calculations.

However, this still feels miles ahead of the previous iteration.

* Properly reset state when resuming from a pause

* rework playables/encoding/etc a bit

* Add a proxy, allow for more pipin'

* Cleanup, etc

* Fix resuming from a pause lerping music timestamp

* Fix some incorrect bounds checks, add MemoryBufferedPlayable
  • Loading branch information
b1naryth1ef authored Apr 11, 2017
1 parent 642542d commit 03dab6d
Show file tree
Hide file tree
Showing 10 changed files with 775 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ dist/
disco*.egg-info/
docs/_build
storage.db
*.dca
6 changes: 3 additions & 3 deletions disco/bot/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ def on_message_create(self, event):
if event.message.author.id == self.client.state.me.id:
return

if self.config.commands_allow_edit:
self.last_message_cache[event.message.channel_id] = (event.message, False)
result = self.handle_message(event.message)

self.handle_message(event.message)
if self.config.commands_allow_edit:
self.last_message_cache[event.message.channel_id] = (event.message, result)

def on_message_update(self, event):
if self.config.commands_allow_edit:
Expand Down
2 changes: 2 additions & 0 deletions disco/gateway/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def create(cls, obj, client):
"""
Create this GatewayEvent class from data and the client.
"""
cls.raw_data = obj

# If this event is wrapping a model, pull its fields
if hasattr(cls, '_wraps_model'):
alias, model = cls._wraps_model
Expand Down
3 changes: 3 additions & 0 deletions disco/voice/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from disco.voice.client import *
from disco.voice.player import *
from disco.voice.playable import *
135 changes: 92 additions & 43 deletions disco/voice/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import struct
import time

import nacl.secret

from holster.enum import Enum
from holster.emitter import Emitter

Expand All @@ -22,11 +24,6 @@
VOICE_CONNECTED=6,
)

# TODO:
# - player implementation
# - encryption
# - cleanup


class VoiceException(Exception):
def __init__(self, msg, client):
Expand All @@ -38,12 +35,40 @@ class UDPVoiceClient(LoggingClass):
def __init__(self, vc):
super(UDPVoiceClient, self).__init__()
self.vc = vc

# The underlying UDP socket
self.conn = None

# Connection information
self.ip = None
self.port = None

self.run_task = None
self.connected = False

def send_frame(self, frame, sequence=None, timestamp=None):
# Convert the frame to a bytearray
frame = bytearray(frame)

# First, pack the header (TODO: reuse bytearray?)
header = bytearray(24)
header[0] = 0x80
header[1] = 0x78
struct.pack_into('>H', header, 2, sequence or self.vc.sequence)
struct.pack_into('>I', header, 4, timestamp or self.vc.timestamp)
struct.pack_into('>i', header, 8, self.vc.ssrc)

# Now encrypt the payload with the nonce as a header
raw = self.vc.secret_box.encrypt(bytes(frame), bytes(header)).ciphertext

# Send the header (sans nonce padding) plus the payload
self.send(header[:12] + raw)

# Increment our sequence counter
self.vc.sequence += 1
if self.vc.sequence >= 65535:
self.vc.sequence = 0

def run(self):
while True:
self.conn.recvfrom(4096)
Expand All @@ -54,26 +79,29 @@ def send(self, data):
def disconnect(self):
self.run_task.kill()

def connect(self, host, port, timeout=10):
def connect(self, host, port, timeout=10, addrinfo=None):
self.ip = socket.gethostbyname(host)
self.port = port

self.conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)
if addrinfo:
ip, port = addrinfo
else:
# Send discovery packet
packet = bytearray(70)
struct.pack_into('>I', packet, 0, self.vc.ssrc)
self.send(packet)

# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)
# Wait for a response
try:
data, addr = gevent.spawn(lambda: self.conn.recvfrom(70)).get(timeout=timeout)
except gevent.Timeout:
return (None, None)

# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]
# Read IP and port
ip = str(data[4:]).split('\x00', 1)[0]
port = struct.unpack('<H', data[-2:])[0]

# Spawn read thread so we don't max buffers
self.connected = True
Expand All @@ -86,30 +114,45 @@ class VoiceClient(LoggingClass):
def __init__(self, channel, encoder=None):
super(VoiceClient, self).__init__()

assert channel.is_voice, 'Cannot spawn a VoiceClient for a non-voice channel'
if not channel.is_voice:
raise ValueError('Cannot spawn a VoiceClient for a non-voice channel')

self.channel = channel
self.client = self.channel.client
self.encoder = encoder or JSONEncoder

# Bind to some WS packets
self.packets = Emitter(gevent.spawn)
self.packets.on(VoiceOPCode.READY, self.on_voice_ready)
self.packets.on(VoiceOPCode.SESSION_DESCRIPTION, self.on_voice_sdp)

# State
# State + state change emitter
self.state = VoiceState.DISCONNECTED
self.connected = gevent.event.Event()
self.state_emitter = Emitter(gevent.spawn)

# Connection metadata
self.token = None
self.endpoint = None
self.ssrc = None
self.port = None
self.secret_box = None
self.udp = None

# Voice data state
self.sequence = 0
self.timestamp = 0

self.update_listener = None

# Websocket connection
self.ws = None
self.heartbeat_task = None

def set_state(self, state):
prev_state = self.state
self.state = state
self.state_emitter.emit(state, prev_state)

def heartbeat(self, interval):
while True:
self.send(VoiceOPCode.HEARTBEAT, time.time() * 1000)
Expand All @@ -128,7 +171,7 @@ def send(self, op, data):
}), self.encoder.OPCODE)

def on_voice_ready(self, data):
self.state = VoiceState.CONNECTING
self.set_state(VoiceState.CONNECTING)
self.ssrc = data['ssrc']
self.port = data['port']

Expand All @@ -146,18 +189,20 @@ def on_voice_ready(self, data):
'data': {
'port': port,
'address': ip,
'mode': 'plain'
'mode': 'xsalsa20_poly1305'
}
})

def on_voice_sdp(self, _):
def on_voice_sdp(self, sdp):
# Create a secret box for encryption/decryption
self.secret_box = nacl.secret.SecretBox(bytes(bytearray(sdp['secret_key'])))

# Toggle speaking state so clients learn of our SSRC
self.set_speaking(True)
self.set_speaking(False)
gevent.sleep(0.25)

self.state = VoiceState.CONNECTED
self.connected.set()
self.set_state(VoiceState.CONNECTED)

def on_voice_server_update(self, data):
if self.channel.guild_id != data.guild_id or not data.token:
Expand All @@ -167,43 +212,44 @@ def on_voice_server_update(self, data):
return

self.token = data.token
self.state = VoiceState.AUTHENTICATING
self.set_state(VoiceState.AUTHENTICATING)

self.endpoint = data.endpoint.split(':', 1)[0]
self.ws = Websocket(
'wss://' + self.endpoint,
on_message=self.on_message,
on_error=self.on_error,
on_open=self.on_open,
on_close=self.on_close,
)
self.ws = Websocket('wss://' + self.endpoint)
self.ws.emitter.on('on_open', self.on_open)
self.ws.emitter.on('on_error', self.on_error)
self.ws.emitter.on('on_close', self.on_close)
self.ws.emitter.on('on_message', self.on_message)
self.ws.run_forever()

def on_message(self, _, msg):
def on_message(self, msg):
try:
data = self.encoder.decode(msg)
self.packets.emit(VoiceOPCode[data['op']], data['d'])
except:
self.log.exception('Failed to parse voice gateway message: ')

def on_error(self, _, err):
# TODO
def on_error(self, err):
# TODO: raise an exception here
self.log.warning('Voice websocket error: {}'.format(err))

def on_open(self, _):
def on_open(self):
self.send(VoiceOPCode.IDENTIFY, {
'server_id': self.channel.guild_id,
'user_id': self.client.state.me.id,
'session_id': self.client.gw.session_id,
'token': self.token
})

def on_close(self, _, code, error):
# TODO
def on_close(self, code, error):
self.log.warning('Voice websocket disconnected (%s, %s)', code, error)

if self.state == VoiceState.CONNECTED:
self.log.info('Attempting voice reconnection')
self.connect()

def connect(self, timeout=5, mute=False, deaf=False):
self.state = VoiceState.AWAITING_ENDPOINT
self.set_state(VoiceState.AWAITING_ENDPOINT)

self.update_listener = self.client.events.on('VoiceServerUpdate', self.on_voice_server_update)

Expand All @@ -214,11 +260,11 @@ def connect(self, timeout=5, mute=False, deaf=False):
'channel_id': int(self.channel.id),
})

if not self.connected.wait(timeout) or self.state != VoiceState.CONNECTED:
if not self.state_emitter.once(VoiceState.CONNECTED, timeout=timeout):
raise VoiceException('Failed to connect to voice', self)

def disconnect(self):
self.state = VoiceState.DISCONNECTED
self.set_state(VoiceState.DISCONNECTED)

if self.heartbeat_task:
self.heartbeat_task.kill()
Expand All @@ -236,3 +282,6 @@ def disconnect(self):
'guild_id': int(self.channel.guild_id),
'channel_id': None,
})

def send_frame(self, *args, **kwargs):
self.udp.send_frame(*args, **kwargs)
Loading

0 comments on commit 03dab6d

Please sign in to comment.