+# V A P O R W A V E
+A vaporwave music (+art, +video soon, I promise) generator bodged together using code from various sources ( Hopefully I have credited all of them in the source code). Runs on Python3
+## Installation
+This was tested on macOS Catalina ( so should work on almost all macOS versions).
+Windows is unsupported at this time ( I need to find a way to use aubio's python module)
+### Dependencies
+#### Linux
+sudo apt install ffmpeg libavl1 sox
+pip install -r requirements.txt
+#### macOS
+Make sure you have brew installed
+brew install noah # I would have had to re-compile the executeable :(
+brew install sox
+pip install -r requirements.txt
+## Usage
+### YouTube URL
+python3 main.py <YOUTUBE_URL>
+### Song Title
+python3 main.py Song Title
+## Bugs
+This project is a result of bodging and therefore has tons of bugs which need to be ironed out
+## To-Do
+[ ] Move away from using os.system calls, and use Python modules instead ( Looking at you, Sox and aubio)
+[ ] Clean the Code
+[ ] Add Artwork Generator
+[ ] Add Video Generator
+import os
+import subprocess
+import re
+from random import randint
+import logzero
+from logzero import logger
+from logzero import setup_logger
+class VaporSong:
+ # Slows down Track
+ def slow_down(src, rate, dest):
+ cmd = "sox -G -D " + src + " " + dest + " speed " + str(rate)
+ os.system(cmd)
+ return dest
+ # Adds Reverb
+ def reverbize(src, dest):
+ cmd = "sox -G -D " + src + " " + dest + " reverb 100 fade 5 -0 7" # idk what this does tbh, https://stackoverflow.com/a/57767238/8386344
+ os.system(cmd)
+ return dest
+ # Crops "src" from "start" plus "start + dur" and return it in "dest"
+ def crop(src,dest,start,dur):
+ cmd = "sox " + src + " " + dest + " trim " + " " + str(start) + " " + str(dur)
+ os.system(cmd)
+ # Randomly crops a part of the song of at most max_sec_len.
+ def random_crop(src, max_sec_len, dest):
+ out = subprocess.check_output(["soxi","-D",src]).rstrip()
+ f_len = int(float(out))
+ if (f_len <= max_sec_len):
+ os.system("cp " + src + " " + dest)
+ return
+ else:
+ start_region = f_len - max_sec_len
+ start = randint(0,start_region)
+ VaporSong.crop(src,dest,start,max_sec_len)
+ # Given a file, returns a list of [beats, confidence], executable based on audibo's test-beattracking.c
+ # TODO: Move away from executable and use aubio's Python module
+ def fetchbeats(src):
+ beat_matrix = []
+ if os.name == 'posix':
+ beats = subprocess.check_output(["noah", "get-beats",src]).rstrip()
+ else:
+ beats = subprocess.check_output(["get-beats",src]).rstrip()
+ beats_ary = beats.splitlines()
+ for i in beats_ary:
+ record = i.split()
+ record[0] = float(record[0])/1000.0
+ record[1] = float(record[1])
+ beat_matrix.append(record)
+ return beat_matrix
+ # Splits an audio file into beats according to beat_matrix list
+ def split_beat(src,beat_matrix):
+ split_files = []
+ for i in range(0,len(beat_matrix)-1):
+ if(beat_matrix[i][1] > CONFIDENCE_THRESH):
+ dur = (beat_matrix[i+1][0] - beat_matrix[i][0])
+ out = src.split(".")[0]+str(i)+".wav"
+ VaporSong.crop(src,out,beat_matrix[i][0],dur)
+ split_files.append(out)
+ return split_files
+ # Combines a list of sections
+ def combine(sections,dest):
+ tocomb = []
+ tocomb.append("sox")
+ tocomb.append("-G")
+ for section in sections:
+ for sample in section:
+ tocomb.append(sample)
+ tocomb.append(dest)
+ tmpFileLimit = len(tocomb) + 256 # in case the program messes up, it does not actually frick up your system
+ os.system("ulimit -n " + str(tmpFileLimit))
+ subprocess.check_output(tocomb)
+ return dest
+ # Arbitrarily groups beats into lists of 4, 6, 8, or 9, perfect for looping.
+ def generate_sections(ary):
+ sections = []
+ beats = [4,6,8,9]
+ index = 0
+ while(index != len(ary)):
+ current_beat = beats[randint(0,len(beats)-1)]
+ new_section = []
+ while((current_beat != 0) and (index != len(ary))):
+ new_section.append(ary[index])
+ current_beat -= 1
+ index += 1
+ sections.append(new_section)
+ return sections
+ # given a list of sections, selects some of them and duplicates them, perfect for that vaporwave looping effect
+ def dup_sections(sections):
+ new_section = []
+ for section in sections:
+ new_section.append(section)
+ if(randint(0,1) == 0):
+ new_section.append(section)
+ return new_section
+ # a passage is a list of sections. This takes some sections and groups them into passages.
+ def make_passages(sections):
+ passages = []
+ index = 0
+ while(index != len(sections)):
+ passage_len = randint(1,4)
+ passage = []
+ while(index != len(sections) and passage_len > 0):
+ passage.append(sections[index])
+ index += 1
+ passage_len -= 1
+ passages.append(passage)
+ return passages
+ # Given all of our passages, picks some of them and inserts them into a list some number of times.
+ def reorder_passages(passages):
+ new_passages = []
+ passage_count = randint(5,12)
+ while(passage_count != 0):
+ passage = passages[randint(0,len(passages)-1)]
+ passage_count -= 1
+ dup = randint(1,4)
+ while(dup != 0):
+ dup -= 1
+ new_passages.append(passage)
+ return new_passages
+ # converts a list of passages to a list of sections.
+ def flatten(passages):
+ sections = []
+ for passage in passages:
+ for section in passage:
+ sections.append(section)
+ return sections
+ # It's all coming together
+ def vaporize_song(fname, title):
+ logger.info("Slowing down the music")
+ VaporSong.slow_down(fname, 0.7, "beats/out.wav")
+ #logger.info("Cropping")
+ #VaporSong.random_crop("beats/out.wav",150,"beats/outcrop.wav")
+ logger.info("Doing Beat Analysis")
+ bm = VaporSong.fetchbeats("beats/out.wav")
+ logger.info("Split into beats")
+ splitd = VaporSong.split_beat("beats/out.wav",bm)
+ #group beats to sections
+ logger.info("Divide into sections")
+ sections = VaporSong.generate_sections(splitd)
+ logger.info("Duping Sections")
+ sdup = VaporSong.dup_sections(sections)
+ # group sections into passages
+ paslist = VaporSong.make_passages(sdup)
+ # reorder packages
+ pasloop = VaporSong.reorder_passages(paslist)
+ sectionflat = VaporSong.flatten(pasloop)
+ logger.info("Mastering & Reverbing")
+ VaporSong.combine(sectionflat,"beats/out_norev.wav")
+ VaporSong.reverbize("beats/out_norev.wav","./" + (re.sub(r"\W+|_", " ", title)).replace(" ","_") + ".wav")
+ logger.info("Generated V A P O R W A V E")
+from VaporSong import VaporSong
+import os
+import sys
+import youtube_dl
+import logzero
+from logzero import logger
+from logzero import setup_logger
+import re
+import urllib.request
+import urllib.parse
+import time
+MAX_DURATION = 600 # In-case the program finds a compilation
+youtube_urls = ('youtube.com', 'https://www.youtube.com/', 'http://www.youtube.com/', 'http://youtu.be/', 'https://youtu.be/', 'youtu.be')
+def download_file(query,request_id=1):
+ """Returns audio to the vapor command handler
+ Searches YouTube for 'query', finds first match that has
+ duration under the limit, download video with youtube_dl
+ and extract .wav audio with ffmpeg.
+ Query can be YouTube link.
+ """
+ ydl_opts = {
+ 'quiet': 'True',
+ 'format': 'bestaudio/best',
+ 'outtmpl': str(request_id) +'.%(ext)s',
+ 'prefer_ffmpeg': 'True',
+ 'noplaylist': 'True',
+ 'postprocessors': [{
+ 'key': 'FFmpegExtractAudio',
+ 'preferredcodec': 'wav',
+ 'preferredquality': '192',
+ }],
+ }
+ original_path = str(request_id) + ".wav"
+ file_title = ""
+ # check if query is youtube url
+ if not query.lower().startswith((youtube_urls)):
+ # search for youtube videos matching query
+ query_string = urllib.parse.urlencode({"search_query" : query})
+ html_content = urllib.request.urlopen("http://www.youtube.com/results?" + query_string)
+ search_results = re.findall(r'href=\"\/watch\?v=(.{11})', html_content.read().decode())
+ info = False
+ # find video that fits max duration
+ logger.info("Get video information...")
+ for url in search_results:
+ # check for video duration
+ try:
+ info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
+ except Exception as e:
+ logger.error(e)
+ raise ValueError('Could not get information about video.')
+ full_title = info['title']
+ if (info['duration'] < MAX_DURATION and info['duration'] >= 5):
+ # get first video that fits the limit duration
+ logger.info("Got video: " + str(full_title))
+ file_title = info['title']
+ break
+ # if we ran out of urls, return error
+ if (not info):
+ raise ValueError('Could not find a video.')
+ # query was a youtube link
+ else:
+ logger.info("Query was a YouTube URL.")
+ url = query
+ info = youtube_dl.YoutubeDL(ydl_opts).extract_info(url,download = False)
+ file_title = info['title']
+ # check if video fits limit duration
+ if (info['duration'] < 5 or info['duration'] > MAX_DURATION):
+ raise ValueError('Video is too short. Need 5 seconds or more.')
+ # download video and extract audio
+ logger.info("Downloading video...")
+ with youtube_dl.YoutubeDL(ydl_opts) as ydl:
+ try:
+ ydl.download([url])
+ except Exception as e:
+ logger.error(e)
+ raise ValueError('Could not download ' + str(full_title) + '.')
+ return original_path, file_title
+def gen_vapor(filePath, title):
+ # Delete stuff if there is anything left over.
+ os.system("rm -r download/")
+ os.system("rm -r beats/")
+ # Make the proper folders for intermediate steps
+ os.system("mkdir download/")
+ os.system("mkdir beats/")
+ # Download the youtube query's first result. Might be wrong but YOLO
+ #YTDownloader.download_wav_to_samp2(query)
+ # For every song in download folder(just one for now)
+ """
+ for fs in os.listdir("download/"):
+ # Slow down the song.
+ VaporSong.vaporize_song(query,"download/"+fs)
+ pass
+ # When we are finished, delete the old folders.
+ """
+ VaporSong.vaporize_song(filePath, title)
+ os.system("rm -r download/")
+ os.system("rm -r beats/")
+## Makes this a command line tool: disable when we get the webserver going
+query = ""
+for s in sys.argv:
+ query = query + s
+name, title = download_file(query)
+gen_vapor(name, title)