123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- """
- Operations on existing wheel files, including basic installation.
- """
- # XXX see patched pip to install
- import sys
- import warnings
- import os.path
- import re
- import zipfile
- import hashlib
- import csv
- import shutil
- try:
- _big_number = sys.maxsize
- except NameError:
- _big_number = sys.maxint
- from wheel.decorator import reify
- from wheel.util import (urlsafe_b64encode, from_json, urlsafe_b64decode,
- native, binary, HashingFile)
- from wheel import signatures
- from wheel.pkginfo import read_pkg_info_bytes
- from wheel.util import open_for_csv
- from .pep425tags import get_supported
- from .paths import get_install_paths
- # The next major version after this version of the 'wheel' tool:
- # Non-greedy matching of an optional build number may be too clever (more
- # invalid wheel filenames will match). Separate regex for .dist-info?
- WHEEL_INFO_RE = re.compile(
- r"""^(?P<namever>(?P<name>.+?)(-(?P<ver>\d.+?))?)
- ((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
- \.whl|\.dist-info)$""",
- re.VERBOSE).match
- def parse_version(version):
- """Use parse_version from pkg_resources or distutils as available."""
- global parse_version
- try:
- from pkg_resources import parse_version
- except ImportError:
- from distutils.version import LooseVersion as parse_version
- return parse_version(version)
- class BadWheelFile(ValueError):
- pass
- class WheelFile(object):
- """Parse wheel-specific attributes from a wheel (.whl) file and offer
- basic installation and verification support.
- WheelFile can be used to simply parse a wheel filename by avoiding the
- methods that require the actual file contents."""
- def __init__(self,
- filename,
- fp=None,
- append=False,
- context=get_supported):
- """
- :param fp: A seekable file-like object or None to open(filename).
- :param append: Open archive in append mode.
- :param context: Function returning list of supported tags. Wheels
- must have the same context to be sortable.
- """
- self.filename = filename
- self.fp = fp
- self.append = append
- self.context = context
- basename = os.path.basename(filename)
- self.parsed_filename = WHEEL_INFO_RE(basename)
- if not basename.endswith('.whl') or self.parsed_filename is None:
- raise BadWheelFile("Bad filename '%s'" % filename)
- def __repr__(self):
- return self.filename
- @property
- def distinfo_name(self):
- return "%s.dist-info" % self.parsed_filename.group('namever')
- @property
- def datadir_name(self):
- return "%s.data" % self.parsed_filename.group('namever')
- @property
- def record_name(self):
- return "%s/%s" % (self.distinfo_name, self.RECORD)
- @property
- def wheelinfo_name(self):
- return "%s/%s" % (self.distinfo_name, self.WHEEL_INFO)
- @property
- def tags(self):
- """A wheel file is compatible with the Cartesian product of the
- period-delimited tags in its filename.
- To choose a wheel file among several candidates having the same
- distribution version 'ver', an installer ranks each triple of
- (pyver, abi, plat) that its Python installation can run, sorting
- the wheels by the best-ranked tag it supports and then by their
- arity which is just len(list(compatibility_tags)).
- """
- tags = self.parsed_filename.groupdict()
- for pyver in tags['pyver'].split('.'):
- for abi in tags['abi'].split('.'):
- for plat in tags['plat'].split('.'):
- yield (pyver, abi, plat)
- compatibility_tags = tags
- @property
- def arity(self):
- """The number of compatibility tags the wheel declares."""
- return len(list(self.compatibility_tags))
- @property
- def rank(self):
- """
- Lowest index of any of this wheel's tags in self.context(), and the
- arity e.g. (0, 1)
- """
- return self.compatibility_rank(self.context())
- @property
- def compatible(self):
- return self.rank[0] != _big_number # bad API!
- # deprecated:
- def compatibility_rank(self, supported):
- """Rank the wheel against the supported tags. Smaller ranks are more
- compatible!
- :param supported: A list of compatibility tags that the current
- Python implemenation can run.
- """
- preferences = []
- for tag in self.compatibility_tags:
- try:
- preferences.append(supported.index(tag))
- # Tag not present
- except ValueError:
- pass
- if len(preferences):
- return (min(preferences), self.arity)
- return (_big_number, 0)
- # deprecated
- def supports_current_python(self, x):
- assert self.context == x, 'context mismatch'
- return self.compatible
- # Comparability.
- # Wheels are equal if they refer to the same file.
- # If two wheels are not equal, compare based on (in this order):
- # 1. Name
- # 2. Version
- # 3. Compatibility rank
- # 4. Filename (as a tiebreaker)
- @property
- def _sort_key(self):
- return (self.parsed_filename.group('name'),
- parse_version(self.parsed_filename.group('ver')),
- tuple(-x for x in self.rank),
- self.filename)
- def __eq__(self, other):
- return self.filename == other.filename
- def __ne__(self, other):
- return self.filename != other.filename
- def __lt__(self, other):
- if self.context != other.context:
- raise TypeError("{0}.context != {1}.context".format(self, other))
- return self._sort_key < other._sort_key
- # XXX prune
- sn = self.parsed_filename.group('name')
- on = other.parsed_filename.group('name')
- if sn != on:
- return sn < on
- sv = parse_version(self.parsed_filename.group('ver'))
- ov = parse_version(other.parsed_filename.group('ver'))
- if sv != ov:
- return sv < ov
- # Compatibility
- if self.context != other.context:
- raise TypeError("{0}.context != {1}.context".format(self, other))
- sc = self.rank
- oc = other.rank
- if sc != None and oc != None and sc != oc:
- # Smaller compatibility ranks are "better" than larger ones,
- # so we have to reverse the sense of the comparison here!
- return sc > oc
- elif sc == None and oc != None:
- return False
- return self.filename < other.filename
- def __gt__(self, other):
- return other < self
- def __le__(self, other):
- return self == other or self < other
- def __ge__(self, other):
- return self == other or other < self
- #
- # Methods using the file's contents:
- #
- @reify
- def zipfile(self):
- mode = "r"
- if self.append:
- mode = "a"
- vzf = VerifyingZipFile(self.fp if self.fp else self.filename, mode)
- if not self.append:
- self.verify(vzf)
- return vzf
- @reify
- def parsed_wheel_info(self):
- """Parse wheel metadata (the .data/WHEEL file)"""
- return read_pkg_info_bytes(self.zipfile.read(self.wheelinfo_name))
- def check_version(self):
- version = self.parsed_wheel_info['Wheel-Version']
- if tuple(map(int, version.split('.'))) >= VERSION_TOO_HIGH:
- raise ValueError("Wheel version is too high")
- @reify
- def install_paths(self):
- """
- Consult distutils to get the install paths for our dist. A dict with
- ('purelib', 'platlib', 'headers', 'scripts', 'data').
- We use the name from our filename as the dist name, which means headers
- could be installed in the wrong place if the filesystem-escaped name
- is different than the Name. Who cares?
- """
- name = self.parsed_filename.group('name')
- return get_install_paths(name)
- def install(self, force=False, overrides={}):
- """
- Install the wheel into site-packages.
- """
- # Utility to get the target directory for a particular key
- def get_path(key):
- return overrides.get(key) or self.install_paths[key]
- # The base target location is either purelib or platlib
- if self.parsed_wheel_info['Root-Is-Purelib'] == 'true':
- root = get_path('purelib')
- else:
- root = get_path('platlib')
- # Parse all the names in the archive
- name_trans = {}
- for info in self.zipfile.infolist():
- name = info.filename
- # Zip files can contain entries representing directories.
- # These end in a '/'.
- # We ignore these, as we create directories on demand.
- if name.endswith('/'):
- continue
- # Pathnames in a zipfile namelist are always /-separated.
- # In theory, paths could start with ./ or have other oddities
- # but this won't happen in practical cases of well-formed wheels.
- # We'll cover the simple case of an initial './' as it's both easy
- # to do and more common than most other oddities.
- if name.startswith('./'):
- name = name[2:]
- # Split off the base directory to identify files that are to be
- # installed in non-root locations
- basedir, sep, filename = name.partition('/')
- if sep and basedir == self.datadir_name:
- # Data file. Target destination is elsewhere
- key, sep, filename = filename.partition('/')
- if not sep:
- raise ValueError("Invalid filename in wheel: {0}".format(name))
- target = get_path(key)
- else:
- # Normal file. Target destination is root
- key = ''
- target = root
- filename = name
- # Map the actual filename from the zipfile to its intended target
- # directory and the pathname relative to that directory.
- dest = os.path.normpath(os.path.join(target, filename))
- name_trans[info] = (key, target, filename, dest)
- # We're now ready to start processing the actual install. The process
- # is as follows:
- # 1. Prechecks - is the wheel valid, is its declared architecture
- # OK, etc. [[Responsibility of the caller]]
- # 2. Overwrite check - do any of the files to be installed already
- # exist?
- # 3. Actual install - put the files in their target locations.
- # 4. Update RECORD - write a suitably modified RECORD file to
- # reflect the actual installed paths.
- if not force:
- for info, v in name_trans.items():
- k = info.filename
- key, target, filename, dest = v
- if os.path.exists(dest):
- raise ValueError("Wheel file {0} would overwrite {1}. Use force if this is intended".format(k, dest))
- # Get the name of our executable, for use when replacing script
- # wrapper hashbang lines.
- # We encode it using getfilesystemencoding, as that is "the name of
- # the encoding used to convert Unicode filenames into system file
- # names".
- exename = sys.executable.encode(sys.getfilesystemencoding())
- record_data = []
- record_name = self.distinfo_name + '/RECORD'
- for info, (key, target, filename, dest) in name_trans.items():
- name = info.filename
- source = self.zipfile.open(info)
- # Skip the RECORD file
- if name == record_name:
- continue
- ddir = os.path.dirname(dest)
- if not os.path.isdir(ddir):
- os.makedirs(ddir)
- destination = HashingFile(open(dest, 'wb'))
- if key == 'scripts':
- hashbang = source.readline()
- if hashbang.startswith(b'#!python'):
- hashbang = b'#!' + exename + binary(os.linesep)
- destination.write(hashbang)
- shutil.copyfileobj(source, destination)
- reldest = os.path.relpath(dest, root)
- reldest.replace(os.sep, '/')
- record_data.append((reldest, destination.digest(), destination.length))
- destination.close()
- source.close()
- # preserve attributes (especially +x bit for scripts)
- attrs = info.external_attr >> 16
- if attrs: # tends to be 0 if Windows.
- os.chmod(dest, info.external_attr >> 16)
- record_name = os.path.join(root, self.record_name)
- writer = csv.writer(open_for_csv(record_name, 'w+'))
- for reldest, digest, length in sorted(record_data):
- writer.writerow((reldest, digest, length))
- writer.writerow((self.record_name, '', ''))
- def verify(self, zipfile=None):
- """Configure the VerifyingZipFile `zipfile` by verifying its signature
- and setting expected hashes for every hash in RECORD.
- Caller must complete the verification process by completely reading
- every file in the archive (e.g. with extractall)."""
- sig = None
- if zipfile is None:
- zipfile = self.zipfile
- zipfile.strict = True
- record_name = '/'.join((self.distinfo_name, 'RECORD'))
- sig_name = '/'.join((self.distinfo_name, 'RECORD.jws'))
- # tolerate s/mime signatures:
- smime_sig_name = '/'.join((self.distinfo_name, 'RECORD.p7s'))
- zipfile.set_expected_hash(record_name, None)
- zipfile.set_expected_hash(sig_name, None)
- zipfile.set_expected_hash(smime_sig_name, None)
- record = zipfile.read(record_name)
- record_digest = urlsafe_b64encode(hashlib.sha256(record).digest())
- try:
- sig = from_json(native(zipfile.read(sig_name)))
- except KeyError: # no signature
- pass
- if sig:
- headers, payload = signatures.verify(sig)
- if payload['hash'] != "sha256=" + native(record_digest):
- msg = "RECORD.sig claimed RECORD hash {0} != computed hash {1}."
- raise BadWheelFile(msg.format(payload['hash'],
- native(record_digest)))
- reader = csv.reader((native(r) for r in record.splitlines()))
- for row in reader:
- filename = row[0]
- hash = row[1]
- if not hash:
- if filename not in (record_name, sig_name):
- sys.stderr.write("%s has no hash!\n" % filename)
- continue
- algo, data = row[1].split('=', 1)
- assert algo == "sha256", "Unsupported hash algorithm"
- zipfile.set_expected_hash(filename, urlsafe_b64decode(binary(data)))
- class VerifyingZipFile(zipfile.ZipFile):
- """ZipFile that can assert that each of its extracted contents matches
- an expected sha256 hash. Note that each file must be completly read in
- order for its hash to be checked."""
- def __init__(self, file, mode="r",
- compression=zipfile.ZIP_STORED,
- allowZip64=False):
- zipfile.ZipFile.__init__(self, file, mode, compression, allowZip64)
- self.strict = False
- self._expected_hashes = {}
- self._hash_algorithm = hashlib.sha256
- def set_expected_hash(self, name, hash):
- """
- :param name: name of zip entry
- :param hash: bytes of hash (or None for "don't care")
- """
- self._expected_hashes[name] = hash
- def open(self, name_or_info, mode="r", pwd=None):
- """Return file-like object for 'name'."""
- # A non-monkey-patched version would contain most of zipfile.py
- ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
- if isinstance(name_or_info, zipfile.ZipInfo):
- name = name_or_info.filename
- else:
- name = name_or_info
- if (name in self._expected_hashes
- and self._expected_hashes[name] != None):
- expected_hash = self._expected_hashes[name]
- try:
- _update_crc_orig = ef._update_crc
- except AttributeError:
- warnings.warn('Need ZipExtFile._update_crc to implement '
- 'file hash verification (in Python >= 2.7)')
- return ef
- running_hash = self._hash_algorithm()
- if hasattr(ef, '_eof'): # py33
- def _update_crc(data):
- _update_crc_orig(data)
- running_hash.update(data)
- if ef._eof and running_hash.digest() != expected_hash:
- raise BadWheelFile("Bad hash for file %r" % ef.name)
- else:
- def _update_crc(data, eof=None):
- _update_crc_orig(data, eof=eof)
- running_hash.update(data)
- if eof and running_hash.digest() != expected_hash:
- raise BadWheelFile("Bad hash for file %r" % ef.name)
- ef._update_crc = _update_crc
- elif self.strict and name not in self._expected_hashes:
- raise BadWheelFile("No expected hash for file %r" % ef.name)
- return ef
- def pop(self):
- """Truncate the last file off this zipfile.
- Assumes infolist() is in the same order as the files (true for
- ordinary zip files created by Python)"""
- if not self.fp:
- raise RuntimeError(
- "Attempt to pop from ZIP archive that was already closed")
- last = self.infolist().pop()
- del self.NameToInfo[last.filename]
- self.fp.seek(last.header_offset, os.SEEK_SET)
- self.fp.truncate()
- self._didModify = True