本文整理汇总了Python中utilities.prompt_format.item函数的典型用法代码示例。如果您正苦于以下问题:Python item函数的具体用法?Python item怎么用?Python item使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了item函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: DownloadResource
def DownloadResource(url, default_dir='data', verbose=True):
'''Downloading a resource from HDX.'''
#
# Assemble file path.
#
file_name = os.path.basename(url)
file_path = os.path.join('data', file_name)
if verbose:
print '%s Downloading resource %s' % (item('prompt_bullet'), file_name)
#
# Make request.
#
try:
r = requests.get(url)
if r.status_code == 200:
with open(file_path, 'wb') as f:
for chunk in r:
f.write(chunk)
if verbose:
print '%s File %s was downloaded successfully.' % (item('prompt_bullet'), file_name)
except Exception as e:
if verbose:
print e
print '%s File %s failed to download.' % (item('prompt_error'), file_name)
return False
开发者ID:luiscape,项目名称:unosat-product-scraper-analysis,代码行数:30,代码来源:process.py
示例2: CollectAndStoreGaulData
def CollectAndStoreGaulData(csv_name, db_table='Gaul', verbose=True):
'''Use a CSV file to store the WFP-modified GAUL on a local database.'''
print '%s Storing GAUL database in DB (~5 mins).' % item('prompt_bullet')
#
# Data dir.
#
data_dir = os.path.split(dir)[0]
gaul_location = os.path.join(data_dir, 'config', csv_name)
#
# Storing GAUL on database.
#
try:
with open(gaul_location) as csv_file:
data = csv.DictReader(csv_file)
records = []
for row in data:
StoreRecords(row, db_table, verbose=True)
records.append(row)
# StoreRecords(records, db_table, verbose=True)
except Exception as e:
print "%s Failed to store GAUL database in DB." % item('prompt_error')
if verbose:
print e
return False
开发者ID:luiscape,项目名称:hdxscraper-wfp-vam-api,代码行数:29,代码来源:gaul.py
示例3: StoreRecords
def StoreRecords(data, table, verbose = False):
'''Store records in a ScraperWiki database.'''
# Available schemas.
schemas = {
'FCS': ["ADM0_ID", "ADM5_ID", "Methodology", "LivelihoodZoneName", "ADM4_ID", "FCS_borderline", "FCS_month", "IndicatorTypeID", "FCS_dataSource", "methodologyID", "FCS_year", "TargetGroup", "ADM3_ID", "ADM2_ID", "Lz_ID", "mr_id", "FCS_lowerThreshold", "FCS_id", "FCS_poor", "targetGroupID", "ADM1_ID", "FCS_upperThreshold", "FCS_acceptable", "FCS_mean"],
'CSI': ["CSI_rMediumCoping", "IndicatorTypeID", "ADM0_ID", "CSI_csHighCoping", "ADM5_ID", "LivelihoodZoneName", "ADM4_ID", "CSI_rDataSource", "CSI_csLowCoping", "MethodologyCs", "csMethodologyID", "CSI_rHighCoping", "CSI_id", "CSI_rMediumHighThreshold", "CSI_csMean", "CSI_rLowCoping", "CSI_rLowMediumThreshold", "rMethodologyID", "CSI_rMonth", "csTargetGroupID", "CSI_rNoCoping", "TargetGroupCs", "ADM3_ID", "CSI_csDataSource", "ADM2_ID", "TargetGroupR", "CSI_csLowMediumThreshold", "Lz_ID", "MethodologyR", "CSI_csMediumCoping", "mr_id", "CSI_csNoCoping", "CSI_rYear", "fdc", "CSI_csMediumHighThreshold", "rTargetGroupID", "CSI_csYear", "CSI_rMean", "ADM1_ID", "CSI_csMonth"],
'Income': ["IncomeSubCategoryID", "IncomeID", "Adm4_ID", "Adm0_ID", "IncomeYear", "Adm3_ID", "IndicatorTypeID", "Adm2_ID", "IncomeCategoryID", "Adm5_ID", "IncomeSubCategory", "IncomeCategory", "IncomeMonth", "mr_id", "IncomeValue", "Adm1_ID"]
}
try:
schema = schemas[table]
except Exception as e:
if verbose is True:
print "%s select one of the following tables: %s." % (item('prompt_error'), ", ".join(schemas.keys()))
print e
print '%s Could not find schema.' % item('prompt_error')
return False
try:
for record in data:
scraperwiki.sqlite.save(schema, record, table_name=table)
except Exception as e:
print "%s Failed to store record in database." % item('prompt_error')
print e
开发者ID:luiscape,项目名称:hdx-undelete-user,代码行数:29,代码来源:store_records.py
示例4: FetchData
def FetchData(url=Config.LoadConfig()['url']):
'''Fetching data from the UNOSAT API.'''
#
# Loading main URL from the config
# file and making request.
#
try:
u = url
r = requests.get(u)
except Exception as e:
print '%s Could not connect to url: %s' % (item('prompt_error'), url)
print e
return False
#
# Checking the status code.
#
if r.status_code != requests.codes.ok:
print '%s Request to UNOSAT servers failed to complete.' % item('propmt_error')
return False
else:
return r.json()
开发者ID:OCHA-DAP,项目名称:hdxscraper-unosat-flood-portal,代码行数:25,代码来源:collect.py
示例5: CreateTables
def CreateTables(config_path=Config.CONFIG_PATH, verbose=True):
'''Creating the tables of the new database.'''
try:
endpoints = Config.LoadConfig(config_path)
except Exception as e:
if verbose:
print e
else:
print '%s Could not load configuration file.' % item('prompt_error')
sql_statements = {}
for endpoint in endpoints['endpoints']:
table_name = endpoint['database']['name']
statement = " TEXT, ".join(endpoint['database']['fields'])
statement = 'CREATE TABLE IF NOT EXISTS %s(%s TEXT)' % (table_name, statement)
sql_statements[table_name] = statement
for table in sql_statements:
try:
query = scraperwiki.sqlite.execute(sql_statements[table])
print "%s table `%s` created." % (item('prompt_bullet'), str(table))
except Exception as e:
print e
return False
print "%s Database created successfully." % item('prompt_success')
return True
开发者ID:luiscape,项目名称:hdxscraper-wfp-vam-api,代码行数:31,代码来源:setup.py
示例6: collect_previous_ga_data
def collect_previous_ga_data(verbose = False, test_data = False):
'''Collecting historical Google Analytics data with the new database.'''
counter = 0
period_date = date.today()
# Google Analytics only has data available
# from 2014-05-25, not earlier.
while period_date > date(2014, 5, 25):
period_date = date.today() - timedelta(weeks=counter)
counter += 1
try:
print "%s collecting data for week %s of %s" % (I.item('prompt_bullet'), period_date.isocalendar()[1], period_date.isocalendar()[0])
records = ga_collect.collect_ga_data(period_date)
S.StoreRecords(data = records, table = "funnel")
if test_data is True and counter > 1:
return records
except Exception as e:
if verbose:
print e
return False
print "%s Google Analytics failed to run." % I.item('prompt_error')
print "%s Google Analytics collection ran successfully." % I.item('prompt_success')
return True
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:29,代码来源:setup.py
示例7: CreateTables
def CreateTables(config_path='dev.json', verbose=True):
'''Creating the tables of the new database.'''
#
# Load configuration data.
#
try:
config_data = Config.LoadConfig(config_path)['database']
except Exception as e:
if verbose:
print '%s Could not load configuration file.' % item('prompt_error')
print e
return False
#
# Create SQL statements for every table.
#
sql_statements = {}
for table in config_data:
table_name = table['database']['table_name']
statement = " TEXT, ".join(table['database']['fields'])
statement = 'CREATE TABLE IF NOT EXISTS %s(%s TEXT)' % (table_name, statement)
sql_statements[table_name] = statement
for table in sql_statements:
scraperwiki.sqlite.execute(sql_statements[table])
print "%s Table `%s` created." % (item('prompt_bullet'), str(table))
print "%s Database created successfully.\n" % item('prompt_success')
return True
开发者ID:luiscape,项目名称:hdxscraper-violation-documentation-center-syria,代码行数:34,代码来源:database.py
示例8: Main
def Main(verbose=False):
'''Wrapper.'''
#
# List of indicators to download.
#
indicators = [642, 653, 654, 593, 587, 3, 190, 504, 495, 343, 322, 337, 545, 384, 664, 645, 541, 540, 684, 588]
# indicators = [322]
for indicator in indicators:
data = BuildQueryString(indicator)
print '%s Processing data for `%s`' % (item('prompt_bullet'), data['metadato']['NOM_DATO'].encode('utf-8'))
#
# Error handler for the processing.
#
errors = []
try:
table_name = 'sidih_' + str(indicator)
StoreRecords(data=data['valores'], table=table_name, schema='sidih_schema')
StoreRecords(data=data['valores'], table="sidih_all_data", schema='sidih_schema')
except Exception as e:
errors.append(indicator)
print '%s Indicator %s failed to process.' % (item('prompt_bullet'), str(indicator))
if verbose:
print e
#
# Pretty printing summary.
#
n_success = len(indicators) - len(errors)
print '%s Successfully collected %s indicators from SIDIH.' % (item('prompt_success'), str(n_success))
if len(errors) > 0:
print '%s %s indicators failed to collect: %s.' % (item('prompt_warn'), str(len(errors)), errors)
return True
开发者ID:luiscape,项目名称:hdxscraper-sidih-api,代码行数:35,代码来源:collect.py
示例9: QueryWFP
def QueryWFP(urls, db_table, endpoint, **kwargs):
'''Query WFP's VAM API asynchronously.'''
data_dir = kwargs['data_dir']
verbose = kwargs.get('verbose')
make_json = kwargs.get('make_json')
make_csv = kwargs.get('make_csv')
store_db = kwargs.get('store_db', True)
#
# Load endpoint information.
#
preferred_fields = endpoint['preferred_fields']
url_list = list(urls)
if verbose:
for url in url_list:
print '%s query: %s' % (item('prompt_bullet'), url)
#
# Defining the asynchronous request.
#
request_list = (requests.get(url) for url in url_list)
responses = requests.map(request_list, exception_handler=handler)
for index, r in enumerate(responses, 1):
data = r.json() if r else []
length = len(data)
#
# Check if there is data available and store output.
#
if length and verbose:
print "%s Data found." % item('prompt_bullet')
elif verbose:
print '%s Data not found.' % item('prompt_warn')
# Store JSON.
if length and make_json:
j_path = p.join(DATA_DIR, 'data', '%s_%s_data.json' % (db_table, index))
with open(j_path, 'w') as outfile:
json.dump(data, outfile)
# Store CSV.
if length and make_csv:
c_path = p.join(DATA_DIR, 'data', '%s_%s_data.csv' % (db_table, index))
f = csv.writer(open(c_path, "wb+"))
f.writerow(data[0].keys())
[f.writerow(flatten_row(row, preferred_fields).values()) for row in data]
#
# Storing results in DB.
#
if length and store_db:
schema = endpoint['database']['fields']
for row in data:
flattened_row = flatten_row(row, preferred_fields)
StoreRecords([flattened_row], schema, db_table)
开发者ID:luiscape,项目名称:hdxscraper-wfp-vam-api,代码行数:59,代码来源:collect.py
示例10: CalculateMetric
def CalculateMetric(json, test_data = False):
'''Process dataset list data and store output.'''
print "%s Calculating private datasets." % I.item('prompt_bullet')
records = [{
'metricid': 'ckan-number-of-private-dataset',
'period': str(time.strftime("%Y-%m-%d")),
'period_start_date': str(time.strftime("%Y-%m-%d")),
'period_end_date': str(time.strftime("%Y-%m-%d")),
'period_type': 'd',
'value': 0
}]
i = 0
for dataset in json['result']:
if dataset['private']:
records[0]['value'] += 1
i += 1
progress = round((float(i) / len(json['result'])),3) * 100
print "%s Progress: %s%%" % (I.item('prompt_bullet'), progress)
# Create week-record
current_day_date = datetime.strptime(time.strftime("%Y-%m-%d"), "%Y-%m-%d")
current_week = time.strftime("%Y-W") + str(int(time.strftime('%U')) + 1)
start = current_day_date - timedelta(days = current_day_date.weekday())
end = start + timedelta(days = 6)
first_day_of_current_week = start.strftime('%Y-%m-%d')
last_day_of_current_week = end.strftime('%Y-%m-%d')
## Faking week data
## for test purposes.
if test_data is True:
current_day_date = last_day_of_current_week
if current_day_date == last_day_of_current_week:
print "%s Generating week record." % I.item('prompt_bullet')
record_week = {
'metricid': 'ckan-number-of-orgs',
'period': current_week, # week starts at 01
'period_start_date': first_day_of_current_week,
'period_end_date': last_day_of_current_week,
'period_type': 'w',
'value': records[0]['value']
}
records.append(record_week)
S.StoreRecords(data = records, table = 'funnel')
if test_data is True:
return records
else:
return True
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:58,代码来源:ckan_num_private_datasets.py
示例11: ProcessHDXUserList
def ProcessHDXUserList(json, test_data = False):
'''Process data and store output.'''
if json["success"] is False:
print "%s the resulting JSON is empty. Review your HDX query and try again." % I.item('prompt_error')
# Calculating the record.
if json["success"] is True:
print "%s Processing results" % I.item('prompt_bullet')
## Create day-record.
records = [{
'metricid': 'ckan-number-of-users',
'period': str(time.strftime("%Y-%m-%d")),
'period_start_date': str(time.strftime("%Y-%m-%d")),
'period_end_date': str(time.strftime("%Y-%m-%d")),
'period_type': 'd',
'value': len(json["result"])
}]
## Create week-record.
current_day_date = datetime.strptime(time.strftime("%Y-%m-%d"), "%Y-%m-%d")
current_week = time.strftime("%Y-W") + str(int(time.strftime('%U')) + 1)
start = current_day_date - timedelta(days = current_day_date.weekday())
end = start + timedelta(days = 6)
first_day_of_current_week = start.strftime('%Y-%m-%d')
last_day_of_current_week = end.strftime('%Y-%m-%d')
## Faking week data
## for test purposes.
if test_data is True:
current_day_date = last_day_of_current_week
if current_day_date == last_day_of_current_week:
# Store in database.
print "%s Generating week record." % I.item('prompt_bullet')
record_week = {
'metricid': 'ckan-number-of-users',
'period': current_week, # week starts at 01
'period_start_date': first_day_of_current_week,
'period_end_date': last_day_of_current_week,
'period_type': 'w',
'value': len(json["result"])
}
records.append(record_week)
S.StoreRecords(data = records, table = 'funnel')
if test_data is True:
return records
else:
return True
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:58,代码来源:ckan_num_reg_users.py
示例12: Main
def Main(patch=True, write_json=False):
'''Wrapper.'''
try:
d = DownloadAndProcessData()
#
# For testing purposes.
#
if write_json:
import json
with open(os.path.join('data', 'test.json'), 'w') as outfile:
json.dump(d, outfile)
StoreData(data=d, table_name='unprocessed_data')
#
# Patching original data.
#
if patch:
try:
#
# Adding dates and country codes.
#
dates_data = Clean.CleanDates(data=d)
country_data = Clean.IdentifyCountries(data=dates_data)
file_type_data = Clean.IdentifyFileTypeAndFileName(data=country_data)
#
# Variable for export.
#
export_data = file_type_data
#
# Cleaning title and adding tags.
#
data_title = Clean.CleanTitle(data=export_data)
#
# Storing results.
#
StoreData(data=data_title, table_name='processed_data')
print '%s Successfully patched %s records.' % (item('prompt_success'), len(export_data))
except Exception as e:
print '%s Failed to patch data.' % item('prompt_error')
print e
return False
print '%s Successfully fetched %s records from the UNOSAT Flood Portal.\n' % (item('prompt_success'), len(d))
except Exception as e:
print e
return False
开发者ID:OCHA-DAP,项目名称:hdxscraper-unosat-flood-portal,代码行数:54,代码来源:collect.py
示例13: run_historical_calculations
def run_historical_calculations():
'''Making the calculations.'''
print "%s Making historical calculations." % I.item('prompt_bullet')
try:
calc.get_initial_setup_data()
except Exception as e:
print e
print "%s successfully performed historical calculations.\n" % I.item('prompt_success')
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:12,代码来源:setup.py
示例14: CreateDbAndTable
def CreateDbAndTable(config_file='dev.json', verbose=True):
'''Creating tables in PostgreSQL database.'''
#
# Loading database information
# from config file.
#
database = LoadConfig(config_file)['database']
#
# TODO: add environment variables
# to these default values.
#
conn = psycopg2.connect(host=HOST_DATABASE, dbname='rolltime', user='rolltime', password='rolltime')
cur = conn.cursor()
#
# Build each table.
#
for table in database:
#
# Construct SQL statement.
#
table_sql = ""
for f in table['fields']:
s = '%s %s, ' % (f['field_name'], f['type'])
table_sql += s
statement = 'CREATE TABLE IF NOT EXISTS %s(%sPRIMARY KEY (%s))' % (table['name'], table_sql, ", ".join(table['primary_key']))
#
# Make statements to the database.
#
try:
cur.execute(statement)
conn.commit()
print "%s table `%s` created." % (item('prompt_bullet'), str(table['name']))
except Exception as e:
print '%s Table `%s` could not be created.' % (item('prompt_error'), table['name'])
if verbose:
print e
return False
#
# Close communication.
#
cur.close()
conn.close()
开发者ID:rolltime,项目名称:rolltime-collect,代码行数:50,代码来源:db.py
示例15: CreateTables
def CreateTables(config_path=Config.DEV_CONFIG_PATH, verbose=True):
'''Creating the tables of the new database.'''
#
# Load configuration data.
#
try:
config_data = Config.LoadConfig(config_path)['database']
except Exception as e:
if verbose:
print '%s Could not load configuration file.' % item('prompt_error')
print e
return False
#
# Create SQL statements for every table.
#
sql_statements = {}
for endpoint in config_data:
table_name = endpoint['database']['table_name']
statement = " TEXT, ".join(endpoint['database']['fields'])
statement = 'CREATE TABLE IF NOT EXISTS %s(%s TEXT)' % (table_name, statement)
sql_statements[table_name] = statement
for table in sql_statements:
scraperwiki.sqlite.execute(sql_statements[table])
print "%s Table `%s` created." % (item('prompt_bullet'), str(table))
#
## I'm unable to test the following chunk.
## As a result, it will remain commented
## below.
#
# for table in sql_statements:
# try:
# scraperwiki.sqlite.execute(sql_statements[table])
# print "%s Table `%s` created." % (item('prompt_bullet'), str(table))
# except Exception as e:
# if verbose:
# print '%s Failed to create table %s.' % (item('prompt_error'), table_name)
# print e
# return False
print "%s Database created successfully.\n" % item('prompt_success')
return True
开发者ID:OCHA-DAP,项目名称:hdxscraper-unosat-flood-portal,代码行数:49,代码来源:database.py
示例16: FetchResourceInfo
def FetchResourceInfo(package_id, preferred_format='ZIPPED SHAPEFILE', verbose=False, **kwargs):
'''Query HDX for a list of datasets that belong to an organization.
Only fetches resources that match a preferred file format.'''
#
# Fetch configuration.
#
if kwargs.get('config_file') is not None:
config = LoadConfig(kwargs.get('config_file'))
else:
config = LoadConfig() # default: dev.json
header = { 'X-CKAN-API-Key': config['hdx_key'] , 'content-type': 'application/json' }
u = config['hdx_site'] + '/api/action/package_show?id=' + package_id
try:
#
# If not production, we need to
# add simple HTTP authorization.
#
if config['production']:
r = requests.get(u, verify=True) # turns-off SSL certificate verification
else:
r = requests.get(u, auth=(config['auth'][0], config['auth'][1]), verify=True)
except Exception as e:
print '%s There was a connection error. Host %s is now known.' % (item('prompt_error'), u)
return False
if r.status_code != 200:
print '%s HDX query returned an error: "%s"' % (item('prompt_error'), r.json()['error']['message'])
return False
else:
#
# Fetching URL information
# from all organization packages.
#
data = r.json()
package_array = []
for resource in data['result']['resources']:
if resource['format'] == preferred_format:
d = { 'resource_id': resource['id'], 'dataset_id': package_id, 'resource_url': resource['url'] }
package_array.append(d)
return package_array
开发者ID:luiscape,项目名称:unosat-product-scraper-analysis,代码行数:49,代码来源:download.py
示例17: GetHDXUserList
def GetHDXUserList():
'''Querying the CKAN API with a specific parameter.'''
# Querying CKAN.
# This takes a bit to complete ...
u = "https://data.hdx.rwlabs.org/api/action/user_list"
try:
print "%s Connecting to HDX" % (I.item('prompt_bullet'))
j = r.get(u).json()
return j
except Exception as e:
print "%s There was an error connecting to the CKAN API. Aborting." % I.item('prompt_error')
return False
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:15,代码来源:ckan_num_reg_users.py
示例18: GetDatasetList
def GetDatasetList():
'''Query CKAN for a list of datasets.'''
# Querying CKAN.
u = "https://data.hdx.rwlabs.org/api/action/current_package_list_with_resources?limit=2000"
headers = { 'Authorization': L.LoadConfig('dev')['hdx_key'] }
try:
print "%s Fetching dataset list from HDX." % (I.item('prompt_bullet'))
j = r.get(u, headers=headers).json()
return j
except Exception as e:
print "%s There was an error connecting to the CKAN API. Aborting." % I.item('prompt_error')
return False
开发者ID:luiscape,项目名称:hdx-monitor-funnel-stats,代码行数:15,代码来源:ckan_num_private_datasets.py
示例19: Main
def Main():
'''Wrapper.'''
tables = ['station']
try:
for table in tables:
Db.CreateDbAndTable()
except Exception as e:
print '%s Database configuration failed' % item('prompt_error')
print e
return False
print '%s Database configured successfully.\n' % item('prompt_success')
开发者ID:rolltime,项目名称:rolltime-collect,代码行数:15,代码来源:__main__.py
示例20: DownloadAndProcess
def DownloadAndProcess(delete_files=True, verbose=False, **kwargs):
'''Download and process the packages from UNOSAT.'''
try:
results = []
a = AssemblePackageData(**kwargs)
for package in a:
#
# Download.
#
DownloadResource(package['resource_url'])
#
# Analyze.
#
b = FetchZipInformation(package)
results.append(b)
#
# Clean downloaded file.
#
if delete_files:
f = os.path.basename(b['resource_url'])
os.remove(os.path.join('data', f))
return results
except Exception as e:
print '%s Failed to download and process files.' % item('prompt_error')
print e
return results
开发者ID:luiscape,项目名称:unosat-product-scraper-analysis,代码行数:32,代码来源:process.py
注:本文中的utilities.prompt_format.item函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论