本文整理汇总了Python中temba.utils.json.dumps函数的典型用法代码示例。如果您正苦于以下问题:Python dumps函数的具体用法?Python dumps怎么用?Python dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_claim
def test_claim(self, mock_post):
url = reverse("channels.types.viber_public.claim")
self.login(self.admin)
# check that claim page URL appears on claim list page
response = self.client.get(reverse("channels.channel_claim"))
self.assertContains(response, url)
# try submitting with invalid token
mock_post.return_value = MockResponse(400, json.dumps({"status": 3, "status_message": "Invalid token"}))
response = self.client.post(url, {"auth_token": "invalid"})
self.assertEqual(response.status_code, 200)
self.assertContains(response, "Error validating authentication token")
# ok this time claim with a success
mock_post.side_effect = [
MockResponse(200, json.dumps({"status": 0, "status_message": "ok", "id": "viberId", "uri": "viberName"})),
MockResponse(200, json.dumps({"status": 0, "status_message": "ok", "id": "viberId", "uri": "viberName"})),
MockResponse(200, json.dumps({"status": 0, "status_message": "ok"})),
]
self.client.post(url, {"auth_token": "123456"}, follow=True)
# assert our channel got created
channel = Channel.objects.get(address="viberId")
self.assertEqual(channel.config["auth_token"], "123456")
self.assertEqual(channel.name, "viberName")
self.assertTrue(channel.get_type().has_attachment_support(channel))
# should have been called with our webhook URL
self.assertEqual(mock_post.call_args[0][0], "https://chatapi.viber.com/pa/set_webhook")
开发者ID:teehamaral,项目名称:rapidpro,代码行数:33,代码来源:tests.py
示例2: refresh_access_token
def refresh_access_token(self, channel_id):
r = get_redis_connection()
lock_name = self.TOKEN_REFRESH_LOCK % self.channel_uuid
if not r.get(lock_name):
with r.lock(lock_name, timeout=30):
key = self.TOKEN_STORE_KEY % self.channel_uuid
post_data = dict(grant_type="client_credentials", client_id=self.app_id, client_secret=self.app_secret)
url = self.TOKEN_URL
event = HttpEvent("POST", url, json.dumps(post_data))
start = time.time()
response = self._request(url, post_data, access_token=None)
event.status_code = response.status_code
if response.status_code != 200:
event.response_body = response.content
ChannelLog.log_channel_request(
channel_id, "Got non-200 response from %s" % self.API_NAME, event, start, True
)
return
response_json = response.json()
event.response_body = json.dumps(response_json)
ChannelLog.log_channel_request(
channel_id, "Successfully fetched access token from %s" % self.API_NAME, event, start
)
access_token = response_json["access_token"]
expires = response_json.get("expires_in", 7200)
r.set(key, access_token, ex=int(expires))
return access_token
开发者ID:teehamaral,项目名称:rapidpro,代码行数:34,代码来源:access_token.py
示例3: form_valid
def form_valid(self, form):
self.form = form
user = self.request.user
org = user.get_org()
simulation = self.request.GET.get("simulation", "false") == "true"
omnibox = self.form.cleaned_data["omnibox"]
has_schedule = self.form.cleaned_data["schedule"]
step_uuid = self.form.cleaned_data.get("step_node", None)
text = self.form.cleaned_data["text"]
groups = list(omnibox["groups"])
contacts = list(omnibox["contacts"])
urns = list(omnibox["urns"])
if step_uuid:
from .tasks import send_to_flow_node
get_params = {k: v for k, v in self.request.GET.items()}
get_params.update({"s": step_uuid})
send_to_flow_node.delay(org.pk, user.pk, text, **get_params)
if "_format" in self.request.GET and self.request.GET["_format"] == "json":
return HttpResponse(json.dumps(dict(status="success")), content_type="application/json")
else:
return HttpResponseRedirect(self.get_success_url())
# if simulating only use the test contact
if simulation:
groups = []
urns = []
for contact in contacts:
if contact.is_test:
contacts = [contact]
break
schedule = Schedule.objects.create(created_by=user, modified_by=user) if has_schedule else None
broadcast = Broadcast.create(
org, user, text, groups=groups, contacts=contacts, urns=urns, schedule=schedule, status=QUEUED
)
if not has_schedule:
self.post_save(broadcast)
super().form_valid(form)
analytics.track(
self.request.user.username,
"temba.broadcast_created",
dict(contacts=len(contacts), groups=len(groups), urns=len(urns)),
)
if "_format" in self.request.GET and self.request.GET["_format"] == "json":
data = dict(status="success", redirect=reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
return HttpResponse(json.dumps(data), content_type="application/json")
else:
if self.form.cleaned_data["schedule"]:
return HttpResponseRedirect(reverse("msgs.broadcast_schedule_read", args=[broadcast.pk]))
return HttpResponseRedirect(self.get_success_url())
开发者ID:teehamaral,项目名称:rapidpro,代码行数:57,代码来源:views.py
示例4: start_call
def start_call(self, call, to, from_, status_callback):
if not settings.SEND_CALLS:
raise ValueError("SEND_CALLS set to False, skipping call start")
params = dict(to=to, from_=call.channel.address, url=status_callback, status_callback=status_callback)
try:
twilio_call = self.api.calls.create(**params)
call.external_id = str(twilio_call.sid)
# the call was successfully sent to the IVR provider
call.status = IVRCall.WIRED
call.save()
for event in self.events:
ChannelLog.log_ivr_interaction(call, "Started call", event)
except TwilioRestException as twilio_error:
message = "Twilio Error: %s" % twilio_error.msg
if twilio_error.code == 20003:
message = _("Could not authenticate with your Twilio account. Check your token and try again.")
event = HttpEvent("POST", "https://api.nexmo.com/v1/calls", json.dumps(params), response_body=str(message))
ChannelLog.log_ivr_interaction(call, "Call start failed", event, is_error=True)
call.status = IVRCall.FAILED
call.save()
raise IVRException(message)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:29,代码来源:clients.py
示例5: form_invalid
def form_invalid(self, form):
if "_format" in self.request.GET and self.request.GET["_format"] == "json": # pragma: needs cover
return HttpResponse(
json.dumps(dict(status="error", errors=form.errors)), content_type="application/json", status=400
)
else:
return super().form_invalid(form)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:views.py
示例6: test_release
def test_release(self, mock_delete):
mock_delete.return_value = MockResponse(200, json.dumps({"success": True}))
self.channel.release()
mock_delete.assert_called_once_with(
"https://graph.facebook.com/v2.12/me/subscribed_apps", params={"access_token": "09876543"}
)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:tests.py
示例7: push_task
def push_task(org, queue, task_name, args, priority=DEFAULT_PRIORITY):
"""
Adds a task to queue_name with the supplied arguments.
Ex: push_task(nyaruka, 'flows', 'start_flow', [1,2,3,4,5,6,7,8,9,10])
"""
r = get_redis_connection("default")
# calculate our score from the current time and priority, this could get us in trouble
# if things are queued for more than ~100 days, but otherwise gives us the properties of prioritizing
# first based on priority, then insertion order.
score = time.time() + priority
# push our task onto the right queue and make sure it is in the active list (atomically)
with r.pipeline() as pipe:
org_id = org if isinstance(org, int) else org.id
pipe.zadd("%s:%d" % (task_name, org_id), score, json.dumps(args))
# and make sure this key is in our list of queues so this job will get worked on
pipe.zincrby("%s:active" % task_name, org_id, 0)
pipe.execute()
# if we were given a queue to schedule on, then add this task to celery.
#
# note that the task that is fired needs no arguments as it should just use pop_task with the
# task name to determine what to work on.
if queue:
if getattr(settings, "CELERY_ALWAYS_EAGER", False):
task_function = lookup_task_function(task_name)
task_function()
else: # pragma: needs cover
current_app.send_task(task_name, args=[], kwargs={}, queue=queue)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:32,代码来源:queues.py
示例8: serialize_run
def serialize_run(run):
serialized = {
"uuid": str(run.uuid),
"status": "completed" if run.exited_on else "active",
"created_on": run.created_on.isoformat(),
"exited_on": run.exited_on.isoformat() if run.exited_on else None,
"expires_on": run.expires_on.isoformat() if run.expires_on else None,
"flow": {"uuid": str(run.flow.uuid), "name": run.flow.name},
"path": run.path,
}
if run.results:
serialized["results"] = run.results
if run.events:
serialized["events"] = run.events
if run.parent_id and run.parent.contact == run.contact:
serialized["parent_uuid"] = str(run.parent.uuid)
msg_in = run.get_last_msg()
if msg_in:
serialized["input"] = serialize_input(msg_in)
# things in @extra might not have come from a webhook call but at least they'll be accessible if we make one
if run.fields:
payload = json.dumps(run.fields)
serialized["webhook"] = {
"request": "GET / HTTP/1.1\r\nHost: fakewebhooks.com\r\nUser-Agent: goflow-trials\r\n\r\n",
"response": "HTTP/1.1 200 OK\r\nContent-Type: application/json; charset=utf-8\r\n\r\n" + payload,
"status": "success",
"status_code": 200,
"url": "http://fakewebhooks.com/",
}
return serialized
开发者ID:mxabierto,项目名称:rapidpro,代码行数:34,代码来源:resumes.py
示例9: assertStatus
def assertStatus(sms, event_type, assert_status):
data["event_type"] = event_type
response = self.client.post(
reverse("handlers.junebug_handler", args=["event", self.channel.uuid]),
data=json.dumps(data),
content_type="application/json",
)
self.assertEqual(200, response.status_code)
sms = Msg.objects.get(pk=sms.id)
self.assertEqual(assert_status, sms.status)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:10,代码来源:tests.py
示例10: put_jsonl
def put_jsonl(self, bucket, key, records):
stream = io.BytesIO()
gz = gzip.GzipFile(fileobj=stream, mode="wb")
for record in records:
gz.write(json.dumps(record).encode("utf-8"))
gz.write(b"\n")
gz.close()
self.objects[(bucket, key)] = stream
开发者ID:mxabierto,项目名称:rapidpro,代码行数:10,代码来源:s3.py
示例11: _request
def _request(self, endpoint, payload):
if logger.isEnabledFor(logging.DEBUG): # pragma: no cover
logger.debug("=============== %s request ===============" % endpoint)
logger.debug(json.dumps(payload, indent=2))
logger.debug("=============== /%s request ===============" % endpoint)
response = requests.post("%s/mr/%s" % (self.base_url, endpoint), json=payload, headers=self.headers)
resp_json = response.json()
if logger.isEnabledFor(logging.DEBUG): # pragma: no cover
logger.debug("=============== %s response ===============" % endpoint)
logger.debug(json.dumps(resp_json, indent=2))
logger.debug("=============== /%s response ===============" % endpoint)
if 400 <= response.status_code < 500:
raise MailroomException(endpoint, payload, resp_json)
response.raise_for_status()
return resp_json
开发者ID:teehamaral,项目名称:rapidpro,代码行数:20,代码来源:client.py
示例12: setUp
def setUp(self):
responses.add(
responses.GET,
"https://api.github.com/repos/nyaruka/posm-extracts/git/trees/master",
body=json.dumps({"tree": [{"path": "geojson", "sha": "the-sha"}]}),
content_type="application/json",
)
responses.add(
responses.GET,
"https://api.github.com/repos/nyaruka/posm-extracts/git/trees/the-sha",
body=json.dumps({"tree": [{"path": "R12345_simplified.json"}, {"path": "R45678_simplified.json"}]}),
content_type="application/json",
)
responses.add(
responses.GET,
"https://raw.githubusercontent.com/nyaruka/posm-extracts/master/geojson/R12345_simplified.json",
body="the-relation-json",
content_type="application/json",
)
self.testdir = tempfile.mkdtemp()
开发者ID:teehamaral,项目名称:rapidpro,代码行数:20,代码来源:tests.py
示例13: _request
def _request(self, endpoint, payload):
if self.debug:
print("[GOFLOW]=============== %s request ===============" % endpoint)
print(json.dumps(payload, indent=2))
print("[GOFLOW]=============== /%s request ===============" % endpoint)
response = requests.post("%s/flow/%s" % (self.base_url, endpoint), json=payload, headers=self.headers)
resp_json = response.json()
if self.debug:
print("[GOFLOW]=============== %s response ===============" % endpoint)
print(json.dumps(resp_json, indent=2))
print("[GOFLOW]=============== /%s response ===============" % endpoint)
if 400 <= response.status_code < 500:
raise FlowServerException(endpoint, payload, resp_json)
response.raise_for_status()
return resp_json
开发者ID:mxabierto,项目名称:rapidpro,代码行数:20,代码来源:client.py
示例14: test_receive_with_session_id
def test_receive_with_session_id(self):
from temba.ussd.models import USSDSession
data = self.mk_ussd_msg(content="événement", session_id="session-id", to=self.starcode)
callback_url = reverse("handlers.junebug_handler", args=["inbound", self.channel.uuid])
self.client.post(callback_url, json.dumps(data), content_type="application/json")
# load our message
inbound_msg, outbound_msg = Msg.objects.all().order_by("pk")
self.assertEqual(outbound_msg.connection.status, USSDSession.TRIGGERED)
self.assertEqual(outbound_msg.connection.external_id, "session-id")
self.assertEqual(inbound_msg.connection.external_id, "session-id")
开发者ID:mxabierto,项目名称:rapidpro,代码行数:12,代码来源:tests.py
示例15: test_receive_ussd_no_session
def test_receive_ussd_no_session(self):
from temba.channels.handlers import JunebugUSSDHandler
# Delete the trigger to prevent the sesion from being created
self.trigger.delete()
data = self.mk_ussd_msg(content="événement", to=self.starcode)
callback_url = reverse("handlers.junebug_handler", args=["inbound", self.channel.uuid])
response = self.client.post(callback_url, json.dumps(data), content_type="application/json")
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()["status"], JunebugUSSDHandler.NACK)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:12,代码来源:tests.py
示例16: get_db_prep_value
def get_db_prep_value(self, value, *args, **kwargs):
# if the value is falsy we will save is as null
if self.null and value in (None, {}, []) and not kwargs.get("force"):
return None
if value is None:
return None
if type(value) not in (list, dict, OrderedDict):
raise ValueError("JSONAsTextField should be a dict or a list, got %s => %s" % (type(value), value))
return json.dumps(value)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:12,代码来源:models.py
示例17: post
def post(self, request, *args, **kwargs):
user = self.request.user
org = user.get_org()
form = MsgActionForm(self.request.POST, org=org, user=user)
if form.is_valid():
response = form.execute()
# shouldn't get in here in normal operation
if response and "error" in response: # pragma: no cover
return HttpResponse(json.dumps(response), content_type="application/json", status=400)
return self.get(request, *args, **kwargs)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:14,代码来源:views.py
示例18: send_message_auto_complete_processor
def send_message_auto_complete_processor(request):
"""
Adds completions for the expression auto-completion to the request context
"""
completions = []
user = request.user
org = None
if hasattr(user, "get_org"):
org = request.user.get_org()
if org:
completions.append(dict(name="contact", display=str(_("Contact Name"))))
completions.append(dict(name="contact.first_name", display=str(_("Contact First Name"))))
completions.append(dict(name="contact.groups", display=str(_("Contact Groups"))))
completions.append(dict(name="contact.language", display=str(_("Contact Language"))))
completions.append(dict(name="contact.name", display=str(_("Contact Name"))))
completions.append(dict(name="contact.tel", display=str(_("Contact Phone"))))
completions.append(dict(name="contact.tel_e164", display=str(_("Contact Phone - E164"))))
completions.append(dict(name="contact.uuid", display=str(_("Contact UUID"))))
completions.append(dict(name="date", display=str(_("Current Date and Time"))))
completions.append(dict(name="date.now", display=str(_("Current Date and Time"))))
completions.append(dict(name="date.today", display=str(_("Current Date"))))
completions.append(dict(name="date.tomorrow", display=str(_("Tomorrow's Date"))))
completions.append(dict(name="date.yesterday", display=str(_("Yesterday's Date"))))
for scheme, label in ContactURN.SCHEME_CHOICES:
if scheme != TEL_SCHEME and scheme in org.get_schemes(Channel.ROLE_SEND):
completions.append(dict(name="contact.%s" % scheme, display=str(_("Contact %s" % label))))
for field in org.contactfields(manager="user_fields").filter(is_active=True).order_by("label"):
display = str(_("Contact Field: %(label)s")) % {"label": field.label}
completions.append(dict(name="contact.%s" % str(field.key), display=display))
function_completions = get_function_listing()
return dict(completions=json.dumps(completions), function_completions=json.dumps(function_completions))
开发者ID:teehamaral,项目名称:rapidpro,代码行数:37,代码来源:views.py
示例19: __str__
def __str__(self):
object_len = len(self.document)
for idx in range(object_len):
action_dict = self.document[idx]
if action_dict["action"] in ["talk", "stream"]:
if idx == object_len - 1:
self.document[idx]["bargeIn"] = False
elif idx <= object_len - 2:
next_action_dict = self.document[idx + 1]
if next_action_dict["action"] != "input":
self.document[idx]["bargeIn"] = False
return json.dumps(self.document)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:15,代码来源:nexmo.py
示例20: push_courier_msgs
def push_courier_msgs(channel, msgs, high_priority=False):
"""
Adds the passed in msgs to our courier queue for channel
"""
r = get_redis_connection("default")
priority = COURIER_HIGH_PRIORITY if high_priority else COURIER_LOW_PRIORITY
tps = channel.tps if channel.tps else COURIER_DEFAULT_TPS
# create our payload
payload = []
for msg in msgs:
payload.append(msg_as_task(msg))
# call our lua script
get_script(r)(keys=(time.time(), "msgs", channel.uuid, tps, priority, json.dumps(payload)), client=r)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:15,代码来源:courier.py
注:本文中的temba.utils.json.dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论