Django文件存储 自己定制存储系统解析

Django文件存储 自己定制存储系统解析要自己写一个存储系统,可以依照以下步骤:1.写一个继承自django.core.files.storage.Storage的子类。fromdjango.core.files.storageimportStorageclassMyStorage(Storage):…2.Django必须可以在无任何参数的情况下实例化MyStorage,所以任何环境设置必须来自django.conf.settings。fromdjango.confimport

要自己写一个存储系统,可以依照以下步骤:

1.写一个继承自django.core.files.storage.Storage的子类。


    from django.core.files.storage import Storage
    class MyStorage(Storage):
      ...

2.Django必须可以在无任何参数的情况下实例化MyStorage,所以任何环境设置必须来自django.conf.settings。


    from django.conf import settings
    from django.core.files.storage import Storage
     
    class MyStorage(Storage):
      def __init__(self, option=None):
        if not option:
          option = settings.CUSTOM_STORAGE_OPTIONS
        ...

3.根据Storage的open和save方法源码:


    def open(self, name, mode='rb'):
      """ Retrieves the specified file from storage. """
      return self._open(name, mode)
     
     
    def save(self, name, content, max_length=None):
      """ Saves new content to the file specified by name. The content should be a proper File object or any python file-like object, ready to be read from the beginning. """
      # Get the proper name for the file, as it will actually be saved.
      if name is None:
        name = content.name
     
      if not hasattr(content, 'chunks'):
        content = File(content, name)
     
      name = self.get_available_name(name, max_length=max_length)
      return self._save(name, content)

MyStorage需要实现_open和_save方法。

如果写的是个本地存储系统,还要重写path方法。

4.使用django.utils.deconstruct.deconstructible装饰器,以便在migration可以序列化。

还有,Storage.delete()、Storage.exists()、Storage.listdir()、Storage.size()、Storage.url()方法都会报NotImplementedError,也需要重写。

Django Qiniu Storage

七牛云有自己的django storage系统,可以看下是怎么运作的,地址 https://github.com/glasslion/django-
qiniu-storage

先在环境变量或者settings中配置QINIU_ACCESS_KEY、QINIU_SECRET_KEY、QINIU_BUCKET_NAME、QINIU_BUCKET_DOMAIN、QINIU_SECURE_URL。

使用七牛云托管用户上传的文件,在 settings.py 里设置DEFAULT_FILE_STORAGE:


    DEFAULT_FILE_STORAGE = 'qiniustorage.backends.QiniuStorage'

使用七牛托管动态生成的文件以及站点自身的静态文件,设置:


    STATICFILES_STORAGE = 'qiniustorage.backends.QiniuStaticStorage'

运行python manage.py collectstatic,静态文件就会被统一上传到七牛。

QiniuStorage代码如下:


    @deconstructible
    class QiniuStorage(Storage):
      """ Qiniu Storage Service """
      location = ""
     
      def __init__(
          self,
          access_key=QINIU_ACCESS_KEY,
          secret_key=QINIU_SECRET_KEY,
          bucket_name=QINIU_BUCKET_NAME,
          bucket_domain=QINIU_BUCKET_DOMAIN,
          secure_url=QINIU_SECURE_URL):
     
        self.auth = Auth(access_key, secret_key)
        self.bucket_name = bucket_name
        self.bucket_domain = bucket_domain
        self.bucket_manager = BucketManager(self.auth)
        self.secure_url = secure_url
     
      def _clean_name(self, name):
        """ Cleans the name so that Windows style paths work """
        # Normalize Windows style paths
        clean_name = posixpath.normpath(name).replace('\\', '/')
     
        # os.path.normpath() can strip trailing slashes so we implement
        # a workaround here.
        if name.endswith('/') and not clean_name.endswith('/'):
          # Add a trailing slash as it was stripped.
          return clean_name + '/'
        else:
          return clean_name
     
      def _normalize_name(self, name):
        """ Normalizes the name so that paths like /path/to/ignored/../foo.txt work. We check to make sure that the path pointed to is not outside the directory specified by the LOCATION setting. """
     
        base_path = force_text(self.location)
        base_path = base_path.rstrip('/')
     
        final_path = urljoin(base_path.rstrip('/') + "/", name)
     
        base_path_len = len(base_path)
        if (not final_path.startswith(base_path) or
            final_path[base_path_len:base_path_len + 1] not in ('', '/')):
          raise SuspiciousOperation("Attempted access to '%s' denied." %
                       name)
        return final_path.lstrip('/')
     
      def _open(self, name, mode='rb'):
        return QiniuFile(name, self, mode)
     
      def _save(self, name, content):
        cleaned_name = self._clean_name(name)
        name = self._normalize_name(cleaned_name)
     
        if hasattr(content, 'chunks'):
          content_str = b''.join(chunk for chunk in content.chunks())
        else:
          content_str = content.read()
     
        self._put_file(name, content_str)
        return cleaned_name
     
      def _put_file(self, name, content):
        token = self.auth.upload_token(self.bucket_name)
        ret, info = put_data(token, name, content)
        if ret is None or ret['key'] != name:
          raise QiniuError(info)
     
      def _read(self, name):
        return requests.get(self.url(name)).content
     
      def delete(self, name):
        name = self._normalize_name(self._clean_name(name))
        if six.PY2:
          name = name.encode('utf-8')
        ret, info = self.bucket_manager.delete(self.bucket_name, name)
     
        if ret is None or info.status_code == 612:
          raise QiniuError(info)
     
      def _file_stat(self, name, silent=False):
        name = self._normalize_name(self._clean_name(name))
        if six.PY2:
          name = name.encode('utf-8')
        ret, info = self.bucket_manager.stat(self.bucket_name, name)
        if ret is None and not silent:
          raise QiniuError(info)
        return ret
     
      def exists(self, name):
        stats = self._file_stat(name, silent=True)
        return True if stats else False
     
      def size(self, name):
        stats = self._file_stat(name)
        return stats['fsize']
     
      def modified_time(self, name):
        stats = self._file_stat(name)
        time_stamp = float(stats['putTime']) / 10000000
        return datetime.datetime.fromtimestamp(time_stamp)
     
      def listdir(self, name):
        name = self._normalize_name(self._clean_name(name))
        if name and not name.endswith('/'):
          name += '/'
     
        dirlist = bucket_lister(self.bucket_manager, self.bucket_name,
                    prefix=name)
        files = []
        dirs = set()
        base_parts = name.split("/")[:-1]
        for item in dirlist:
          parts = item['key'].split("/")
          parts = parts[len(base_parts):]
          if len(parts) == 1:
            # File
            files.append(parts[0])
          elif len(parts) > 1:
            # Directory
            dirs.add(parts[0])
        return list(dirs), files
     
      def url(self, name):
        name = self._normalize_name(self._clean_name(name))
        name = filepath_to_uri(name)
        protocol = u'https://' if self.secure_url else u'http://'
        return urljoin(protocol + self.bucket_domain, name)

配置是从环境变量或者settings.py中获得的:


    def get_qiniu_config(name, default=None):
      """ Get configuration variable from environment variable or django setting.py """
      config = os.environ.get(name, getattr(settings, name, default))
      if config is not None:
        if isinstance(config, six.string_types):
          return config.strip()
        else:
          return config
      else:
        raise ImproperlyConfigured(
          "Can't find config for '%s' either in environment"
          "variable or in setting.py" % name) 
    QINIU_ACCESS_KEY = get_qiniu_config('QINIU_ACCESS_KEY')
    QINIU_SECRET_KEY = get_qiniu_config('QINIU_SECRET_KEY')
    QINIU_BUCKET_NAME = get_qiniu_config('QINIU_BUCKET_NAME')
    QINIU_BUCKET_DOMAIN = get_qiniu_config('QINIU_BUCKET_DOMAIN', '').rstrip('/')
    QINIU_SECURE_URL = get_qiniu_config('QINIU_SECURE_URL', 'False')

重写了_open和_save方法:


    def _open(self, name, mode='rb'):
      return QiniuFile(name, self, mode) 
    def _save(self, name, content):
      cleaned_name = self._clean_name(name)
      name = self._normalize_name(cleaned_name) 
      if hasattr(content, 'chunks'):
        content_str = b''.join(chunk for chunk in content.chunks())
      else:
        content_str = content.read() 
      self._put_file(name, content_str)
      return cleaned_name

使用的put_data方法上传文件,相关代码如下:


    def put_data(
        up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
        fname=None):
      """上传二进制流到七牛 Args: up_token: 上传凭证 key: 上传文件名 data: 上传二进制流 params: 自定义变量,规格参考 http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar mime_type: 上传数据的mimeType check_crc: 是否校验crc32 progress_handler: 上传进度 Returns: 一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"} 一个ResponseInfo对象 """
      crc = crc32(data) if check_crc else None
      return _form_put(up_token, key, data, params, mime_type, crc, progress_handler, fname)
     
    def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None, file_name=None):
      fields = { 
   }
      if params:
        for k, v in params.items():
          fields[k] = str(v)
      if crc:
        fields['crc32'] = crc
      if key is not None:
        fields['key'] = key 
      fields['token'] = up_token
      url = config.get_default('default_zone').get_up_host_by_token(up_token) + '/'
      # name = key if key else file_name
     
      fname = file_name
      if not fname or not fname.strip():
        fname = 'file_name'
     
      r, info = http._post_file(url, data=fields, files={ 
   'file': (fname, data, mime_type)})
      if r is None and info.need_retry():
        if info.connect_failed:
          url = config.get_default('default_zone').get_up_host_backup_by_token(up_token) + '/'
        if hasattr(data, 'read') is False:
          pass
        elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
          data.seek(0)
        else:
          return r, info
        r, info = http._post_file(url, data=fields, files={ 
   'file': (fname, data, mime_type)})
     
      return r, info 
    def _post_file(url, data, files):
      return _post(url, data, files, None) 
    def _post(url, data, files, auth, headers=None):
      if _session is None:
        _init()
      try:
        post_headers = _headers.copy()
        if headers is not None:
          for k, v in headers.items():
            post_headers.update({ 
   k: v})
        r = _session.post(
          url, data=data, files=files, auth=auth, headers=post_headers,
          timeout=config.get_default('connection_timeout'))
      except Exception as e:
        return None, ResponseInfo(None, e)
      return __return_wrapper(r) 
    def _init():
      session = requests.Session()
      adapter = requests.adapters.HTTPAdapter(
        pool_connections=config.get_default('connection_pool'), pool_maxsize=config.get_default('connection_pool'),
        max_retries=config.get_default('connection_retries'))
      session.mount('http://', adapter)
      global _session
      _session = session

最终使用的是requests库上传文件的,统一适配了链接池个数、链接重试次数。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

在这里插入图片描述

今天的文章Django文件存储 自己定制存储系统解析分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/9822.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注