본 포스팅에서는 실제 데이터를 활용하여 음성 데이터를 전처리해보겠습니다.
특히 음성 데이터에서 침묵 구간과 비침묵 구간을 분리하는 부분을 살펴보겠습니다.
데이터
- 데이터: AIHub / '구음장애 음성인식 데이터' (https://www.aihub.or.kr/).
- 특징: 오디오 데이터인 .wav 파일과 데이터의 메타 정보를 담고 있는 .jsom 파일로 구성됨.
- wav 파일은 10분 ~ 30분 길이로 구성됩니다.
30초 가량 침묵 -> 한마디 -> 30초 가량 침묵 -> 한마디 패턴으로 이루어집니다. - json파일에는 오디오 파일에 대한 텍스트 대본을 나타내는 Transcript' 라벨링이 존재합니다.
- 목표: 한마디 음성 - 텍스트로 매칭되는 (오디오-텍스트) 데이터 쌍으로 재구성.
침묵-비침묵 구간 분리
먼저 필요한 라이브러리를 불러옵니다.
import pandas as pd
import numpy as np
from pydub.utils import db_to_float
import itertools
from pydub import AudioSegment
import IPython.display as ipd
from pydub import AudioSegment
import torch
import librosa # 음성데이터 분석 라이브러리
from IPython.display import Audio # 음성데이터 재생을 위해 사용하는 라이브러리
- librosa는 음성 데이터 분석을 위한 라이브러리입니다.
- IPython.display은 노트북 파일에서 음성데이터를 재생해보기 위해 불러옵니다.
이후 필요한 함수들을 불러옵니다.
먼저 detect_silence() 함수는 침묵 구간을 탐색하고, 침묵 구간에 해당하는 초(sec)를 [시작:끝] 형태로 반환합니다.
def detect_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1):
seg_len = len(audio_segment)
if seg_len < min_silence_len:
return []
silence_thresh = db_to_float(silence_thresh) * audio_segment.max_possible_amplitude
silence_starts = []
last_slice_start = seg_len - min_silence_len
slice_starts = range(0, last_slice_start + 1, seek_step)
if last_slice_start % seek_step:
slice_starts = itertools.chain(slice_starts, [last_slice_start])
for i in slice_starts:
audio_slice = audio_segment[i:i + min_silence_len]
if audio_slice.rms <= silence_thresh:
silence_starts.append(i)
if not silence_starts:
return []
silent_ranges = []
prev_i = silence_starts.pop(0)
current_range_start = prev_i
for silence_start_i in silence_starts:
continuous = (silence_start_i == prev_i + seek_step)
if not continuous and silence_has_gap:
silent_ranges.append([current_range_start,
prev_i + min_silence_len])
current_range_start = silence_start_i
prev_i = silence_start_i
silent_ranges.append([current_range_start,
prev_i + min_silence_len])
return silent_ranges
데이터 적용
마찬가지로, detect_nonsilent() 함수는 발화 구간을 탐색합니다.
def detect_nonsilent(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1):
silent_ranges = detect_silence(audio_segment, min_silence_len, silence_thresh, seek_step)
len_seg = len(audio_segment)
if not silent_ranges:
return [[0, len_seg]]
if silent_ranges[0][0] == 0 and silent_ranges[0][1] == len_seg:
return []
prev_end_i = 0
nonsilent_ranges = []
for start_i, end_i in silent_ranges:
nonsilent_ranges.append([prev_end_i, start_i])
prev_end_i = end_i
if end_i != len_seg:
nonsilent_ranges.append([prev_end_i, len_seg])
if nonsilent_ranges[0] == [0, 0]:
nonsilent_ranges.pop(0)
return nonsilent_ranges
위 두 함수를 이용하여 음성 데이터에서 최종 발화 구간과 침묵 구간을 분리하여 반환하기 위한 함수입니다.
def create_json(audio_file):
intervals_jsons = []
min_silence_length = 70
intervals = detect_nonsilent(audio_file,
min_silence_len=min_silence_length,
silence_thresh=-32.64)
if intervals[0][0] != 0:
intervals_jsons.append({'start':0,'end':intervals[0][0]/1000,'tag':'침묵'})
non_silence_start = intervals[0][0]
before_silence_start = intervals[0][1]
for interval in intervals:
interval_audio = audio_file[interval[0]:interval[1]]
if (interval[0] - before_silence_start) >= 20000:
intervals_jsons.append({'start':non_silence_start/1000,'end':(before_silence_start+200)/1000,'tag':'비침묵'})
non_silence_start = interval[0]-200
intervals_jsons.append({'start':before_silence_start/1000,'end':interval[0]/1000,'tag':'침묵'})
before_silence_start = interval[1]
if non_silence_start != len(audio_file):
intervals_jsons.append({'start':non_silence_start/1000,'end':len(audio_file)/1000,'tag':'비침묵'})
return intervals_jsons
#########################################################
def match_target_amplitude(sound, target_dBFS):
change_in_dBFS = target_dBFS - sound.dBFS
return sound.apply_gain(change_in_dBFS)
#########################################################
def split_on_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, keep_silence=100,
seek_step=1):
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
if isinstance(keep_silence, bool):
keep_silence = len(audio_segment) if keep_silence else 0
output_ranges = [
[ start - keep_silence, end + keep_silence ]
for (start,end)
in detect_nonsilent(audio_segment, min_silence_len, silence_thresh, seek_step)
]
for range_i, range_ii in pairwise(output_ranges):
last_end = range_i[1]
next_start = range_ii[0]
if next_start < last_end:
range_i[1] = (last_end+next_start)//2
range_ii[0] = range_i[1]
return [
audio_segment[ max(start,0) : min(end,len(audio_segment)) ]
for start,end in output_ranges
]
데이터 전처리 자동화
AIHub에서 제공하는 '구음장애 음성인식 데이터'에서 json / wav 파일 형식에 맞도록 데이터 전처리를 자동화 합니다.
import os
# 작업 폴더 설정
# 작업 폴더로 이동해주세요
%cd '/content/.../.../...'
# 폴더 변수화
PATH = os.getcwd()
# os.path.join(PATH, 'xxx') -> xxx 부분에 파일들이 들어있는 폴더를 입력해주세요.
LABEL = os.path.join(PATH, '13.뇌성마비_라벨')
AUDIO = os.path.join(PATH, '13.뇌성마비_음성')
# 결과 폴더는 직접 만들어주세요.
OUTPUT = os.path.join(PATH, '13.뇌성마비_결과')
데이터의 개수를 확인합니다.
print("라벨 폴더 파일 개수: ", len(os.listdir(LABEL)))
print("음성 폴더 파일 개수: ", len(os.listdir(AUDIO)))
### result ###
라벨 폴더 파일 개수: 119
음성 폴더 파일 개수: 119
아래 코드를 통해서 전체 데이터에 적용하면 두 가지 파일이 생성됩니다.
- output{i}_{j}.wav: i 번째 원본 파일의 j 번째 음성(한마디)
- output{i}_{j}.txt: i 번째 원본 파일의 j 번째 음성 텍스트(한마디)
for i in range(len(os.listdir(LABEL))):
# 스크립트 불러오기 (meta, DataFrame)
print(f'파일: {i}')
label_file = os.path.join(LABEL, sorted(os.listdir(LABEL))[i])
meta = pd.read_json(label_file, orient = 'columns')
meta = pd.DataFrame(meta['Transcript'][0].split("."))[:-1]
# 오디오 불러오기
audio_file = os.path.join(AUDIO, sorted(os.listdir(AUDIO))[i])
# 침묵-비침묵 분리
sound = AudioSegment.from_file(audio_file, "wav")
normalized_sound = match_target_amplitude(sound, -20.0)
json = create_json(normalized_sound)
# 발화구간/텍스트 불러오기 (df, DataFrame)
audio_df = pd.DataFrame(json)
df = pd.concat([audio_df[audio_df['tag'] == '비침묵'].reset_index(drop=True).drop('tag', axis=1), meta], axis=1)
df.columns=['start', 'end', 'text']
# 자를 오디오 불러오기
audio = AudioSegment.from_file(audio_file)
# 한 문장 별로 데이터 저장
for j in range(len(df)):
start_time = int(df['start'][j]) * 1000 # milliseconds
end_time = (int(df['end'][j]) + 1.5) * 1000 # milliseconds
# 텍스트
f = open(os.path.join(OUTPUT, f"output{i}_{j}.txt"), 'w')
if type(df['text'][j]) != str:
break
f.write(df['text'][j])
f.close()
# 오디오 파일 자르기
output = audio[start_time:end_time]
# 자른 오디오 파일 저장
output.export(os.path.join(OUTPUT, f"output{i}_{j}.wav"), format="wav")
# 결과 확인
os.listdir(OUTPUT)
반응형
'Data Analytics' 카테고리의 다른 글
[음성 인식 프로젝트] Whisper 파인튜닝 하기 (4) | 2024.03.14 |
---|---|
[음성 인식 모델 프로젝트] 음성 데이터 시각화 및 특성 추출 (0) | 2024.01.20 |
빅데이터분석기사 시험 난이도 (빅분기 7회 합격 후기) (10) | 2023.12.16 |