Welcome, guest | Sign In | My Account | Store | Cart
#!/usr/bin/env python
#-*- coding:utf-8 -*-

#---------
# IMPORT
#---------
import mimetypes, argparse, os, subprocess, re, urllib, sys

#---------
# DEFINE
#---------
__doc__ = "Generate an m3u playlist recursively from multimedia files (video or audio)"
__author__ = "Xian Jacobs <l.oluyede@gmail.com>"
__date__ = "24 11 2013"
__version__ = "0.0"

options = [
    {
        "options": ["directory"],
        "help"  : "Directory to search multimedia files"
    },
    {
        "options": ["-p", "--play"],
        "action" : "store_true",
        "help"   : "Open playlist after generating"
    },
    {
        "options": ["-r", "--relative"],
        "action" : "store_true",
        "help"   : "Create with local pathnames relative to the M3U file location"
    },
    {
        "options": ["-n", "--just-print", "--dry-run", "--recon"],
        "action" : "store_true",
        "help"   : "Print the files that would be adde to the playlist, but do not generate it."
    }
]

MARKER_FORMAT = "#EXTM3U"
MARKER_RECORD = "#EXTINF"

FORMAT_FILENAME = "{name}.m3u"
FORMAT_M3U      = "{marker}\n{playlist}"
FORMAT_INFO     = "{marker}:{duration},{artist} - {title}"

REGEX_DURATION = re.compile(
    r"(.*Duration: )([0-9][0-9]:[0-9][0-9]:[0-9][0-9].[0-9][0-9])",
    re.MULTILINE|re.DOTALL
)

stripnulls    = lambda data: data.replace("\00", "").strip()
filetitle     = lambda filePath: os.path.splitext(os.path.basename(filePath))[0]
alphatonum    = lambda text: int(text) if text.isdigit() else text 
alphanumkey   = lambda key: [alphatonum(c) for c in re.split("([0-9]+)", key)]
sortalphanum  = lambda items, index=0: sorted(items, key=lambda x: alphanumkey(x[index]))
timeseconds   = lambda time: int(sum(
    float(x) * 60 ** i
    for i,x in enumerate(reversed(time.split(":")))
))

class Playlist(object):
    extensionsInclude = [".f4v"]
    extensionsExclude = [".m3u"]

    fieldsTag  = {
        "title"   : (  3,  33, stripnulls),
        "artist"  : ( 33,  63, stripnulls),
        "album"   : ( 63,  93, stripnulls),
        "year"    : ( 93,  97, stripnulls),
        "comment" : ( 97, 126, stripnulls),
        "genre"   : (127, 128, ord)
    }

    def __init__(self, arguments):
        self.rootDirectory = None
        self.arguments     = arguments

        mimetypes.init()

        self.extensions = self.extensionsInclude + [
            extension
            for extension in mimetypes.types_map
            if mimetypes.types_map[extension].split("/")[0] in ["video", "audio"]
        ]

    def generate(self, pathDirectory):
        self.rootDirectory = pathDirectory

        playlistItems    = self._getPlaylistItems(pathDirectory)

        if self.arguments.just_print:
            self.printPlaylistItems(playlistItems)
        
            playlistFilePath = self.getFileOutput(pathDirectory)

        else:
            playlistFilePath = self.writePlaylistItems(pathDirectory, playlistItems)

        if all([
            self.arguments.play,
            os.path.exists(playlistFilePath),
            os.path.isfile(playlistFilePath)
        ]):
            self.openPlaylist(playlistFilePath)

    def openPlaylist(self, playlistFilePath):
        if sys.platform.startswith('darwin'):
            subprocess.Popen(('open', playlistFilePath))

        elif os.name == 'nt':
            os.startfile(filepath)

        elif os.name == 'posix':
            subprocess.Popen(('xdg-open', playlistFilePath))

    def printPlaylistItems(self, playlistItems):
        playlistSorted = self.sortPlaylist(playlistItems["playlist"])

        for playlistInfo in playlistSorted:
            print(playlistInfo[1])

    def _getPlaylistItems(self, pathDirectory):
        return {
            "marker"   : MARKER_FORMAT,
            "playlist" : self.getPlaylistInfo(pathDirectory)
        }

    def writePlaylistItems(self, pathDirectory, playlistItems):
        filePathPlaylist  = self.getFileOutput(pathDirectory)

        with open(filePathPlaylist, "w") as fileOutput:
            fileOutput.write(self.formatPlaylistItems(playlistItems))

        return filePathPlaylist

    def sortPlaylist(self, playlist):
        return sortalphanum(playlist, 1)

    def formatPlaylistItems(self, playlistItems):
        playlistFormatted = map(
            lambda tags: "\n".join(tags),
            playlistItems["playlist"]
        )

        playlistItems["playlist"] = "\n".join(playlistFormatted)

        return FORMAT_M3U.format(**playlistItems)

    def flattenPlaylist(self, playlist):
        return sum(playlist, [])

    def getPlaylistInfo(self, pathDirectory):
        playlist       = self.getPlaylistsInfos(pathDirectory)
        playlistFlat   = self.flattenPlaylist(playlist)
        playlistSorted = self.sortPlaylist(playlistFlat)

        return playlistSorted

    def getPlaylistsInfos(self, pathDirectory):
        return [
            self._formatTags(tags)
            for tags in self._getInfoTags(pathDirectory)
        ] 

    def getFileOutput(self, pathDirectory):
        fileName = FORMAT_FILENAME.format(name=os.path.basename(pathDirectory))
        filePath = os.path.join(pathDirectory, fileName)

        return filePath

    def _formatTags(self, tags):
        return [
            (FORMAT_INFO.format(**fileTags), fileTags["path"])
            for fileTags in tags
        ]

    def _getInfoTags(self, pathDirectory):
        tagsFromFilePaths = [
            self._getTagsFromFileNames(directoryPath, fileNames)
            for directoryPath, directoryNames, fileNames in os.walk(pathDirectory)
        ]

        return tagsFromFilePaths

    def _getTagsFromFileNames(self, directoryPath, fileNames):
        tagsFromFilePaths = filter(
            lambda fileTag: fileTag != None,
            self.getTagsFromFileNames(directoryPath, fileNames)
        )

        return tagsFromFilePaths

    def getTagsFromFileNames(self, directoryPath, fileNames):
        return [
            self._getTagsFromFileName(directoryPath, fileName)
            for fileName in fileNames
        ]

    def _getTagsFromFileName(self, directoryPath, fileName):
        fileTitle, fileExtension = os.path.splitext(fileName)

        tagsFromFilePath = (
            self.getTagsFromFileName(os.path.join(directoryPath, fileName))
            if all([
                fileExtension in self.extensions,
                fileExtension not in self.extensionsExclude
            ])
            else None
        )

        return tagsFromFilePath

    def getMimeType(self, filePath):
        fileUrl      = urllib.pathname2url(filePath)
        fileMimeType = mimetypes.guess_type(fileUrl)[0]

        return (
            False
            if fileMimeType == None
            else fileMimeType.split("/")[0]
        )

    def getTagData(self, filePath):
        if self.getMimeType(filePath) != "audio":
            return None

        with open(filePath, "rb", 0) as fileInput:
            fileInput.seek(-128, 2)  
  
            return fileInput.read(128)

    def getTagsFromFileName(self, filePath):
        tagData = self.getTagData(filePath)

        tagsFromFilePath = dict(
            self._validateTagData(tag, start, end, parseFunc, tagData)
            if tagData != None
            else (tag, "")
            for tag, (start, end, parseFunc) in self.fieldsTag.items()
        )

        return self._formatTagsFromFilePath(filePath, tagsFromFilePath)

    def _validateTagData(self, tag, start, end, parseFunc, tagData):
        return (
            (tag, re.sub("[\n\t]", " ", str(parseFunc(tagData[start:end]))))
            if not self.validateTagData(tagData)
            else (tag, "")
        )

    def validateTagData(self, tagData):
        if "Reference&#32" in tagData:
            return False

        try:
            tagData.decode("utf-8")

        except UnicodeDecodeError:
            return False

        return True

    def getDuration(self, filePath):
        process = subprocess.Popen(
            ["ffmpeg", "-i", filePath],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT
        )

        stdout, stderr = process.communicate()

        try:
            return str(timeseconds(REGEX_DURATION.match(stdout).group(2)))

        except AttributeError:
            return 0

    def checkTagTitle(self, tagTitle, filePath):
        return (
            tagTitle
            if self.checkTagEncoding(tagTitle) != ""
            else filetitle(filePath)
        )

    def checkTagArtist(self, tagArtist):
        return (
            tagArtist
            if self.checkTagEncoding(tagArtist) != ""
            else os.path.basename(self.rootDirectory).strip()
        )

    def checkTagEncoding(self, tag):
        try:
            tag.decode("utf-8")

        except UnicodeDecodeError:
            return ""

        return tag.strip()

    def getTagPath(self, filePath):
        return (
            re.sub(r"^"+re.escape(self.rootDirectory), "", filePath).lstrip(os.path.sep)
            if self.arguments.relative
            else filePath
        )

    def _formatTagsFromFilePath(self, filePath, tagsFromFilePath):
        tags = {
            "marker"   : MARKER_RECORD,
            "path"     : self.getTagPath(filePath),
            "duration" : self.getDuration(filePath),
            "title"    : self.checkTagTitle(tagsFromFilePath["title"], filePath),
            "artist"   : self.checkTagArtist(tagsFromFilePath["artist"])
        }

        tagsFromFilePath.update(tags)

        return tagsFromFilePath

class Parser(argparse.ArgumentParser):
    def __init__(self, *args, **kwargs):
       super(Parser, self).__init__(*args, **kwargs)

    def addOption(self, option):
        options = option["options"]
        option.pop("options")

        self.add_argument(*options, **option)

    def parseOptions(self, options):
        map(self.addOption, options)

        return super(Parser, self).parse_args()
    
#---------
# MAIN
#---------
if __name__ == "__main__":
    parser    = Parser(description="Generate an m3u playlist")
    arguments = parser.parseOptions(options)

    if all([
        os.path.exists(arguments.directory),
        os.path.isdir(arguments.directory)
    ]):
        playlist = Playlist(arguments)
        playlist.generate(arguments.directory)

Diff to Previous Revision

--- revision 1 2013-11-24 11:40:22
+++ revision 2 2013-11-24 11:56:57
@@ -4,7 +4,7 @@
 #---------
 # IMPORT
 #---------
-import mimetypes, argparse, os, subprocess, re, urllib
+import mimetypes, argparse, os, subprocess, re, urllib, sys
 
 #---------
 # DEFINE
@@ -101,7 +101,17 @@
             os.path.exists(playlistFilePath),
             os.path.isfile(playlistFilePath)
         ]):
-            subprocess.Popen(["xdg-open", playlistFilePath])
+            self.openPlaylist(playlistFilePath)
+
+    def openPlaylist(self, playlistFilePath):
+        if sys.platform.startswith('darwin'):
+            subprocess.Popen(('open', playlistFilePath))
+
+        elif os.name == 'nt':
+            os.startfile(filepath)
+
+        elif os.name == 'posix':
+            subprocess.Popen(('xdg-open', playlistFilePath))
 
     def printPlaylistItems(self, playlistItems):
         playlistSorted = self.sortPlaylist(playlistItems["playlist"])

History