视频处理

需求

视频下载下来后,需要对每个视频提取12个关键帧,提取每个视频的音频并等分为6段,获取每个视频的text。
三者分别保存在以id命名的文件夹下以images/audios/texts命名的文件夹下。

准备工作

扫描存放所有视频的文件夹,将所有视频名存储为list_videos_name并写入文件list_videos.txt

get_videos_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 获取要提取关键帧的所有视频的名字存储在list_videos_name并写入文件list_videos_name.txt
import os

def file_name(file_dir): # 获取视频名list
for root, dirs, files in os.walk(file_dir):
continue
return files

videos_path = 'D:/videos/videos_20171207' # 填写存放视频的文件夹路径
list_videos_name = file_name(videos_path)

f_list_videos_name = open("list_videos.txt", 'w', encoding='utf8', errors='ignore')
f_list_videos_name.write(str(list_videos_name))
f_list_videos_name.close()

为了获取视频text,须对之前处理获得的final_texts.txt读取获得dict_id_text并存储为dict_id_text.txt

get_dict_id_text.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 读取final_texts.txt,生成字典dict_id_text
import re

f_id_texts = open("final_texts.txt", encoding='utf8', errors='ignore')

def get_text(template):
rule = r'::::(.*?)\n'
slot_list = re.findall(rule, template)
return slot_list

def get_id(template):
rule = r'(.*?)::::'
slot_list = re.findall(rule, template)
return slot_list

dict_id_text = {}
while 1:
line_id_text = f_id_texts.readline()
if not line_id_text:
break
else:
dict_id_text[str(get_id(line_id_text)[0])] = str(get_text(line_id_text)[0])

f_id_texts.close()

f_id_texts = open("dict_id_text.txt", 'w', encoding='utf8', errors='ignore')
f_id_texts.write(str(dict_id_text))
细节
  • 因为要处理的数据量较大,要考虑到程序可随时终止再继续运行。故加入四个日志文件
    • extract_ok.txt:记录所有处理成功的视频
    • extract_ok_this_time.txt:记录每次任务处理成功的视频
    • video_error.txt:记录所有处理失败的视频
    • video_error_this_time.txt:记录每次任务处理失败的视频
    • 在每次任务开始处理视频前将日志文件整合并将处理成功的视频从list_videos中删除即可
  • 视频帧提取用的cv2包,视频的音频提取和音频的分割用的第三方工具ffmpeg
  • 音频分割后发现每段音频间有重叠部分,故应采用精准分割
    1
    ffmpeg -y -vn -ss start_time -t 持续时间(00:00:00.900) -i 原文件路径 -acodec copy 生成文件路径
get_final_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# 配置video_path、mkpath_images和mkpath_audios内容
import cv2
import os
import subprocess
import json

def mkdir(path):
# 去除首位空格
path = path.strip()
# 去除尾部 \ 符号
path = path.rstrip("/")

# 判断路径是否存在
# 存在 True
# 不存在 False
isExists = os.path.exists(path)

# 判断结果
if not isExists:
# 如果不存在则创建目录
# 创建目录操作函数
os.makedirs(path)

print(str(path) + ' 创建成功')
return True
else:
# 如果目录存在则不创建,并提示目录已存在
print(str(path) + ' 目录已存在')
return False

def get_lines(file_name): #获取文件行数
count = 0
thefile = open(file_name, encoding='utf8', errors='ignore')
while True:
buffer = thefile.read(1024 * 8192)
if not buffer:
break
count += buffer.count('\n')
thefile.close()
return count


def get_time(filename): #获取视频或音频时长
command = ["ffprobe.exe", "-loglevel", "quiet", "-print_format", "json", "-show_format", "-show_streams", "-i",
filename]
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = result.stdout.read()
temp = str(out.decode('utf-8'))
data = json.loads(temp)["format"]['duration']
return data


def get_flag_list(flag): #获取flag每一位上数字
list_flag = []
while flag:
get_num = flag % 10
list_flag.append(int(get_num))
flag = (flag - get_num) / 10
return list_flag

f_list_videos = open("list_videos.txt", encoding='utf8', errors='ignore')
a = f_list_videos.read()
list_videos = eval(a)
f_list_videos.close()
print('list_videos.txt读取成功')

f_extract_ok = open("extract_ok.txt", 'a', encoding='utf8', errors='ignore')
f_extract_ok_this_time = open("extract_ok_this_time.txt", encoding='utf8', errors='ignore')
while 1:
line_extract_ok_this_time = f_extract_ok_this_time.readline()
if not line_extract_ok_this_time:
break
else:
list_videos.remove(str(line_extract_ok_this_time).replace('\n', '') + '.mp4') # 更新未抽取videos
f_extract_ok.write(str(line_extract_ok_this_time)) # 更新已抽取videos
f_extract_ok_this_time.close()
f_extract_ok.close()
f_list_videos = open("list_videos.txt", 'w', encoding='utf8', errors='ignore')
f_list_videos.write(str(list_videos))
f_list_videos.close()
print("list_videos清理已抽取视频id成功")
print("extract_ok添加已抽取视频id成功")

print("共剩余" + str(len(list_videos)) + '条未抽取')
print("总共已抽取" + str(get_lines('extract_ok.txt')) + '条视频')

f = open("extract_ok_this_time.txt", 'w', encoding='utf8', errors='ignore') # 释放空间
f.close()

f_video_error = open("video_error.txt", 'a', encoding='utf8', errors='ignore')
f_video_error_this_time = open("video_error_this_time.txt", encoding='utf8', errors='ignore')
while 1:
line_video_error_this_time = f_video_error_this_time.readline()
if not line_video_error_this_time:
break
else:
f_video_error.write(str(line_video_error_this_time) + '\n') # 更新失败videos
f_video_error_this_time.close()
f_video_error.close()

print("video_error增加错误videos成功")
print("共有" + str(get_lines('video_error.txt')) + '条视频抽取错误')

f = open("video_error_this_time.txt", 'w', encoding='utf8', errors='ignore') # 释放空间.
f.close()

f = open("dict_id_text.txt", encoding='utf8', errors='ignore')
a = f.read()
dict_id_text = eval(a)
f.close()
print("dict_id_text读取成功\n")

print('---开始抽取---')
flag = 1
for video_name in list_videos:
try:
video_path = 'D:/videos/videos_20171207/' + str(video_name) # 配置视频文件夹路径
video_cap = cv2.VideoCapture(video_path)

frame_count = 0
all_frames = []
while (True):
ret, frame = video_cap.read()
if ret is False:
break
all_frames.append(frame) # 视频所有帧存list
frame_count = frame_count + 1 # 帧数

i = 0
flag = int(frame_count / 12)
video_name = video_name.replace(".mp4", "")
mkpath_images = "I:/images/" + video_name + '/images' # 路径全英文---存12帧图片文件夹路径
mkdir(mkpath_images)
mkpath_audios = "I:/images/" + video_name + '/audios' # 路径全英文---存音频文件夹路径
mkdir(mkpath_audios)
mkpath_texts = "I:/images/" + video_name + '/texts' # 路径全英文---存文本文件夹路径
mkdir(mkpath_texts)
for frame in all_frames:
i = i + 1
if (i % flag == 0 and i <= flag * 12):
path = mkpath_images + '/' + str(int(i / flag)) + '.jpg'
cv2.imwrite(path, frame) # 存储为图像
audio_path = mkpath_audios + '/' + video_name + '.mp3'
cmd = 'ffmpeg -i ' + video_path + ' -f mp3 -vn ' + audio_path + ' -loglevel quiet -y'
os.system(cmd) # 提取音频

video_time = get_time(video_path) # 音频分段
video_time = int(float(video_time) * 1000)
flag = int(video_time / 6)
list_flag = get_flag_list(flag)
every_time = str(list_flag[2]) + str(list_flag[1]) + str(list_flag[0])
start = 0
end = flag
for i in range(1, 7):
cmd = 'ffmpeg -y -vn -ss 00:00:0' + str(int(start / 1000)) + '.' + str(
int(start % 1000)) + ' -t 00:00:0' + str(
int(
flag / 1000)) + '.' + every_time + ' -i ' + audio_path + ' -codec copy ' + mkpath_audios + '/' + str(
i) + '.mp3' + ' -loglevel quiet'
os.system(cmd)
start += flag
end += flag

f_video_text = open(mkpath_texts+'/text.txt', 'w', encoding='utf8', errors='ignore')
f_video_text.write(str(dict_id_text[video_name])+'\n')
f_video_text.close()
print('。。。。。')
f_extract_ok_this_time = open("extract_ok_this_time.txt", 'a', encoding='utf8', errors='ignore')
f_extract_ok_this_time.write(str(video_name) + '\n')
f_extract_ok_this_time.close()
flag += 1
except (TimeoutError):
print('----video打开失败----')
f_video_error_this_time = open("video_error_this_time.txt", 'a', encoding='utf8', errors='ignore')
f_video_error_this_time.write(str(video_name) + '\n')
f_video_error_this_time.close()
print('本次已抽取' + str(get_lines('extract_ok_this_time.txt')) + '条,本次抽取videos错误' + str(
get_lines('video_error_this_time.txt')) + '条')
# print(dict_video_error_this_time)
print('已保存并跳过此条video_id,抽取继续-->')
if (flag % 1000 == 0):
print('本次已抽取' + flag + '条视频')
print("****下载抽取完成****")
小结

到目前为止,读取所有的视频并保存其帧、音频、文本数据。
数据集与代码,发布在github.