123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- import time
- try:
- from boto import s3
- from boto.s3.prefix import Prefix
- from boto.s3.key import Key
- except ImportError:
- s3 = None
- from flask import redirect
- from flask_admin.babel import gettext
- from . import BaseFileAdmin
- class S3Storage(object):
- """
- Storage object representing files on an Amazon S3 bucket.
- Usage::
- from flask_admin.contrib.fileadmin import BaseFileAdmin
- from flask_admin.contrib.fileadmin.s3 import S3Storage
- class MyS3Admin(BaseFileAdmin):
- # Configure your class however you like
- pass
- fileadmin_view = MyS3Admin(storage=S3Storage(...))
- """
- def __init__(self, bucket_name, region, aws_access_key_id,
- aws_secret_access_key):
- """
- Constructor
- :param bucket_name:
- Name of the bucket that the files are on.
- :param region:
- Region that the bucket is located
- :param aws_access_key_id:
- AWS Access Key ID
- :param aws_secret_access_key:
- AWS Secret Access Key
- Make sure the credentials have the correct permissions set up on
- Amazon or else S3 will return a 403 FORBIDDEN error.
- """
- if not s3:
- raise ValueError('Could not import boto. You can install boto by '
- 'using pip install boto')
- connection = s3.connect_to_region(
- region,
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=aws_secret_access_key,
- )
- self.bucket = connection.get_bucket(bucket_name)
- self.separator = '/'
- def get_files(self, path, directory):
- def _strip_path(name, path):
- if name.startswith(path):
- return name.replace(path, '', 1)
- return name
- def _remove_trailing_slash(name):
- return name[:-1]
- def _iso_to_epoch(timestamp):
- dt = time.strptime(timestamp.split(".")[0], "%Y-%m-%dT%H:%M:%S")
- return int(time.mktime(dt))
- files = []
- directories = []
- if path and not path.endswith(self.separator):
- path += self.separator
- for key in self.bucket.list(path, self.separator):
- if key.name == path:
- continue
- if isinstance(key, Prefix):
- name = _remove_trailing_slash(_strip_path(key.name, path))
- key_name = _remove_trailing_slash(key.name)
- directories.append((name, key_name, True, 0, 0))
- else:
- last_modified = _iso_to_epoch(key.last_modified)
- name = _strip_path(key.name, path)
- files.append((name, key.name, False, key.size, last_modified))
- return directories + files
- def _get_bucket_list_prefix(self, path):
- parts = path.split(self.separator)
- if len(parts) == 1:
- search = ''
- else:
- search = self.separator.join(parts[:-1]) + self.separator
- return search
- def _get_path_keys(self, path):
- search = self._get_bucket_list_prefix(path)
- return {key.name for key in self.bucket.list(search, self.separator)}
- def is_dir(self, path):
- keys = self._get_path_keys(path)
- return path + self.separator in keys
- def path_exists(self, path):
- if path == '':
- return True
- keys = self._get_path_keys(path)
- return path in keys or (path + self.separator) in keys
- def get_base_path(self):
- return ''
- def get_breadcrumbs(self, path):
- accumulator = []
- breadcrumbs = []
- for n in path.split(self.separator):
- accumulator.append(n)
- breadcrumbs.append((n, self.separator.join(accumulator)))
- return breadcrumbs
- def send_file(self, file_path):
- key = self.bucket.get_key(file_path)
- if key is None:
- raise ValueError()
- return redirect(key.generate_url(3600))
- def save_file(self, path, file_data):
- key = Key(self.bucket, path)
- key.set_contents_from_file(file_data.stream)
- def delete_tree(self, directory):
- self._check_empty_directory(directory)
- self.bucket.delete_key(directory + self.separator)
- def delete_file(self, file_path):
- self.bucket.delete_key(file_path)
- def make_dir(self, path, directory):
- dir_path = self.separator.join([path, (directory + self.separator)])
- key = Key(self.bucket, dir_path)
- key.set_contents_from_string('')
- def _check_empty_directory(self, path):
- if not self._is_directory_empty(path):
- raise ValueError(gettext('Cannot operate on non empty '
- 'directories'))
- return True
- def rename_path(self, src, dst):
- if self.is_dir(src):
- self._check_empty_directory(src)
- src += self.separator
- dst += self.separator
- self.bucket.copy_key(dst, self.bucket.name, src)
- self.delete_file(src)
- def _is_directory_empty(self, path):
- keys = self._get_path_keys(path + self.separator)
- return len(keys) == 1
- class S3FileAdmin(BaseFileAdmin):
- """
- Simple Amazon Simple Storage Service file-management interface.
- :param bucket_name:
- Name of the bucket that the files are on.
- :param region:
- Region that the bucket is located
- :param aws_access_key_id:
- AWS Access Key ID
- :param aws_secret_access_key:
- AWS Secret Access Key
- Sample usage::
- from flask_admin import Admin
- from flask_admin.contrib.fileadmin.s3 import S3FileAdmin
- admin = Admin()
- admin.add_view(S3FileAdmin('files_bucket', 'us-east-1', 'key_id', 'secret_key')
- """
- def __init__(self, bucket_name, region, aws_access_key_id,
- aws_secret_access_key, *args, **kwargs):
- storage = S3Storage(bucket_name, region, aws_access_key_id,
- aws_secret_access_key)
- super(S3FileAdmin, self).__init__(*args, storage=storage, **kwargs)
|