Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save give-you-favo/a4b83cbe5f2240bbedfe41b9528bf1ff to your computer and use it in GitHub Desktop.
Save give-you-favo/a4b83cbe5f2240bbedfe41b9528bf1ff to your computer and use it in GitHub Desktop.
録画をダウンロードするサンプル
import base64
import requests
import datetime
import time
import urllib
from bs4 import BeautifulSoup
import os
import subprocess
"""
説明
指定したscreen_idのユーザーの録画をダウンロードするスクリプト
from_date ~ until_date内の公開録画中の録画をダウンロードして、
./movie/{screen_id}/{movie_id}.mp4 形式で保存する。
準備
1. ffmpegのパスが通っていることが必要(実行フォルダ内でも可)
* ver4.2.2で動作確認、古いとm4sでエラー
2. ClientIDとClientSecretの設定
3. screen_id,from_date,until_dateの設定
4. 十分な空き容量
"""
# 指定
screen_id = "nabo66"
from_date = "2020-01-01"
until_date = "2020-01-10"
# 設定
ClinetID = ""
ClientSecret = ""
# 定義
LIMIT = 50
GET_MOVIE_ERQUEST_MAX = 200 # LIMIT * GET_MOVIE_ERQUEST_MAX = 10,000 movies
API_VERSION = "2.0"
BASE_URL = "https://apiv2.twitcasting.tv"
BASE64_ENCODED_STRING = base64.b64encode((ClinetID + ":" + ClientSecret).encode("utf-8")).decode("ascii")
def get_movies_by_user(screen_id, slice_id):
url = BASE_URL + f"/users/{screen_id}/movies"
params = {"slice_id" : slice_id, "limit" : LIMIT}
url += "?" + urllib.parse.urlencode(params)
headars = {'Accept': 'application/json',
'X-Api-Version': API_VERSION,
'Authorization': 'Basic ' + BASE64_ENCODED_STRING
}
response = requests.get(url, headers = headars)
movies = response.json().get("movies")
# check limit and sleep
remain = response.headers.get("X-RateLimit-Remaining")
if remain == 0:
reset_timestamp = int(response.headers.get("X-RateLimit-Reset"))
now_timestamp = datetime.datetime.now().timestamp()
wait_time_s = (reset_timestamp - now_timestamp) + 1
if wait_time_s > 0:
time.sleep(wait_time_s)
return movies
def get_movies_by_timestamp(screen_id, from_timestamp, until_timestamp):
movies = []
limit = LIMIT
slice_id = 1000000000 # 最新録画IDを取得した方がいい
finished = False
for i in range(GET_MOVIE_ERQUEST_MAX):
print('\rGet_Movies, %d' % i, end='')
# try catch
movies_chunc = get_movies_by_user(screen_id, slice_id)
for movie in movies_chunc:
created = int(movie.get("created"))
movie_id = int(movie.get("id"))
if created < until_timestamp and not finished:
movies.append(movie)
if created < from_timestamp or slice_id == movie_id: # 終了条件もうちょい考えた方がいい
finished = True
if movie_id < slice_id:
slice_id = movie_id
if finished:
break
return movies
def date2timestamp(date):
return int(datetime.datetime.strptime(date, "%Y-%m-%d").timestamp())
def get_movies_by_date(screen_id, from_date, until_date):
from_timestamp = date2timestamp(from_date)
until_timestamp = date2timestamp(until_date)
return get_movies_by_timestamp(screen_id, from_timestamp, until_timestamp)
def download_hls(m3u8url, screen_id, movie_id):
cwd = os.getcwd()
dst_dir = os.path.join(cwd, screen_id)
if not os.path.isdir(dst_dir):
os.mkdir(dst_dir)
dst_path = os.path.join(cwd, screen_id, movie_id + ".mp4")
cmd = f'ffmpeg -i {m3u8url} -movflags faststart -c copy -bsf:a aac_adtstoasc {dst_path}'
p = subprocess.run(cmd, shell=True)
def get_movie_m3u8url(link):
response = requests.get(link)
html = response.text.encode(response.encoding)
soup = BeautifulSoup(html,features="lxml")
# videoタグのdata-movie-url属性がプレイリストURL(2020/1/30現在)
video = soup.find("video",id="player")
m3u8url = video.get("data-movie-url")
return m3u8url
def download_movies(movies, screen_id):
for i, movie in enumerate(movies):
print('\rDownloading, %d / %d' % (i, len(movies)), end='')
link = movie.get("link")
movie_id = movie.get("id")
# try catch
m3u8url = get_movie_m3u8url(link)
download_hls(m3u8url, screen_id, movie_id)
def _main():
movies = get_movies_by_date(screen_id, from_date, until_date)
recorded_movies = list(filter(lambda movie: movie.get("is_recorded"), movies))
download_movies(recorded_movies, screen_id)
if __name__=="__main__":
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment