本文整理汇总了Python中textutils.json_decode函数的典型用法代码示例。如果您正苦于以下问题:Python json_decode函数的具体用法?Python json_decode怎么用?Python json_decode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了json_decode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __call__
def __call__(self, req, db, user):
from operation.typechecker import TypeCheckerContext
if user.isAnonymous() and not self.__accept_anonymous_user:
return OperationFailureMustLogin()
if req.method == "POST": data = req.read()
else: data = req.getParameter("data")
if not data: raise OperationError("no input")
try: value = json_decode(data)
except ValueError as error: raise OperationError("invalid input: %s" % str(error))
try:
self.__checker(value, TypeCheckerContext(req, db, user))
return self.process(db, user, **value)
except OperationError as error:
return error
except OperationFailure as failure:
return failure
except dbutils.NoSuchUser as error:
return OperationFailure(code="nosuchuser",
title="Who is '%s'?" % error.name,
message="There is no user in Critic's database named that.")
except dbutils.NoSuchReview as error:
return OperationFailure(code="nosuchreview",
title="Invalid review ID",
message="The review ID r/%d is not valid." % error.id)
except dbutils.TransactionRollbackError:
return OperationFailure(code="transactionrollback",
title="Transaction rolled back",
message="Your database transaction rolled back, probably due to a deadlock. Please try again.")
except:
# Decode value again since the type checkers might have modified it.
value = json_decode(data)
error_message = ("User: %s\nReferrer: %s\nData: %s\n\n%s"
% (user.name,
req.getReferrer(),
json_encode(self.sanitize(value), indent=2),
traceback.format_exc()))
db.rollback()
import mailutils
import configuration
if not user.hasRole(db, "developer"):
mailutils.sendExceptionMessage(db, "wsgi[%s]" % req.path, error_message)
if configuration.debug.IS_DEVELOPMENT or user.hasRole(db, "developer"):
return OperationError(error_message)
else:
return OperationError("An unexpected error occurred. " +
"A message has been sent to the system administrator(s) " +
"with details about the problem.")
开发者ID:andreastt,项目名称:critic,代码行数:57,代码来源:__init__.py
示例2: handle_input
def handle_input(self, data):
try:
result = json_decode(data)
except ValueError:
result = { "status": "error",
"error": ("invalid response:\n" +
background.utils.indent(data)) }
if result["status"] == "ok":
for item in result["info"]:
self.server.info(item)
if result["output"]:
self.__client.write(result["output"].strip() + "\n")
if result["accept"]:
self.__client.write("ok\n")
elif result["status"] == "reject":
self.server.warning(result["message"])
self.__client.write(result["message"].strip() + "\n")
else:
self.server.error(result["error"])
self.__client.write("""\
An exception was raised while processing the request. A message has
been sent to the system administrator(s).
""")
if configuration.debug.IS_DEVELOPMENT:
self.__client.write("\n" + result["error"].strip() + "\n")
self.__client.close()
开发者ID:Haster2004,项目名称:critic,代码行数:26,代码来源:githook.py
示例3: process
def process(self, db, user, service_name):
if not user.hasRole(db, "administrator"):
raise OperationFailure(
code="notallowed", title="Not allowed!", message="Only a system administrator can restart services."
)
if service_name == "wsgi":
for pid in os.listdir(configuration.paths.WSGI_PIDFILE_DIR):
try:
os.kill(int(pid), signal.SIGINT)
except:
pass
return OperationResult()
else:
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.connect(configuration.services.SERVICEMANAGER["address"])
connection.send(textutils.json_encode({"command": "restart", "service": service_name}))
connection.shutdown(socket.SHUT_WR)
data = ""
while True:
received = connection.recv(4096)
if not received:
break
data += received
result = textutils.json_decode(data)
if result["status"] == "ok":
return OperationResult()
else:
raise OperationError, result["error"]
开发者ID:kolorowestudio,项目名称:critic,代码行数:32,代码来源:servicemanager.py
示例4: handle_input
def handle_input(self, value):
try: result = json_decode(value)
except ValueError:
self.server.error("invalid response:\n" + indent(value))
result = self.request.copy()
result["error"] = value
for client in self.clients: client.add_result(result)
self.server.request_finished(self, self.request, result)
开发者ID:jherland,项目名称:critic,代码行数:8,代码来源:utils.py
示例5: perform_job
def perform_job():
import syntaxhighlight.generate
request = json_decode(sys.stdin.read())
request["highlighted"] = syntaxhighlight.generate.generateHighlight(
repository_path=request["repository_path"],
sha1=request["sha1"],
language=request["language"])
sys.stdout.write(json_encode(request))
开发者ID:ahockersten,项目名称:critic,代码行数:9,代码来源:highlight.py
示例6: handle_input
def handle_input(self, _file, data):
data = textutils.json_decode(data)
process = self.server.get_process(data["flavor"])
extension = ExtensionRunner.Extension(
self.server, self, process, data["timeout"])
extension.write(data["stdin"])
extension.close()
self.server.add_peer(extension)
开发者ID:jensl,项目名称:critic,代码行数:11,代码来源:extensionrunner.py
示例7: handle_input
def handle_input(self, data):
try: data = json_decode(data)
except ValueError:
self.server.error("invalid response from wait-for-update child: %r" % data)
self.client.close()
if data["status"] == "output":
self.client.write(data["output"])
self.server.debug(" hook output written to client")
elif data["status"] == "no-output":
self.server.debug(" update produced no hook output")
else:
self.server.debug(" timeout")
self.client.close()
开发者ID:Aessy,项目名称:critic,代码行数:15,代码来源:branchtrackerhook.py
示例8: __call__
def __call__(self, req, db, user):
if user.isAnonymous() and not self.__accept_anonymous_user:
return OperationFailureMustLogin()
if req.method == "POST": data = req.read()
else: data = req.getParameter("data")
if not data: raise OperationError, "no input"
try: value = json_decode(data)
except ValueError, error: raise OperationError, "invalid input: %s" % str(error)
self.__checker(value)
try: return self.process(db, user, **value)
except OperationError: raise
except OperationFailure, failure: return failure
except dbutils.NoSuchUser, error:
return OperationFailure(code="nosuchuser",
title="Who is '%s'?" % error.name,
message="There is no user in Critic's database named that.")
开发者ID:suquant,项目名称:critic,代码行数:21,代码来源:__init__.py
示例9: requestChangesets
def requestChangesets(requests):
try:
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.connect(configuration.services.CHANGESET["address"])
connection.send(json_encode(requests))
connection.shutdown(socket.SHUT_WR)
data = ""
while True:
received = connection.recv(4096)
if not received: break
data += received
connection.close()
except socket.error as error:
raise ChangesetBackgroundServiceError(error[1])
try:
results = json_decode(data)
except ValueError:
raise ChangesetBackgroundServiceError(
"returned an invalid response: %r" % data)
if type(results) != list:
# If not a list, the result is probably an error message.
raise ChangesetBackgroundServiceError(str(results))
if len(results) != len(requests):
raise ChangesetBackgroundServiceError("didn't process all requests")
errors = []
for result in results:
if "error" in result:
errors.append(result["error"])
if errors:
raise ChangesetBackgroundServiceError(
"one or more requests failed:\n%s" % "\n".join(map(indent, errors)))
开发者ID:Aessy,项目名称:critic,代码行数:40,代码来源:client.py
示例10: requestHighlights
def requestHighlights(repository, sha1s):
requests = [{ "repository_path": repository.path, "sha1": sha1, "path": path, "language": language }
for sha1, (path, language) in sha1s.items()
if not syntaxhighlight.isHighlighted(sha1, language)]
if not requests: return
try:
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.connect(configuration.services.HIGHLIGHT["address"])
connection.send(json_encode(requests))
connection.shutdown(socket.SHUT_WR)
data = ""
while True:
received = connection.recv(4096)
if not received: break
data += received
connection.close()
except socket.error as error:
raise HighlightBackgroundServiceError(error[1])
try:
results = json_decode(data)
except ValueError:
raise HighlightBackgroundServiceError(
"returned an invalid response (%r)" % data)
if type(results) != list:
# If not a list, the result is probably an error message.
raise HighlightBackgroundServiceError(str(results))
if len(results) != len(requests):
raise HighlightBackgroundServiceError("didn't process all requests")
开发者ID:Aessy,项目名称:critic,代码行数:36,代码来源:request.py
示例11: perform_job
def perform_job():
soft_limit, hard_limit = getrlimit(RLIMIT_RSS)
rss_limit = configuration.services.CHANGESET["rss_limit"]
if soft_limit < rss_limit:
setrlimit(RLIMIT_RSS, (rss_limit, hard_limit))
from changeset.create import createChangeset
request = json_decode(sys.stdin.read())
try:
db = dbutils.Database()
createChangeset(db, request)
db.close()
sys.stdout.write(json_encode(request))
except:
print "Request:"
print json_encode(request, indent=2)
print
print_exc(file=sys.stdout)
开发者ID:Aessy,项目名称:critic,代码行数:24,代码来源:changeset.py
示例12: str
except gitutils.GitReferenceError:
return "invalid commit id"
except Exception as exception:
return str(exception)
HANDLERS = { "propagate-comment": propagateComment }
try:
if len(sys.argv) > 1:
init()
for command in sys.argv[1:]:
pending_mails = None
if command == "generate-mails-for-batch":
data = json_decode(sys.stdin.readline())
batch_id = data["batch_id"]
was_accepted = data["was_accepted"]
is_accepted = data["is_accepted"]
pending_mails = reviewing.utils.generateMailsForBatch(db, batch_id, was_accepted, is_accepted)
elif command == "generate-mails-for-assignments-transaction":
data = json_decode(sys.stdin.readline())
transaction_id = data["transaction_id"]
pending_mails = reviewing.utils.generateMailsForAssignmentsTransaction(db, transaction_id)
elif command == "apply-filters":
data = json_decode(sys.stdin.readline())
filters = reviewing.filters.Filters()
user = dbutils.User.fromId(db, data["user_id"]) if "user_id" in data else None
if "review_id" in data:
review = dbutils.Review.fromId(db, data["review_id"])
filters.setFiles(db, review=review)
开发者ID:Aessy,项目名称:critic,代码行数:31,代码来源:cli.py
示例13: renderCreateReview
def renderCreateReview(req, db, user):
if user.isAnonymous(): raise page.utils.NeedLogin(req)
repository = req.getParameter("repository", filter=gitutils.Repository.FromParameter(db), default=None)
applyparentfilters = req.getParameter("applyparentfilters", "yes" if user.getPreference(db, 'review.applyUpstreamFilters') else "no") == "yes"
cursor = db.cursor()
if req.method == "POST":
data = json_decode(req.read())
summary = data.get("summary")
description = data.get("description")
review_branch_name = data.get("review_branch_name")
commit_ids = data.get("commit_ids")
commit_sha1s = data.get("commit_sha1s")
else:
summary = req.getParameter("summary", None)
description = req.getParameter("description", None)
review_branch_name = req.getParameter("reviewbranchname", None)
commit_ids = None
commit_sha1s = None
commits_arg = req.getParameter("commits", None)
remote = req.getParameter("remote", None)
upstream = req.getParameter("upstream", "master")
branch_name = req.getParameter("branch", None)
if commits_arg:
try: commit_ids = map(int, commits_arg.split(","))
except: commit_sha1s = [repository.revparse(ref) for ref in commits_arg.split(",")]
elif branch_name:
cursor.execute("""SELECT commit
FROM reachable
JOIN branches ON (branch=id)
WHERE repository=%s
AND name=%s""",
(repository.id, branch_name))
commit_ids = [commit_id for (commit_id,) in cursor]
if len(commit_ids) > configuration.limits.MAXIMUM_REVIEW_COMMITS:
raise page.utils.DisplayMessage(
"Too many commits!",
(("<p>The branch <code>%s</code> contains %d commits. Reviews can"
"be created from branches that contain at most %d commits.</p>"
"<p>This limit can be adjusted by modifying the system setting"
"<code>configuration.limits.MAXIMUM_REVIEW_COMMITS</code>.</p>")
% (htmlutils.htmlify(branch_name), len(commit_ids),
configuration.limits.MAXIMUM_REVIEW_COMMITS)),
html=True)
else:
return renderSelectSource(req, db, user)
req.content_type = "text/html; charset=utf-8"
if commit_ids:
commits = [gitutils.Commit.fromId(db, repository, commit_id) for commit_id in commit_ids]
elif commit_sha1s:
commits = [gitutils.Commit.fromSHA1(db, repository, commit_sha1) for commit_sha1 in commit_sha1s]
else:
commits = []
if not commit_ids:
commit_ids = [commit.getId(db) for commit in commits]
if not commit_sha1s:
commit_sha1s = [commit.sha1 for commit in commits]
if summary is None:
if len(commits) == 1:
summary = commits[0].summary()
else:
summary = ""
if review_branch_name:
invalid_branch_name = "false"
default_branch_name = review_branch_name
else:
invalid_branch_name = htmlutils.jsify(user.name + "/")
default_branch_name = user.name + "/"
match = re.search("(?:^|[Ff]ix(?:e[ds])?(?: +for)?(?: +bug)? +)([A-Z][A-Z0-9]+-[0-9]+)", summary)
if match:
invalid_branch_name = "false"
default_branch_name = htmlutils.htmlify(match.group(1))
changesets = []
changeset_utils.createChangesets(db, repository, commits)
for commit in commits:
changesets.extend(changeset_utils.createChangeset(db, None, repository, commit, do_highlight=False))
changeset_ids = [changeset.id for changeset in changesets]
all_reviewers, all_watchers = reviewing.utils.getReviewersAndWatchers(
db, repository, changesets=changesets, applyparentfilters=applyparentfilters)
document = htmlutils.Document(req)
html = document.html()
head = html.head()
document.addInternalScript(user.getJS(db))
#.........这里部分代码省略.........
开发者ID:ahockersten,项目名称:critic,代码行数:101,代码来源:createreview.py
示例14: render
delay += delay
else: raise
if not connected:
raise page.utils.DisplayMessage, "Service manager not responding!"
connection.send(textutils.json_encode({ "query": "status" }))
connection.shutdown(socket.SHUT_WR)
data = ""
while True:
received = connection.recv(4096)
if not received: break
data += received
result = textutils.json_decode(data)
if result["status"] == "error":
raise page.utils.DisplayMessage, result["error"]
paleyellow = page.utils.PaleYellowTable(body, "Services")
def render(target):
table = target.table("services", cellspacing=0, align="center")
headings = table.tr("headings")
headings.th("name").text("Name")
headings.th("module").text("Module")
headings.th("pid").text("PID")
headings.th("rss").text("RSS")
headings.th("cpu").text("CPU")
开发者ID:ryfow,项目名称:critic,代码行数:31,代码来源:services.py
示例15: getrlimit
from dbutils import Database
if "--json-job" in sys.argv[1:]:
from resource import getrlimit, setrlimit, RLIMIT_RSS
from traceback import print_exc
soft_limit, hard_limit = getrlimit(RLIMIT_RSS)
rss_limit = configuration.services.CHANGESET["rss_limit"]
if soft_limit < rss_limit:
setrlimit(RLIMIT_RSS, (rss_limit, hard_limit))
from changeset.create import createChangeset
from textutils import json_decode, json_encode
request = json_decode(sys.stdin.read())
try:
db = Database()
createChangeset(db, request)
db.close()
sys.stdout.write(json_encode(request))
except:
print "Request:"
print json_encode(request, indent=2)
print
print_exc(file=sys.stdout)
开发者ID:KurSh,项目名称:critic,代码行数:30,代码来源:changeset.py
示例16: executeProcess
def executeProcess(db, manifest, role_name, script, function, extension_id,
user_id, argv, timeout, stdin=None, rlimit_rss=256):
# If |user_id| is not the same as |db.user|, then one user's access of the
# system is triggering an extension on behalf of another user. This will
# for instance happen when one user is adding changes to a review,
# triggering an extension filter hook set up by another user.
#
# In this case, we need to check that the other user can access the
# extension.
#
# If |user_id| is the same as |db.user|, we need to use |db.profiles|, which
# may contain a profile associated with an access token that was used to
# authenticate the user.
if user_id != db.user.id:
user = dbutils.User.fromId(db, user_id)
authentication_labels = auth.DATABASE.getAuthenticationLabels(user)
profiles = [auth.AccessControlProfile.forUser(
db, user, authentication_labels)]
else:
authentication_labels = db.authentication_labels
profiles = db.profiles
extension = Extension.fromId(db, extension_id)
if not auth.AccessControlProfile.isAllowedExtension(
profiles, "execute", extension):
raise auth.AccessDenied("Access denied to extension: execute %s"
% extension.getKey())
flavor = manifest.flavor
if manifest.flavor not in configuration.extensions.FLAVORS:
flavor = configuration.extensions.DEFAULT_FLAVOR
stdin_data = "%s\n" % json_encode({
"library_path": configuration.extensions.FLAVORS[flavor]["library"],
"rlimit": { "rss": rlimit_rss },
"hostname": configuration.base.HOSTNAME,
"dbname": configuration.database.PARAMETERS["database"],
"dbuser": configuration.database.PARAMETERS["user"],
"git": configuration.executables.GIT,
"python": configuration.executables.PYTHON,
"python_path": "%s:%s" % (configuration.paths.CONFIG_DIR,
configuration.paths.INSTALL_DIR),
"repository_work_copy_path": configuration.extensions.WORKCOPY_DIR,
"changeset_address": configuration.services.CHANGESET["address"],
"branchtracker_pid_path": configuration.services.BRANCHTRACKER["pidfile_path"],
"maildelivery_pid_path": configuration.services.MAILDELIVERY["pidfile_path"],
"is_development": configuration.debug.IS_DEVELOPMENT,
"extension_path": manifest.path,
"extension_id": extension_id,
"user_id": user_id,
"authentication_labels": list(authentication_labels),
"role": role_name,
"script_path": script,
"fn": function,
"argv": argv })
if stdin is not None:
stdin_data += stdin
# Double the timeout. Timeouts are primarily handled by the extension runner
# service, which returns an error response on timeout. This deadline here is
# thus mostly to catch the extension runner service itself timing out.
deadline = time.time() + timeout * 2
try:
connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
connection.settimeout(max(0, deadline - time.time()))
connection.connect(configuration.services.EXTENSIONRUNNER["address"])
connection.sendall(json_encode({
"stdin": stdin_data,
"flavor": flavor,
"timeout": timeout
}))
connection.shutdown(socket.SHUT_WR)
data = ""
while True:
connection.settimeout(max(0, deadline - time.time()))
try:
received = connection.recv(4096)
except socket.error as error:
if error.errno == errno.EINTR:
continue
raise
if not received:
break
data += received
connection.close()
except socket.timeout as error:
raise ProcessTimeout(timeout)
except socket.error as error:
raise ProcessError("failed to read response: %s" % error)
try:
data = json_decode(data)
except ValueError as error:
raise ProcessError("failed to decode response: %s" % error)
#.........这里部分代码省略.........
开发者ID:jensl,项目名称:critic,代码行数:101,代码来源:execute.py
示例17: process
def process(value):
value = value.strip()
if value[0] == '"' == value[-1]:
return json_decode(value)
else:
return value
开发者ID:andreastt,项目名称:critic,代码行数:6,代码来源:manifest.py
示例18: main
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-u", dest="user_id", type=int)
parser.add_argument("-l", dest="auth_labels", action="append", default=[])
parser.add_argument("command", nargs="*")
arguments = parser.parse_args()
try:
init(arguments.user_id, arguments.auth_labels)
for command in arguments.command:
pending_mails = None
if command == "generate-mails-for-batch":
data = json_decode(sys.stdin.readline())
batch_id = data["batch_id"]
was_accepted = data["was_accepted"]
is_accepted = data["is_accepted"]
pending_mails = reviewing.utils.generateMailsForBatch(db, batch_id, was_accepted, is_accepted)
elif command == "generate-mails-for-assignments-transaction":
data = json_decode(sys.stdin.readline())
transaction_id = data["transaction_id"]
pending_mails = reviewing.utils.generateMailsForAssignmentsTransaction(db, transaction_id)
elif command == "apply-filters":
data = json_decode(sys.stdin.readline())
filters = reviewing.filters.Filters()
user = dbutils.User.fromId(db, data["user_id"]) if "user_id" in data else None
if "review_id" in data:
review = dbutils.Review.fromId(db, data["review_id"])
filters.setFiles(db, review=review)
filters.load(db, review=review, user=user,
added_review_filters=data.get("added_review_filters", []),
removed_review_filters=data.get("removed_review_filters", []))
else:
repository = gitutils.Repository.fromId(db, data["repository_id"])
filters.setFiles(db, file_ids=data["file_ids"])
filters.load(db, repository=repository, recursive=data.get("recursive", False), user=user)
sys.stdout.write(json_encode(filters.data) + "\n")
elif command == "generate-custom-mails":
pending_mails = []
for data in json_decode(sys.stdin.readline()):
from_user = dbutils.User.fromId(db, data["sender"])
if data.get("recipients"):
recipients = [dbutils.User.fromId(db, user_id)
for user_id in data["recipients"]]
else:
recipients = None
subject = data["subject"]
headers = data.get("headers")
body = data["body"]
if "review_id" in data:
review = dbutils.Review.fromId(db, data["review_id"])
else:
review = None
pending_mails.extend(sendCustomMail(
from_user, recipients, subject, headers, body, review))
elif command == "set-review-state":
data = json_decode(sys.stdin.readline())
error = ""
try:
user = dbutils.User.fromId(db, data["user_id"])
review = dbutils.Review.fromId(db, data["review_id"])
if review.state != data["old_state"]:
error = "invalid old state"
elif data["new_state"] == "open":
review.reopen(db, user)
elif data["new_state"] == "closed":
review.close(db, user)
elif data["new_state"] == "dropped":
review.drop(db, user)
else:
error = "invalid new state"
except dbutils.NoSuchUser:
error = "invalid user id"
except dbutils.NoSuchReview:
error = "invalid review id"
except Exception as error:
error = str(error)
sys.stdout.write(error + "\n")
elif command in HANDLERS:
data_in = json_decode(sys.stdin.readline())
data_out = HANDLERS[command](data_in)
sys.stdout.write(json_encode(data_out) + "\n")
else:
sys.stdout.write(json_encode("unknown command: %s" % command) + "\n")
sys.exit(0)
if pending_mails is not None:
sys.stdout.write(json_encode(pending_mails) + "\n")
finish()
except Exception:
sys.stdout.write(json_encode(traceback.format_exc()) + "\n")
finally:
abort()
开发者ID:jensl,项目名称:critic,代码行数:97,代码来源:cli.py
示例19: processLine
def processLine(paths, line):
try:
command, value = line.split(" ", 1)
except ValueError:
raise InjectError("Invalid line in output: %r" % line)
if command not in ("link", "script", "stylesheet", "preference"):
raise InjectError("Invalid command: %r" % command)
value = value.strip()
try:
value = json_decode(value)
except ValueError:
raise InjectError("Invalid JSON: %r" % value)
def is_string(value):
return isinstance(value, basestring)
if command in ("script", "stylesheet") and not is_string(value):
raise InjectError("Invalid value for %r: %r (expected string)"
% (command, value))
elif command == "link":
if isinstance(value, dict):
if "label" not in value or not is_string(value["label"]):
raise InjectError("Invalid value for %r: %r (expected attribute 'label' of type string)"
% (command, value))
elif "url" not in value or not is_string(value["url"]) or value["url"] is None:
raise InjectError("Invalid value for %r: %r (expected attribute 'url' of type string or null)"
% (command, value))
# Alternatively support [label, url] (backwards compatibility).
elif not isinstance(value, list) or len(value) != 2:
raise InjectError("Invalid value for %r: %r (expected object { \"label\": LABEL, \"url\": URL })"
% (command, value))
elif not is_string(value[0]):
raise InjectError("Invalid value for %r: %r (expected string at array[0])"
% (command, value))
elif not (is_string(value[1]) or value[1] is None):
raise InjectError("Invalid value for %r: %r (expected string or null at array[1])"
% (command, value))
else:
value = { "label": value[0], "url": value[1] }
elif command == "preference":
if "config" not in paths:
raise InjectError("Invalid command: %r only valid on /config page"
% command)
elif not isinstance(value, dict):
raise InjectError("Invalid value for %r: %r (expected object)"
% (command, value))
for name in ("url", "name", "type", "value", "default", "description"):
if name not in value:
raise InjectError("Invalid value for %r: %r (missing attribute %r)"
% (command, value, name))
preference_url = value["url"]
preference_name = value["name"]
preference_type = value["type"]
preference_value = value["value"]
preference_default = value["default"]
preference_description = value["description"]
if not is_string(preference_url):
raise InjectError("Invalid value for %r: %r (expected attribute 'url' of type string)"
% (command, value))
elif not is_string(preference_name):
raise InjectError("Invalid value for %r: %r (expected attribute 'name' of type string)"
% (command, value))
elif not is_string(preference_description):
raise InjectError("Invalid value for %r: %r (expected attribute 'description' of type string)"
% (command, value))
if is_string(preference_type):
if preference_type not in ("boolean", "integer", "string"):
raise InjectError("Invalid value for %r: %r (unsupported preference type)"
% (command, value))
if preference_type == "boolean":
type_check = lambda value: isinstance(value, bool)
elif preference_type == "integer":
type_check = lambda value: isinstance(value, int)
else:
type_check = is_string
if not type_check(preference_value):
raise InjectError("Invalid value for %r: %r (type mismatch between 'value' and 'type')"
% (command, value))
if not type_check(preference_default):
raise InjectError("Invalid value for %r: %r (type mismatch between 'default' and 'type')"
% (command, value))
else:
if not isinstance(preference_type, list):
raise InjectError("Invalid value for %r: %r (invalid 'type', expected string or array)"
% (command, value))
for index, choice in enumerate(preference_type):
if not isinstance(choice, dict) \
or not isinstance(choice.get("value"), basestring) \
or not isinstance(choice.get("title"), basestring):
#.........这里部分代码省略.........
开发者ID:Aessy,项目名称:critic,代码行数:101,代码来源:inject.py
示例20: handle_input
def handle_input(self, value):
self.__requests = json_decode(value)
self.__results = []
self.server.add_requests(self, self.__requests)
开发者ID:KurSh,项目名称:critic,代码行数:4,代码来源:utils.py
注:本文中的textutils.json_decode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论