使用golang协程并行下载m3u8视频
背景
最近在追剧,网上的影视资源非常卡,看两秒卡七八秒,很多做盗版资源的网站都是接了一些乱七八糟的接口,导致观影体验极差。观察后发现是m3u8资源,索性一次性下载到本地离线看。
方案
- php+多进程
- golang+协程
方案一
将ts文件列表逐一写入redis列表,然后通过多进程来竞争这个列表数据,从而实现进程的数据共享和并行下载。这个方案只适用于linux环境。
安装依赖:
# 这是一个多进程模型,可以让你更方便地开启多进程
composer require icy8/process
代码实现:
<?php
include "./vendor/autoload.php";
$opt = getopt("", ["url:", "output:"]);
$output = "output/";
$url = $opt['url'] ?? '';
$queueKey = "m3u8_download/";
$redisHost = '192.168.1.3';
if ($opt['output'] ?? '') {
$output = $opt['output'];// 文件保存的目录
$output = trim($output, '/');
}
if (!$url) {
die("The parameter `url` cannot be null");
}
$response = file_get_contents($url);
if (!$response) {
die("Request failure");
}
$tsList = trim(preg_replace("!#.+\n*!", "", $response));
$redis = new \Redis();
$redis->connect($redisHost);
// 清空原有的数据
$redis->del($queueKey);
$lines = explode("\n", $tsList);
$redis->rPush($queueKey, ...$lines);
// 创建存储目录
!is_dir($output) && mkdir($output, 0777, true);
$process = new \icy8\process\Worker();
$process->total = 100; // 开n个进程同时下载
$process->run(function () use ($url, $redisHost, $output, $queueKey) {
$redis = new \Redis();
$redis->connect($redisHost);
$pid = getmypid();
// 一个进程一次只下载一个文件
while ($line = $redis->lPop($queueKey)) {
try {
$line = trim($line);
$file = $output . pathinfo($line, PATHINFO_BASENAME);
// 跳过同名文件
if (is_file($file)) {
continue;
}
$tsUrl = completeUrl($line, $url);
$data = file_get_contents($tsUrl);
if (!$data) {
throw new Exception('data error');
}
file_put_contents($file, $data);
echo "> [{$pid}] " . $tsUrl . PHP_EOL;
} catch (\Throwable $e) {
// 重新写入队列
$redis->lPush($queueKey, $line);
}
}
});
$code = '';
foreach ($lines as $line) {
$filename = pathinfo($line, PATHINFO_BASENAME);
$code .= "file '{$filename}'" . PHP_EOL;
}
file_put_contents($output . 'merge.txt', $code);
echo PHP_EOL;
echo "> Run the following command to generate an mp4 file:" . PHP_EOL;
echo "ffmpeg -f concat -i {$output}merge.txt -c copy output.mp4" . PHP_EOL;
/**
* url补全
* @param $tspath
* @param $host
* @return string
*/
function completeUrl($tspath, $uri)
{
if (preg_match("!http[s]*://!i", $tspath)) {
return $tspath;
}
$parsed_url = parse_url($uri);
$scheme = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';
$host = isset($parsed_url['host']) ? $parsed_url['host'] : '';
$port = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';
$user = isset($parsed_url['user']) ? $parsed_url['user'] : '';
$pass = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : '';
$pass = ($user || $pass) ? "$pass@" : '';
$path = isset($parsed_url['path']) ? $parsed_url['path'] : '';
if ($tspath[0] === '/') {
$path = $tspath;
} else {
$path .= $tspath;
}
return "{$scheme}{$user}{$pass}{$host}{$port}{$path}";
}
运行脚本:
php m3u8.php --url "http://***.com/2000s/hls/index.m3u8" --output download
脚本执行完后会给出合并ts文件为mp4的命令,类似:
# 如果要执行这条命令,需要你的系统安装了ffmpeg软件命令
ffmpeg -f concat -i download/merge.txt -c copy output.mp4
windows用户可以在docker下载文件,然后在windows主系统执行ffmpeg。
方案二
通过实现一个类似队列的带有互斥锁的golang结构体,然后将ts文件列表写入这个队列,最后使用golang协程来竞争这个队列数据,即可实现并行下载。
这个方案适用于大部分系统,使用了golang自带的互斥锁,不需要借助redis。
代码实现:
package main
import (
"errors"
"flag"
"fmt"
"io"
"io/fs"
"math/rand"
"net/http"
"net/url"
"os"
"path"
"regexp"
"strconv"
"strings"
"sync"
)
var (
_url = flag.String("url", "", "m3u8 url.")
_file = flag.String("file", "", "m3u8 file path.")
_host = flag.String("host", "", "host prefix.")
_co = flag.String("co", "", "goroutine total.")
_output = flag.String("output", "", "output dir.")
dataDir = "output/"
wg = &sync.WaitGroup{}
ch chan int
urlQueue *queue
)
func main() {
flag.Parse()
var (
urlInfo *url.URL
tsList string
)
coTotal, err := strconv.Atoi(*_co)
if len(*_url) <= 0 && len(*_file) <= 0 {
panic("Parameters `url` and `file` cannot be both empty")
}
if err != nil || coTotal < 1 {
coTotal = 5
}
if coTotal > 1000 {
panic("The goroutine maximum is 1000")
}
// 创建数据目录
if len(*_output) > 0 {
dataDir = *_output
}
if !PathExists(dataDir) {
os.MkdirAll(dataDir, os.ModePerm)
}
if len(*_url) > 0 {
urlInfo, _ = url.Parse(*_url)
resp, err := http.Get(*_url)
if err != nil {
panic(err)
}
body, _ := io.ReadAll(resp.Body)
tsList = string(body)
} else if PathExists(*_file) {
urlInfo, _ = url.Parse(*_host)
if urlInfo.Host == "" {
panic("file host empty")
}
_bytes, err := os.ReadFile(*_file)
if err != nil {
panic(err)
}
tsList = string(_bytes)
} else {
panic("Invalid data")
}
flagMatch, _ := regexp.MatchString("^#EXTM3U", tsList);
if !flagMatch {
panic("Invalid data")
}
// 删除注释
commentLineRegexp, err := regexp.Compile("#.+\n*")
if err == nil {
tsList = commentLineRegexp.ReplaceAllString(tsList, "")
}
tsList = strings.TrimSpace(tsList)
// 每行一个文件
lines := strings.Split(tsList, "\n")
// 队列,让所有协程竞争这个队列数据
urlQueue = &queue{
items: lines,
}
// 创建协程
for i := 1; i <= coTotal; i++ {
wg.Add(1)
go saveProc(*urlInfo)
}
// 阻塞主线程,等所有协程执行完再往下执行
wg.Wait()
fmt.Println(">", "All goroutine done")
// 生成ffmpeg文件索引文件
mergeTxt := ""
for _, line := range lines {
filename := path.Base(line)
mergeTxt += "file '" + filename + "'\n"
}
os.WriteFile(dataDir+"/merge.txt", []byte(mergeTxt), fs.ModePerm)
fmt.Println("")
fmt.Println(">", "Run the following command to generate an mp4 file:")
fmt.Println("ffmpeg -f concat -i " + strings.TrimSuffix(dataDir, "/") + "/merge.txt -c copy output.mp4")
}
// 协程函数
func saveProc(urlInfo url.URL) {
id := rand.Intn(999999)
for urlQueue.length() > 0 {
tsName := strings.TrimSpace(urlQueue.Pop()) // 让每个协程每次下载一个ts文件
if tsName == "" {
return
}
tsUrl := urlUnparse(tsName, urlInfo)
result, _ := saveTsFile(tsUrl, tsName)
fmt.Println(">", id, tsUrl, result)
if !result {
urlQueue.Push(tsName)
}
}
//fmt.Println(id, "complete")
wg.Done()
}
// url合成
func urlUnparse(filename string, urlInfo url.URL) string {
match, _ := regexp.MatchString("^http[s]*://", filename)
if match {
return filename
}
userpart := ""
if len(urlInfo.User.Username()) > 0 {
pwd, _ := urlInfo.User.Password()
userpart = urlInfo.User.Username() + "@" + pwd
}
if string(filename[0]) == "/" {
return urlInfo.Scheme + "://" + userpart + "" + urlInfo.Host + filename
}
return urlInfo.Scheme + "://" + userpart + "" + urlInfo.Host + path.Dir(urlInfo.Path) + "/" + filename
}
// 保存文件
func saveTsFile(url string, filename string) (bool, error) {
resp, err := http.Get(url)
if err != nil {
return false, err
} else if resp.StatusCode != 200 {
return false, errors.New("HTTP CODE: " + fmt.Sprintf("%d", resp.StatusCode))
}
buffer, err := io.ReadAll(resp.Body)
if err != nil || len(buffer) <= 0 {
return false, errors.New("fail to read buffer ")
}
filename = path.Base(filename)
file, err := os.Create(dataDir + "/" + filename)
if err != nil {
return false, err
}
n, err := file.Write(buffer)
if err != nil || n <= 0 {
return false, errors.New("write fail")
}
return true, nil
}
// 判断路径是否存在
func PathExists(path string) bool {
_, err := os.Stat(path)
if err == nil {
return true
} else if os.IsNotExist(err) {
return false
}
return false
}
// 队列结构体
type queue struct {
// 互斥锁
sync.Mutex
items []string
}
// 右入队列
func (this *queue) Push(items ...string) bool {
this.Lock()
defer this.Unlock()
this.items = append(this.items, items...)
return true
}
// 左入队列
func (this *queue) Unshift(items ...string) bool {
this.Lock()
defer this.Unlock()
readyItems := make([]string, len(items))
this.items = append(readyItems, this.items...)
return true
}
// 右出队列
func (this *queue) Pop() string {
this.Lock()
defer this.Unlock()
if len(this.items) == 0 {
return ""
}
endIndex := len(this.items) - 1
rtn := this.items[endIndex]
this.items = this.items[:endIndex]
return rtn
}
// 左出队列
func (this *queue) Shift() string {
this.Lock()
defer this.Unlock()
if len(this.items) == 0 {
return ""
}
rtn := this.items[0]
if len(this.items) > 1 {
this.items = this.items[1:]
} else {
this.items = []string{}
}
return rtn
}
// 队列长度
func (this *queue) length() int {
this.Lock()
defer this.Unlock()
return len(this.items)
}
运行脚本:
# --co是指定的协程数,取值范围(0,1000],默认5
go run main.go --url "http://***.com/2000s/hls/index.m3u8" --output download --co 1000
脚本执行完后会给出合并ts文件为mp4的命令,类似:
# 如果要执行这条命令,需要你的系统安装了ffmpeg软件命令
ffmpeg -f concat -i download/merge.txt -c copy output.mp4
参数列表
url
通过m3u8的网络地址下载file
通过m3u8文件本地路径下载,url
和file
二选一host
如果配置了file
参数,host
将作为ts文件的服务器地址co
开启的协程数,默认5,取值范围(0,1000]
output
文件下载后的存储目录
结语
- 个人更倾向于方案二,因为协程的开销远低于进程,比如方案一如果 fork 1000 个进程的话,机器可能会变卡,其次协程之间的数据共享也非常方便。
- 两个方案下载过程都有失败重试的逻辑,所以需要保证输入的地址是可用的,避免死循环。
使用golang协程并行下载m3u8视频
http://blog.icy8.cn/posts/19346/