本文整理汇总了Python中mkt.site.storage_utils.private_storage.delete函数的典型用法代码示例。如果您正苦于以下问题:Python delete函数的具体用法?Python delete怎么用?Python delete使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了delete函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_delete_mid_read
def test_delete_mid_read(self):
self.viewer.extract()
self.viewer.select('install.js')
private_storage.delete(os.path.join(self.viewer.dest, 'install.js'))
res = self.viewer.read_file()
eq_(res, '')
assert self.viewer.selected['msg'].startswith('That file no')
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:7,代码来源:test_helpers.py
示例2: cleanup_extracted_file
def cleanup_extracted_file():
log.info('Removing extracted files for file viewer.')
root = os.path.join(settings.TMP_PATH, 'file_viewer')
# Local storage uses local time for file modification. S3 uses UTC time.
now = datetime.utcnow if storage_is_remote() else datetime.now
for path in private_storage.listdir(root)[0]:
full = os.path.join(root, path)
age = now() - private_storage.modified_time(
os.path.join(full, 'manifest.webapp'))
if age.total_seconds() > (60 * 60):
log.debug('Removing extracted files: %s, %dsecs old.' %
(full, age.total_seconds()))
for subroot, dirs, files in walk_storage(full):
for f in files:
private_storage.delete(os.path.join(subroot, f))
# Nuke out the file and diff caches when the file gets removed.
id = os.path.basename(path)
try:
int(id)
except ValueError:
continue
key = hashlib.md5()
key.update(str(id))
cache.delete('%s:memoize:%s:%s' % (settings.CACHE_PREFIX,
'file-viewer', key.hexdigest()))
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:26,代码来源:cron.py
示例3: resize_preview
def resize_preview(src, pk, **kw):
"""Resizes preview images and stores the sizes on the preview."""
instance = Preview.objects.get(pk=pk)
thumb_dst, full_dst = instance.thumbnail_path, instance.image_path
sizes = instance.sizes or {}
log.info("[[email protected]] Resizing preview and storing size: %s" % thumb_dst)
try:
thumbnail_size = APP_PREVIEW_SIZES[0][:2]
image_size = APP_PREVIEW_SIZES[1][:2]
with private_storage.open(src, "rb") as fp:
size = Image.open(fp).size
if size[0] > size[1]:
# If the image is wider than tall, then reverse the wanted size
# to keep the original aspect ratio while still resizing to
# the correct dimensions.
thumbnail_size = thumbnail_size[::-1]
image_size = image_size[::-1]
if kw.get("generate_thumbnail", True):
sizes["thumbnail"] = resize_image(src, thumb_dst, thumbnail_size, remove_src=False)
if kw.get("generate_image", True):
sizes["image"] = resize_image(src, full_dst, image_size, remove_src=False)
instance.sizes = sizes
instance.save()
log.info("Preview resized to: %s" % thumb_dst)
# Remove src file now that it has been processed.
private_storage.delete(src)
return True
except Exception, e:
log.error("Error saving preview: %s; %s" % (e, thumb_dst))
开发者ID:ujdhesa,项目名称:zamboni,代码行数:33,代码来源:tasks.py
示例4: delete
def delete(self):
log.info(u'Version deleted: %r (%s)' % (self, self.id))
mkt.log(mkt.LOG.DELETE_VERSION, self.webapp, str(self.version))
models.signals.pre_delete.send(sender=Version, instance=self)
was_current = False
if self == self.webapp.current_version:
was_current = True
self.update(deleted=True)
# Set file status to disabled.
f = self.all_files[0]
f.update(status=mkt.STATUS_DISABLED, _signal=False)
f.hide_disabled_file()
# If version deleted was the current version and there now exists
# another current_version, we need to call some extra methods to update
# various bits for packaged apps.
if was_current and self.webapp.current_version:
self.webapp.update_name_from_package_manifest()
self.webapp.update_supported_locales()
if self.webapp.is_packaged:
# Unlink signed packages if packaged app.
public_storage.delete(f.signed_file_path)
log.info(u'Unlinked file: %s' % f.signed_file_path)
private_storage.delete(f.signed_reviewer_file_path)
log.info(u'Unlinked file: %s' % f.signed_reviewer_file_path)
models.signals.post_delete.send(sender=Version, instance=self)
开发者ID:shahbaz17,项目名称:zamboni,代码行数:32,代码来源:models.py
示例5: reviewer_sign_file
def reviewer_sign_file(self):
"""Sign the original file (`file_path`) with reviewer certs, then move
the signed file to the reviewers-specific signed path
(`reviewer_signed_file_path`) on private storage."""
if not self.extension.uuid:
raise SigningError('Need uuid to be set to sign')
if not self.pk:
raise SigningError('Need version pk to be set to sign')
ids = json.dumps({
'id': self.review_id,
'version': self.pk
})
with statsd.timer('extensions.sign_reviewer'):
try:
# This will read the file from self.file_path, generate a
# reviewer signature and write the signed file to
# self.reviewer_signed_file_path.
sign_app(private_storage.open(self.file_path),
self.reviewer_signed_file_path, ids, reviewer=True)
except SigningError:
log.info(
'[ExtensionVersion:%s] Reviewer Signing failed' % self.pk)
if private_storage.exists(self.reviewer_signed_file_path):
private_storage.delete(self.reviewer_signed_file_path)
raise
开发者ID:Witia1,项目名称:zamboni,代码行数:25,代码来源:models.py
示例6: rmtree
def rmtree(prefix):
dirs, files = private_storage.listdir(prefix)
for fname in files:
private_storage.delete(os.path.join(prefix, fname))
for d in dirs:
rmtree(os.path.join(prefix, d))
private_storage.delete(prefix)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:7,代码来源:helpers.py
示例7: dump_user_installs_cron
def dump_user_installs_cron():
"""
Sets up tasks to do user install dumps.
"""
chunk_size = 100
# Get valid users to dump.
user_ids = set(Installed.objects.filter(user__enable_recommendations=True)
.values_list('user', flat=True))
# Clean up the path where we'll store the individual json files from each
# user installs dump (which are in users/ in DUMPED_USERS_PATH).
path_to_cleanup = os.path.join(settings.DUMPED_USERS_PATH, 'users')
task_log.info('Cleaning up path {0}'.format(path_to_cleanup))
try:
for dirpath, dirnames, filenames in walk_storage(
path_to_cleanup, storage=private_storage):
for filename in filenames:
private_storage.delete(os.path.join(dirpath, filename))
except OSError:
# Ignore if the directory does not exist.
pass
grouping = []
for chunk in chunked(user_ids, chunk_size):
grouping.append(dump_user_installs.subtask(args=[chunk]))
post = zip_users.subtask(immutable=True)
ts = chord(grouping, post)
ts.apply_async()
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:29,代码来源:cron.py
示例8: reviewer_sign_file
def reviewer_sign_file(self):
"""Sign the original file (`file_path`) with reviewer certs, then move
the signed file to the reviewers-specific signed path
(`reviewer_signed_file_path`) on private storage."""
if not self.extension.uuid:
raise SigningError('Need uuid to be set to sign')
if not self.pk:
raise SigningError('Need version pk to be set to sign')
ids = json.dumps({
# Reviewers get a unique 'id' so the reviewer installed add-on
# won't conflict with the public add-on, and also so even multiple
# versions of the same add-on can be installed side by side with
# other versions.
'id': 'reviewer-{guid}-{version_id}'.format(
guid=self.extension.uuid, version_id=self.pk),
'version': self.pk
})
with statsd.timer('extensions.sign_reviewer'):
try:
# This will read the file from self.file_path, generate a
# reviewer signature and write the signed file to
# self.reviewer_signed_file_path.
sign_app(private_storage.open(self.file_path),
self.reviewer_signed_file_path, ids, reviewer=True)
except SigningError:
log.info(
'[ExtensionVersion:%s] Reviewer Signing failed' % self.pk)
if private_storage.exists(self.reviewer_signed_file_path):
private_storage.delete(self.reviewer_signed_file_path)
raise
开发者ID:1Smert1,项目名称:zamboni,代码行数:30,代码来源:models.py
示例9: test_different_tree
def test_different_tree(self):
self.file_viewer.extract()
private_storage.delete(os.path.join(self.file_viewer.left.dest,
not_binary))
res = self.client.get(self.file_url(not_binary))
doc = pq(res.content)
eq_(doc('h4:last').text(), 'Deleted files:')
eq_(len(doc('ul.root')), 2)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:8,代码来源:test_views.py
示例10: test_view_one_missing
def test_view_one_missing(self):
self.file_viewer.extract()
private_storage.delete(os.path.join(self.file_viewer.right.dest,
'script.js'))
res = self.client.get(self.file_url(not_binary))
doc = pq(res.content)
eq_(len(doc('pre')), 3)
eq_(len(doc('#content-wrapper p')), 2)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:8,代码来源:test_views.py
示例11: test_bom
def test_bom(self):
dest = os.path.join(settings.TMP_PATH, 'test_bom')
with private_storage.open(dest, 'w') as f:
f.write('foo'.encode('utf-16'))
self.viewer.select('foo')
self.viewer.selected = {'full': dest, 'size': 1}
eq_(self.viewer.read_file(), u'foo')
private_storage.delete(dest)
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:8,代码来源:test_helpers.py
示例12: setup_files
def setup_files(self):
# Clean out any left over stuff.
private_storage.delete(self.file.signed_file_path)
private_storage.delete(self.file.signed_reviewer_file_path)
# Make sure the source file is there.
if not private_storage.exists(self.file.file_path):
copy_to_storage(self.packaged_app_path('mozball.zip'),
self.file.file_path)
开发者ID:jamesthechamp,项目名称:zamboni,代码行数:9,代码来源:tests.py
示例13: cleanup
def cleanup(self):
try:
for root, dirs, files in walk_storage(
self.dest, storage=private_storage):
for fname in files:
private_storage.delete(os.path.join(root, fname))
except OSError as e:
if e.errno == 2:
# Directory doesn't exist, nothing to clean up.
return
raise
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:11,代码来源:helpers.py
示例14: generate_previews
def generate_previews(app, n=1):
gen = pydenticon.Generator(8, 12, foreground=foreground, digest=hashlib.sha512)
for i in range(n):
img = gen.generate(unicode(app.name) + unichr(i), 320, 480, output_format="png")
p = Preview.objects.create(addon=app, filetype="image/png", caption="screenshot " + str(i), position=i)
fn = tempfile.mktemp()
try:
f = private_storage.open(fn, "w")
f.write(img)
f.close()
resize_preview(fn, p.pk)
finally:
private_storage.delete(fn)
开发者ID:jostw,项目名称:zamboni,代码行数:13,代码来源:fakedata.py
示例15: test_delete_with_file
def test_delete_with_file(self):
"""Test that when a Extension instance is deleted, the corresponding
file on the filesystem is also deleted."""
extension = Extension.objects.create(version='0.1')
file_path = extension.file_path
with private_storage.open(file_path, 'w') as f:
f.write('sample data\n')
assert private_storage.exists(file_path)
try:
extension.delete()
assert not private_storage.exists(file_path)
finally:
if private_storage.exists(file_path):
private_storage.delete(file_path)
开发者ID:shahbaz17,项目名称:zamboni,代码行数:14,代码来源:test_models.py
示例16: clean_old_signed
def clean_old_signed(seconds=60 * 60):
"""Clean out apps signed for reviewers."""
log.info('Removing old apps signed for reviewers')
root = settings.SIGNED_APPS_REVIEWER_PATH
# Local storage uses local time for file modification. S3 uses UTC time.
now = datetime.utcnow if storage_is_remote() else datetime.now
for nextroot, dirs, files in walk_storage(root):
for fn in files:
full = os.path.join(nextroot, fn)
age = now() - private_storage.modified_time(full)
if age.total_seconds() > seconds:
log.debug('Removing signed app: %s, %dsecs old.' % (
full, age.total_seconds()))
private_storage.delete(full)
开发者ID:jamesthechamp,项目名称:zamboni,代码行数:14,代码来源:cron.py
示例17: mkt_gc
def mkt_gc(**kw):
"""Site-wide garbage collections."""
log.info('Collecting data to delete')
logs = (ActivityLog.objects.filter(created__lt=days_ago(90))
.exclude(action__in=mkt.LOG_KEEP).values_list('id', flat=True))
for chunk in chunked(logs, 100):
chunk.sort()
log.info('Deleting log entries: %s' % str(chunk))
delete_logs.delay(chunk)
# Clear oauth nonce rows. These expire after 10 minutes but we're just
# clearing those that are more than 1 day old.
Nonce.objects.filter(created__lt=days_ago(1)).delete()
# Delete the dump apps over 30 days.
_remove_stale_files(os.path.join(settings.DUMPED_APPS_PATH, 'tarballs'),
settings.DUMPED_APPS_DAYS_DELETE,
'Deleting old tarball: {0}',
storage=public_storage)
# Delete the dumped user installs over 30 days. Those are using private
# storage.
_remove_stale_files(os.path.join(settings.DUMPED_USERS_PATH, 'tarballs'),
settings.DUMPED_USERS_DAYS_DELETE,
'Deleting old tarball: {0}',
storage=private_storage)
# Delete old files in select directories under TMP_PATH.
_remove_stale_files(os.path.join(settings.TMP_PATH, 'preview'),
settings.TMP_PATH_DAYS_DELETE,
'Deleting TMP_PATH file: {0}',
storage=private_storage)
_remove_stale_files(os.path.join(settings.TMP_PATH, 'icon'),
settings.TMP_PATH_DAYS_DELETE,
'Deleting TMP_PATH file: {0}',
storage=private_storage)
# Delete stale FileUploads.
for fu in FileUpload.objects.filter(created__lte=days_ago(90)):
log.debug(u'[FileUpload:{uuid}] Removing file: {path}'
.format(uuid=fu.uuid, path=fu.path))
if fu.path:
try:
private_storage.delete(fu.path)
except OSError:
pass
fu.delete()
开发者ID:Fjoerfoks,项目名称:zamboni,代码行数:48,代码来源:cron.py
示例18: export_data
def export_data(name=None):
today = datetime.datetime.today().strftime('%Y-%m-%d')
if name is None:
name = today
# Clean up the path where we'll store the individual json files from each
# app dump.
for dirpath, dirnames, filenames in walk_storage(
settings.DUMPED_APPS_PATH, storage=private_storage):
for filename in filenames:
private_storage.delete(os.path.join(dirpath, filename))
task_log.info('Cleaning up path {0}'.format(settings.DUMPED_APPS_PATH))
# Run all dump_apps task in parallel, and once it's done, add extra files
# and run compression.
chord(dump_all_apps_tasks(),
compress_export.si(tarball_name=name, date=today)).apply_async()
开发者ID:ayushagrawal288,项目名称:zamboni,代码行数:17,代码来源:tasks.py
示例19: resize_promo_imgs
def resize_promo_imgs(src, dst, sizes, **kw):
"""Resizes webapp/website promo imgs."""
log.info('[[email protected]] Resizing promo imgs: %s' % dst)
try:
for s in sizes:
size_dst = '%s-%s.png' % (dst, s)
# Crop only to the width, keeping the aspect ratio.
resize_image(src, size_dst, (s, 0), remove_src=False)
pngcrush_image.delay(size_dst, **kw)
with private_storage.open(src) as fd:
promo_img_hash = _hash_file(fd)
private_storage.delete(src)
log.info('Promo img hash resizing completed for: %s' % dst)
return {'promo_img_hash': promo_img_hash}
except Exception, e:
log.error("Error resizing promo img hash: %s; %s" % (e, dst))
开发者ID:kolyaflash,项目名称:zamboni,代码行数:18,代码来源:tasks.py
示例20: test_delete_with_file
def test_delete_with_file(self):
"""Test that when a Extension instance is deleted, the ExtensionVersion
referencing it are also deleted, as well as the attached files."""
extension = Extension.objects.create()
version = ExtensionVersion.objects.create(
extension=extension, version='0.1')
file_path = version.file_path
with private_storage.open(file_path, 'w') as f:
f.write('sample data\n')
assert private_storage.exists(file_path)
try:
extension.delete()
assert not Extension.objects.count()
assert not ExtensionVersion.objects.count()
assert not private_storage.exists(file_path)
finally:
if private_storage.exists(file_path):
private_storage.delete(file_path)
开发者ID:demagu-sr,项目名称:zamboni,代码行数:18,代码来源:test_models.py
注:本文中的mkt.site.storage_utils.private_storage.delete函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论