• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python storage_utils.storage_is_remote函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mkt.site.storage_utils.storage_is_remote函数的典型用法代码示例。如果您正苦于以下问题:Python storage_is_remote函数的具体用法?Python storage_is_remote怎么用?Python storage_is_remote使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了storage_is_remote函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_icon_url

def get_icon_url(base_url_format, obj, size,
                 default_format='default-{size}.png'):
    """
    Returns either the icon URL for a given (`obj`, `size`). base_url_format`
    is a string that will be used for url formatting if we are not using a
    remote storage, see ADDON_ICON_URL for an example.

    If no icon type if set on the `obj`, then the url for the
    appropriate default icon for the given `size` will be returned.

    `obj` needs to implement `icon_type` and `icon_hash` properties for this
    function to work.

    Note: does not check size, so it can return 404 URLs if you specify an
    invalid size.
    """
    # Return default image if no icon_type was stored.
    if not obj.icon_type:
        return '{path}/{name}'.format(path=static_url('ICONS_DEFAULT_URL'),
                                      name=default_format.format(size=size))
    else:
        # If we don't have the icon_hash set to a dummy string ("never"),
        # when the icon is eventually changed, icon_hash will be updated.
        suffix = obj.icon_hash or 'never'

        if storage_is_remote():
            # We don't care about base_url_format, the storage provides the url
            # for a given path. We assume AWS_QUERYSTRING_AUTH is False atm.
            path = '%s/%s-%s.png' % (obj.get_icon_dir(), obj.pk, size)
            return '%s?modified=%s' % (public_storage.url(path), suffix)

        # [1] is the whole ID, [2] is the directory.
        split_id = re.match(r'((\d*?)\d{1,3})$', str(obj.pk))
        return base_url_format % (split_id.group(2) or 0, obj.pk, size, suffix)
开发者ID:kolyaflash,项目名称:zamboni,代码行数:34,代码来源:utils.py


示例2: setUp

 def setUp(self):
     fn = get_file('dictionary-test.xpi')
     if storage_is_remote():
         copy_stored_file(
             fn, fn,
             src_storage=local_storage, dst_storage=private_storage)
     self.viewer = FileViewer(make_file(1, fn))
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:7,代码来源:test_helpers.py


示例3: preload_test_plan_url

 def preload_test_plan_url(self):
     if storage_is_remote():
         return private_storage.url(self.preload_test_plan_path)
     else:
         host = (settings.PRIVATE_MIRROR_URL if self.webapp.is_disabled
                 else settings.LOCAL_MIRROR_URL)
         return os.path.join(host, str(self.webapp.id), self.filename)
开发者ID:shahbaz17,项目名称:zamboni,代码行数:7,代码来源:models.py


示例4: cleanup_extracted_file

def cleanup_extracted_file():
    log.info('Removing extracted files for file viewer.')
    root = os.path.join(settings.TMP_PATH, 'file_viewer')
    # Local storage uses local time for file modification. S3 uses UTC time.
    now = datetime.utcnow if storage_is_remote() else datetime.now
    for path in private_storage.listdir(root)[0]:
        full = os.path.join(root, path)
        age = now() - private_storage.modified_time(
            os.path.join(full, 'manifest.webapp'))
        if age.total_seconds() > (60 * 60):
            log.debug('Removing extracted files: %s, %dsecs old.' %
                      (full, age.total_seconds()))
            for subroot, dirs, files in walk_storage(full):
                for f in files:
                    private_storage.delete(os.path.join(subroot, f))
            # Nuke out the file and diff caches when the file gets removed.
            id = os.path.basename(path)
            try:
                int(id)
            except ValueError:
                continue

            key = hashlib.md5()
            key.update(str(id))
            cache.delete('%s:memoize:%s:%s' % (settings.CACHE_PREFIX,
                                               'file-viewer', key.hexdigest()))
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:26,代码来源:cron.py


示例5: test_get_icon_url_bigger_pk

 def test_get_icon_url_bigger_pk(self):
     website = Website(pk=98765432, icon_type="image/png")
     if not storage_is_remote():
         expected = static_url("WEBSITE_ICON_URL") % (str(website.pk)[:-3], website.pk, 32, "never")
     else:
         path = "%s/%s-%s.png" % (website.get_icon_dir(), website.pk, 32)
         expected = "%s?modified=never" % storage.url(path)
     assert website.get_icon_url(32).endswith(expected), "Expected %s, got %s" % (expected, website.get_icon_url(32))
开发者ID:jamesthechamp,项目名称:zamboni,代码行数:8,代码来源:test_models.py


示例6: test_get_icon_url_bigger_pk

 def test_get_icon_url_bigger_pk(self):
     website = Website(pk=98765432, icon_type='image/png')
     if not storage_is_remote():
         expected = (static_url('WEBSITE_ICON_URL')
                     % (str(website.pk)[:-3], website.pk, 32, 'never'))
     else:
         path = '%s/%s-%s.png' % (website.get_icon_dir(), website.pk, 32)
         expected = '%s?modified=never' % public_storage.url(path)
     assert website.get_icon_url(32).endswith(expected), (
         'Expected %s, got %s' % (expected, website.get_icon_url(32)))
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:10,代码来源:test_models.py


示例7: test_no_manifest_at_root

 def test_no_manifest_at_root(self):
     path = self.packaged_app_path('no-manifest-at-root.zip')
     if storage_is_remote():
         copy_stored_file(path, path, src_storage=local_storage,
                          dst_storage=private_storage)
     with self.assertRaises(forms.ValidationError) as exc:
         WebAppParser().parse(private_storage.open(path))
     m = exc.exception.messages[0]
     assert m.startswith('The file "manifest.webapp" was not found'), (
         'Unexpected: %s' % m)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:10,代码来源:test_views_validation.py


示例8: _get_files

    def _get_files(self):
        all_files, res = [], OrderedDict()

        # Not using os.path.walk so we get just the right order.
        def iterate(path):
            path_dirs, path_files = private_storage.listdir(path)
            for dirname in sorted(path_dirs):
                full = os.path.join(path, dirname)
                all_files.append(full)
                iterate(full)

            for filename in sorted(path_files):
                full = os.path.join(path, filename)
                all_files.append(full)

        iterate(self.dest)

        for path in all_files:
            filename = smart_unicode(os.path.basename(path), errors='replace')
            short = smart_unicode(path[len(self.dest) + 1:], errors='replace')
            mime, encoding = mimetypes.guess_type(filename)
            if not mime and filename == 'manifest.webapp':
                mime = 'application/x-web-app-manifest+json'
            if storage_is_remote():
                # S3 doesn't have directories, so we check for names with this
                # prefix and call it a directory if there are some.
                subdirs, subfiles = private_storage.listdir(path)
                directory = bool(subdirs or subfiles)
            else:
                directory = os.path.isdir(path)

            res[short] = {
                'binary': self._is_binary(mime, path),
                'depth': short.count(os.sep),
                'directory': directory,
                'filename': filename,
                'full': path,
                'md5': get_md5(path) if not directory else '',
                'mimetype': mime or 'application/octet-stream',
                'syntax': self.get_syntax(filename),
                'modified': (
                    time.mktime(
                        private_storage.modified_time(path).timetuple())
                    if not directory else 0),
                'short': short,
                'size': private_storage.size(path) if not directory else 0,
                'truncated': self.truncate(filename),
                'url': reverse('mkt.files.list',
                               args=[self.file.id, 'file', short]),
                'url_serve': reverse('mkt.files.redirect',
                                     args=[self.file.id, short]),
                'version': self.file.version.version,
            }

        return res
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:55,代码来源:helpers.py


示例9: _remove_stale_files

def _remove_stale_files(path, max_age_seconds, msg, storage):
    # Local storage uses local time for file modification. S3 uses UTC time.
    now = datetime.utcnow if storage_is_remote() else datetime.now

    # Look for files (ignore directories) to delete in the path.
    for file_name in storage.listdir(path)[1]:
        file_path = os.path.join(path, file_name)
        age = now() - storage.modified_time(file_path)
        if age.total_seconds() > max_age_seconds:
            log.info(msg.format(file_path))
            storage.delete(file_path)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:11,代码来源:cron.py


示例10: test_no_manifest_at_root

 def test_no_manifest_at_root(self):
     path = self.packaged_app_path('no-manifest-at-root.zip')
     if storage_is_remote():
         with open(path) as local_f:
             with storage.open(path, 'w') as remote_f:
                 copyfileobj(local_f, remote_f)
     with self.assertRaises(forms.ValidationError) as exc:
         WebAppParser().parse(path)
     m = exc.exception.messages[0]
     assert m.startswith('The file "manifest.webapp" was not found'), (
         'Unexpected: %s' % m)
开发者ID:Jobava,项目名称:zamboni,代码行数:11,代码来源:test_views_validation.py


示例11: get_file_response

def get_file_response(request, path, content=None, status=None,
                      content_type='application/octet-stream', etag=None,
                      public=True):
    if storage_is_remote():
        storage = public_storage if public else private_storage
        if not storage.exists(path):
            raise http.Http404
        # Note: The `content_type` and `etag` will have no effect here. It
        # should be set when saving the item to S3.
        return http.HttpResponseRedirect(storage.url(path))
    else:
        return HttpResponseSendFile(request, path, content_type=content_type,
                                    etag=etag)
开发者ID:kolyaflash,项目名称:zamboni,代码行数:13,代码来源:utils.py


示例12: test_parse_packaged_BOM

 def test_parse_packaged_BOM(self):
     path = self.packaged_app_path('mozBOM.zip')
     if storage_is_remote():
         copy_stored_file(path, path, src_storage=local_storage,
                          dst_storage=private_storage)
     wp = WebAppParser().parse(private_storage.open(path))
     eq_(wp['guid'], None)
     eq_(wp['name']['en-US'], u'Packaged MozBOM ょ')
     eq_(wp['description']['en-US'], u'Exciting BOM action!')
     eq_(wp['description']['es'], u'¡Acción BOM!')
     eq_(wp['description']['it'], u'Azione BOM!')
     eq_(wp['version'], '1.0')
     eq_(wp['default_locale'], 'en-US')
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:13,代码来源:test_views_validation.py


示例13: clean_old_signed

def clean_old_signed(seconds=60 * 60):
    """Clean out apps signed for reviewers."""
    log.info('Removing old apps signed for reviewers')
    root = settings.SIGNED_APPS_REVIEWER_PATH
    # Local storage uses local time for file modification. S3 uses UTC time.
    now = datetime.utcnow if storage_is_remote() else datetime.now
    for nextroot, dirs, files in walk_storage(root):
        for fn in files:
            full = os.path.join(nextroot, fn)
            age = now() - storage.modified_time(full)
            if age.total_seconds() > seconds:
                log.debug('Removing signed app: %s, %dsecs old.' % (
                    full, age.total_seconds()))
                storage.delete(full)
开发者ID:Jobava,项目名称:zamboni,代码行数:14,代码来源:cron.py


示例14: test_resize_transparency

def test_resize_transparency():
    src = get_image_path('transparent.png')
    dest = tempfile.mkstemp(dir=settings.TMP_PATH)[1]
    expected = src.replace('.png', '-expected.png')
    if storage_is_remote():
        copy_to_storage(src, src, src_storage=local_storage)
    try:
        resize_image(src, dest, (32, 32), remove_src=False)
        with public_storage.open(dest) as dfh:
            with open(expected) as efh:
                assert dfh.read() == efh.read()
    finally:
        if public_storage.exists(dest):
            public_storage.delete(dest)
开发者ID:ayushagrawal288,项目名称:zamboni,代码行数:14,代码来源:test_utils_.py


示例15: test_parse_packaged_BOM

 def test_parse_packaged_BOM(self):
     path = self.packaged_app_path('mozBOM.zip')
     if storage_is_remote():
         with open(path) as local_f:
             with storage.open(path, 'w') as remote_f:
                 copyfileobj(local_f, remote_f)
     wp = WebAppParser().parse(path)
     eq_(wp['guid'], None)
     eq_(wp['name']['en-US'], u'Packaged MozBOM ょ')
     eq_(wp['description']['en-US'], u'Exciting BOM action!')
     eq_(wp['description']['es'], u'¡Acción BOM!')
     eq_(wp['description']['it'], u'Azione BOM!')
     eq_(wp['version'], '1.0')
     eq_(wp['default_locale'], 'en-US')
开发者ID:Jobava,项目名称:zamboni,代码行数:14,代码来源:test_views_validation.py


示例16: test_parse_packaged

 def test_parse_packaged(self):
     path = self.packaged_app_path('mozball.zip')
     if storage_is_remote():
         copy_stored_file(path, path, src_storage=local_storage,
                          dst_storage=private_storage)
     wp = WebAppParser().parse(private_storage.open(path))
     eq_(wp['guid'], None)
     eq_(wp['name']['en-US'], u'Packaged MozillaBall ょ')
     eq_(wp['description']['en-US'],
         u'Exciting Open Web development action!')
     eq_(wp['description']['es'],
         u'¡Acción abierta emocionante del desarrollo del Web!')
     eq_(wp['description']['it'],
         u'Azione aperta emozionante di sviluppo di fotoricettore!')
     eq_(wp['version'], '1.0')
     eq_(wp['default_locale'], 'en-US')
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:16,代码来源:test_views_validation.py


示例17: test_admin_can_blocklist

 def test_admin_can_blocklist(self):
     blocklist_zip_path = os.path.join(settings.MEDIA_ROOT,
                                       'packaged-apps', 'blocklisted.zip')
     if storage_is_remote():
         copy_to_storage(blocklist_zip_path, blocklist_zip_path)
     self.grant_permission(
         UserProfile.objects.get(email='[email protected]'),
         'Apps:Configure')
     self.login('[email protected]')
     v_count = self.app.versions.count()
     url = self.app.get_dev_url('blocklist')
     res = self.client.post(url)
     self.assert3xx(res, self.app.get_dev_url('versions'))
     app = self.app.reload()
     eq_(app.versions.count(), v_count + 1)
     eq_(app.status, mkt.STATUS_BLOCKED)
     eq_(app.versions.latest().files.latest().status, mkt.STATUS_BLOCKED)
开发者ID:jamesthechamp,项目名称:zamboni,代码行数:17,代码来源:test_views_versions.py


示例18: tearDown

 def tearDown(self):
     self.helper.cleanup()
     if storage_is_remote():
         private_storage.delete(self.packaged_app_path('signed.zip'))
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:4,代码来源:test_helpers.py


示例19: test_get_storage_local

 def test_get_storage_local(self):
     assert not storage_is_remote()
     eq_(get_private_storage().__class__.__name__, 'LocalFileStorage')
     eq_(get_public_storage().__class__.__name__, 'LocalFileStorage')
开发者ID:shahbaz17,项目名称:zamboni,代码行数:4,代码来源:test_storage_utils.py


示例20: test_get_storage_remote

 def test_get_storage_remote(self):
     assert storage_is_remote()
     eq_(get_private_storage().__class__.__name__, 'S3BotoPrivateStorage')
     eq_(get_public_storage().__class__.__name__, 'S3BotoPublicStorage')
开发者ID:shahbaz17,项目名称:zamboni,代码行数:4,代码来源:test_storage_utils.py



注:本文中的mkt.site.storage_utils.storage_is_remote函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python private_storage.delete函数代码示例发布时间:2022-05-27
下一篇:
Python storage_utils.copy_stored_file函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap