Source code for dlmanager.manager

import os
import requests
import six
import sys
import tempfile
import threading

from contextlib import closing
from six.moves.urllib.parse import urlparse

from dlmanager import fs
from dlmanager.persist_limit import PersistLimit


[docs]class DownloadInterrupt(Exception): "Raised when a download is interrupted."
[docs]class Download(object): """ Download is reponsible of downloading one file in the background. Example of use: :: dl = Download(url, dest) dl.start() dl.wait() # this will block until completion / cancel / error If a download fail or is canceled, the temporary dest is removed from the disk. Usually, Downloads are created by using :meth:`DownloadManager.download`. :param url: the url of the file to download :param dest: the local file path destination :param finished_callback: a callback that will be called in the thread when the thread work is done. Takes the download instance as a parameter. :param chunk_size: size of the chunk that will be read. The thread can not be stopped while we are reading that chunk size. :param session: a requests.Session instance that will do do the real downloading work. If None, `requests` module is used. :param progress: A callable to report the progress (default to None). see :meth:`set_progress`. """ def __init__(self, url, dest, finished_callback=None, chunk_size=16 * 1024, session=None, progress=None): self.thread = threading.Thread( target=self._download, args=(url, dest, finished_callback, chunk_size, session or requests) ) self._lock = threading.Lock() self.__url = url self.__dest = dest self.__progress = progress self.__canceled = False self.__error = None
[docs] def start(self): """ Start the thread that will do the download. """ self.thread.start()
[docs] def cancel(self): """ Cancel a previously started download. """ self.__canceled = True
[docs] def is_canceled(self): """ Returns True if we canceled this download. """ return self.__canceled
[docs] def is_running(self): """ Returns True if the downloading thread is running. """ return self.thread.is_alive()
[docs] def wait(self, raise_if_error=True): """ Block until the downloading thread is finished. :param raise_if_error: if True (the default), :meth:`raise_if_error` will be called and raise an error if any. """ while self.thread.is_alive(): try: # in case of exception here (like KeyboardInterrupt), # cancel the task. self.thread.join(0.02) except: self.cancel() raise # this will raise exception that may happen inside the thread. if raise_if_error: self.raise_if_error()
[docs] def error(self): """ Returns None or a tuple of three values (type, value, traceback) that give information about the exception. """ return self.__error
[docs] def raise_if_error(self): """ Raise an error if any. If the download was canceled, raise :class:`DownloadInterrupt`. """ if self.__error: six.reraise(*self.__error) if self.__canceled: raise DownloadInterrupt()
[docs] def set_progress(self, progress): """ set a callable to report the progress of the download, or None to disable any report. The callable must take three parameters (download, current, total). Note that this method is thread safe, you can call it during a download. """ with self._lock: self.__progress = progress
[docs] def get_dest(self): """ Returns the dest. """ return self.__dest
[docs] def get_url(self): """ Returns the url. """ return self.__url
def _update_progress(self, current, total): with self._lock: if self.__progress: self.__progress(self, current, total) def _download(self, url, dest, finished_callback, chunk_size, session): # save the file under a temporary name # this allow to not use a broken file in case things went really bad # while downloading the file (ie the python interpreter is killed # abruptly) temp = None bytes_so_far = 0 try: with closing(session.get(url, stream=True)) as response: total_size = response.headers.get('Content-length', '').strip() total_size = int(total_size) if total_size else None self._update_progress(bytes_so_far, total_size) # we use NamedTemporaryFile as raw open() call was causing # issues on windows - see: # https://bugzilla.mozilla.org/show_bug.cgi?id=1185756 with tempfile.NamedTemporaryFile( delete=False, suffix='.tmp', dir=os.path.dirname(dest)) as temp: for chunk in response.iter_content(chunk_size): if self.is_canceled(): break if chunk: temp.write(chunk) bytes_so_far += len(chunk) self._update_progress(bytes_so_far, total_size) response.raise_for_status() except: self.__error = sys.exc_info() try: if temp is None: pass # not even opened the temp file, nothing to do elif self.is_canceled() or self.__error: fs.remove(temp.name) else: # if all goes well, then rename the file to the real dest fs.remove(dest) # just in case it already existed fs.move(temp.name, dest) finally: if finished_callback: finished_callback(self)
[docs]class DownloadManager(object): """ DownloadManager is responsible of starting and managing downloads inside a given directory. It will download a file only if a given filename is not already there. Note that background downloads needs to be stopped. For example, if you have an exception while a download is occuring, python will only exit when the download will finish. To get rid of that, there is a possible idiom: :: def download_things(manager): # do things with the manager manager.download(url1, f1) manager.download(url2, f2) ... manager = DownloadManager(destdir) try: download_things(manager) finally: # ensure we cancel all background downloads to ask the end # of possible remainings threads manager.cancel() :param destdir: a directory where files are downloaded. It will be created if it does not exists. :param session: a requests session. If None, one will be created for you. :param persist_limit: an instance of :class:`PersistLimit`, to allow limiting the size of the download dir. Defaults to None, meaning no limit. """ def __init__(self, destdir, session=None, persist_limit=None): self.destdir = destdir self.session = session or requests.Session() self._downloads = {} self._lock = threading.Lock() self.persist_limit = persist_limit or PersistLimit(0) self.persist_limit.register_dir_content(self.destdir) # if persist folder does not exist, create it if not os.path.isdir(destdir): os.makedirs(destdir) def get_dest(self, fname): return os.path.join(self.destdir, fname)
[docs] def cancel(self, cancel_if=None): """ Cancel downloads, if any. if cancel_if is given, it must be a callable that take the download instance as parameter, and return True if the download needs to be canceled. Note that download threads won't be stopped directly. """ with self._lock: for download in six.itervalues(self._downloads): if cancel_if is None or cancel_if(download): if download.is_running(): download.cancel()
[docs] def wait(self, raise_if_error=True): """ Wait for all downloads to be finished. """ for download in self._downloads.values(): download.wait(raise_if_error=raise_if_error)
[docs] def download(self, url, fname=None, progress=None): """ Returns a started :class:`Download` instance, or None if fname is already present in destdir. if a download is already running for the given fname, it is just returned. Else the download is created, started and returned. :param url: url of the file to download. :param fname: name to give for the downloaded file. If None, it will be the name extracted in the url. :param progress: a callable to report the download progress, or None. See :meth:`Download.set_progress`. """ if fname is None: fname = urlparse(url).path.split('/')[-1] dest = self.get_dest(fname) with self._lock: # if we are downloading, returns the instance if dest in self._downloads: dl = self._downloads[dest] if progress: dl.set_progress(progress) return dl if os.path.exists(dest): return None # else create the download (will be automatically removed of # the list on completion) start it, and returns that. with self._lock: download = Download(url, dest, session=self.session, finished_callback=self._download_finished, progress=progress) self._downloads[dest] = download download.start() self._download_started(download) return download
def _download_started(self, dl): """ Useful when sub-classing. Report the start event of a download. :param dl: The :class:`Download` instance. """ pass def _download_finished(self, dl): """ Useful when sub-classing. Report the end of a download. Note that this is executed in the download thread. Also, you should make sure to call the base implementation. :param dl: The :class:`Download` instance. """ with self._lock: dest = dl.get_dest() del self._downloads[dest] self.persist_limit.register_file(dest) self.persist_limit.remove_old_files()