-
Notifications
You must be signed in to change notification settings - Fork 0
/
JAudioSync.py
179 lines (156 loc) · 7.17 KB
/
JAudioSync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import sys
import re
import os
import argparse
from functools import partial
from pygame import mixer
from urllib.parse import unquote
from datetime import timedelta, datetime
import time
from pydub import AudioSegment
from math import ceil
from apscheduler.schedulers.blocking import BlockingScheduler
import pandas as pd
def get_next_time():
second = datetime.now().second
minute = datetime.now().minute
if second < 30:
next_time = datetime.now() + timedelta(minutes=1)
next_time = next_time.replace(second=0, microsecond=0).strftime('%H:%M:%S')
if second > 30:
next_time = datetime.now() + timedelta(minutes=1)
next_time = next_time.replace(second=30, microsecond=0).strftime('%H:%M:%S')
# debug
#next_time = datetime.now() + timedelta(seconds=10)
#next_time = next_time.replace(microsecond=0).strftime('%H:%M:%S')
return next_time
# Validate hh:mm:ss time format for start_time input
def validate_time_string(time_str):
# Regular expression to validate the format hh:mm:ss
pattern = re.compile(r'^([01]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$')
if not pattern.match(time_str):
raise argparse.ArgumentTypeError(f"Invalid time format: {time_str} , use valid hh:mm:ss")
return time_str
def read_resume_position(pl_len):
try:
with open("./.resume", "r") as file:
resume_pos = int(file.readline().strip())
if not (0 <= resume_pos <= pl_len):
return 0
return resume_pos
except (FileNotFoundError, ValueError):
print(f".resume not valid, 0 - {pl_len}")
return 0
# Try to convert pl_pos string to int and check if valid playlist position
def validate_pl_pos(pl_len, pos):
if pos.lower() == "res":
resume_pos = read_resume_position(pl_len)
print(f"Resuming with track {resume_pos}.")
return resume_pos
try:
pos = int(pos)
if not (0 <= pos < pl_len):
raise ValueError(f"Playlist position out of range. Use 0 - {pl_len - 1}")
return pos
except ValueError:
raise argparse.ArgumentTypeError(f"Invalid playlist position: {pos}.")
# Convert time string to a datetime object with date of today
def string_to_datetime(time_string):
format_str = "%H:%M:%S"
today_date = datetime.now().date()
datetime_str = f"{today_date} {time_string}"
return datetime.strptime(datetime_str, f"%Y-%m-%d {format_str}")
# Get the rounded up playback length of a music file as timedelta (seconds)
def get_music_length(file_path):
audio = AudioSegment.from_mp3(file_path)
length_in_seconds = len(audio) / 1000 # Convert milliseconds to seconds
rounded_length = ceil(length_in_seconds)
return timedelta(seconds=rounded_length)
# Read .m3u8 playlist file and extract music file paths to "playlist"
def load_playlist(playlist_file):
try:
with open(playlist_file, mode='r') as file:
lines = file.readlines()
# Filter out comments and empty lines, clean, add ./Music
path = [os.path.join("./Music", unquote(line.strip())) for line in lines if line.strip() and not line.startswith('#')]
if path is None:
raise ValueError(f"Playlist {playlist_file} is empty.")
return pd.DataFrame({'Path': path})
except FileNotFoundError:
print(f'The playlist file {file_path} is not present.')
except Exception as e:
print(f'An error occurred: {e}')
# Load a music file with pygame.mixer.music
def load_music(path):
global pl_pos
pl_pos += 1
mixer.music.load(path)
print(f"Playing: {path}")
#mixer.music.set_volume(0.8)
# Start playback of music from RAM memory
def play_music():
mixer.music.play()
print(f"At: {datetime.now()}")
while mixer.get_busy() == True:
continue
def pl_fill_times(pl, start_time, pl_start, pl_len):
for i in range(pl_start, pl_len):
if i == pl_start:
pl.at[i, 'LoadTime'] = start_time - timedelta(seconds=1)
pl.at[i, 'StartTime'] = start_time
elif pl_start < i:
pl.at[i, 'LoadTime'] = pl.at[i-1, 'StartTime'] + get_music_length(pl.at[i-1, 'Path'])
pl.at[i, 'StartTime'] = pl.at[i, 'LoadTime'] + timedelta(seconds=1)
return pl
def end():
print("Playlist finished playing.")
mixer.quit()
scheduler.shutdown(wait=False)
if __name__ == "__main__":
playlist_file = "./Music/Playlist.m3u8" # Location of .m3u8 playlist file
pl = load_playlist(playlist_file)
pl_len = pl.shape[0]
timezone = time.tzname[time.localtime().tm_isdst]
next_time = get_next_time()
# Create ArgumentParser object
parser = argparse.ArgumentParser(description="""
Play a (m3u8) playlist of music in perfect sync on multiple devices.
Syncing NTP time over wireless network first and then start playback at exact choosen time (using apscheduler),
which then doesn't need network anymore because it depends on system clock.
Using pygame.mixer.music to play.
usage: JAudioSync.py [-h] [-t 18:55:00] [-p 0 | res]
"""
)
# Add optional arguments
parser.add_argument('-t', type=validate_time_string, help='Time the playback should be scheduled today in the format hh:mm:ss, default: next full minute', nargs='?', default=next_time)
parser.add_argument('-p', type=partial(validate_pl_pos, pl_len), help='Start track number in playlist [0 - number of tracks], or "resume" to resume from last played track, default: starting from 0', nargs='?', const=0, default=0)
args = parser.parse_args() # Parse the command-line arguments
start_time_str = args.t # Access parsed start time argument
start_time = string_to_datetime(start_time_str) # Convert time string to a datetime object
global pl_pos
pl_pos = args.p # Access parsed playlist position, starting with 0
pl_start = int(pl_pos)
print(f"Starting with Track: {pl_start}")
# fill playlist DataFrame with load_times and start_times
pl = pl_fill_times(pl, start_time, pl_start, pl_len)
print(f"Playlist:")
print(pl.iloc[pl_start:])
mixer.init()
scheduler = BlockingScheduler(timezone=timezone) # Create a scheduler
for i in range(pl_start, pl_len):
path = pl.at[i, "Path"]
load_time = pl.at[i, 'LoadTime']
start_time = pl.at[i, 'StartTime']
scheduler.add_job(load_music, 'date', run_date=load_time, args=[path])
scheduler.add_job(play_music, 'date', run_date=start_time)
# Scheduling shutdown after last played track
end_time = pl.at[pl_len-1, 'StartTime'] + get_music_length(pl.at[pl_len-1, 'Path'])
scheduler.add_job(end, 'date', run_date=end_time)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
with open("./.resume", 'w') as file: # Write current pl_pos to .resume file
file.write(str(pl_pos))
print("Script interrupted by user.")
mixer.quit()
scheduler.shutdown(wait=False)