本文整理汇总了Python中udata.utils.get_by函数的典型用法代码示例。如果您正苦于以下问题:Python get_by函数的具体用法?Python get_by怎么用?Python get_by使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_by函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_resource
def get_resource(id):
'''Fetch a resource given its UUID'''
dataset = Dataset.objects(resources__id=id).first()
if dataset:
return get_by(dataset.resources, 'id', id)
else:
return CommunityResource.objects(id=id).first()
开发者ID:odtvince,项目名称:udata,代码行数:7,代码来源:models.py
示例2: delete
def delete(self, **kwargs):
self.kwargs = kwargs
user = User.objects.get_or_404(id=request.form.get('user_id'))
member = get_by(self.organization.members, 'user', user)
self.organization.members.remove(member)
self.organization.save()
return '', 204
开发者ID:grouan,项目名称:udata,代码行数:7,代码来源:views.py
示例3: remote_organizations
def remote_organizations(self):
response = self.get('organization_list')
for name in response['result']:
details = self.get('organization_show', {'id': name})['result']
organization = self.get_harvested(Organization, details['id'])
organization.name = details['title']
organization.slug = details['name']
organization.description = details['description']
organization.image_url = details['image_url'] or None
if self.config.get('users') is not None:
for member in details['users']:
user = self.get_harvested(User, member['id'], create=False)
if user and not get_by(organization.members, 'user', user):
role = 'admin' if member['capacity'] == 'admin' else 'editor'
organization.members.append(Member(role=role, user=user))
yield organization
if not organization.id:
continue
followers = self.get('group_follower_list', {'id': name})['result']
for follower in followers:
user = self.get_harvested(User, follower['id'], create=False)
if user:
follow, created = FollowOrg.objects.get_or_create(follower=user, following=organization)
开发者ID:remilejeune,项目名称:udata-harvest,代码行数:27,代码来源:ckan.py
示例4: delete
def delete(self, slug, rid):
dataset = Dataset.objects.get_or_404(slug=slug)
resource = get_by(dataset.resources, 'id', UUID(rid))
if not resource:
abort(404)
dataset.resources.remove(resource)
dataset.save()
return '', 204
开发者ID:rossjones,项目名称:udata,代码行数:8,代码来源:api.py
示例5: delete
def delete(self, **kwargs):
self.kwargs = kwargs
org = self.get_object()
user = User.objects.get_or_404(id=request.form.get('user_id'))
member = get_by(org.members, 'user', user)
org.members.remove(member)
org.save()
return '', 204
开发者ID:rossjones,项目名称:udata,代码行数:8,代码来源:views.py
示例6: put
def put(self, dataset):
'''Reorder resources'''
new_resources = []
for rid in request.json:
resource = get_by(dataset.resources, 'id', UUID(rid))
new_resources.append(resource)
dataset.resources = new_resources
dataset.save()
return dataset.resources, 200
开发者ID:guillo-w,项目名称:udata,代码行数:9,代码来源:api.py
示例7: on_form_valid
def on_form_valid(self, form):
user = User.objects.get_or_404(id=form.pk.data)
member = get_by(self.organization.members, 'user', user)
if member:
member.role = form.value.data
else:
member = Member(user=user, role=form.value.data or 'editor')
self.organization.members.append(member)
self.organization.save()
return '', 200
开发者ID:pombredanne,项目名称:udata,代码行数:10,代码来源:views.py
示例8: put
def put(self, slug, rid):
dataset = Dataset.objects.get_or_404(slug=slug)
resource = get_by(dataset.resources, 'id', UUID(rid))
if not resource:
abort(404)
form = ResourceForm(request.form, instance=resource, csrf_enabled=False)
if not form.validate():
return {'errors': form.errors}, 400
form.populate_obj(resource)
dataset.save()
return marshal(resource, resource_fields)
开发者ID:rossjones,项目名称:udata,代码行数:11,代码来源:api.py
示例9: resource_from_rdf
def resource_from_rdf(graph_or_distrib, dataset=None):
'''
Map a Resource domain model to a DCAT/RDF graph
'''
if isinstance(graph_or_distrib, RdfResource):
distrib = graph_or_distrib
else:
node = graph_or_distrib.value(predicate=RDF.type,
object=DCAT.Distribution)
distrib = graph_or_distrib.resource(node)
download_url = url_from_rdf(distrib, DCAT.downloadURL)
access_url = url_from_rdf(distrib, DCAT.accessURL)
url = safe_unicode(download_url or access_url)
if dataset:
resource = get_by(dataset.resources, 'url', url)
if not dataset or not resource:
resource = Resource()
if dataset:
dataset.resources.append(resource)
resource.title = title_from_rdf(distrib, url)
resource.url = url
resource.description = sanitize_html(distrib.value(DCT.description))
resource.filesize = rdf_value(distrib, DCAT.bytesSize)
resource.mime = rdf_value(distrib, DCAT.mediaType)
fmt = rdf_value(distrib, DCT.term('format'))
if fmt:
resource.format = fmt.lower()
checksum = distrib.value(SPDX.checksum)
if checksum:
algorithm = checksum.value(SPDX.algorithm).identifier
algorithm = CHECKSUM_ALGORITHMS.get(algorithm)
if algorithm:
resource.checksum = Checksum()
resource.checksum.value = rdf_value(checksum, SPDX.checksumValue)
resource.checksum.type = algorithm
resource.published = rdf_value(distrib, DCT.issued, resource.published)
resource.modified = rdf_value(distrib, DCT.modified, resource.modified)
identifier = rdf_value(distrib, DCT.identifier)
if identifier:
resource.extras['dct:identifier'] = identifier
if isinstance(distrib.identifier, URIRef):
resource.extras['uri'] = distrib.identifier.toPython()
return resource
开发者ID:opendatateam,项目名称:udata,代码行数:49,代码来源:rdf.py
示例10: test_coverage_for_level
def test_coverage_for_level(self):
GeoLevelFactory(id='top')
GeoLevelFactory(id='sub', parents=['top'])
GeoLevelFactory(id='child', parents=['sub'])
topzones, subzones, childzones = [], [], []
for _ in range(2):
zone = GeoZoneFactory(level='top')
topzones.append(zone)
for _ in range(2):
subzone = GeoZoneFactory(level='sub', parents=[zone.id])
subzones.append(subzone)
for _ in range(2):
childzone = GeoZoneFactory(
level='child', parents=[zone.id, subzone.id])
childzones.append(childzone)
for zone in topzones + subzones + childzones:
VisibleDatasetFactory(
spatial=SpatialCoverageFactory(zones=[zone.id]))
response = self.get(url_for('api.spatial_coverage', level='sub'))
self.assert200(response)
self.assertEqual(len(response.json['features']), len(subzones))
for feature in response.json['features']:
self.assertEqual(feature['type'], 'Feature')
zone = get_by(subzones, 'id', feature['id'])
self.assertIsNotNone(zone)
self.assertJsonEqual(feature['geometry'], zone.geom)
properties = feature['properties']
self.assertEqual(properties['name'], zone.name)
self.assertEqual(properties['code'], zone.code)
self.assertEqual(properties['level'], 'sub')
# Nested levels datasets should be counted
self.assertEqual(properties['datasets'], 3)
开发者ID:anukat2015,项目名称:udata,代码行数:38,代码来源:test_api.py
示例11: handle_downloads
def handle_downloads(self, row, day):
if 'url' in row:
try:
hashed_url = hash_url(row['url'])
data = (
Dataset.objects(resources__urlhash=hashed_url).first()
or
CommunityResource.objects(urlhash=hashed_url).first()
)
if isinstance(data, Dataset):
dataset = data
resource = get_by(dataset.resources, 'urlhash', hashed_url)
log.debug('Found resource download: %s', resource.url)
self.count(resource, day, row)
metric = ResourceViews(resource)
metric.compute()
# Use the MongoDB positionnal operator ($)
cmd = 'set__resources__S__metrics__{0}'.format(metric.name)
qs = Dataset.objects(id=dataset.id,
resources__id=resource.id)
qs.update(**{cmd: metric.value})
if dataset.organization:
OrgResourcesDownloads(dataset.organization).compute()
elif isinstance(data, CommunityResource):
resource = data
log.debug('Found community resource download: %s',
resource.url)
self.count(resource, day, row)
metric = CommunityResourceViews(resource)
metric.compute()
resource.metrics[metric.name] = metric.value
resource.save()
except:
log.exception('Unable to count download for %s', row['url'])
if 'subtable' in row:
for subrow in row['subtable']:
self.handle_downloads(subrow, day)
开发者ID:noirbizarre,项目名称:udata-piwik,代码行数:38,代码来源:counter.py
示例12: populate_obj
def populate_obj(self, obj, name):
if not self.has_data:
return
initial_values = getattr(obj, name, [])
new_values = []
class Holder(object):
pass
holder = Holder()
for idx, field in enumerate(self):
initial = None
if hasattr(self.nested_model, 'id') and 'id' in field.data:
id = self.nested_model.id.to_python(field.data['id'])
initial = get_by(initial_values, 'id', id)
holder.nested = initial or self.nested_model()
field.populate_obj(holder, 'nested')
new_values.append(holder.nested)
setattr(obj, name, new_values)
开发者ID:opendatateam,项目名称:udata,代码行数:23,代码来源:fields.py
示例13: test_coverage_for_level
def test_coverage_for_level(self):
register_level('country', 'included', 'Included level')
included = [TerritoryFactory(level='included') for _ in range(2)]
excluded = [TerritoryFactory(level='country') for _ in range(2)]
[VisibleDatasetFactory(spatial=SpatialCoverageFactory(territories=[t.reference()])) for t in included]
[VisibleDatasetFactory(spatial=SpatialCoverageFactory(territories=[t.reference()])) for t in excluded]
response = self.get(url_for('api.spatial_coverage', level='included'))
self.assert200(response)
self.assertEqual(len(response.json['features']), 2)
for feature in response.json['features']:
self.assertEqual(feature['type'], 'Feature')
territory = get_by(included, 'id', ObjectId(feature['id']))
self.assertIsNotNone(territory)
self.assertEqual(feature['geometry'], territory.geom)
properties = feature['properties']
self.assertEqual(properties['name'], territory.name)
self.assertEqual(properties['code'], territory.code)
self.assertEqual(properties['level'], 'included')
self.assertEqual(properties['datasets'], 1)
开发者ID:guillo-w,项目名称:udata,代码行数:23,代码来源:test_spatial_api.py
示例14: _add_entry
def _add_entry(self, formdata=None, data=unset_value, index=None):
'''
Fill the form with previous data if necessary to handle partial update
'''
if formdata:
prefix = '-'.join((self.name, str(index)))
basekey = '-'.join((prefix, '{0}'))
idkey = basekey.format('id')
if prefix in formdata:
formdata[idkey] = formdata.pop(prefix)
if hasattr(self.nested_model, 'id') and idkey in formdata:
id = self.nested_model.id.to_python(formdata[idkey])
data = get_by(self.initial_data, 'id', id)
initial = flatten_json(self.nested_form,
data.to_mongo(),
prefix)
for key, value in initial.items():
if key not in formdata:
formdata[key] = value
else:
data = None
return super(NestedModelList, self)._add_entry(formdata, data, index)
开发者ID:opendatateam,项目名称:udata,代码行数:24,代码来源:fields.py
示例15: remote_datasets
def remote_datasets(self):
response = self.get('package_list')
for name in response['result']:
details = self.get('package_show', {'id': name})['result']
dataset = self.get_harvested(Dataset, details['id'])
# Core attributes
dataset.slug = details['name']
dataset.title = details['title']
dataset.description = details.get('notes', 'No description')
dataset.license = License.objects(id=details['license_id']).first() or License.objects.get(id='notspecified')
dataset.tags = [tag['name'].lower() for tag in details['tags']]
dataset.frequency = self.map('frequency', details) or 'unknown'
dataset.created_at = parse(details['metadata_created'])
dataset.last_modified = parse(details['metadata_modified'])
if any_field(details, 'territorial_coverage', 'territorial_coverage_granularity'):
coverage = TerritorialCoverage(
codes=[code.strip() for code in details.get('territorial_coverage', '').split(',') if code.strip()],
granularity=self.map('territorial_coverage_granularity', details),
)
dataset.extras['territorial_coverage'] = coverage
try:
dataset.spatial = territorial_to_spatial(dataset)
except Exception as e:
print 'Error while processing spatial coverage for {0}:'.format(dataset.title), e
if all_field(details, 'temporal_coverage_from', 'temporal_coverage_to'):
try:
dataset.temporal_coverage = db.DateRange(
start=daterange_start(details.get('temporal_coverage_from')),
end=daterange_end(details.get('temporal_coverage_to')),
)
except:
log.error('Unable to parse temporal coverage for dataset %s', details['id'])
# Organization
if details.get('organization'):
dataset.organization = self.get_harvested(Organization, details['organization']['id'], False)
else:
# Need to fetch user from roles
roles = self.get('roles_show', {'domain_object': name})['result']['roles']
for role in roles:
if role['role'] == 'admin' and role['context'] == 'Package':
dataset.owner = self.get_harvested(User, role['user_id'])
break
# Supplier
if details.get('supplier_id'):
dataset.supplier = self.get_harvested(Organization, details['supplier_id'], False)
# Remote URL
if details.get('url'):
dataset.extras['remote_url'] = details['url']
# Extras
if 'extras' in details:
extra_mapping = self.harvester.mapping.get('from_extras', {})
for extra in details['extras']:
if extra['key'] in self.harvester.mapping:
value = self.harvester.mapping[extra['key']].get(extra['value'])
else:
value = extra['value']
if extra['key'] in extra_mapping:
setattr(dataset, extra_mapping[extra['key']], value)
else:
dataset.extras[extra['key']] = value
# Resources
for res in details['resources']:
try:
resource = get_by(dataset.resources, 'id', UUID(res['id']))
except:
log.error('Unable to parse resource %s', res['id'])
continue
if not resource:
resource = Resource(id=res['id'])
dataset.resources.append(resource)
resource.title = res.get('name', '') or ''
resource.url = res['url']
resource.description = res.get('description')
resource.format = res.get('format')
resource.hash = res.get('hash')
resource.created = parse(res['created'])
resource.modified = parse(res['revision_timestamp'])
resource.published = resource.published or resource.created
yield dataset
if dataset.id:
followers = self.get('dataset_follower_list', {'id': name})['result']
for follower in followers:
user = self.get_harvested(User, follower['id'], False)
if user:
follow, created = FollowDataset.objects.get_or_create(follower=user, following=dataset)
开发者ID:remilejeune,项目名称:udata-harvest,代码行数:95,代码来源:ckan.py
示例16: get_resource_or_404
def get_resource_or_404(self, dataset, id):
resource = get_by(dataset.resources, 'id', id)
if not resource:
api.abort(404, 'Ressource does not exists')
return resource
开发者ID:opendatateam,项目名称:udata,代码行数:5,代码来源:api.py
示例17: process
def process(self, item):
response = self.get_action('package_show', id=item.remote_id)
data = self.validate(response['result'], schema)
# Fix the remote_id: use real ID instead of not stable name
item.remote_id = data['id']
# Skip if no resource
if not len(data.get('resources', [])):
msg = 'Dataset {0} has no record'.format(item.remote_id)
raise HarvestSkipException(msg)
dataset = self.get_dataset(item.remote_id)
# Core attributes
if not dataset.slug:
dataset.slug = data['name']
dataset.title = data['title']
dataset.description = data['notes']
dataset.license = License.objects(id=data['license_id']).first()
# dataset.license = license or License.objects.get(id='notspecified')
dataset.tags = [t['name'] for t in data['tags']]
dataset.created_at = data['metadata_created']
dataset.last_modified = data['metadata_modified']
dataset.extras['ckan:name'] = data['name']
temporal_start, temporal_end = None, None
spatial_geom = None
for extra in data['extras']:
# GeoJSON representation (Polygon or Point)
if extra['key'] == 'spatial':
spatial_geom = json.loads(extra['value'])
# Textual representation of the extent / location
elif extra['key'] == 'spatial-text':
log.debug('spatial-text value not handled')
print 'spatial-text', extra['value']
# Linked Data URI representing the place name
elif extra['key'] == 'spatial-uri':
log.debug('spatial-uri value not handled')
print 'spatial-uri', extra['value']
# Update frequency
elif extra['key'] == 'frequency':
print 'frequency', extra['value']
# Temporal coverage start
elif extra['key'] == 'temporal_start':
print 'temporal_start', extra['value']
temporal_start = daterange_start(extra['value'])
continue
# Temporal coverage end
elif extra['key'] == 'temporal_end':
print 'temporal_end', extra['value']
temporal_end = daterange_end(extra['value'])
continue
# else:
# print extra['key'], extra['value']
dataset.extras[extra['key']] = extra['value']
if spatial_geom:
dataset.spatial = SpatialCoverage()
if spatial_geom['type'] == 'Polygon':
coordinates = [spatial_geom['coordinates']]
elif spatial_geom['type'] == 'MultiPolygon':
coordinates = spatial_geom['coordinates']
else:
HarvestException('Unsupported spatial geometry')
dataset.spatial.geom = {
'type': 'MultiPolygon',
'coordinates': coordinates
}
if temporal_start and temporal_end:
dataset.temporal_coverage = db.DateRange(
start=temporal_start,
end=temporal_end,
)
# Remote URL
if data.get('url'):
dataset.extras['remote_url'] = data['url']
# Resources
for res in data['resources']:
if res['resource_type'] not in ALLOWED_RESOURCE_TYPES:
continue
try:
resource = get_by(dataset.resources, 'id', UUID(res['id']))
except:
log.error('Unable to parse resource ID %s', res['id'])
continue
if not resource:
resource = Resource(id=res['id'])
dataset.resources.append(resource)
resource.title = res.get('name', '') or ''
resource.description = res.get('description')
resource.url = res['url']
resource.filetype = ('api' if res['resource_type'] == 'api'
else 'remote')
#.........这里部分代码省略.........
开发者ID:michelbl,项目名称:udata,代码行数:101,代码来源:ckan.py
示例18: test_attribute_not_found
def test_attribute_not_found(self):
'''get_by() should not fail if an object don't have the given attr'''
self.assertIsNone(get_by(TEST_LIST, 'inexistant', 'value'))
开发者ID:noirbizarre,项目名称:udata,代码行数:3,代码来源:test_utils.py
示例19: test_not_found
def test_not_found(self):
'''get_by() should return None if not found'''
self.assertIsNone(get_by(TEST_LIST, 'name', 'not-found'))
开发者ID:noirbizarre,项目名称:udata,代码行数:3,代码来源:test_utils.py
示例20: test_find_object
def test_find_object(self):
'''get_by() should find an object in list'''
obj_lst = [ObjDict(d) for d in TEST_LIST]
result = get_by(obj_lst, 'name', 'bbb')
self.assertEqual(result.name, 'bbb')
self.assertEqual(result.another, 'ddd')
开发者ID:noirbizarre,项目名称:udata,代码行数:6,代码来源:test_utils.py
注:本文中的udata.utils.get_by函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论