本文整理汇总了Python中utils.http.json_resp函数的典型用法代码示例。如果您正苦于以下问题:Python json_resp函数的具体用法?Python json_resp怎么用?Python json_resp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了json_resp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: search_bangumi
def search_bangumi(self, type, term, offset, count):
"""
search bangumi from bangumi.tv, properly handling cookies is required for the bypass anti-bot mechanism
:param term: a urlencoded word of the search term.
:return: a json object
"""
result = {"data": [], "total": 0}
api_url = 'http://api.bgm.tv/search/subject/{0}?responseGroup=large&max_result={1}&start={2}&type={3}'.format(term.encode('utf-8'), count, offset, type)
r = bangumi_request.get(api_url)
if r.status_code > 399:
r.raise_for_status()
try:
bgm_content = r.json()
except Exception as error:
logger.warn(error)
result['message'] = 'fail to query bangumi'
return json_resp(result, 500)
if 'code' in bgm_content and bgm_content['code'] == 404:
return json_resp(result, 200)
bgm_list = bgm_content['list']
total_count = bgm_content['results']
if len(bgm_list) == 0:
return json_resp(result)
bgm_id_list = [bgm['id'] for bgm in bgm_list]
bangumi_list = self.get_bangumi_from_bgm_id_list(bgm_id_list)
for bgm in bgm_list:
bgm['bgm_id'] = bgm.get('id')
bgm['id'] = None
# if bgm_id has found in database, give the database id to bgm.id
# that's we know that this bangumi exists in our database
for bangumi in bangumi_list:
if bgm['bgm_id'] == bangumi.bgm_id:
bgm['id'] = bangumi.id
break
bgm_images = bgm.get('images')
if bgm_images:
bgm['image'] = bgm_images.get('large')
# remove useless keys
bgm.pop('images', None)
bgm.pop('collection', None)
bgm.pop('url', None)
bgm.pop('type', None)
result['data'] = bgm_list
result['total'] = total_count
return json_resp(result)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:53,代码来源:admin.py
示例2: add_web_hook_token
def add_web_hook_token(self, token_id, web_hook_id, user):
session = SessionManager.Session()
try:
web_hook = session.query(WebHook).filter(WebHook.id == web_hook_id).one()
web_hook_token = WebHookToken(web_hook_id=web_hook_id,
user_id=user.id,
token_id=token_id)
session.add(web_hook_token)
session.commit()
method_args = {
'web_hook_id': web_hook_id,
'token_id': token_id,
'user_id': user.id,
'email': None
}
if web_hook.has_permission(WebHook.PERMISSION_EMAIL) and user.email is not None and user.email_confirmed:
method_args['email'] = user.email
rpc_request.send('token_add', method_args)
return json_resp({'message': 'ok'})
except NoResultFound:
raise ClientError('web hook not existed')
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:25,代码来源:web_hook.py
示例3: update_web_hook
def update_web_hook(self, web_hook_id, web_hook_dict):
"""
update a web hook
:param web_hook_dict:
:param web_hook_id:
:return:
"""
session = SessionManager.Session()
try:
web_hook = session.query(WebHook).\
filter(WebHook.id == web_hook_id).\
one()
web_hook.name = web_hook_dict.get('name')
web_hook.description = bleach.clean(web_hook_dict.get('description'), tags=self.ALLOWED_TAGS)
web_hook.url = web_hook_dict.get('url')
web_hook.status = web_hook_dict.get('status')
web_hook.consecutive_failure_count = web_hook_dict.get('consecutive_failure_count')
web_hook.permissions = web_hook_dict.get('permissions')
if 'shared_secret' in web_hook_dict and web_hook_dict.get('shared_secret') is not None:
web_hook.shared_secret = web_hook_dict.get('shared_secret')
session.commit()
return json_resp({'message': 'ok'})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:26,代码来源:web_hook.py
示例4: get_bangumi
def get_bangumi(self, id):
try:
session = SessionManager.Session()
bangumi = session.query(Bangumi).options(joinedload(Bangumi.episodes)).filter(Bangumi.id == id).one()
episodes = []
for episode in bangumi.episodes:
eps = row2dict(episode)
eps['thumbnail'] = utils.generate_thumbnail_link(episode, bangumi)
episodes.append(eps)
bangumi_dict = row2dict(bangumi)
bangumi_dict['episodes'] = episodes
bangumi_dict['cover'] = utils.generate_cover_link(bangumi)
return json_resp({'data': bangumi_dict})
except NoResultFound:
raise ClientError(ClientError.NOT_FOUND, 404)
except Exception as exception:
raise exception
finally:
SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:26,代码来源:admin.py
示例5: update_bangumi
def update_bangumi(self, bangumi_id, bangumi_dict):
try:
session = SessionManager.Session()
bangumi = session.query(Bangumi).filter(Bangumi.id == bangumi_id).one()
bangumi.name = bangumi_dict['name']
bangumi.name_cn = bangumi_dict['name_cn']
bangumi.summary = bangumi_dict['summary']
bangumi.eps = bangumi_dict['eps']
bangumi.eps_regex = bangumi_dict['eps_regex']
bangumi.image = bangumi_dict['image']
bangumi.air_date = datetime.strptime(bangumi_dict['air_date'], '%Y-%m-%d')
bangumi.air_weekday = bangumi_dict['air_weekday']
bangumi.rss = bangumi_dict['rss']
bangumi.update_time = datetime.now()
session.commit()
return json_resp({'msg': 'ok'})
except NoResultFound:
raise ClientError(ClientError.NOT_FOUND)
except Exception as exception:
raise exception
finally:
SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:25,代码来源:admin.py
示例6: register_web_hook
def register_web_hook(self, web_hook_dict, add_by_uid):
"""
register an web hook and send an initial keep alive event
:param web_hook_dict:
:param add_by_uid:
:return:
"""
session = SessionManager.Session()
try:
web_hook = WebHook(name=web_hook_dict.get('name'),
description=bleach.clean(web_hook_dict.get('description'), tags=self.ALLOWED_TAGS),
url=web_hook_dict.get('url'),
shared_secret=web_hook_dict.get('shared_secret'),
created_by_uid=add_by_uid,
permissions=web_hook_dict.get('permissions'))
session.add(web_hook)
session.commit()
web_hook_id = str(web_hook.id)
# send event via rpc
rpc_request.send('initialize_web_hook', {
'web_hook_id': web_hook_id,
'web_hook_url': web_hook.url,
'shared_secret': web_hook.shared_secret
})
return json_resp({'data': web_hook_id})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:29,代码来源:web_hook.py
示例7: request_reset_pass
def request_reset_pass():
data = json.loads(request.get_data(True, as_text=True))
if 'email' in data:
UserCredential.send_pass_reset_email(data['email'])
return json_resp({'message': 'ok'})
else:
raise ClientError(ClientError.INVALID_REQUEST)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:7,代码来源:user.py
示例8: register
def register():
"""
register a new user using invite code, note that a newly registered user is not administrator, you need to
use an admin user to promote it
:return: response
"""
content = request.get_data(True, as_text=True)
register_data = json.loads(content)
if ('name' in register_data) and ('password' in register_data) and ('password_repeat' in register_data) and ('invite_code' in register_data) and ('email' in register_data):
name = register_data['name']
password = register_data['password']
password_repeat = register_data['password_repeat']
email = register_data['email']
invite_code = register_data['invite_code']
if password != password_repeat:
raise ClientError(ClientError.PASSWORD_MISMATCH)
if UserCredential.register_user(name=name, password=password, email=email, invite_code=invite_code):
# login automatically
credential = UserCredential.login_user(name, password)
login_user(credential, remember=False)
# send email
credential.send_confirm_email()
return json_resp({'message': 'ok'}, 201)
else:
raise ClientError(ClientError.INVALID_REQUEST)
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:25,代码来源:user.py
示例9: update_episode
def update_episode(self, episode_id, episode_dict):
try:
session = SessionManager.Session()
episode = session.query(Episode).filter(Episode.id == episode_id).one()
episode.episode_no = episode_dict.get('episode_no')
episode.bgm_eps_id = episode_dict.get('bgm_eps_id')
episode.name = episode_dict.get('name')
episode.name_cn = episode_dict.get('name_cn')
if 'airdate' in episode_dict:
episode.airdate = datetime.strptime(episode_dict.get('airdate'), '%Y-%m-%d')
episode.duration = episode_dict.get('duration')
episode.update_time = datetime.utcnow()
if 'status' in episode_dict:
episode.status = episode_dict['status']
session.commit()
return json_resp({'msg': 'ok'})
except NoResultFound:
raise ClientError(ClientError.NOT_FOUND, 404)
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:26,代码来源:admin.py
示例10: list_episode
def list_episode(self, page, count, sort_field, sort_order, status):
try:
session = SessionManager.Session()
query_object = session.query(Episode).\
filter(Episode.delete_mark == None)
if status is not None:
query_object = query_object.filter(Episode.status==status)
# count total rows
total = session.query(func.count(Episode.id)).filter(Episode.status==status).scalar()
else:
total = session.query(func.count(Episode.id)).scalar()
offset = (page - 1) * count
if sort_order == 'desc':
episode_list = query_object.\
order_by(desc(getattr(Episode, sort_field))).\
offset(offset).\
limit(count).\
all()
else:
episode_list = query_object.\
order_by(asc(getattr(Episode, sort_field))).\
offset(offset).limit(count).\
all()
episode_dict_list = [row2dict(episode, Episode) for episode in episode_list]
return json_resp({'data': episode_dict_list, 'total': total})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:33,代码来源:admin.py
示例11: on_air_bangumi
def on_air_bangumi(self):
session = SessionManager.Session()
current_day = datetime.today()
start_time = datetime(current_day.year, current_day.month, 1)
if current_day.month == 12:
next_year = current_day.year + 1
next_month = 1
else:
next_year = current_day.year
next_month = current_day.month + 1
end_time = datetime(next_year, next_month, 1)
try:
result = session.query(distinct(Episode.bangumi_id), Bangumi).\
join(Bangumi).\
filter(Episode.airdate >= start_time).\
filter(Episode.airdate <= end_time)
bangumi_list = []
for bangumi_id, bangumi in result:
bangumi_dict = row2dict(bangumi)
bangumi_dict['cover'] = utils.generate_cover_link(bangumi)
bangumi_list.append(bangumi_dict)
return json_resp({'data': bangumi_list})
except Exception as error:
raise error
finally:
SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:29,代码来源:bangumi.py
示例12: get_all_announce
def get_all_announce(self, position, offset, count, content):
session = SessionManager.Session()
try:
if content:
announce_list = session.query(Announce).\
filter(Announce.content == content).\
all()
else:
announce_list = session.query(Announce).\
filter(Announce.position == position).\
offset(offset).\
limit(count).\
all()
total = session.query(func.count(Announce.id)). \
scalar()
announce_dict_list = []
for announce in announce_list:
announce_dict = row2dict(announce, Announce)
announce_dict_list.append(announce_dict)
if position == Announce.POSITION_BANGUMI:
self.__add_bangumi_info(session, announce_dict_list)
return json_resp({'data': announce_dict_list, 'total': total})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:29,代码来源:announce.py
示例13: recent_update
def recent_update(self, days):
current = datetime.now()
# from one week ago
start_time = current - timedelta(days=days)
session = SessionManager.Session()
try:
result = session.query(Episode, Bangumi).\
join(Bangumi).\
filter(Episode.status == Episode.STATUS_DOWNLOADED).\
filter(Episode.update_time >= start_time).\
filter(Episode.update_time <= current).\
order_by(desc(Episode.update_time))
episode_list = []
for eps, bgm in result:
episode = row2dict(eps)
episode['thumbnail'] = utils.generate_thumbnail_link(eps, bgm)
episode['bangumi'] = row2dict(bgm)
episode['bangumi']['cover'] = utils.generate_cover_link(bgm)
episode_list.append(episode)
return json_resp({'data': episode_list})
except Exception as error:
raise error
finally:
SessionManager.Session.remove()
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:30,代码来源:bangumi.py
示例14: logout
def logout():
"""
logout a user
:return: response
"""
logout_user()
return json_resp({'msg': 'ok'})
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:7,代码来源:user.py
示例15: on_air_bangumi
def on_air_bangumi(self, user_id, type):
session = SessionManager.Session()
current_day = datetime.today()
start_time = datetime(current_day.year, current_day.month, 1)
if current_day.month == 12:
next_year = current_day.year + 1
next_month = 1
else:
next_year = current_day.year
next_month = current_day.month + 1
end_time = datetime(next_year, next_month, 1)
try:
result = session.query(distinct(Episode.bangumi_id), Bangumi).\
join(Bangumi). \
options(joinedload(Bangumi.cover_image)).\
filter(Bangumi.delete_mark == None). \
filter(Bangumi.type == type).\
filter(Episode.airdate >= start_time).\
filter(Episode.airdate <= end_time). \
order_by(desc(getattr(Bangumi, 'air_date')))
bangumi_list = []
bangumi_id_list = [bangumi_id for bangumi_id, bangumi in result]
if len(bangumi_id_list) == 0:
return json_resp({'data': []})
favorites = session.query(Favorites).\
filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
filter(Favorites.user_id == user_id).\
all()
for bangumi_id, bangumi in result:
bangumi_dict = row2dict(bangumi, Bangumi)
bangumi_dict['cover'] = utils.generate_cover_link(bangumi)
utils.process_bangumi_dict(bangumi, bangumi_dict)
for fav in favorites:
if fav.bangumi_id == bangumi_id:
bangumi_dict['favorite_status'] = fav.status
break
bangumi_list.append(bangumi_dict)
return json_resp({'data': bangumi_list})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:46,代码来源:bangumi.py
示例16: list_task
def list_task(self):
try:
session = SessionManager.Session()
result = session.query(Task).all()
task_list = [row2dict(task, Task) for task in result]
return json_resp({'data': task_list})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:8,代码来源:task.py
示例17: get_user_info
def get_user_info():
'''
get current user name and level
:return: response
'''
user_info = {}
user_info['name'] = current_user.name
user_info['level'] = current_user.level
return json_resp({'data': user_info})
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:9,代码来源:user.py
示例18: list_bangumi
def list_bangumi(self, page, count, sort_field, sort_order, name, user_id, bangumi_type):
try:
session = SessionManager.Session()
query_object = session.query(Bangumi).\
options(joinedload(Bangumi.cover_image)).\
filter(Bangumi.delete_mark == None)
if bangumi_type != -1:
query_object = query_object.filter(Bangumi.type == bangumi_type)
if name is not None:
name_pattern = '%{0}%'.format(name.encode('utf-8'),)
logger.debug(name_pattern)
query_object = query_object.\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
# count total rows
total = session.query(func.count(Bangumi.id)).\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
scalar()
else:
total = session.query(func.count(Bangumi.id)).scalar()
if sort_order == 'desc':
query_object = query_object.\
order_by(desc(getattr(Bangumi, sort_field)))
else:
query_object = query_object.\
order_by(asc(getattr(Bangumi, sort_field)))
if count == -1:
bangumi_list = query_object.all()
else:
offset = (page - 1) * count
bangumi_list = query_object.offset(offset).limit(count).all()
bangumi_id_list = [bgm.id for bgm in bangumi_list]
favorites = session.query(Favorites).\
filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
filter(Favorites.user_id == user_id).\
all()
bangumi_dict_list = []
for bgm in bangumi_list:
bangumi = row2dict(bgm, Bangumi)
bangumi['cover'] = utils.generate_cover_link(bgm)
utils.process_bangumi_dict(bgm, bangumi)
for fav in favorites:
if fav.bangumi_id == bgm.id:
bangumi['favorite_status'] = fav.status
bangumi_dict_list.append(bangumi)
return json_resp({'data': bangumi_dict_list, 'total': total})
finally:
SessionManager.Session.remove()
开发者ID:KTachibanaM,项目名称:Albireo,代码行数:57,代码来源:bangumi.py
示例19: query_one_bangumi
def query_one_bangumi(bgm_id):
bangumi_tv_url_base = 'http://api.bgm.tv/subject/'
bangumi_tv_url_param = '?responseGroup=large'
if bgm_id is not None:
bangumi_tv_url = bangumi_tv_url_base + bgm_id + bangumi_tv_url_param
h = httplib2.Http('.cache')
(resp, content) = h.request(bangumi_tv_url, 'GET')
return content
else:
return json_resp({})
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:10,代码来源:admin.py
示例20: search_bangumi
def search_bangumi():
bangumi_tv_url_base = 'http://api.bgm.tv/search/subject/'
bangumi_tv_url_param = '?responseGroup=simple&max_result=10&start=0'
name = request.args.get('name', None)
result = {"data": []}
if name is not None and len(name) > 0:
bangumi_tv_url = bangumi_tv_url_base + name + bangumi_tv_url_param
h = httplib2.Http('.cache')
(resp, content) = h.request(bangumi_tv_url, 'GET')
if resp.status == 200:
try:
bgm_content = json.loads(content)
except ValueError as e:
return json_resp(result)
list = [bgm for bgm in bgm_content['list'] if bgm['type'] == 2]
if len(list) == 0:
return json_resp(result)
bgm_id_list = [bgm['id'] for bgm in list]
bangumi_list = admin_service.get_bangumi_from_bgm_id_list(bgm_id_list)
for bgm in list:
bgm['bgm_id'] = bgm['id']
bgm['id'] = None
# if bgm_id has found in database, give the database id to bgm.id
# that's we know that this bangumi exists in our database
for bangumi in bangumi_list:
if bgm['bgm_id'] == bangumi.bgm_id:
bgm['id'] = bangumi.id
break
bgm['image'] = bgm['images']['large']
# remove useless keys
bgm.pop('images', None)
bgm.pop('collection', None)
bgm.pop('url', None)
bgm.pop('type', None)
result['data'] = list
elif resp.status == 502:
# when bangumi.tv is down
result['msg'] = 'bangumi is down'
return json_resp(result)
开发者ID:HerringtonDarkholme,项目名称:Albireo,代码行数:43,代码来源:admin.py
注:本文中的utils.http.json_resp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论