From f0002ea292effc5ef5899769080d1fb0f3e4663a Mon Sep 17 00:00:00 2001 From: Frederik Rietdijk Date: Tue, 6 Jun 2017 17:20:10 +0200 Subject: [PATCH] Python: improve update script - remove dead code - improve readability - skip fetchFromGitHub (#26320) --- maintainers/scripts/update-python-libraries | 287 ++++++++++---------- 1 file changed, 139 insertions(+), 148 deletions(-) diff --git a/maintainers/scripts/update-python-libraries b/maintainers/scripts/update-python-libraries index 90f6c94233ac..278c467b0542 100755 --- a/maintainers/scripts/update-python-libraries +++ b/maintainers/scripts/update-python-libraries @@ -25,18 +25,33 @@ INDEX = "https://pypi.io/pypi" EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl'] """Permitted file extensions. These are evaluated from left to right and the first occurance is returned.""" -def _get_value(attribute, text): - """Match attribute in text and return it.""" +import logging +logging.basicConfig(level=logging.INFO) + + +def _get_values(attribute, text): + """Match attribute in text and return all matches. + + :returns: List of matches. + """ regex = '{}\s+=\s+"(.*)";'.format(attribute) regex = re.compile(regex) - value = regex.findall(text) - n = len(value) + values = regex.findall(text) + return values + +def _get_unique_value(attribute, text): + """Match attribute in text and return unique match. + + :returns: Single match. + """ + values = _get_values(attribute, text) + n = len(values) if n > 1: - raise ValueError("Found too many values for {}".format(attribute)) + raise ValueError("found too many values for {}".format(attribute)) elif n == 1: - return value[0] + return values[0] else: - raise ValueError("No value found for {}".format(attribute)) + raise ValueError("no value found for {}".format(attribute)) def _get_line_and_value(attribute, text): """Match attribute in text. Return the line and the value of the attribute.""" @@ -45,11 +60,11 @@ def _get_line_and_value(attribute, text): value = regex.findall(text) n = len(value) if n > 1: - raise ValueError("Found too many values for {}".format(attribute)) + raise ValueError("found too many values for {}".format(attribute)) elif n == 1: return value[0] else: - raise ValueError("No value found for {}".format(attribute)) + raise ValueError("no value found for {}".format(attribute)) def _replace_value(attribute, value, text): @@ -64,175 +79,151 @@ def _fetch_page(url): if r.status_code == requests.codes.ok: return r.json() else: - raise ValueError("Request for {} failed".format(url)) - -def _get_latest_version(package, extension): - + raise ValueError("request for {} failed".format(url)) +def _get_latest_version_pypi(package, extension): + """Get latest version and hash from PyPI.""" url = "{}/{}/json".format(INDEX, package) json = _fetch_page(url) - data = extract_relevant_nix_data(json, extension)[1] - - version = data['latest_version'] - if version in data['versions']: - sha256 = data['versions'][version]['sha256'] - else: - sha256 = None # Its possible that no file was uploaded to PyPI + version = json['info']['version'] + for release in json['releases'][version]: + if release['filename'].endswith(extension): + # TODO: In case of wheel we need to do further checks! + sha256 = release['digests']['sha256'] return version, sha256 -def extract_relevant_nix_data(json, extension): - """Extract relevant Nix data from the JSON of a package obtained from PyPI. +def _get_latest_version_github(package, extension): + raise ValueError("updating from GitHub is not yet supported.") - :param json: JSON obtained from PyPI + +FETCHERS = { + 'fetchFromGitHub' : _get_latest_version_github, + 'fetchPypi' : _get_latest_version_pypi, + 'fetchurl' : _get_latest_version_pypi, +} + + +DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz' + + +FORMATS = { + 'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION, + 'wheel' : 'whl' +} + +def _determine_fetcher(text): + # Count occurences of fetchers. + nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys()) + if nfetchers == 0: + raise ValueError("no fetcher.") + elif nfetchers > 1: + raise ValueError("multiple fetchers.") + else: + # Then we check which fetcher to use. + for fetcher in FETCHERS.keys(): + if 'src = {}'.format(fetcher) in text: + return fetcher + + +def _determine_extension(text, fetcher): + """Determine what extension is used in the expression. + + If we use: + - fetchPypi, we check if format is specified. + - fetchurl, we determine the extension from the url. + - fetchFromGitHub we simply use `.tar.gz`. """ - def _extract_license(json): - """Extract license from JSON.""" - return json['info']['license'] + if fetcher == 'fetchPypi': + try: + format = _get_unique_value('format', text) + except ValueError as e: + format = None # format was not given - def _available_versions(json): - return json['releases'].keys() + try: + extension = _get_unique_value('extension', text) + except ValueError as e: + extension = None # extension was not given - def _extract_latest_version(json): - return json['info']['version'] + if extension is None: + if format is None: + format = 'setuptools' + extension = FORMATS[format] - def _get_src_and_hash(json, version, extensions): - """Obtain url and hash for a given version and list of allowable extensions.""" - if not json['releases']: - msg = "Package {}: No releases available.".format(json['info']['name']) - raise ValueError(msg) - else: - # We use ['releases'] and not ['urls'] because we want to have the possibility for different version. - for possible_file in json['releases'][version]: - for extension in extensions: - if possible_file['filename'].endswith(extension): - src = {'url': str(possible_file['url']), - 'sha256': str(possible_file['digests']['sha256']), - } - return src - else: - msg = "Package {}: No release with valid file extension available.".format(json['info']['name']) - logging.info(msg) - return None - #raise ValueError(msg) + elif fetcher == 'fetchurl': + url = _get_unique_value('url', text) + extension = os.path.splitext(url)[1] + if 'pypi' not in url: + raise ValueError('url does not point to PyPI.') - def _get_sources(json, extensions): - versions = _available_versions(json) - releases = {version: _get_src_and_hash(json, version, extensions) for version in versions} - releases = toolz.itemfilter(lambda x: x[1] is not None, releases) - return releases + elif fetcher == 'fetchFromGitHub': + raise ValueError('updating from GitHub is not yet implemented.') - # Collect data) - name = str(json['info']['name']) - latest_version = str(_extract_latest_version(json)) - #src = _get_src_and_hash(json, latest_version, EXTENSIONS) - sources = _get_sources(json, [extension]) - - # Collect meta data - license = str(_extract_license(json)) - license = license if license != "UNKNOWN" else None - summary = str(json['info'].get('summary')).strip('.') - summary = summary if summary != "UNKNOWN" else None - #description = str(json['info'].get('description')) - #description = description if description != "UNKNOWN" else None - homepage = json['info'].get('home_page') - - data = { - 'latest_version' : latest_version, - 'versions' : sources, - #'src' : src, - 'meta' : { - 'description' : summary if summary else None, - #'longDescription' : description, - 'license' : license, - 'homepage' : homepage, - }, - } - return name, data + return extension def _update_package(path): + + + # Read the expression + with open(path, 'r') as f: + text = f.read() + + # Determine pname. + pname = _get_unique_value('pname', text) + + # Determine version. + version = _get_unique_value('version', text) + + # First we check how many fetchers are mentioned. + fetcher = _determine_fetcher(text) + + extension = _determine_extension(text, fetcher) + + new_version, new_sha256 = _get_latest_version_pypi(pname, extension) + + if new_version == version: + logging.info("Path {}: no update available for {}.".format(path, pname)) + return False + if not new_sha256: + raise ValueError("no file available for {}.".format(pname)) + + text = _replace_value('version', new_version, text) + text = _replace_value('sha256', new_sha256, text) + + with open(path, 'w') as f: + f.write(text) + + logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version)) + + return True + + +def _update(path): + # We need to read and modify a Nix expression. if os.path.isdir(path): path = os.path.join(path, 'default.nix') + # If a default.nix does not exist, we quit. if not os.path.isfile(path): - logging.warning("Path does not exist: {}".format(path)) + logging.info("Path {}: does not exist.".format(path)) return False + # If file is not a Nix expression, we quit. if not path.endswith(".nix"): - logging.warning("Path does not end with `.nix`, skipping: {}".format(path)) - return False - - with open(path, 'r') as f: - text = f.read() - - try: - pname = _get_value('pname', text) - except ValueError as e: - logging.warning("Path {}: {}".format(path, str(e))) + logging.info("Path {}: does not end with `.nix`.".format(path)) return False try: - version = _get_value('version', text) + return _update_package(path) except ValueError as e: - logging.warning("Path {}: {}".format(path, str(e))) + logging.warning("Path {}: {}".format(path, e)) return False - # If we use a wheel, then we need to request a wheel as well - try: - format = _get_value('format', text) - except ValueError as e: - # No format mentioned, then we assume we have setuptools - # and use a .tar.gz - logging.info("Path {}: {}".format(path, str(e))) - extension = ".tar.gz" - else: - if format == 'wheel': - extension = ".whl" - else: - try: - url = _get_value('url', text) - extension = os.path.splitext(url)[1] - if 'pypi' not in url: - logging.warning("Path {}: uses non-PyPI url, not updating.".format(path)) - return False - except ValueError as e: - logging.info("Path {}: {}".format(path, str(e))) - extension = ".tar.gz" - - try: - new_version, new_sha256 = _get_latest_version(pname, extension) - except ValueError as e: - logging.warning("Path {}: {}".format(path, str(e))) - else: - if not new_sha256: - logging.warning("Path has no valid file available: {}".format(path)) - return False - if new_version != version: - try: - text = _replace_value('version', new_version, text) - except ValueError as e: - logging.warning("Path {}: {}".format(path, str(e))) - try: - text = _replace_value('sha256', new_sha256, text) - except ValueError as e: - logging.warning("Path {}: {}".format(path, str(e))) - - with open(path, 'w') as f: - f.write(text) - - logging.info("Updated {} from {} to {}".format(pname, version, new_version)) - - else: - logging.info("No update available for {} at {}".format(pname, version)) - - return True - - def main(): parser = argparse.ArgumentParser() @@ -240,11 +231,11 @@ def main(): args = parser.parse_args() - packages = args.package + packages = map(os.path.abspath, args.package) - count = list(map(_update_package, packages)) + count = list(map(_update, packages)) - #logging.info("{} package(s) updated".format(sum(count))) + logging.info("{} package(s) updated".format(sum(count))) if __name__ == '__main__': main() \ No newline at end of file