Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #17] avoid -352 by use new header. And add series features #18

Merged
merged 6 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,38 @@ bilifm season $uid $sid [OPTIONS]
- uid, sid 的获取:
打开视频合集网页, 从 URL 中获取

https://space.bilibili.com/23263470/channel/collectiondetail?sid=1855309
https://space.bilibili.com/23263470/channel/collectiondetail?sid=1855309

例如上面链接, uid 为 23263470, sid 为 1855309 (目前 uid 可以随意填写)

```bash
bilifm season 23263470 1855309
```

例如上面链接, uid 为 23263470, sid 为 1855309 (目前 uid 可以随意填写)
- Options:
- -o, --directory 选择音频保存地址

### series 模式

下载视频列表

```bash
bilifm series $uid $sid [OPTIONS]
```

- uid, sid 的获取:
打开用户空间中的合集和列表, 找到列表点击更多, 然后从URL中获取

https://space.bilibili.com/488978908/channel/seriesdetail?sid=888434

例如上面链接, uid 为 488978908, sid 为 888434. 使用下面命令

```bash
bilifm series 488978908 888434
```

- Options:
- -o, --directory 选择音频保存地址

## Features

Expand Down
32 changes: 27 additions & 5 deletions src/bilifm/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .audio import Audio
from .fav import Fav
from .season import Season
from .series import Series
from .user import User
from .util import Directory, Path

Expand Down Expand Up @@ -50,16 +51,37 @@ def fav(
@app.command()
def season(uid: str, sid: str, directory: Directory = None):
sea = Season(uid, sid)
ret = sea.get_videos()
if not ret:
audio_generator = sea.get_videos()
if not audio_generator:
typer.Exit(1)
return

if not os.path.isdir(sea.name):
os.makedirs(sea.name)
os.chdir(sea.name)

for id in sea.videos:
audio = Audio(id)
audio.download()
for audios in audio_generator:
for id in audios:
audio = Audio(id)
audio.download()
typer.echo("Download complete")


@app.command()
def series(uid: str, sid: str, directory: Directory = None):
"""Download bilibili video series
jingfelix marked this conversation as resolved.
Show resolved Hide resolved

The api of series lacks the series name, executing
this command will not create a folder for the series
"""
ser = Series(uid, sid)
audio_generator = ser.get_videos()
if not audio_generator:
typer.Exit(1)
return

for audios in audio_generator:
for id in audios:
audio = Audio(id)
audio.download()
typer.echo("Download complete")
71 changes: 47 additions & 24 deletions src/bilifm/season.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import typer

from .util import request
from .util import Retry, request

headers: dict[str, str] = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com",
}


class Season:
# api refer https://github.com/SocialSisterYi/bilibili-API-collect/blob/f9ee5c3b99335af6bef0d9d902101c565b3bea00/docs/video/collection.md
season_url: str = (
"https://api.bilibili.com/x/polymer/web-space/seasons_archives_list"
)
retry = 3

def __init__(self, uid: str, sid: str, page_size=30) -> None:
self.uid = uid
Expand All @@ -26,33 +32,50 @@ def get_videos(self):
"page_size": self.page_size,
}

res = request(
method="get", url=self.season_url, params=params, wbi=True, dm=True
).json()

code = res.get("code", -404)
if code != 0:
# uid 错误好像无影响
if code == "-404":
typer.echo(f"Error: uid {self.uid} or sid {self.season_id} error.")
else:
type.echo("Error: unknown error")
typer.echo(f"code: {res['code']}")
if res.get("message", None):
typer.echo(f"msg: {res['message']}")
@Retry(self.__response_succeed, self.__handle_error_response)
def wrapped_request():
"""wrap request with retry"""
return request(
method="get", url=self.season_url, params=params, headers=headers
).json()

res = wrapped_request()
if res is None:
return False

self.total = res["data"]["meta"]["total"]
self.name = res["data"]["meta"]["name"]

max_pn = self.total // 50
for i in range(1, max_pn + 2):
params["page_num"] = i
def bvid_generator():
max_pn = self.total // self.page_size
for i in range(1, max_pn + 2):
params["page_num"] = i
res = wrapped_request()
if res:
bvids = [d["bvid"] for d in res["data"]["archives"]]
# self.videos.extend(bvids)
yield bvids
else:
typer.echo(
f"skip audios from {(i-1)* self.page_size} to {i * self.page_size}"
)

res = request(
method="get", url=self.season_url, params=params, wbi=True, dm=True
).json()
bvids = [d["bvid"] for d in res["data"]["archives"]]
self.videos.extend(bvids)
return bvid_generator()

def __handle_error_response(self, response):
code = response.get("code", -404)
if code == -404:
typer.echo(f"Error: uid {self.uid} or sid {self.season_id} error.")
elif code == -352:
typer.echo(
"Error: Authentication problem or too many requests, please try again later."
)
else:
typer.echo("Error: Unknown problem.")

typer.echo(f"code: {response['code']}")
if response.get("message", None):
typer.echo(f"msg: {response['message']}")

return True
def __response_succeed(self, response):
return response.get("code", -404) == 0
70 changes: 70 additions & 0 deletions src/bilifm/series.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""download bilibili video series, 视频列表"""

import typer

from .util import Retry, request


class Series:
series_url: str = "https://api.bilibili.com/x/series/archives"
retry: int = 3

def __init__(self, uid: str, series_id: str, page_size=30) -> None:
self.uid = uid
self.series_id = series_id
self.page_size = page_size
self.videos = []
self.total = 0

def get_videos(self):
"""return a generator that contain page_size videos"""
params = {
"mid": self.uid,
"series_id": self.series_id,
"pn": 1,
"ps": self.page_size,
"current_id": self.uid,
}

@Retry(self.__response_succeed, self.__handle_error_response)
def wrapped_request():
"""wrap request with retry"""
return request(method="get", url=self.series_url, params=params).json()

res = wrapped_request()
if res is None:
return 0

self.total = res["data"]["page"]["total"]

def bvid_generator():
for i in range(1, self.total // self.page_size + 2):
params["pn"] = i
res = wrapped_request()
if res:
bvids = [ar["bvid"] for ar in res["data"]["archives"]]
# self.videos.extend(bvids)
yield bvids
else:
typer.echo(
f"skip audios from {(i-1)* self.page_size} to {i * self.page_size}"
)

return bvid_generator()

def __handle_error_response(self, response):
try:
archives = response["data"]["archives"]
except KeyError:
archives = 0 # something null not none
if archives is None:
typer.echo(f"Error: uid {self.uid} or sid {self.series_id} error.")
else:
typer.echo("Error: Unknown problem.")
typer.echo(f"resp: {response}")

def __response_succeed(self, response) -> bool:
try:
return response["data"]["archives"] is not None
except KeyError:
return False
22 changes: 22 additions & 0 deletions src/bilifm/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import urllib.parse
from functools import reduce
from hashlib import md5
from typing import Callable

import requests
import typer
Expand Down Expand Up @@ -192,3 +193,24 @@ def check_path(path: str):

Directory = Annotated[str, typer.Option("-o", "--directory", callback=change_directory)]
Path = typer.Argument(callback=check_path)


class Retry:
"""Retry decorator"""

def __init__(self, response_succeed, handle_error_response, total=3) -> None:
self.total = total
self.__response_succeed = response_succeed
self.__handle_error_response = handle_error_response
pass

def __call__(self, request_func: Callable) -> Callable:
def wrapped_request(*args, **kwargs):
for _ in range(self.total):
res = request_func(*args, **kwargs)
if self.__response_succeed(res):
return res
self.__handle_error_response(res)
return None

return wrapped_request
Loading