util.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """Utility functions."""
  2. import sys
  3. import os
  4. import base64
  5. import json
  6. import hashlib
  7. try:
  8. from collections import OrderedDict
  9. except ImportError:
  10. OrderedDict = dict
  11. __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
  12. 'to_json', 'from_json', 'matches_requirement']
  13. def urlsafe_b64encode(data):
  14. """urlsafe_b64encode without padding"""
  15. return base64.urlsafe_b64encode(data).rstrip(binary('='))
  16. def urlsafe_b64decode(data):
  17. """urlsafe_b64decode without padding"""
  18. pad = b'=' * (4 - (len(data) & 3))
  19. return base64.urlsafe_b64decode(data + pad)
  20. def to_json(o):
  21. '''Convert given data to JSON.'''
  22. return json.dumps(o, sort_keys=True)
  23. def from_json(j):
  24. '''Decode a JSON payload.'''
  25. return json.loads(j)
  26. def open_for_csv(name, mode):
  27. if sys.version_info[0] < 3:
  28. nl = {}
  29. bin = 'b'
  30. else:
  31. nl = { 'newline': '' }
  32. bin = ''
  33. return open(name, mode + bin, **nl)
  34. try:
  35. unicode
  36. def utf8(data):
  37. '''Utf-8 encode data.'''
  38. if isinstance(data, unicode):
  39. return data.encode('utf-8')
  40. return data
  41. except NameError:
  42. def utf8(data):
  43. '''Utf-8 encode data.'''
  44. if isinstance(data, str):
  45. return data.encode('utf-8')
  46. return data
  47. try:
  48. # For encoding ascii back and forth between bytestrings, as is repeatedly
  49. # necessary in JSON-based crypto under Python 3
  50. unicode
  51. def native(s):
  52. return s
  53. def binary(s):
  54. if isinstance(s, unicode):
  55. return s.encode('ascii')
  56. return s
  57. except NameError:
  58. def native(s):
  59. if isinstance(s, bytes):
  60. return s.decode('ascii')
  61. return s
  62. def binary(s):
  63. if isinstance(s, str):
  64. return s.encode('ascii')
  65. class HashingFile(object):
  66. def __init__(self, fd, hashtype='sha256'):
  67. self.fd = fd
  68. self.hashtype = hashtype
  69. self.hash = hashlib.new(hashtype)
  70. self.length = 0
  71. def write(self, data):
  72. self.hash.update(data)
  73. self.length += len(data)
  74. self.fd.write(data)
  75. def close(self):
  76. self.fd.close()
  77. def digest(self):
  78. if self.hashtype == 'md5':
  79. return self.hash.hexdigest()
  80. digest = self.hash.digest()
  81. return self.hashtype + '=' + native(urlsafe_b64encode(digest))
  82. class OrderedDefaultDict(OrderedDict):
  83. def __init__(self, *args, **kwargs):
  84. if not args:
  85. self.default_factory = None
  86. else:
  87. if not (args[0] is None or callable(args[0])):
  88. raise TypeError('first argument must be callable or None')
  89. self.default_factory = args[0]
  90. args = args[1:]
  91. super(OrderedDefaultDict, self).__init__(*args, **kwargs)
  92. def __missing__ (self, key):
  93. if self.default_factory is None:
  94. raise KeyError(key)
  95. self[key] = default = self.default_factory()
  96. return default
  97. if sys.platform == 'win32':
  98. import ctypes.wintypes
  99. # CSIDL_APPDATA for reference - not used here for compatibility with
  100. # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
  101. csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28,
  102. CSIDL_COMMON_APPDATA=35)
  103. def get_path(name):
  104. SHGFP_TYPE_CURRENT = 0
  105. buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
  106. ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
  107. return buf.value
  108. def save_config_path(*resource):
  109. appdata = get_path("CSIDL_LOCAL_APPDATA")
  110. path = os.path.join(appdata, *resource)
  111. if not os.path.isdir(path):
  112. os.makedirs(path)
  113. return path
  114. def load_config_paths(*resource):
  115. ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
  116. for id in ids:
  117. base = get_path(id)
  118. path = os.path.join(base, *resource)
  119. if os.path.exists(path):
  120. yield path
  121. else:
  122. def save_config_path(*resource):
  123. import xdg.BaseDirectory
  124. return xdg.BaseDirectory.save_config_path(*resource)
  125. def load_config_paths(*resource):
  126. import xdg.BaseDirectory
  127. return xdg.BaseDirectory.load_config_paths(*resource)
  128. def matches_requirement(req, wheels):
  129. """List of wheels matching a requirement.
  130. :param req: The requirement to satisfy
  131. :param wheels: List of wheels to search.
  132. """
  133. try:
  134. from pkg_resources import Distribution, Requirement
  135. except ImportError:
  136. raise RuntimeError("Cannot use requirements without pkg_resources")
  137. req = Requirement.parse(req)
  138. selected = []
  139. for wf in wheels:
  140. f = wf.parsed_filename
  141. dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
  142. if dist in req:
  143. selected.append(wf)
  144. return selected