3
0
Fork 0
forked from mirrors/nixpkgs

Python: improve update script

- remove dead code
- improve readability
- skip fetchFromGitHub (#26320)
This commit is contained in:
Frederik Rietdijk 2017-06-06 17:20:10 +02:00
parent 5d488af46b
commit f0002ea292

View file

@ -25,18 +25,33 @@ INDEX = "https://pypi.io/pypi"
EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl'] EXTENSIONS = ['tar.gz', 'tar.bz2', 'tar', 'zip', '.whl']
"""Permitted file extensions. These are evaluated from left to right and the first occurance is returned.""" """Permitted file extensions. These are evaluated from left to right and the first occurance is returned."""
def _get_value(attribute, text): import logging
"""Match attribute in text and return it.""" logging.basicConfig(level=logging.INFO)
def _get_values(attribute, text):
"""Match attribute in text and return all matches.
:returns: List of matches.
"""
regex = '{}\s+=\s+"(.*)";'.format(attribute) regex = '{}\s+=\s+"(.*)";'.format(attribute)
regex = re.compile(regex) regex = re.compile(regex)
value = regex.findall(text) values = regex.findall(text)
n = len(value) return values
def _get_unique_value(attribute, text):
"""Match attribute in text and return unique match.
:returns: Single match.
"""
values = _get_values(attribute, text)
n = len(values)
if n > 1: if n > 1:
raise ValueError("Found too many values for {}".format(attribute)) raise ValueError("found too many values for {}".format(attribute))
elif n == 1: elif n == 1:
return value[0] return values[0]
else: else:
raise ValueError("No value found for {}".format(attribute)) raise ValueError("no value found for {}".format(attribute))
def _get_line_and_value(attribute, text): def _get_line_and_value(attribute, text):
"""Match attribute in text. Return the line and the value of the attribute.""" """Match attribute in text. Return the line and the value of the attribute."""
@ -45,11 +60,11 @@ def _get_line_and_value(attribute, text):
value = regex.findall(text) value = regex.findall(text)
n = len(value) n = len(value)
if n > 1: if n > 1:
raise ValueError("Found too many values for {}".format(attribute)) raise ValueError("found too many values for {}".format(attribute))
elif n == 1: elif n == 1:
return value[0] return value[0]
else: else:
raise ValueError("No value found for {}".format(attribute)) raise ValueError("no value found for {}".format(attribute))
def _replace_value(attribute, value, text): def _replace_value(attribute, value, text):
@ -64,175 +79,151 @@ def _fetch_page(url):
if r.status_code == requests.codes.ok: if r.status_code == requests.codes.ok:
return r.json() return r.json()
else: else:
raise ValueError("Request for {} failed".format(url)) raise ValueError("request for {} failed".format(url))
def _get_latest_version(package, extension):
def _get_latest_version_pypi(package, extension):
"""Get latest version and hash from PyPI."""
url = "{}/{}/json".format(INDEX, package) url = "{}/{}/json".format(INDEX, package)
json = _fetch_page(url) json = _fetch_page(url)
data = extract_relevant_nix_data(json, extension)[1] version = json['info']['version']
for release in json['releases'][version]:
version = data['latest_version'] if release['filename'].endswith(extension):
if version in data['versions']: # TODO: In case of wheel we need to do further checks!
sha256 = data['versions'][version]['sha256'] sha256 = release['digests']['sha256']
else:
sha256 = None # Its possible that no file was uploaded to PyPI
return version, sha256 return version, sha256
def extract_relevant_nix_data(json, extension): def _get_latest_version_github(package, extension):
"""Extract relevant Nix data from the JSON of a package obtained from PyPI. raise ValueError("updating from GitHub is not yet supported.")
:param json: JSON obtained from PyPI
FETCHERS = {
'fetchFromGitHub' : _get_latest_version_github,
'fetchPypi' : _get_latest_version_pypi,
'fetchurl' : _get_latest_version_pypi,
}
DEFAULT_SETUPTOOLS_EXTENSION = 'tar.gz'
FORMATS = {
'setuptools' : DEFAULT_SETUPTOOLS_EXTENSION,
'wheel' : 'whl'
}
def _determine_fetcher(text):
# Count occurences of fetchers.
nfetchers = sum(text.count('src = {}'.format(fetcher)) for fetcher in FETCHERS.keys())
if nfetchers == 0:
raise ValueError("no fetcher.")
elif nfetchers > 1:
raise ValueError("multiple fetchers.")
else:
# Then we check which fetcher to use.
for fetcher in FETCHERS.keys():
if 'src = {}'.format(fetcher) in text:
return fetcher
def _determine_extension(text, fetcher):
"""Determine what extension is used in the expression.
If we use:
- fetchPypi, we check if format is specified.
- fetchurl, we determine the extension from the url.
- fetchFromGitHub we simply use `.tar.gz`.
""" """
def _extract_license(json): if fetcher == 'fetchPypi':
"""Extract license from JSON.""" try:
return json['info']['license'] format = _get_unique_value('format', text)
except ValueError as e:
format = None # format was not given
def _available_versions(json): try:
return json['releases'].keys() extension = _get_unique_value('extension', text)
except ValueError as e:
extension = None # extension was not given
def _extract_latest_version(json): if extension is None:
return json['info']['version'] if format is None:
format = 'setuptools'
extension = FORMATS[format]
def _get_src_and_hash(json, version, extensions): elif fetcher == 'fetchurl':
"""Obtain url and hash for a given version and list of allowable extensions.""" url = _get_unique_value('url', text)
if not json['releases']: extension = os.path.splitext(url)[1]
msg = "Package {}: No releases available.".format(json['info']['name']) if 'pypi' not in url:
raise ValueError(msg) raise ValueError('url does not point to PyPI.')
else:
# We use ['releases'] and not ['urls'] because we want to have the possibility for different version.
for possible_file in json['releases'][version]:
for extension in extensions:
if possible_file['filename'].endswith(extension):
src = {'url': str(possible_file['url']),
'sha256': str(possible_file['digests']['sha256']),
}
return src
else:
msg = "Package {}: No release with valid file extension available.".format(json['info']['name'])
logging.info(msg)
return None
#raise ValueError(msg)
def _get_sources(json, extensions): elif fetcher == 'fetchFromGitHub':
versions = _available_versions(json) raise ValueError('updating from GitHub is not yet implemented.')
releases = {version: _get_src_and_hash(json, version, extensions) for version in versions}
releases = toolz.itemfilter(lambda x: x[1] is not None, releases)
return releases
# Collect data) return extension
name = str(json['info']['name'])
latest_version = str(_extract_latest_version(json))
#src = _get_src_and_hash(json, latest_version, EXTENSIONS)
sources = _get_sources(json, [extension])
# Collect meta data
license = str(_extract_license(json))
license = license if license != "UNKNOWN" else None
summary = str(json['info'].get('summary')).strip('.')
summary = summary if summary != "UNKNOWN" else None
#description = str(json['info'].get('description'))
#description = description if description != "UNKNOWN" else None
homepage = json['info'].get('home_page')
data = {
'latest_version' : latest_version,
'versions' : sources,
#'src' : src,
'meta' : {
'description' : summary if summary else None,
#'longDescription' : description,
'license' : license,
'homepage' : homepage,
},
}
return name, data
def _update_package(path): def _update_package(path):
# Read the expression
with open(path, 'r') as f:
text = f.read()
# Determine pname.
pname = _get_unique_value('pname', text)
# Determine version.
version = _get_unique_value('version', text)
# First we check how many fetchers are mentioned.
fetcher = _determine_fetcher(text)
extension = _determine_extension(text, fetcher)
new_version, new_sha256 = _get_latest_version_pypi(pname, extension)
if new_version == version:
logging.info("Path {}: no update available for {}.".format(path, pname))
return False
if not new_sha256:
raise ValueError("no file available for {}.".format(pname))
text = _replace_value('version', new_version, text)
text = _replace_value('sha256', new_sha256, text)
with open(path, 'w') as f:
f.write(text)
logging.info("Path {}: updated {} from {} to {}".format(path, pname, version, new_version))
return True
def _update(path):
# We need to read and modify a Nix expression. # We need to read and modify a Nix expression.
if os.path.isdir(path): if os.path.isdir(path):
path = os.path.join(path, 'default.nix') path = os.path.join(path, 'default.nix')
# If a default.nix does not exist, we quit.
if not os.path.isfile(path): if not os.path.isfile(path):
logging.warning("Path does not exist: {}".format(path)) logging.info("Path {}: does not exist.".format(path))
return False return False
# If file is not a Nix expression, we quit.
if not path.endswith(".nix"): if not path.endswith(".nix"):
logging.warning("Path does not end with `.nix`, skipping: {}".format(path)) logging.info("Path {}: does not end with `.nix`.".format(path))
return False
with open(path, 'r') as f:
text = f.read()
try:
pname = _get_value('pname', text)
except ValueError as e:
logging.warning("Path {}: {}".format(path, str(e)))
return False return False
try: try:
version = _get_value('version', text) return _update_package(path)
except ValueError as e: except ValueError as e:
logging.warning("Path {}: {}".format(path, str(e))) logging.warning("Path {}: {}".format(path, e))
return False return False
# If we use a wheel, then we need to request a wheel as well
try:
format = _get_value('format', text)
except ValueError as e:
# No format mentioned, then we assume we have setuptools
# and use a .tar.gz
logging.info("Path {}: {}".format(path, str(e)))
extension = ".tar.gz"
else:
if format == 'wheel':
extension = ".whl"
else:
try:
url = _get_value('url', text)
extension = os.path.splitext(url)[1]
if 'pypi' not in url:
logging.warning("Path {}: uses non-PyPI url, not updating.".format(path))
return False
except ValueError as e:
logging.info("Path {}: {}".format(path, str(e)))
extension = ".tar.gz"
try:
new_version, new_sha256 = _get_latest_version(pname, extension)
except ValueError as e:
logging.warning("Path {}: {}".format(path, str(e)))
else:
if not new_sha256:
logging.warning("Path has no valid file available: {}".format(path))
return False
if new_version != version:
try:
text = _replace_value('version', new_version, text)
except ValueError as e:
logging.warning("Path {}: {}".format(path, str(e)))
try:
text = _replace_value('sha256', new_sha256, text)
except ValueError as e:
logging.warning("Path {}: {}".format(path, str(e)))
with open(path, 'w') as f:
f.write(text)
logging.info("Updated {} from {} to {}".format(pname, version, new_version))
else:
logging.info("No update available for {} at {}".format(pname, version))
return True
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -240,11 +231,11 @@ def main():
args = parser.parse_args() args = parser.parse_args()
packages = args.package packages = map(os.path.abspath, args.package)
count = list(map(_update_package, packages)) count = list(map(_update, packages))
#logging.info("{} package(s) updated".format(sum(count))) logging.info("{} package(s) updated".format(sum(count)))
if __name__ == '__main__': if __name__ == '__main__':
main() main()