本文整理汇总了Python中swapper.load_model函数的典型用法代码示例。如果您正苦于以下问题:Python load_model函数的具体用法?Python load_model怎么用?Python load_model使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了load_model函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_imagestore_permissions
def setup_imagestore_permissions(instance, created, **kwargs):
if not created:
return
try:
Album = swapper.load_model('imagestore', 'Album')
Image = swapper.load_model('imagestore', 'Image')
image_applabel, image_classname = Image._meta.app_label, Image.__name__.lower()
album_applabel, album_classname = Album._meta.app_label, Album.__name__.lower()
image_ctype = ContentType.objects.get(app_label=image_applabel, model=image_classname)
album_ctype = ContentType.objects.get(app_label=album_applabel, model=album_classname)
add_image_permission = Permission.objects.get(codename='add_%s' % image_classname, content_type=image_ctype)
add_album_permission = Permission.objects.get(codename='add_%s' % album_classname, content_type=album_ctype)
change_image_permission = Permission.objects.get(codename='change_%s' % image_classname, content_type=image_ctype)
change_album_permission = Permission.objects.get(codename='change_%s' % album_classname, content_type=album_ctype)
delete_image_permission = Permission.objects.get(codename='delete_%s' % image_classname, content_type=image_ctype)
delete_album_permission = Permission.objects.get(codename='delete_%s' % album_classname, content_type=album_ctype)
instance.user_permissions.add(add_image_permission, add_album_permission,)
instance.user_permissions.add(change_image_permission, change_album_permission,)
instance.user_permissions.add(delete_image_permission, delete_album_permission,)
except ObjectDoesNotExist:
# Permissions are not yet installed or content does not created yet
# probaly this is first
pass
开发者ID:SaadBinShahid,项目名称:basic-django-imagestore,代码行数:26,代码来源:image.py
示例2: process_zipfile
def process_zipfile(uploaded_album):
Image = swapper.load_model('imagestore', 'Image')
Album = swapper.load_model('imagestore', 'Album')
if default_storage.exists(uploaded_album.zip_file.name):
# TODO: implement try-except here
zip = zipfile.ZipFile(uploaded_album.zip_file)
bad_file = zip.testzip()
if bad_file:
raise Exception('"%s" in the .zip archive is corrupt.' % bad_file)
if not uploaded_album.album:
uploaded_album.album = \
Album.objects.create(name=uploaded_album.new_album_name)
for filename in sorted(zip.namelist()):
try:
encoding = chardet.detect(filename)['encoding']
unicode_filename = filename.decode(encoding)
except ValueError: # if detect takes unicode string
unicode_filename = filename
# do not process meta files
if unicode_filename.startswith('__'):
continue
logger.info('Processing file {}.'.format(unicode_filename))
data = zip.read(filename)
if len(data):
try:
# the following is taken from django.forms.fields.ImageField:
# load() could spot a truncated JPEG, but it loads the entire
# image in memory, which is a DoS vector. See #3848 and #18520.
# verify() must be called immediately after the constructor.
PILImage.open(BytesIO(data)).verify()
except Exception as e:
# if a "bad" file is found we just skip it.
logger.info('Error verify image: %s' % e)
continue
if hasattr(data, 'seek') and \
isinstance(data.seek, collections.Callable):
logger.info('Seeked')
data.seek(0)
try:
img = Image(album=uploaded_album.album)
img.image.save(filename, ContentFile(data))
img.save()
except Exception as e:
logger.info('Error create Image: %s' % e)
continue
zip.close()
uploaded_album.delete()
开发者ID:holms,项目名称:imagestore,代码行数:55,代码来源:upload.py
示例3: test_create
def test_create(self):
Type = swapper.load_model('default_app', 'Type')
Item = swapper.load_model('default_app', 'Item')
Item.objects.create(
type=Type.objects.create(name="Type 1"),
name="Item 1",
)
self.assertEqual(Item.objects.count(), 1)
item = Item.objects.all()[0]
self.assertEqual(item.type.name, "Type 1")
开发者ID:wq,项目名称:django-swappable-models,代码行数:13,代码来源:test_swapper.py
示例4: set_for_events
def set_for_events(self, delete=True, **event_filter):
"""
Update EventResult cache for the given events. The event query should
be specified as query keyword arguments, rather than a queryset, so
that a JOIN can be used instead of retrieving the results for each
event separately.
"""
# Delete existing EventResults (using denormalized Event fields)
if delete:
er_filter = {
'event_' + key: val for key, val in event_filter.items()
}
self.filter(**er_filter).delete()
# Filter results (using JOIN through Report to Event)
Result = swapper.load_model('vera', 'Result')
result_filter = {
'report__event__' + key: val for key, val in event_filter.items()
}
ers = []
results = Result.objects.valid_results(
**result_filter
).select_related('report__event')
for result in results:
er = self.model(
event=result.report.event,
result=result
)
er.denormalize()
ers.append(er)
self.bulk_create(ers)
开发者ID:TipOfit,项目名称:vera,代码行数:32,代码来源:models.py
示例5: test_swap_fields
def test_swap_fields(self):
Type = swapper.load_model('default_app', 'Type')
fields = dict(
(field.name, field)
for field in Type._meta.fields
)
self.assertIn('code', fields)
开发者ID:wq,项目名称:django-swappable-models,代码行数:7,代码来源:test_swapper.py
示例6: test_fields
def test_fields(self):
Type = swapper.load_model('default_app', 'Type')
fields = {
field.name: field
for field in Type._meta.fields
}
self.assertIn('name', fields)
开发者ID:ebugfix,项目名称:django-swappable-models,代码行数:7,代码来源:test_swapper.py
示例7: test_page_fails_with_nonexistant_template
def test_page_fails_with_nonexistant_template(client):
Page = swapper.load_model('varlet', 'Page')
homepage = Page(url='', template='home.html')
with pytest.raises(ValidationError) as e:
homepage.full_clean()
homepage.save()
with pytest.raises(TemplateDoesNotExist):
client.get(homepage.get_absolute_url())
开发者ID:kezabelle,项目名称:django-varlet,代码行数:8,代码来源:test_views.py
示例8: process_tweet_queue
def process_tweet_queue(self):
"""
Inserts any queued tweets into the database.
It is ok for this to be called on a thread other than the streaming thread.
"""
# this is for calculating the tps rate
now = time.time()
diff = now - self.time
self.time = now
try:
batch = self.queue.get_all_nowait()
except Queue.Empty:
return 0
if len(batch) == 0:
return 0
Tweet = load_model("twitter_stream", "Tweet")
tweets = []
for status in batch:
if settings.CAPTURE_EMBEDDED and 'retweeted_status' in status:
if self.to_file:
tweets.append(json.dumps(status['retweeted_status']))
else:
tweets.append(Tweet.create_from_json(status['retweeted_status']))
if self.to_file:
if 'retweeted_status' in status:
del status['retweeted_status']
tweets.append(json.dumps(status))
else:
tweets.append(Tweet.create_from_json(status))
if tweets:
if self.to_file:
if not self._output_file or self._output_file.closed:
self._output_file = open(self.to_file, 'ab')
self._output_file.write("\n".join(tweets) + "\n")
self._output_file.flush()
logger.info("Dumped %s tweets at %s tps to %s" % (len(tweets), len(tweets) / diff, self.to_file))
else:
Tweet.objects.bulk_create(tweets, settings.INSERT_BATCH_SIZE)
logger.info("Inserted %s tweets at %s tps" % (len(tweets), len(tweets) / diff))
else:
logger.info("Saved 0 tweets")
if settings.DEBUG:
# Prevent apparent memory leaks
# https://docs.djangoproject.com/en/dev/faq/models/#why-is-django-leaking-memory
from django import db
db.reset_queries()
return len(tweets) / diff
开发者ID:acquayefrank,项目名称:django-twitter-stream,代码行数:58,代码来源:streaming.py
示例9: test_page_renders_html_ok
def test_page_renders_html_ok(client):
Page = swapper.load_model('varlet', 'Page')
homepage = Page(url='', template='varlet/pages/layouts/test_template.html')
homepage.full_clean()
homepage.save()
response = client.get(homepage.get_absolute_url())
assert response.status_code == 200
response.render()
assert '<b>Test template</b>' in force_text(response.content)
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_views.py
示例10: test_page_renders_json_detailview_ok
def test_page_renders_json_detailview_ok(client):
Page = swapper.load_model('varlet', 'Page')
page = Page(url='/test/', template='varlet/pages/layouts/test_template.html')
page.full_clean()
page.save()
response = client.get(page.get_absolute_url(), HTTP_ACCEPT="application/json")
assert response.status_code == 200
data = json.loads(force_text(response.content))
assert data['get_absolute_url'] == '/test/'
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_views.py
示例11: test_rendering_sitemaps
def test_rendering_sitemaps(rf):
Page = swapper.load_model('varlet', 'Page')
pages = [Page.objects.create(url=str(x), template="varlet/pages/layouts/test_template.html") for x in range(1, 5)]
sm = PageSitemap()
response = sitemap(rf.get('/'), sitemaps={'pages': sm})
response.render()
assert '<loc>http://example.com/3/</loc>' in force_text(response.content)
assert '<loc>http://example.com/2/</loc>' in force_text(response.content)
assert '<loc>http://example.com/1/</loc>' in force_text(response.content)
开发者ID:kezabelle,项目名称:django-varlet,代码行数:9,代码来源:test_sitemaps.py
示例12: get_admin_url
def get_admin_url(self):
model = swapper.load_model('varlet', 'Page')
if not admin.site.is_registered(model):
return None
try:
url = reverse('admin:{}_{}_autocomplete'.format(
model._meta.app_label, model._meta.model_name))
except NoReverseMatch:
return None
return url
开发者ID:kezabelle,项目名称:django-varlet,代码行数:10,代码来源:admin.py
示例13: get_page
def get_page(path):
Page = swapper.load_model('varlet', 'Page')
field = Page._meta.get_field('url')
try:
cleaned_path = field.to_python(path)
except ValidationError:
return None
try:
return Page.objects.get(url=cleaned_path)
except (Page.DoesNotExist, MultipleObjectsReturned):
return None
开发者ID:kezabelle,项目名称:django-varlet,代码行数:11,代码来源:page_tags.py
示例14: process_zipfile
def process_zipfile(uploaded_album):
Image = swapper.load_model('imagestore', 'Image')
Album = swapper.load_model('imagestore', 'Album')
if default_storage.exists(uploaded_album.zip_file.name):
# TODO: implement try-except here
zip = zipfile.ZipFile(uploaded_album.zip_file)
bad_file = zip.testzip()
if bad_file:
raise Exception('"%s" in the .zip archive is corrupt.' % bad_file)
if not uploaded_album.album:
uploaded_album.album = Album.objects.create(name=uploaded_album.new_album_name)
for filename in sorted(zip.namelist()):
if filename.startswith('__'): # do not process meta files
continue
print(filename.encode('ascii', errors='replace'))
data = zip.read(filename)
if len(data):
try:
# the following is taken from django.forms.fields.ImageField:
# load() could spot a truncated JPEG, but it loads the entire
# image in memory, which is a DoS vector. See #3848 and #18520.
# verify() must be called immediately after the constructor.
PILImage.open(six.moves.cStringIO(data)).verify()
except Exception as ex:
# if a "bad" file is found we just skip it.
print('Error verify image: %s' % ex.message)
continue
if hasattr(data, 'seek') and isinstance(data.seek, collections.Callable):
print('seeked')
data.seek(0)
try:
img = Image(album=uploaded_album.album)
img.image.save(filename, ContentFile(data))
img.save()
except Exception as ex:
print('error create Image: %s' % ex.message)
zip.close()
uploaded_album.delete()
开发者ID:SaadBinShahid,项目名称:basic-django-imagestore,代码行数:41,代码来源:upload.py
示例15: setup_imagestore_permissions
def setup_imagestore_permissions(instance, created, **kwargs):
if not created:
return
try:
Album = swapper.load_model('imagestore', 'Album')
Image = swapper.load_model('imagestore', 'Image')
perms = []
for model_class in [Album, Image]:
for perm_name in ['add', 'change', 'delete']:
app_label, model_name = model_class._meta.app_label, model_class.__name__.lower()
perm = Permission.objects.get_by_natural_key('%s_%s' % (perm_name, model_name), app_label, model_name)
perms.append(perm)
instance.user_permissions.add(*perms)
except ObjectDoesNotExist:
# Permissions are not yet installed or content does not created yet
# probaly this is first
pass
开发者ID:grengojbo,项目名称:imagestore,代码行数:21,代码来源:image.py
示例16: stream_status
def stream_status():
terms = FilterTerm.objects.filter(enabled=True)
processes = StreamProcess.get_current_stream_processes()
running = False
for p in processes:
if p.status == StreamProcess.STREAM_STATUS_RUNNING:
running = True
break
Tweet = load_model("twitter_stream", "Tweet")
tweet_count = Tweet.count_approx()
earliest_time = Tweet.get_earliest_created_at()
latest_time = Tweet.get_latest_created_at()
avg_rate = None
if earliest_time is not None and latest_time is not None:
avg_rate = float(tweet_count) / (latest_time - earliest_time).total_seconds()
# Get the tweets / minute over the past 10 minutes
tweet_counts = []
if latest_time is not None:
latest_time_minute = latest_time.replace(second=0, microsecond=0)
if settings.DATABASES['default']['ENGINE'].endswith('mysql'):
drop_seconds = "created_at - INTERVAL SECOND(created_at) SECOND"
elif settings.DATABASES['default']['ENGINE'].endswith('postgresql_psycopg2'):
drop_seconds = "date_trunc('minute', created_at)"
else:
drop_seconds = "created_at"
tweet_counts = Tweet.objects.extra(select={
'time': drop_seconds
}) \
.filter(created_at__gt=latest_time_minute - timedelta(minutes=20)) \
.values('time') \
.order_by('time') \
.annotate(tweets=models.Count('id'))
tweet_counts = list(tweet_counts)
for row in tweet_counts:
row['time'] = row['time'].isoformat()
return {
'running': running,
'terms': [t.term for t in terms],
'processes': processes,
'tweet_count': tweet_count,
'earliest': earliest_time,
'latest': latest_time,
'avg_rate': avg_rate,
'timeline': tweet_counts
}
开发者ID:michaelbrooks,项目名称:django-twitter-stream,代码行数:53,代码来源:views.py
示例17: test_page_renders_json_listview_ok
def test_page_renders_json_listview_ok(client):
Page = swapper.load_model('varlet', 'Page')
homepage = Page(url='', template='varlet/pages/layouts/test_template.html')
homepage.full_clean()
homepage.save()
for x in range(1, 3):
y = Page.objects.create(url=str(x), template='varlet/pages/layouts/test_template.html')
response = client.get(homepage.get_absolute_url(), HTTP_ACCEPT="application/json")
assert response.status_code == 200
data = json.loads(force_text(response.content))
assert len(data) == 3
assert {x['get_absolute_url'] for x in data} == {"/", "/1/", "/2/"}
开发者ID:kezabelle,项目名称:django-varlet,代码行数:12,代码来源:test_views.py
示例18: dictionary_get
def dictionary_get(request):
if request.method != "GET":
return False
# print('start view')
model_name = request.GET["model_name"]
# print(model_name)
my_model = swapper.load_model("chemical", model_name)
model_dict = {}
for mdl in my_model.objects.all():
model_dict[mdl.pk] = mdl.name
xml_bytes = json.dumps(model_dict)
# print(xml_bytes)
# print('end eiw')
return HttpResponse(xml_bytes, "application/json")
开发者ID:KateGL,项目名称:chemcloudsite,代码行数:16,代码来源:views.py
示例19: vals
def vals(self, vals):
Parameter = swapper.load_model('vera', 'Parameter')
keys = [(key,) for key in vals.keys()]
params, success = Parameter.objects.resolve_keys(keys)
if not success:
missing = [
name for name, params in params.items() if params is None
]
raise TypeError(
"Could not identify one or more parameters: %s!"
% missing
)
for key, param in params.items():
result, is_new = self.results.get_or_create(type=param)
result.value = vals[key[0]]
result.save()
开发者ID:TipOfit,项目名称:vera,代码行数:17,代码来源:models.py
示例20: valid_results
def valid_results(self, **filter):
Parameter = swapper.load_model('vera', 'Parameter')
filter['report__status__is_valid'] = True
filter['empty'] = False
# DISTINCT ON event, then parameter, collapsing results from different
# reports into one
distinct = ['report__event__id'] + nest_ordering(
'type', Parameter._meta.ordering
) + ['type__id']
# ORDER BY distinct fields, then valid report order
order = distinct + nest_ordering('report', VALID_REPORT_ORDER)
return self.filter(
**filter
).order_by(*order).distinct(*distinct).select_related('report')
开发者ID:TipOfit,项目名称:vera,代码行数:18,代码来源:models.py
注:本文中的swapper.load_model函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论