本文整理汇总了Python中temba.msgs.models.Msg类的典型用法代码示例。如果您正苦于以下问题:Python Msg类的具体用法?Python Msg怎么用?Python Msg使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Msg类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_resume_with_message_in_subflow
def test_resume_with_message_in_subflow(self, mock_report_success, mock_report_failure):
self.get_flow("subflow")
parent_flow = Flow.objects.get(org=self.org, name="Parent Flow")
child_flow = Flow.objects.get(org=self.org, name="Child Flow")
# start the parent flow and then trigger the subflow by picking an option
parent_flow.start([], [self.contact])
Msg.create_incoming(self.channel, "tel:+12065552020", "color")
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
parent_run, child_run = list(FlowRun.objects.order_by("created_on"))
# check the reconstructed session for this run
session = resumes.reconstruct_session(child_run)
self.assertEqual(len(session["runs"]), 2)
self.assertEqual(session["runs"][0]["flow"]["uuid"], str(parent_flow.uuid))
self.assertEqual(session["runs"][1]["flow"]["uuid"], str(child_flow.uuid))
self.assertEqual(session["contact"]["uuid"], str(self.contact.uuid))
self.assertEqual(session["trigger"]["type"], "manual")
self.assertNotIn("results", session)
self.assertNotIn("events", session)
# and then resume by replying
Msg.create_incoming(self.channel, "tel:+12065552020", "I like red")
child_run.refresh_from_db()
parent_run.refresh_from_db()
# subflow run has completed
self.assertIsNotNone(child_run.exited_on)
self.assertIsNone(parent_run.exited_on)
self.assertEqual(mock_report_success.call_count, 2)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:35,代码来源:tests.py
示例2: send_message
def send_message(
self,
flow,
message,
restart_participants=False,
contact=None,
initiate_flow=False,
assert_reply=True,
assert_handle=True,
):
"""
Starts the flow, sends the message, returns the reply
"""
if not contact:
contact = self.contact
try:
if contact.is_test:
Contact.set_simulation(True)
incoming = self.create_msg(
direction=INCOMING, contact=contact, contact_urn=contact.get_urn(), text=message
)
# start the flow
if initiate_flow:
flow.start(
groups=[], contacts=[contact], restart_participants=restart_participants, start_msg=incoming
)
else:
flow.start(groups=[], contacts=[contact], restart_participants=restart_participants)
(handled, msgs) = Flow.find_and_handle(incoming)
Msg.mark_handled(incoming)
if assert_handle:
self.assertTrue(handled, "'%s' did not handle message as expected" % flow.name)
else:
self.assertFalse(handled, "'%s' handled message, was supposed to ignore" % flow.name)
# our message should have gotten a reply
if assert_reply:
replies = Msg.objects.filter(response_to=incoming).order_by("pk")
self.assertGreaterEqual(len(replies), 1)
if len(replies) == 1:
self.assertEqual(contact, replies.first().contact)
return replies.first().text
# if it's more than one, send back a list of replies
return [reply.text for reply in replies]
else:
# assert we got no reply
replies = Msg.objects.filter(response_to=incoming).order_by("pk")
self.assertFalse(replies)
return None
finally:
Contact.set_simulation(False)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:60,代码来源:base.py
示例3: test_msg_events_with_attachments
def test_msg_events_with_attachments(self, mock_report_success, mock_report_failure):
# test an outgoing message with media
flow = self.get_flow("color")
flow_json = flow.as_json()
flow_json["action_sets"][2]["actions"][0]["media"] = {"base": "image/jpg:files/blue.jpg"}
flow.update(flow_json)
flow.start([], [self.contact])
Msg.create_incoming(self.channel, "tel:+12065552020", "blue")
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:12,代码来源:tests.py
示例4: handle_async
def handle_async(self, urn, content, date, message_id):
from temba.msgs.models import Msg, USSD
Msg.create_incoming(
channel=self.channel,
org=self.org,
urn=urn,
text=content or "",
sent_on=date,
connection=self,
msg_type=USSD,
external_id=message_id,
)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:13,代码来源:models.py
示例5: test_webhook_mocking
def test_webhook_mocking(self, mock_report_success, mock_report_failure):
flow = self.get_flow("dual_webhook")
# mock the two webhook calls in this flow
self.mockRequest("POST", "/code", '{"code": "ABABUUDDLRS"}', content_type="application/json")
self.mockRequest("GET", "/success", "Success")
flow.start([], [self.contact])
Msg.create_incoming(self.channel, "tel:+12065552020", "Bob")
self.assertAllRequestsMade()
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:14,代码来源:tests.py
示例6: test_trial_throttling
def test_trial_throttling(self, mock_report_success, mock_report_failure):
# first resume in a suitable flow will be trialled
favorites = self.get_flow("favorites")
favorites.start([], [self.contact], interrupt=True)
Msg.create_incoming(self.channel, "tel:+12065552020", "red")
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
Msg.create_incoming(self.channel, "tel:+12065552020", "primus")
# second won't because its too soon
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:14,代码来源:tests.py
示例7: create_unsolicited_incoming
def create_unsolicited_incoming(self, org):
if not org.cache["contacts"]:
return
self._log(" > Receiving unsolicited incoming message in org %s\n" % org.name)
available_contacts = list(set(org.cache["contacts"]) - set(org.cache["activity"]["started"]))
if available_contacts:
contact = Contact.objects.get(id=self.random_choice(available_contacts))
channel = self.random_choice(org.cache["channels"])
urn = contact.urns.first()
if urn:
text = " ".join([self.random_choice(l) for l in INBOX_MESSAGES])
Msg.create_incoming(channel, str(urn), text)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:14,代码来源:test_db.py
示例8: default
def default(self, line):
"""
Sends a message as the current contact's highest priority URN
"""
urn = self.contact.get_urn()
incoming = Msg.create_incoming(None, (urn.scheme, urn.path), line, date=timezone.now(), org=self.org)
Msg.process_message(incoming)
print((Fore.GREEN + "[%s] " + Fore.YELLOW + ">>" + Fore.MAGENTA + " %s" + Fore.WHITE) % (urn.urn, incoming.text))
# look up any message responses
outgoing = Msg.objects.filter(org=self.org, pk__gt=incoming.pk, direction=OUTGOING)
for response in outgoing:
print((Fore.GREEN + "[%s] " + Fore.YELLOW + "<<" + Fore.MAGENTA + " %s" + Fore.WHITE) % (urn.urn, response.text))
开发者ID:AxisOfEval,项目名称:rapidpro,代码行数:15,代码来源:msg_console.py
示例9: test_channellog
def test_channellog(self):
contact = self.create_contact("Test", "+250788383383")
msg = Msg.create_outgoing(self.org, self.admin, contact, "This is a test message")
msg = dict_to_struct('MockMsg', msg.as_task_json())
with SegmentProfiler("Channel Log inserts (10,000)", self, force_profile=True):
for i in range(10000):
ChannelLog.log_success(msg, "Sent Message", method="GET", url="http://foo",
request="GET http://foo", response="Ok", response_status="201")
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:9,代码来源:perf_tests.py
示例10: test_message_incoming
def test_message_incoming(self):
num_contacts = 300
with SegmentProfiler("Creating incoming messages from new contacts", self, False, force_profile=True):
for c in range(0, num_contacts):
scheme, path, channel = self.urn_generators[c % len(self.urn_generators)](c)
Msg.create_incoming(channel, (scheme, path), "Thanks #1", self.user)
with SegmentProfiler("Creating incoming messages from existing contacts", self, False, force_profile=True):
for c in range(0, num_contacts):
scheme, path, channel = self.urn_generators[c % len(self.urn_generators)](c)
Msg.create_incoming(channel, (scheme, path), "Thanks #2", self.user)
# check messages for each channel
incoming_total = 2 * num_contacts
self.assertEqual(incoming_total / 3, Msg.objects.filter(direction=INCOMING, channel=self.tel_mtn).count())
self.assertEqual(incoming_total / 3, Msg.objects.filter(direction=INCOMING, channel=self.tel_tigo).count())
self.assertEqual(incoming_total / 3, Msg.objects.filter(direction=INCOMING, channel=self.twitter).count())
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:18,代码来源:perf_tests.py
示例11: test_resume_with_expiration_in_subflow
def test_resume_with_expiration_in_subflow(self, mock_report_success, mock_report_failure):
self.get_flow("subflow")
parent_flow = Flow.objects.get(org=self.org, name="Parent Flow")
# start the parent flow and then trigger the subflow by picking an option
parent_flow.start([], [self.contact])
Msg.create_incoming(self.channel, "tel:+12065552020", "color")
parent_run, child_run = list(FlowRun.objects.order_by("created_on"))
# resume by expiring the child run
child_run.expire()
child_run.refresh_from_db()
parent_run.refresh_from_db()
# which should end both our runs
self.assertIsNotNone(child_run.exited_on)
self.assertIsNotNone(parent_run.exited_on)
self.assertEqual(mock_report_success.call_count, 2)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:21,代码来源:tests.py
示例12: test_resume_with_message
def test_resume_with_message(self, mock_report_success, mock_report_failure):
favorites = self.get_flow("favorites")
run, = favorites.start([], [self.contact])
# check the reconstructed session for this run
session = resumes.reconstruct_session(run)
self.assertEqual(len(session["runs"]), 1)
self.assertEqual(session["runs"][0]["flow"]["uuid"], str(favorites.uuid))
self.assertEqual(session["contact"]["uuid"], str(self.contact.uuid))
self.assertNotIn("results", session)
self.assertNotIn("events", session)
# and then resume by replying
Msg.create_incoming(
self.channel, "tel:+12065552020", "I like red", attachments=["image/jpeg:http://example.com/red.jpg"]
)
run.refresh_from_db()
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
# and then resume by replying again
Msg.create_incoming(self.channel, "tel:+12065552020", "ooh Primus")
run.refresh_from_db()
self.assertEqual(mock_report_success.call_count, 2)
self.assertEqual(mock_report_failure.call_count, 0)
# simulate session not containing this run
self.assertEqual(set(resumes.compare(run, {"runs": []}).keys()), {"session"})
# simulate differences in the path, results and events
session = resumes.reconstruct_session(run)
session["runs"][0]["path"][0]["node_uuid"] = "wrong node"
session["runs"][0]["results"]["color"]["value"] = "wrong value"
session["runs"][0]["events"][0]["msg"]["text"] = "wrong text"
self.assertTrue(resumes.compare(run, session)["diffs"])
开发者ID:mxabierto,项目名称:rapidpro,代码行数:39,代码来源:tests.py
示例13: save
def save(self):
user = self.context['user']
contacts = self.validated_data['contacts']
action = self.validated_data['action']
group = self.validated_data.get('group')
if action == self.ADD:
group.update_contacts(user, contacts, add=True)
elif action == self.REMOVE:
group.update_contacts(user, contacts, add=False)
elif action == self.INTERRUPT:
FlowRun.exit_all_for_contacts(contacts, FlowRun.EXIT_TYPE_INTERRUPTED)
elif action == self.ARCHIVE:
Msg.archive_all_for_contacts(contacts)
else:
for contact in contacts:
if action == self.BLOCK:
contact.block(user)
elif action == self.UNBLOCK:
contact.unblock(user)
elif action == self.DELETE:
contact.release(user)
开发者ID:,项目名称:,代码行数:22,代码来源:
示例14: create_flow_run
def create_flow_run(self, org):
activity = org.cache["activity"]
flow = activity["flow"]
if activity["unresponded"]:
contact_id = self.random_choice(activity["unresponded"])
activity["unresponded"].remove(contact_id)
contact = Contact.objects.get(id=contact_id)
urn = contact.urns.first()
if urn:
self._log(" > Receiving flow responses for flow %s in org %s\n" % (flow.name, flow.org.name))
inputs = self.random_choice(flow.input_templates)
for text in inputs:
channel = flow.org.cache["channels"][0]
Msg.create_incoming(channel, str(urn), text)
# if more than 10% of contacts have responded, consider flow activity over
if len(activity["unresponded"]) <= (len(activity["started"]) * 0.9):
self.end_flow_activity(flow.org)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:23,代码来源:test_db.py
示例15: default
def default(self, line):
"""
Sends a message as the current contact's highest priority URN
"""
urn = self.contact.get_urn()
incoming = Msg.create_incoming(None, URN.from_parts(urn.scheme, urn.path),
line, date=timezone.now(), org=self.org)
self.echo((Fore.GREEN + "[%s] " + Fore.YELLOW + ">>" + Fore.MAGENTA + " %s" + Fore.WHITE) % (urn.urn, incoming.text))
# look up any message responses
outgoing = Msg.all_messages.filter(org=self.org, pk__gt=incoming.pk, direction=OUTGOING).order_by('sent_on')
for response in outgoing:
self.echo((Fore.GREEN + "[%s] " + Fore.YELLOW + "<<" + Fore.MAGENTA + " %s" + Fore.WHITE) % (urn.urn, response.text))
开发者ID:Ebaneck,项目名称:rapidpro,代码行数:15,代码来源:msg_console.py
示例16: test_trial_fault_tolerance
def test_trial_fault_tolerance(self, mock_report_failure):
favorites = self.get_flow("favorites")
# an exception in maybe_start shouldn't prevent normal flow execution
with patch("temba.flows.server.trial.resumes.reconstruct_session") as mock_reconstruct_session:
mock_reconstruct_session.side_effect = ValueError("BOOM")
run, = favorites.start([], [self.contact])
Msg.create_incoming(self.channel, "tel:+12065552020", "I like red")
run.refresh_from_db()
self.assertEqual(len(run.path), 4)
# a flow server exception in end also shouldn't prevent normal flow execution
with patch("temba.flows.server.trial.resumes.resume") as mock_resume:
mock_resume.side_effect = FlowServerException("resume", {}, {"errors": ["Boom!"]})
run, = favorites.start([], [self.contact], restart_participants=True)
Msg.create_incoming(self.channel, "tel:+12065552020", "I like red")
run.refresh_from_db()
self.assertEqual(len(run.path), 4)
# any other exception in end_resume also shouldn't prevent normal flow execution
with patch("temba.flows.server.trial.resumes.resume") as mock_resume:
mock_resume.side_effect = ValueError("BOOM")
run, = favorites.start([], [self.contact], restart_participants=True)
Msg.create_incoming(self.channel, "tel:+12065552020", "I like red")
run.refresh_from_db()
self.assertEqual(len(run.path), 4)
# detected differences should be reported but shouldn't effect normal flow execution
with patch("temba.flows.server.trial.resumes.compare") as mock_compare:
mock_compare.return_value = {"diffs": ["a", "b"]}
run, = favorites.start([], [self.contact], restart_participants=True)
Msg.create_incoming(self.channel, "tel:+12065552020", "I like red")
run.refresh_from_db()
self.assertEqual(len(run.path), 4)
self.assertEqual(mock_report_failure.call_count, 1)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:40,代码来源:tests.py
示例17: create_messages_for_ivr_actions
def create_messages_for_ivr_actions(apps, schema_editor):
from django.contrib.auth.models import User
IVRAction = apps.get_model("ivr", "IVRAction")
# create a one-to-one mapping for any ivr actions as ivr messages
for org in Org.objects.all():
channel = org.get_call_channel()
# print "Processing %s" % org
if channel:
for ivr in IVRAction.objects.filter(org=org):
step = FlowStep.objects.get(pk=ivr.step.pk)
if step.rule_value:
urn = ivr.call.contact_urn
msg_dict = {}
if step.rule_value[0:4] == 'http':
msg_dict['recording_url'] = step.rule_value
user = User.objects.get(pk=ivr.call.created_by_id)
msg = Msg.create_incoming(channel, (urn.scheme, urn.path), step.rule_value,
user=user, topup=ivr.topup, status=HANDLED,
msg_type=IVR, date=ivr.created_on, org=org, **msg_dict)
step.add_message(msg)
开发者ID:AbrahamKiggundu,项目名称:rapidpro,代码行数:23,代码来源:0006_auto_20150203_0558.py
示例18: test_resume_in_triggered_session
def test_resume_in_triggered_session(self, mock_report_success, mock_report_failure):
parent_flow = self.get_flow("action_packed")
child_flow = Flow.objects.get(org=self.org, name="Favorite Color")
parent_flow.start([], [self.contact], restart_participants=True)
Msg.create_incoming(self.channel, "tel:+12065552020", "Trey Anastasio")
Msg.create_incoming(self.channel, "tel:+12065552020", "Male")
parent_run, child_run = list(FlowRun.objects.order_by("created_on"))
child_contact = Contact.objects.get(name="Oprah Winfrey")
self.assertEqual(parent_run.flow, parent_flow)
self.assertEqual(parent_run.contact, self.contact)
self.assertEqual(child_run.flow, child_flow)
self.assertEqual(child_run.contact, child_contact)
# check that the run which triggered the child run isn't part of its session, but is part of the trigger
session = resumes.reconstruct_session(child_run)
self.assertEqual(len(session["runs"]), 1)
self.assertEqual(session["runs"][0]["flow"]["uuid"], str(child_flow.uuid))
self.assertEqual(session["contact"]["uuid"], str(child_contact.uuid))
self.assertEqual(session["trigger"]["type"], "flow_action")
self.assertNotIn("results", session)
self.assertNotIn("events", session)
with override_settings(FLOW_SERVER_TRIAL="always"):
# resume child run with a message
Msg.create_incoming(self.channel, "tel:+12065552121", "red")
child_run.refresh_from_db()
# and it should now be complete
self.assertIsNotNone(child_run.exited_on)
self.assertEqual(mock_report_success.call_count, 1)
self.assertEqual(mock_report_failure.call_count, 0)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:36,代码来源:tests.py
示例19: post
def post(self, request, *args, **kwargs):
from temba.msgs.models import Msg
request_body = request.body
request_method = request.method
request_path = request.get_full_path()
def log_channel(channel, description, event, is_error=False):
return ChannelLog.objects.create(
channel_id=channel.pk,
is_error=is_error,
request=event.request_body,
response=event.response_body,
url=event.url,
method=event.method,
response_status=event.status_code,
description=description,
)
action = kwargs["action"].lower()
request_uuid = kwargs["uuid"]
data = json.loads(force_text(request_body))
is_ussd = self.is_ussd_message(data)
channel_data = data.get("channel_data", {})
channel_types = ("JNU", "JN")
# look up the channel
channel = Channel.objects.filter(uuid=request_uuid, is_active=True, channel_type__in=channel_types).first()
if not channel:
return HttpResponse("Channel not found for id: %s" % request_uuid, status=400)
auth = request.META.get("HTTP_AUTHORIZATION", "").split(" ")
secret = channel.config.get(Channel.CONFIG_SECRET)
if secret is not None and (len(auth) != 2 or auth[0] != "Token" or auth[1] != secret):
return JsonResponse(dict(error="Incorrect authentication token"), status=401)
# Junebug is sending an event
if action == "event":
expected_keys = ["event_type", "message_id", "timestamp"]
if not set(expected_keys).issubset(data.keys()):
status = 400
response_body = "Missing one of %s in request parameters." % (", ".join(expected_keys))
event = HttpEvent(request_method, request_path, request_body, status, response_body)
log_channel(channel, "Failed to handle event.", event, is_error=True)
return HttpResponse(response_body, status=status)
message_id = data["message_id"]
event_type = data["event_type"]
# look up the message
message = Msg.objects.filter(channel=channel, external_id=message_id).select_related("channel")
if not message:
status = 400
response_body = "Message with external id of '%s' not found" % (message_id,)
event = HttpEvent(request_method, request_path, request_body, status, response_body)
log_channel(channel, "Failed to handle %s event_type." % (event_type), event)
return HttpResponse(response_body, status=status)
if event_type == "submitted":
for message_obj in message:
message_obj.status_sent()
if event_type == "delivery_succeeded":
for message_obj in message:
message_obj.status_delivered()
elif event_type in ["delivery_failed", "rejected"]:
for message_obj in message:
message_obj.status_fail()
response_body = {"status": self.ACK, "message_ids": [message_obj.pk for message_obj in message]}
event = HttpEvent(request_method, request_path, request_body, 200, json.dumps(response_body))
log_channel(channel, "Handled %s event_type." % (event_type), event)
# Let Junebug know we're happy
return JsonResponse(response_body)
# Handle an inbound message
elif action == "inbound":
expected_keys = [
"channel_data",
"from",
"channel_id",
"timestamp",
"content",
"to",
"reply_to",
"message_id",
]
if not set(expected_keys).issubset(data.keys()):
status = 400
response_body = "Missing one of %s in request parameters." % (", ".join(expected_keys))
event = HttpEvent(request_method, request_path, request_body, status, response_body)
log_channel(channel, "Failed to handle message.", event, is_error=True)
return HttpResponse(response_body, status=status)
if is_ussd:
status = {"close": USSDSession.INTERRUPTED, "new": USSDSession.TRIGGERED}.get(
channel_data.get("session_event"), USSDSession.IN_PROGRESS
)
#.........这里部分代码省略.........
开发者ID:teehamaral,项目名称:rapidpro,代码行数:101,代码来源:handlers.py
示例20: test_catch_all_trigger
def test_catch_all_trigger(self):
self.login(self.admin)
catch_all_trigger = Trigger.get_triggers_of_type(self.org, CATCH_ALL_TRIGGER).first()
flow = self.create_flow()
contact = self.create_contact("Ali", "250788739305")
# make our first message echo back the original message
action_set = ActionSet.objects.get(uuid=flow.entry_uuid)
actions = action_set.as_json()['actions']
actions[0]['msg'] = 'Echo: @step.value'
action_set.set_actions_dict(actions)
action_set.save()
self.assertFalse(catch_all_trigger)
Msg.create_incoming(self.channel, (TEL_SCHEME, contact.get_urn().path), "Hi")
self.assertEquals(1, Msg.objects.all().count())
self.assertEquals(0, flow.runs.all().count())
trigger_url = reverse("triggers.trigger_catchall")
response = self.client.get(trigger_url)
self.assertEquals(response.status_code, 200)
post_data = dict(flow=flow.pk)
response = self.client.post(trigger_url, post_data)
trigger = Trigger.objects.all().order_by('-pk')[0]
self.assertEquals(trigger.trigger_type, CATCH_ALL_TRIGGER)
self.assertEquals(trigger.flow.pk, flow.pk)
catch_all_trigger = Trigger.get_triggers_of_type(self.org, CATCH_ALL_TRIGGER).first()
self.assertEquals(catch_all_trigger.pk, trigger.pk)
incoming = Msg.create_incoming(self.channel, (TEL_SCHEME, contact.get_urn().path), "Hi")
self.assertEquals(1, flow.runs.all().count())
self.assertEquals(flow.runs.all()[0].contact.pk, contact.pk)
reply = Msg.objects.get(response_to=incoming)
self.assertEquals('Echo: Hi', reply.text)
other_flow = Flow.copy(flow, self.admin)
post_data = dict(flow=other_flow.pk)
response = self.client.post(reverse("triggers.trigger_update", args=[trigger.pk]), post_data)
trigger = Trigger.objects.get(pk=trigger.pk)
self.assertEquals(trigger.flow.pk, other_flow.pk)
# create a bunch of catch all triggers
for i in range(3):
response = self.client.get(trigger_url)
self.assertEquals(response.status_code, 200)
post_data = dict(flow=flow.pk)
response = self.client.post(trigger_url, post_data)
self.assertEquals(i+2, Trigger.objects.all().count())
self.assertEquals(1, Trigger.objects.filter(is_archived=False, trigger_type=CATCH_ALL_TRIGGER).count())
# even unarchiving we only have one acive trigger at a time
triggers = Trigger.objects.filter(trigger_type=CATCH_ALL_TRIGGER, is_archived=True)
active_trigger = Trigger.objects.get(trigger_type=CATCH_ALL_TRIGGER, is_archived=False)
post_data = dict()
post_data['action'] = 'restore'
post_data['objects'] = [_.pk for _ in triggers]
response = self.client.post(reverse("triggers.trigger_archived"), post_data)
self.assertEquals(1, Trigger.objects.filter(is_archived=False, trigger_type=CATCH_ALL_TRIGGER).count())
self.assertFalse(active_trigger.pk == Trigger.objects.filter(is_archived=False, trigger_type=CATCH_ALL_TRIGGER)[0].pk)
开发者ID:austiine04,项目名称:rapidpro,代码行数:71,代码来源:tests.py
注:本文中的temba.msgs.models.Msg类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论