123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- """Utility functions."""
- import sys
- import os
- import base64
- import json
- import hashlib
- try:
- from collections import OrderedDict
- except ImportError:
- OrderedDict = dict
- __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
- 'to_json', 'from_json', 'matches_requirement']
- def urlsafe_b64encode(data):
- """urlsafe_b64encode without padding"""
- return base64.urlsafe_b64encode(data).rstrip(binary('='))
- def urlsafe_b64decode(data):
- """urlsafe_b64decode without padding"""
- pad = b'=' * (4 - (len(data) & 3))
- return base64.urlsafe_b64decode(data + pad)
- def to_json(o):
- '''Convert given data to JSON.'''
- return json.dumps(o, sort_keys=True)
- def from_json(j):
- '''Decode a JSON payload.'''
- return json.loads(j)
- def open_for_csv(name, mode):
- if sys.version_info[0] < 3:
- nl = {}
- bin = 'b'
- else:
- nl = { 'newline': '' }
- bin = ''
- return open(name, mode + bin, **nl)
- try:
- unicode
- def utf8(data):
- '''Utf-8 encode data.'''
- if isinstance(data, unicode):
- return data.encode('utf-8')
- return data
- except NameError:
- def utf8(data):
- '''Utf-8 encode data.'''
- if isinstance(data, str):
- return data.encode('utf-8')
- return data
- try:
- # For encoding ascii back and forth between bytestrings, as is repeatedly
- # necessary in JSON-based crypto under Python 3
- unicode
- def native(s):
- return s
- def binary(s):
- if isinstance(s, unicode):
- return s.encode('ascii')
- return s
- except NameError:
- def native(s):
- if isinstance(s, bytes):
- return s.decode('ascii')
- return s
- def binary(s):
- if isinstance(s, str):
- return s.encode('ascii')
- class HashingFile(object):
- def __init__(self, fd, hashtype='sha256'):
- self.fd = fd
- self.hashtype = hashtype
- self.hash = hashlib.new(hashtype)
- self.length = 0
- def write(self, data):
- self.hash.update(data)
- self.length += len(data)
- self.fd.write(data)
- def close(self):
- self.fd.close()
- def digest(self):
- if self.hashtype == 'md5':
- return self.hash.hexdigest()
- digest = self.hash.digest()
- return self.hashtype + '=' + native(urlsafe_b64encode(digest))
- class OrderedDefaultDict(OrderedDict):
- def __init__(self, *args, **kwargs):
- if not args:
- self.default_factory = None
- else:
- if not (args[0] is None or callable(args[0])):
- raise TypeError('first argument must be callable or None')
- self.default_factory = args[0]
- args = args[1:]
- super(OrderedDefaultDict, self).__init__(*args, **kwargs)
- def __missing__ (self, key):
- if self.default_factory is None:
- raise KeyError(key)
- self[key] = default = self.default_factory()
- return default
- if sys.platform == 'win32':
- import ctypes.wintypes
- # CSIDL_APPDATA for reference - not used here for compatibility with
- # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
- csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28,
- CSIDL_COMMON_APPDATA=35)
- def get_path(name):
- SHGFP_TYPE_CURRENT = 0
- buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
- ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
- return buf.value
- def save_config_path(*resource):
- appdata = get_path("CSIDL_LOCAL_APPDATA")
- path = os.path.join(appdata, *resource)
- if not os.path.isdir(path):
- os.makedirs(path)
- return path
- def load_config_paths(*resource):
- ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
- for id in ids:
- base = get_path(id)
- path = os.path.join(base, *resource)
- if os.path.exists(path):
- yield path
- else:
- def save_config_path(*resource):
- import xdg.BaseDirectory
- return xdg.BaseDirectory.save_config_path(*resource)
- def load_config_paths(*resource):
- import xdg.BaseDirectory
- return xdg.BaseDirectory.load_config_paths(*resource)
- def matches_requirement(req, wheels):
- """List of wheels matching a requirement.
- :param req: The requirement to satisfy
- :param wheels: List of wheels to search.
- """
- try:
- from pkg_resources import Distribution, Requirement
- except ImportError:
- raise RuntimeError("Cannot use requirements without pkg_resources")
- req = Requirement.parse(req)
- selected = []
- for wf in wheels:
- f = wf.parsed_filename
- dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
- if dist in req:
- selected.append(wf)
- return selected
|