使用golang协程并行下载m3u8视频

背景

最近在追剧,网上的影视资源非常卡,看两秒卡七八秒,很多做盗版资源的网站都是接了一些乱七八糟的接口,导致观影体验极差。观察后发现是m3u8资源,索性一次性下载到本地离线看。

方案

  1. php+多进程
  2. golang+协程

方案一

将ts文件列表逐一写入redis列表,然后通过多进程来竞争这个列表数据,从而实现进程的数据共享和并行下载。这个方案只适用于linux环境。

安装依赖:

# 这是一个多进程模型,可以让你更方便地开启多进程
composer require icy8/process

代码实现:

<?php
include "./vendor/autoload.php";
$opt       = getopt("", ["url:", "output:"]);
$output    = "output/";
$url       = $opt['url'] ?? '';
$queueKey  = "m3u8_download/";
$redisHost = '192.168.1.3';
if ($opt['output'] ?? '') {
    $output = $opt['output'];// 文件保存的目录
    $output = trim($output, '/');
}
if (!$url) {
    die("The parameter `url` cannot be null");
}
$response = file_get_contents($url);
if (!$response) {
    die("Request failure");
}
$tsList = trim(preg_replace("!#.+\n*!", "", $response));
$redis  = new \Redis();
$redis->connect($redisHost);
// 清空原有的数据
$redis->del($queueKey);
$lines = explode("\n", $tsList);
$redis->rPush($queueKey, ...$lines);
// 创建存储目录
!is_dir($output) && mkdir($output, 0777, true);
$process        = new \icy8\process\Worker();
$process->total = 100; // 开n个进程同时下载
$process->run(function () use ($url, $redisHost, $output, $queueKey) {
    $redis = new \Redis();
    $redis->connect($redisHost);
    $pid = getmypid();
    // 一个进程一次只下载一个文件
    while ($line = $redis->lPop($queueKey)) {
        try {
            $line = trim($line);
            $file = $output . pathinfo($line, PATHINFO_BASENAME);
            // 跳过同名文件
            if (is_file($file)) {
                continue;
            }
            $tsUrl = completeUrl($line, $url);
            $data  = file_get_contents($tsUrl);
            if (!$data) {
                throw new Exception('data error');
            }
            file_put_contents($file, $data);
            echo "> [{$pid}] " . $tsUrl . PHP_EOL;
        } catch (\Throwable $e) {
            // 重新写入队列
            $redis->lPush($queueKey, $line);
        }
    }
});
$code = '';
foreach ($lines as $line) {
    $filename = pathinfo($line, PATHINFO_BASENAME);
    $code     .= "file '{$filename}'" . PHP_EOL;
}
file_put_contents($output . 'merge.txt', $code);
echo PHP_EOL;
echo "> Run the following command to generate an mp4 file:" . PHP_EOL;
echo "ffmpeg -f concat -i {$output}merge.txt -c copy output.mp4" . PHP_EOL;
/**
 * url补全
 * @param $tspath
 * @param $host
 * @return string
 */
function completeUrl($tspath, $uri)
{
    if (preg_match("!http[s]*://!i", $tspath)) {
        return $tspath;
    }

    $parsed_url = parse_url($uri);
    $scheme     = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';
    $host       = isset($parsed_url['host']) ? $parsed_url['host'] : '';
    $port       = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';
    $user       = isset($parsed_url['user']) ? $parsed_url['user'] : '';
    $pass       = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : '';
    $pass       = ($user || $pass) ? "$pass@" : '';
    $path       = isset($parsed_url['path']) ? $parsed_url['path'] : '';
    if ($tspath[0] === '/') {
        $path = $tspath;
    } else {
        $path .= $tspath;
    }
    return "{$scheme}{$user}{$pass}{$host}{$port}{$path}";
}

运行脚本:

php m3u8.php --url "http://***.com/2000s/hls/index.m3u8" --output download

脚本执行完后会给出合并ts文件为mp4的命令,类似:

# 如果要执行这条命令,需要你的系统安装了ffmpeg软件命令
ffmpeg -f concat -i download/merge.txt -c copy output.mp4

windows用户可以在docker下载文件,然后在windows主系统执行ffmpeg。

方案二

通过实现一个类似队列的带有互斥锁的golang结构体,然后将ts文件列表写入这个队列,最后使用golang协程来竞争这个队列数据,即可实现并行下载。

这个方案适用于大部分系统,使用了golang自带的互斥锁,不需要借助redis。

代码实现:

package main

import (
	"errors"
	"flag"
	"fmt"
	"io"
	"io/fs"
	"math/rand"
	"net/http"
	"net/url"
	"os"
	"path"
	"regexp"
	"strconv"
	"strings"
	"sync"
)

var (
	_url     = flag.String("url", "", "m3u8 url.")
	_file    = flag.String("file", "", "m3u8 file path.")
	_host    = flag.String("host", "", "host prefix.")
	_co      = flag.String("co", "", "goroutine total.")
	_output  = flag.String("output", "", "output dir.")
	dataDir  = "output/"
	wg       = &sync.WaitGroup{}
	ch       chan int
	urlQueue *queue
)

func main() {
	flag.Parse()
	var (
		urlInfo *url.URL
		tsList  string
	)

	coTotal, err := strconv.Atoi(*_co)
	if len(*_url) <= 0 && len(*_file) <= 0 {
		panic("Parameters `url` and `file` cannot be both empty")
	}

	if err != nil || coTotal < 1 {
		coTotal = 5
	}

	if coTotal > 1000 {
		panic("The goroutine maximum is 1000")
	}
	// 创建数据目录
	if len(*_output) > 0 {
		dataDir = *_output
	}

	if !PathExists(dataDir) {
		os.MkdirAll(dataDir, os.ModePerm)
	}

	if len(*_url) > 0 {
		urlInfo, _ = url.Parse(*_url)
		resp, err := http.Get(*_url)
		if err != nil {
			panic(err)
		}
		body, _ := io.ReadAll(resp.Body)
		tsList = string(body)
	} else if PathExists(*_file) {
		urlInfo, _ = url.Parse(*_host)
		if urlInfo.Host == "" {
			panic("file host empty")
		}
		_bytes, err := os.ReadFile(*_file)
		if err != nil {
			panic(err)
		}
		tsList = string(_bytes)
	} else {
		panic("Invalid data")
	}
	flagMatch, _ := regexp.MatchString("^#EXTM3U", tsList);
	if !flagMatch {
		panic("Invalid data")
	}
	// 删除注释
	commentLineRegexp, err := regexp.Compile("#.+\n*")
	if err == nil {
		tsList = commentLineRegexp.ReplaceAllString(tsList, "")
	}
	tsList = strings.TrimSpace(tsList)
	// 每行一个文件
	lines := strings.Split(tsList, "\n")
	// 队列,让所有协程竞争这个队列数据
	urlQueue = &queue{
		items: lines,
	}
	// 创建协程
	for i := 1; i <= coTotal; i++ {
		wg.Add(1)
		go saveProc(*urlInfo)
	}
	// 阻塞主线程,等所有协程执行完再往下执行
	wg.Wait()
	fmt.Println(">", "All goroutine done")
	// 生成ffmpeg文件索引文件
	mergeTxt := ""
	for _, line := range lines {
		filename := path.Base(line)
		mergeTxt += "file '" + filename + "'\n"
	}
	os.WriteFile(dataDir+"/merge.txt", []byte(mergeTxt), fs.ModePerm)
	fmt.Println("")
	fmt.Println(">", "Run the following command to generate an mp4 file:")
	fmt.Println("ffmpeg -f concat -i " + strings.TrimSuffix(dataDir, "/") + "/merge.txt -c copy output.mp4")
}

// 协程函数
func saveProc(urlInfo url.URL) {
	id := rand.Intn(999999)
	for urlQueue.length() > 0 {
		tsName := strings.TrimSpace(urlQueue.Pop()) // 让每个协程每次下载一个ts文件
		if tsName == "" {
			return
		}
		tsUrl := urlUnparse(tsName, urlInfo)
		result, _ := saveTsFile(tsUrl, tsName)
		fmt.Println(">", id, tsUrl, result)
		if !result {
			urlQueue.Push(tsName)
		}
	}
	//fmt.Println(id, "complete")
	wg.Done()
}

// url合成
func urlUnparse(filename string, urlInfo url.URL) string {
	match, _ := regexp.MatchString("^http[s]*://", filename)
	if match {
		return filename
	}
	userpart := ""
	if len(urlInfo.User.Username()) > 0 {
		pwd, _ := urlInfo.User.Password()
		userpart = urlInfo.User.Username() + "@" + pwd
	}
	if string(filename[0]) == "/" {
		return urlInfo.Scheme + "://" + userpart + "" + urlInfo.Host + filename
	}
	return urlInfo.Scheme + "://" + userpart + "" + urlInfo.Host + path.Dir(urlInfo.Path) + "/" + filename
}

// 保存文件
func saveTsFile(url string, filename string) (bool, error) {
	resp, err := http.Get(url)
	if err != nil {
		return false, err
	} else if resp.StatusCode != 200 {
		return false, errors.New("HTTP CODE: " + fmt.Sprintf("%d", resp.StatusCode))
	}
	buffer, err := io.ReadAll(resp.Body)
	if err != nil || len(buffer) <= 0 {
		return false, errors.New("fail to read buffer ")
	}
	filename = path.Base(filename)
	file, err := os.Create(dataDir + "/" + filename)
	if err != nil {
		return false, err
	}
	n, err := file.Write(buffer)
	if err != nil || n <= 0 {
		return false, errors.New("write fail")
	}
	return true, nil
}

// 判断路径是否存在
func PathExists(path string) bool {
	_, err := os.Stat(path)
	if err == nil {
		return true
	} else if os.IsNotExist(err) {
		return false
	}
	return false
}

// 队列结构体
type queue struct {
	// 互斥锁
	sync.Mutex
	items []string
}

// 右入队列
func (this *queue) Push(items ...string) bool {
	this.Lock()
	defer this.Unlock()
	this.items = append(this.items, items...)
	return true
}

// 左入队列
func (this *queue) Unshift(items ...string) bool {
	this.Lock()
	defer this.Unlock()
	readyItems := make([]string, len(items))
	this.items = append(readyItems, this.items...)
	return true
}

// 右出队列
func (this *queue) Pop() string {
	this.Lock()
	defer this.Unlock()
	if len(this.items) == 0 {
		return ""
	}
	endIndex := len(this.items) - 1
	rtn := this.items[endIndex]
	this.items = this.items[:endIndex]
	return rtn
}

// 左出队列
func (this *queue) Shift() string {
	this.Lock()
	defer this.Unlock()
	if len(this.items) == 0 {
		return ""
	}
	rtn := this.items[0]
	if len(this.items) > 1 {
		this.items = this.items[1:]
	} else {
		this.items = []string{}
	}
	return rtn
}

// 队列长度
func (this *queue) length() int {
	this.Lock()
	defer this.Unlock()
	return len(this.items)
}

运行脚本:

# --co是指定的协程数,取值范围(0,1000],默认5
go run main.go --url "http://***.com/2000s/hls/index.m3u8" --output download --co 1000

脚本执行完后会给出合并ts文件为mp4的命令,类似:

# 如果要执行这条命令,需要你的系统安装了ffmpeg软件命令
ffmpeg -f concat -i download/merge.txt -c copy output.mp4

参数列表

  1. url 通过m3u8的网络地址下载
  2. file 通过m3u8文件本地路径下载,urlfile二选一
  3. host 如果配置了file参数,host将作为ts文件的服务器地址
  4. co 开启的协程数,默认5,取值范围(0,1000]
  5. output 文件下载后的存储目录

结语

  1. 个人更倾向于方案二,因为协程的开销远低于进程,比如方案一如果 fork 1000 个进程的话,机器可能会变卡,其次协程之间的数据共享也非常方便。
  2. 两个方案下载过程都有失败重试的逻辑,所以需要保证输入的地址是可用的,避免死循环。

使用golang协程并行下载m3u8视频
http://blog.icy8.cn/posts/19346/
作者
icy8
发布于
2023年2月7日
许可协议