aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordeepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>2020-07-07 14:55:32 +0000
committerGitHub <noreply@github.com>2020-07-07 14:55:32 +0000
commit52694cd7ad9bbd5c514de448d46ac3c7ef0aef9e (patch)
tree1157294180ebe84486750fa39c54fa1a26023417
parent6624f8fb1aa7cf4d67c8b5256158515c06f55686 (diff)
Format code with Black
-rw-r--r--main.py126
-rw-r--r--src/VHSImage.py71
-rw-r--r--src/VHSVideo.py51
-rw-r--r--src/VaporSong.py342
4 files changed, 325 insertions, 265 deletions
diff --git a/main.py b/main.py
index 92821e9..a2c0d1f 100644
--- a/main.py
+++ b/main.py
@@ -16,15 +16,19 @@ import time
version = 2.0
style = False
-text = '| V A P O R W A V E || G E N E R A T O R |'
-
-parser = argparse.ArgumentParser(description = text)
-parser.add_argument("-M", "--music", help="generate v a p o r w a v e music", action="store_true")
-parser.add_argument("-P", "--picture", help="generate VHS Style image", action="store_true")
-parser.add_argument("-V","--video", help="VHS Style Video", action="store_true")
+text = "| V A P O R W A V E || G E N E R A T O R |"
+
+parser = argparse.ArgumentParser(description=text)
+parser.add_argument(
+ "-M", "--music", help="generate v a p o r w a v e music", action="store_true"
+)
+parser.add_argument(
+ "-P", "--picture", help="generate VHS Style image", action="store_true"
+)
+parser.add_argument("-V", "--video", help="VHS Style Video", action="store_true")
parser.add_argument("-v", "--version", help="show program version", action="store_true")
parser.add_argument("-i", "--input")
-parser.add_argument("-o","--output", help="Output for specifying output video")
+parser.add_argument("-o", "--output", help="Output for specifying output video")
args = parser.parse_args()
@@ -34,26 +38,34 @@ picture = False
video = False
if args.version:
- print("vaporwave generator 旺育栄", version)
- exit
+ print("vaporwave generator 旺育栄", version)
+ exit
if args.music:
- music = True
+ music = True
elif args.picture:
picture = True
elif args.video:
video = True
if args.input:
- query = args.input
+ query = args.input
if args.output:
outfile = args.output
else:
parser.print_help()
exit
-MAX_DURATION = 600 # In-case the program finds a compilation
-youtube_urls = ('youtube.com', 'https://www.youtube.com/', 'http://www.youtube.com/', 'http://youtu.be/', 'https://youtu.be/', 'youtu.be')
+MAX_DURATION = 600 # In-case the program finds a compilation
+youtube_urls = (
+ "youtube.com",
+ "https://www.youtube.com/",
+ "http://www.youtube.com/",
+ "http://youtu.be/",
+ "https://youtu.be/",
+ "youtu.be",
+)
-def download_file(query,request_id=1):
+
+def download_file(query, request_id=1):
"""Returns audio to the vapor command handler
Searches YouTube for 'query', finds first match that has
@@ -63,16 +75,18 @@ def download_file(query,request_id=1):
Query can be YouTube link.
"""
ydl_opts = {
- 'quiet': 'True',
- 'format': 'bestaudio/best',
- 'outtmpl': str(request_id) +'.%(ext)s',
- 'prefer_ffmpeg': 'True',
- 'noplaylist': 'True',
- 'postprocessors': [{
- 'key': 'FFmpegExtractAudio',
- 'preferredcodec': 'wav',
- 'preferredquality': '192',
- }],
+ "quiet": "True",
+ "format": "bestaudio/best",
+ "outtmpl": str(request_id) + ".%(ext)s",
+ "prefer_ffmpeg": "True",
+ "noplaylist": "True",
+ "postprocessors": [
+ {
+ "key": "FFmpegExtractAudio",
+ "preferredcodec": "wav",
+ "preferredquality": "192",
+ }
+ ],
}
original_path = str(request_id) + ".wav"
@@ -81,9 +95,13 @@ def download_file(query,request_id=1):
# check if query is youtube url
if not query.lower().startswith((youtube_urls)):
# search for youtube videos matching query
- query_string = urllib.parse.urlencode({"search_query" : query})
- html_content = urllib.request.urlopen("http://www.youtube.com/results?" + query_string)
- search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode())
+ query_string = urllib.parse.urlencode({"search_query": query})
+ html_content = urllib.request.urlopen(
+ "http://www.youtube.com/results?" + query_string
+ )
+ search_results = re.findall(
+ r"href=\"\/watch\?v=(.{11})", html_content.read().decode()
+ )
info = False
# find video that fits max duration
@@ -91,30 +109,30 @@ def download_file(query,request_id=1):
for url in search_results:
# check for video duration
try:
- info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
+ info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False)
except Exception as e:
logger.error(e)
- raise ValueError('Could not get information about video.')
- full_title = info['title']
- if (info['duration'] < MAX_DURATION and info['duration'] >= 5):
+ raise ValueError("Could not get information about video.")
+ full_title = info["title"]
+ if info["duration"] < MAX_DURATION and info["duration"] >= 5:
# get first video that fits the limit duration
logger.info("Got video: " + str(full_title))
- file_title = info['title']
+ file_title = info["title"]
break
-
+
# if we ran out of urls, return error
- if (not info):
- raise ValueError('Could not find a video.')
+ if not info:
+ raise ValueError("Could not find a video.")
# query was a youtube link
else:
logger.info("Query was a YouTube URL.")
url = query
- info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
- file_title = info['title']
+ info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url, download=False)
+ file_title = info["title"]
# check if video fits limit duration
- if (info['duration'] < 5 or info['duration'] > MAX_DURATION):
- raise ValueError('Video is too short. Need 5 seconds or more.')
+ if info["duration"] < 5 or info["duration"] > MAX_DURATION:
+ raise ValueError("Video is too short. Need 5 seconds or more.")
# download video and extract audio
logger.info("Downloading video...")
@@ -123,36 +141,35 @@ def download_file(query,request_id=1):
ydl.download([url])
except Exception as e:
logger.error(e)
- raise ValueError('Could not download ' + str(full_title) + '.')
+ raise ValueError("Could not download " + str(full_title) + ".")
return original_path, file_title
def gen_vapor(filePath, title):
- # Delete stuff if there is anything left over.
- os.system("rm -r download/")
- os.system("rm -r beats/")
+ # Delete stuff if there is anything left over.
+ os.system("rm -r download/")
+ os.system("rm -r beats/")
# Make the proper folders for intermediate steps
- os.system("mkdir download/")
- os.system("mkdir beats/")
+ os.system("mkdir download/")
+ os.system("mkdir beats/")
+ # Download the youtube query's first result. Might be wrong but YOLO
+ # YTDownloader.download_wav_to_samp2(query)
- # Download the youtube query's first result. Might be wrong but YOLO
- #YTDownloader.download_wav_to_samp2(query)
-
- # For every song in download folder(just one for now)
- """
+ # For every song in download folder(just one for now)
+ """
for fs in os.listdir("download/"):
# Slow down the song.
VaporSong.vaporize_song(query,"download/"+fs)
pass
# When we are finished, delete the old folders.
"""
- VaporSong.vaporize_song(filePath, title)
+ VaporSong.vaporize_song(filePath, title)
- os.system("rm -r download/")
- os.system("rm -r beats/")
+ os.system("rm -r download/")
+ os.system("rm -r beats/")
"""
@@ -167,7 +184,6 @@ if music:
name, title = download_file(query)
gen_vapor(name, title)
elif picture:
- generateVHSStyle(query,"out.jpg")
+ generateVHSStyle(query, "out.jpg")
elif video:
VHS_Vid(query, outfile)
- \ No newline at end of file
diff --git a/src/VHSImage.py b/src/VHSImage.py
index ce6411a..d80af92 100644
--- a/src/VHSImage.py
+++ b/src/VHSImage.py
@@ -2,7 +2,7 @@ import requests
import datetime
from PIL import Image
from PIL import ImageFont
-from PIL import ImageDraw
+from PIL import ImageDraw
import colorsys
import json
import numpy as np
@@ -21,7 +21,7 @@ def generate_offsets(array_size, max_offset):
periodicity = random.random() * periodicity
offsets = []
for i in range(array_size):
- offsets.append(floor(max_offset*np.sin(periodicity*(i*np.pi/180))))
+ offsets.append(floor(max_offset * np.sin(periodicity * (i * np.pi / 180))))
return offsets
@@ -35,11 +35,11 @@ def hueChange(img, offset):
g_data = []
b_data = []
- offset = offset/100.
+ offset = offset / 100.0
for rd, gr, bl in zip(r.getdata(), g.getdata(), b.getdata()):
- h, s, v = colorsys.rgb_to_hsv(rd/255.0, bl/255.0, gr/255.0)
- rgb = colorsys.hsv_to_rgb(h, s+offset, v)
- rd, bl, gr = [int(x*255.) for x in rgb]
+ h, s, v = colorsys.rgb_to_hsv(rd / 255.0, bl / 255.0, gr / 255.0)
+ rgb = colorsys.hsv_to_rgb(h, s + offset, v)
+ rd, bl, gr = [int(x * 255.0) for x in rgb]
r_data.append(rd)
g_data.append(gr)
b_data.append(bl)
@@ -47,12 +47,21 @@ def hueChange(img, offset):
r.putdata(r_data)
g.putdata(g_data)
b.putdata(b_data)
- return Image.merge('RGB',(r,g,b))
+ return Image.merge("RGB", (r, g, b))
+
def decision(probability):
return random.random() < probability
-def mod_image_repeat_rows(imgname, chance_of_row_repeat=0, max_row_repeats=0, min_row_repeats=0, save=True, out_name="image.jpg"):
+
+def mod_image_repeat_rows(
+ imgname,
+ chance_of_row_repeat=0,
+ max_row_repeats=0,
+ min_row_repeats=0,
+ save=True,
+ out_name="image.jpg",
+):
img = Image.open(imgname)
pixels = img.load()
width, height = img.size
@@ -80,7 +89,7 @@ def mod_image_repeat_rows(imgname, chance_of_row_repeat=0, max_row_repeats=0, mi
pixels[x, y] = row_to_repeat[x - offsets[num_repeats]]
else:
pixels[x, y] = (r, g, b)
-
+
if repeat:
num_repeats += 1
if num_repeats >= times_to_repeat:
@@ -102,53 +111,68 @@ def add_date(img_path, out_name="image.jpg", bottom_offset=0):
width, height = img.size
draw = ImageDraw.Draw(img)
font = ImageFont.truetype("src/VCR_OSD_MONO_1.001.ttf", 64)
- draw.text((corner_offset, (height-150-bottom_offset)), date_str_1, (255, 255, 255), font=font)
- draw.text((corner_offset, (height-75)-bottom_offset), date_str_2, (255, 255, 255), font=font)
+ draw.text(
+ (corner_offset, (height - 150 - bottom_offset)),
+ date_str_1,
+ (255, 255, 255),
+ font=font,
+ )
+ draw.text(
+ (corner_offset, (height - 75) - bottom_offset),
+ date_str_2,
+ (255, 255, 255),
+ font=font,
+ )
draw.text((corner_offset, 25), "|| PAUSE", (255, 255, 255), font=font)
img.save(out_name)
+
def add_img_noise(imgpath, intensity=1, out_name="image.jpg"):
- img = imageio.imread(imgpath, pilmode='RGB')
+ img = imageio.imread(imgpath, pilmode="RGB")
noise1 = img + intensity * img.std() * np.random.random(img.shape)
imageio.imwrite(out_name, noise1)
+
def offset_hue(image, out_name="image.jpg"):
if isinstance(image, str):
image = Image.open(image)
image = hueChange(image, 25)
image.save(out_name)
+
def build_background(out_name, taskbar_offset):
- #getImage(out_name="start.jpg")
+ # getImage(out_name="start.jpg")
offset_hue("start.jpg", out_name="saturated.jpg")
mod_image_repeat_rows("saturated.jpg", 0.012, 50, 10, out_name="shifted.jpg")
add_img_noise("shifted.jpg", out_name="noisy.jpg")
add_date("noisy.jpg", out_name=out_name, bottom_offset=taskbar_offset)
+
"""
if __name__ == "__main__":
build_background("bkg.jpg", 25)
-"""
+"""
+
def generateVHSStyle(infile, outfile, silence=False):
if silence:
cut_rows = bool(random.getrandbits(1))
- offset = random.choice([0,5,10])
- offset_hue(infile,"saturated.jpg")
+ offset = random.choice([0, 5, 10])
+ offset_hue(infile, "saturated.jpg")
if cut_rows:
mod_image_repeat_rows("saturated.jpg", 0.012, 50, 10, True, "shifted.jpg")
else:
mod_image_repeat_rows("saturated.jpg", 0, 0, 0, True, "shifted.jpg")
- add_date("shifted.jpg","noisy.jpg")
- add_date("noisy.jpg",outfile, bottom_offset=offset)
+ add_date("shifted.jpg", "noisy.jpg")
+ add_date("noisy.jpg", outfile, bottom_offset=offset)
os.remove("shifted.jpg")
os.remove("saturated.jpg")
os.remove("noisy.jpg")
else:
cut_rows = bool(random.getrandbits(1))
- offset = random.choice([0,5,10,15,20,25])
+ offset = random.choice([0, 5, 10, 15, 20, 25])
logger.info("Saturating the image")
- offset_hue(infile,"saturated.jpg")
+ offset_hue(infile, "saturated.jpg")
if cut_rows:
logger.info("Shifting the image")
mod_image_repeat_rows("saturated.jpg", 0.012, 50, 10, True, "shifted.jpg")
@@ -156,13 +180,14 @@ def generateVHSStyle(infile, outfile, silence=False):
logger.info("Not applying lines effect")
mod_image_repeat_rows("saturated.jpg", 0, 0, 0, True, "shifted.jpg")
logger.info("Adding noise")
- add_date("shifted.jpg","noisy.jpg")
+ add_date("shifted.jpg", "noisy.jpg")
logger.info("Adding text")
- add_date("noisy.jpg",outfile, bottom_offset=offset)
+ add_date("noisy.jpg", outfile, bottom_offset=offset)
logger.info("Generated Image: out.jpg")
logger.info("Removing residual files")
os.remove("shifted.jpg")
os.remove("saturated.jpg")
os.remove("noisy.jpg")
-#generateVHSStyle("s.jpg","o.jpg") \ No newline at end of file
+
+# generateVHSStyle("s.jpg","o.jpg")
diff --git a/src/VHSVideo.py b/src/VHSVideo.py
index e644074..17493b3 100644
--- a/src/VHSVideo.py
+++ b/src/VHSVideo.py
@@ -8,16 +8,18 @@ import logzero
from logzero import logger
from logzero import setup_logger
-def SaveVid(path):
- vidObj = cv2.VideoCapture(path)
- count = 0
- success = 1
- while success:
- success, image = vidObj.read()
- cv2.imwrite("frames/"+str(count)+".jpg", image)
- #os.rename("frames/"+str(count)+".jpg", os.path.splitext("frames/"+str(count)+".jpg")[0])
- count += 1
-
+
+def SaveVid(path):
+ vidObj = cv2.VideoCapture(path)
+ count = 0
+ success = 1
+ while success:
+ success, image = vidObj.read()
+ cv2.imwrite("frames/" + str(count) + ".jpg", image)
+ # os.rename("frames/"+str(count)+".jpg", os.path.splitext("frames/"+str(count)+".jpg")[0])
+ count += 1
+
+
def Style(pathToFrames):
files = [f for f in os.listdir(pathToFrames) if isfile(join(pathToFrames, f))]
count = 0
@@ -26,7 +28,7 @@ def Style(pathToFrames):
f = str(i)
fi = pathToFrames + f
out = fi + ".jpg"
-
+
generateVHSStyle(fi, out, silence=True)
os.rename(out, fi)
print("--------")
@@ -41,29 +43,42 @@ def Style(pathToFrames):
os.system(c)
os.chdir(cwd)
+
def generateVideo(outfile, path, infile):
frame_array = []
files = [int(f) for f in os.listdir(path) if isfile(join(path, f))]
files.sort()
- duration = subprocess.check_output(['ffprobe', '-i', infile, '-show_entries', 'format=duration', '-v', 'quiet', '-of', 'csv=%s' % ("p=0")])
- fps = len(files)/float(duration)
+ duration = subprocess.check_output(
+ [
+ "ffprobe",
+ "-i",
+ infile,
+ "-show_entries",
+ "format=duration",
+ "-v",
+ "quiet",
+ "-of",
+ "csv=%s" % ("p=0"),
+ ]
+ )
+ fps = len(files) / float(duration)
print("FPS", fps)
for i in range(len(files)):
- filename=path + str(files[i])
+ filename = path + str(files[i])
img = cv2.imread(filename)
height, width, layers = img.shape
- size = (width,height)
+ size = (width, height)
frame_array.append(img)
- out = cv2.VideoWriter(outfile,cv2.VideoWriter_fourcc(*'MP4V'), fps, size)
+ out = cv2.VideoWriter(outfile, cv2.VideoWriter_fourcc(*"MP4V"), fps, size)
for i in range(len(frame_array)):
out.write(frame_array[i])
out.release()
def VHS_Vid(infile, outfile):
- path = './frames/'
+ path = "./frames/"
os.system("rm frames/*")
os.system("mkdir frames")
logger.info("Exctracting Frames")
@@ -84,4 +99,4 @@ def VHS_Vid(infile, outfile):
os.remove("output-audio.aac")
-#VHS_Vid("video.mp4","video2.mp4") \ No newline at end of file
+# VHS_Vid("video.mp4","video2.mp4")
diff --git a/src/VaporSong.py b/src/VaporSong.py
index d2f2020..3531db7 100644
--- a/src/VaporSong.py
+++ b/src/VaporSong.py
@@ -11,174 +11,178 @@ from logzero import setup_logger
CONFIDENCE_THRESH = 0.02
+
class VaporSong:
- # Slows down Track
-
- def slow_down(src, rate, dest):
- cmd = "sox -G -D " + src + " " + dest + " speed " + str(rate)
- os.system(cmd)
- return dest
-
- # Adds Reverb
-
- def reverbize(src, dest):
- cmd = "sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7" # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344
- os.system(cmd)
- return dest
-
-
- # Crops "src" from "start" plus "start + dur" and return it in "dest"
- def crop(src,dest,start,dur):
- cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur)
- os.system(cmd)
-
-
- # Randomly crops a part of the song of at most max_sec_len.
- def random_crop(src, max_sec_len, dest):
- out = subprocess.check_output(["soxi","-D",src]).rstrip()
- f_len = int(float(out))
- if (f_len <= max_sec_len):
- os.system("cp " + src + " " + dest)
- return
- else:
- start_region = f_len - max_sec_len
- start = randint(0,start_region)
- VaporSong.crop(src,dest,start,max_sec_len)
-
-
- # Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c
- # TODO: Move away from executable and use aubio's Python module
- def fetchbeats(src):
- beat_matrix = []
- if platform == 'darwin':
- beats = subprocess.check_output(["noah", "get-beats",src]).rstrip()
- else:
- beats = subprocess.check_output(["./get-beats",src]).rstrip()
- beats_ary = beats.splitlines()
- for i in beats_ary:
- record = i.split()
- record[0] = float(record[0])/1000.0
- record[1] = float(record[1])
- beat_matrix.append(record)
- return beat_matrix
-
- # Splits an audio file into beats according to beat_matrix list
-
- def split_beat(src,beat_matrix):
- split_files = []
- for i in range(0,len(beat_matrix)-1):
-
- if(beat_matrix[i][1] > CONFIDENCE_THRESH):
- dur = (beat_matrix[i+1][0] - beat_matrix[i][0])
- out = src.split(".")[0]+str(i)+".wav"
- VaporSong.crop(src,out,beat_matrix[i][0],dur)
- split_files.append(out)
- return split_files
-
- # Combines a list of sections
-
- def combine(sections,dest):
- tocomb = []
- tocomb.append("sox")
- tocomb.append("-G")
- for section in sections:
- for sample in section:
- tocomb.append(sample)
- tocomb.append(dest)
- tmpFileLimit = len(tocomb) + 256 # in case the program messes up, it does not actually frick up your system
- n = str(tmpFileLimit)
- #logger.info("Setting file limit to ", n)
- os.system("ulimit -n " + n)
- subprocess.check_output(tocomb)
- return dest
-
- # Arbitrarily groups beats into lists of 4, 6, 8, or 9, perfect for looping.
-
- def generate_sections(ary):
- sections = []
- beats = [4,6,8,9]
- index = 0
- while(index != len(ary)):
- current_beat = beats[randint(0,len(beats)-1)]
- new_section = []
- while((current_beat != 0) and (index != len(ary))):
- new_section.append(ary[index])
- current_beat -= 1
- index += 1
- sections.append(new_section)
- return sections
-
-
- # given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect
- def dup_sections(sections):
- new_section = []
- for section in sections:
- new_section.append(section)
- if(randint(0,1) == 0):
- new_section.append(section)
- return new_section
-
- # a passage is a list of sections. This takes some sections and groups them into passages.
-
- def make_passages(sections):
- passages = []
- index = 0
- while(index != len(sections)):
- passage_len = randint(1,4)
- passage = []
- while(index != len(sections) and passage_len > 0):
- passage.append(sections[index])
- index += 1
- passage_len -= 1
- passages.append(passage)
- return passages
-
- # Given all of our passages, picks some of them and inserts them into a list some number of times.
-
- def reorder_passages(passages):
- new_passages = []
- passage_count = randint(5,12)
- while(passage_count != 0):
- passage = passages[randint(0,len(passages)-1)]
- passage_count -= 1
- dup = randint(1,4)
- while(dup != 0):
- dup -= 1
- new_passages.append(passage)
- return new_passages
-
- # converts a list of passages to a list of sections.
-
- def flatten(passages):
- sections = []
- for passage in passages:
- for section in passage:
- sections.append(section)
- return sections
-
- # It's all coming together
-
- def vaporize_song(fname, title):
- logger.info("Slowing down the music")
- VaporSong.slow_down(fname, 0.7, "beats/out.wav")
- #logger.info("Cropping")
- #VaporSong.random_crop("beats/out.wav",150,"beats/outcrop.wav")
- logger.info("Doing Beat Analysis")
- bm = VaporSong.fetchbeats("beats/out.wav")
- logger.info("Split into beats")
- splitd = VaporSong.split_beat("beats/out.wav",bm)
- #group beats to sections
- logger.info("Divide into sections")
- sections = VaporSong.generate_sections(splitd)
- logger.info("Duping Sections")
- sdup = VaporSong.dup_sections(sections)
- # group sections into passages
- paslist = VaporSong.make_passages(sdup)
- # reorder packages
- pasloop = VaporSong.reorder_passages(paslist)
- sectionflat = VaporSong.flatten(pasloop)
- logger.info("Mastering & Reverbing")
- VaporSong.combine(sectionflat,"beats/out_norev.wav")
- VaporSong.reverbize("beats/out_norev.wav","./" + (re.sub(r"\W+|_", " ", title)).replace(" ","_") + ".wav")
- logger.info("Generated V A P O R W A V E")
+ # Slows down Track
+
+ def slow_down(src, rate, dest):
+ cmd = "sox -G -D " + src + " " + dest + " speed " + str(rate)
+ os.system(cmd)
+ return dest
+
+ # Adds Reverb
+
+ def reverbize(src, dest):
+ cmd = (
+ "sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7"
+ ) # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344
+ os.system(cmd)
+ return dest
+
+ # Crops "src" from "start" plus "start + dur" and return it in "dest"
+ def crop(src, dest, start, dur):
+ cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur)
+ os.system(cmd)
+
+ # Randomly crops a part of the song of at most max_sec_len.
+ def random_crop(src, max_sec_len, dest):
+ out = subprocess.check_output(["soxi", "-D", src]).rstrip()
+ f_len = int(float(out))
+ if f_len <= max_sec_len:
+ os.system("cp " + src + " " + dest)
+ return
+ else:
+ start_region = f_len - max_sec_len
+ start = randint(0, start_region)
+ VaporSong.crop(src, dest, start, max_sec_len)
+
+ # Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c
+ # TODO: Move away from executable and use aubio's Python module
+ def fetchbeats(src):
+ beat_matrix = []
+ if platform == "darwin":
+ beats = subprocess.check_output(["noah", "get-beats", src]).rstrip()
+ else:
+ beats = subprocess.check_output(["./get-beats", src]).rstrip()
+ beats_ary = beats.splitlines()
+ for i in beats_ary:
+ record = i.split()
+ record[0] = float(record[0]) / 1000.0
+ record[1] = float(record[1])
+ beat_matrix.append(record)
+ return beat_matrix
+
+ # Splits an audio file into beats according to beat_matrix list
+
+ def split_beat(src, beat_matrix):
+ split_files = []
+ for i in range(0, len(beat_matrix) - 1):
+
+ if beat_matrix[i][1] > CONFIDENCE_THRESH:
+ dur = beat_matrix[i + 1][0] - beat_matrix[i][0]
+ out = src.split(".")[0] + str(i) + ".wav"
+ VaporSong.crop(src, out, beat_matrix[i][0], dur)
+ split_files.append(out)
+ return split_files
+
+ # Combines a list of sections
+
+ def combine(sections, dest):
+ tocomb = []
+ tocomb.append("sox")
+ tocomb.append("-G")
+ for section in sections:
+ for sample in section:
+ tocomb.append(sample)
+ tocomb.append(dest)
+ tmpFileLimit = (
+ len(tocomb) + 256
+ ) # in case the program messes up, it does not actually frick up your system
+ n = str(tmpFileLimit)
+ # logger.info("Setting file limit to ", n)
+ os.system("ulimit -n " + n)
+ subprocess.check_output(tocomb)
+ return dest
+
+ # Arbitrarily groups beats into lists of 4, 6, 8, or 9, perfect for looping.
+
+ def generate_sections(ary):
+ sections = []
+ beats = [4, 6, 8, 9]
+ index = 0
+ while index != len(ary):
+ current_beat = beats[randint(0, len(beats) - 1)]
+ new_section = []
+ while (current_beat != 0) and (index != len(ary)):
+ new_section.append(ary[index])
+ current_beat -= 1
+ index += 1
+ sections.append(new_section)
+ return sections
+
+ # given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect
+ def dup_sections(sections):
+ new_section = []
+ for section in sections:
+ new_section.append(section)
+ if randint(0, 1) == 0:
+ new_section.append(section)
+ return new_section
+
+ # a passage is a list of sections. This takes some sections and groups them into passages.
+
+ def make_passages(sections):
+ passages = []
+ index = 0
+ while index != len(sections):
+ passage_len = randint(1, 4)
+ passage = []
+ while index != len(sections) and passage_len > 0:
+ passage.append(sections[index])
+ index += 1
+ passage_len -= 1
+ passages.append(passage)
+ return passages
+
+ # Given all of our passages, picks some of them and inserts them into a list some number of times.
+
+ def reorder_passages(passages):
+ new_passages = []
+ passage_count = randint(5, 12)
+ while passage_count != 0:
+ passage = passages[randint(0, len(passages) - 1)]
+ passage_count -= 1
+ dup = randint(1, 4)
+ while dup != 0:
+ dup -= 1
+ new_passages.append(passage)
+ return new_passages
+
+ # converts a list of passages to a list of sections.
+
+ def flatten(passages):
+ sections = []
+ for passage in passages:
+ for section in passage:
+ sections.append(section)
+ return sections
+
+ # It's all coming together
+
+ def vaporize_song(fname, title):
+ logger.info("Slowing down the music")
+ VaporSong.slow_down(fname, 0.7, "beats/out.wav")
+ # logger.info("Cropping")
+ # VaporSong.random_crop("beats/out.wav",150,"beats/outcrop.wav")
+ logger.info("Doing Beat Analysis")
+ bm = VaporSong.fetchbeats("beats/out.wav")
+ logger.info("Split into beats")
+ splitd = VaporSong.split_beat("beats/out.wav", bm)
+ # group beats to sections
+ logger.info("Divide into sections")
+ sections = VaporSong.generate_sections(splitd)
+ logger.info("Duping Sections")
+ sdup = VaporSong.dup_sections(sections)
+ # group sections into passages
+ paslist = VaporSong.make_passages(sdup)
+ # reorder packages
+ pasloop = VaporSong.reorder_passages(paslist)
+ sectionflat = VaporSong.flatten(pasloop)
+ logger.info("Mastering & Reverbing")
+ VaporSong.combine(sectionflat, "beats/out_norev.wav")
+ VaporSong.reverbize(
+ "beats/out_norev.wav",
+ "./" + (re.sub(r"\W+|_", " ", title)).replace(" ", "_") + ".wav",
+ )
+ logger.info("Generated V A P O R W A V E")