本文整理汇总了Python中temba.channels.models.Channel类的典型用法代码示例。如果您正苦于以下问题:Python Channel类的具体用法?Python Channel怎么用?Python Channel使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Channel类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
self.clear_cache()
self.user = self.create_user("tito")
self.admin = self.create_user("Administrator")
self.org = Org.objects.create(name="Nyaruka Ltd.", timezone="Africa/Kigali",
created_by=self.user, modified_by=self.user)
self.org.initialize()
self.org.administrators.add(self.admin)
self.admin.set_org(self.org)
self.org.administrators.add(self.user)
self.user.set_org(self.org)
self.tel_mtn = Channel.create(self.org, self.user, 'RW', 'A', name="MTN", address="+250780000000",
secret="12345", gcm_id="123")
self.tel_tigo = Channel.create(self.org, self.user, 'RW', 'A', name="Tigo", address="+250720000000",
secret="23456", gcm_id="234")
self.tel_bulk = Channel.create(self.org, self.user, 'RW', 'NX', name="Nexmo", parent=self.tel_tigo)
self.twitter = Channel.create(self.org, self.user, None, 'TT', name="Twitter", address="billy_bob")
# for generating tuples of scheme, path and channel
def generate_tel_mtn(num):
return TEL_SCHEME, "+25078%07d" % (num + 1), self.tel_mtn
def generate_tel_tigo(num):
return TEL_SCHEME, "+25072%07d" % (num + 1), self.tel_tigo
def generate_twitter(num):
return TWITTER_SCHEME, "tweep_%d" % (num + 1), self.twitter
self.urn_generators = (generate_tel_mtn, generate_tel_tigo, generate_twitter)
self.field_nick = ContactField.get_or_create(self.org, self.admin, 'nick', 'Nickname', show_in_table=True, value_type=Value.TYPE_TEXT)
self.field_age = ContactField.get_or_create(self.org, self.admin, 'age', 'Age', show_in_table=True, value_type=Value.TYPE_DECIMAL)
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:35,代码来源:perf_tests.py
示例2: __init__
def __init__(self, user, *args, **kwargs):
flows = Flow.objects.filter(
org=user.get_org(), is_active=True, is_archived=False, flow_type__in=[Flow.TYPE_USSD]
)
super().__init__(user, flows, *args, **kwargs)
self.fields["channel"].queryset = Channel.get_by_category(self.user.get_org(), ChannelType.Category.USSD)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:views.py
示例3: setUp
def setUp(self):
super().setUp()
flow = self.get_flow("ussd_example")
self.starcode = "*113#"
self.channel.delete()
self.channel = Channel.create(
self.org,
self.user,
"RW",
"JNU",
None,
"1234",
config=dict(username="junebug-user", password="junebug-pass", send_url="http://example.org/"),
uuid="00000000-0000-0000-0000-000000001234",
role=Channel.ROLE_USSD,
)
self.trigger, _ = Trigger.objects.get_or_create(
channel=self.channel,
keyword=self.starcode,
flow=flow,
created_by=self.user,
modified_by=self.user,
org=self.org,
trigger_type=Trigger.TYPE_USSD_PULL,
)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:28,代码来源:tests.py
示例4: setUp
def setUp(self):
# if we are super verbose, turn on debug for sql queries
if self.get_verbosity() > 2:
settings.DEBUG = True
self.clear_cache()
self.superuser = User.objects.create_superuser(username="super", email="[email protected]", password="super")
# create different user types
self.non_org_user = self.create_user("NonOrg")
self.user = self.create_user("User")
self.editor = self.create_user("Editor")
self.admin = self.create_user("Administrator")
self.surveyor = self.create_user("Surveyor")
# setup admin boundaries for Rwanda
self.country = AdminBoundary.objects.create(osm_id='171496', name='Rwanda', level=0)
self.state1 = AdminBoundary.objects.create(osm_id='1708283', name='Kigali City', level=1, parent=self.country)
self.state2 = AdminBoundary.objects.create(osm_id='171591', name='Eastern Province', level=1, parent=self.country)
self.district1 = AdminBoundary.objects.create(osm_id='1711131', name='Gatsibo', level=2, parent=self.state2)
self.district2 = AdminBoundary.objects.create(osm_id='1711163', name='Kayônza', level=2, parent=self.state2)
self.district3 = AdminBoundary.objects.create(osm_id='3963734', name='Nyarugenge', level=2, parent=self.state1)
self.district4 = AdminBoundary.objects.create(osm_id='1711142', name='Rwamagana', level=2, parent=self.state2)
self.ward1 = AdminBoundary.objects.create(osm_id='171113181', name='Kageyo', level=3, parent=self.district1)
self.ward2 = AdminBoundary.objects.create(osm_id='171116381', name='Kabare', level=3, parent=self.district2)
self.ward3 = AdminBoundary.objects.create(osm_id='171114281', name='Bukure', level=3, parent=self.district4)
self.org = Org.objects.create(name="Temba", timezone="Africa/Kigali", country=self.country, brand=settings.DEFAULT_BRAND,
created_by=self.user, modified_by=self.user)
self.org.initialize(topup_size=1000)
# add users to the org
self.user.set_org(self.org)
self.org.viewers.add(self.user)
self.editor.set_org(self.org)
self.org.editors.add(self.editor)
self.admin.set_org(self.org)
self.org.administrators.add(self.admin)
self.surveyor.set_org(self.org)
self.org.surveyors.add(self.surveyor)
self.superuser.set_org(self.org)
# welcome topup with 1000 credits
self.welcome_topup = self.org.topups.all()[0]
# a single Android channel
self.channel = Channel.create(self.org, self.user, 'RW', 'A', name="Test Channel", address="+250785551212",
device="Nexus 5X", secret="12345", gcm_id="123")
# reset our simulation to False
Contact.set_simulation(False)
开发者ID:eHealthAfrica,项目名称:rapidpro,代码行数:58,代码来源:tests.py
示例5: register_active_event
def register_active_event(self):
"""
Helper function for registering active events on a throttled channel
"""
r = get_redis_connection()
channel_key = Channel.redis_active_events_key(self.channel_id)
r.incr(channel_key)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:9,代码来源:models.py
示例6: browser
def browser(self):
self.driver.set_window_size(1024, 2000)
# view the homepage
self.fetch_page()
# go directly to our signup
self.fetch_page(reverse("orgs.org_signup"))
# create account
self.keys("email", "[email protected]")
self.keys("password", "SuperSafe1")
self.keys("first_name", "Joe")
self.keys("last_name", "Blow")
self.click("#form-one-submit")
self.keys("name", "Temba")
self.click("#form-two-submit")
# set up our channel for claiming
channel = Channel.create(
None,
get_anonymous_user(),
"RW",
"A",
name="Test Channel",
address="0785551212",
claim_code="AAABBBCCC",
secret="12345",
gcm_id="123",
)
# and claim it
self.fetch_page(reverse("channels.channel_claim_android"))
self.keys("#id_claim_code", "AAABBBCCC")
self.keys("#id_phone_number", "0785551212")
self.submit(".claim-form")
# get our freshly claimed channel
channel = Channel.objects.get(pk=channel.pk)
# now go to the contacts page
self.click("#menu-right .icon-contact")
self.click("#id_import_contacts")
# upload some contacts
directory = os.path.dirname(os.path.realpath(__file__))
self.keys("#csv_file", "%s/../media/test_imports/sample_contacts.xls" % directory)
self.submit(".smartmin-form")
# make sure they are there
self.click("#menu-right .icon-contact")
self.assertInElements(".value-phone", "+250788382382")
self.assertInElements(".value-text", "Eric Newcomer")
self.assertInElements(".value-text", "Sample Contacts")
开发者ID:mxabierto,项目名称:rapidpro,代码行数:55,代码来源:base.py
示例7: __init__
def __init__(self, user, *args, **kwargs):
flows = Flow.objects.filter(is_archived=False, org=user.get_org(), flow_type__in=[Flow.FLOW, Flow.VOICE])
super(FollowTriggerForm, self).__init__(user, flows, *args, **kwargs)
# all channel types that support follow triggers
types_for_follow = set()
for scheme in URN_SCHEMES_SUPPORTING_FOLLOW:
types_for_follow.update(Channel.types_for_scheme(scheme))
self.fields['channel'].queryset = Channel.objects.filter(is_active=True, org=self.user.get_org(),
channel_type__in=types_for_follow)
开发者ID:TextoCMR,项目名称:TexTo.cm,代码行数:11,代码来源:views.py
示例8: form_valid
def form_valid(self, form):
org = self.request.user.get_org()
if not org: # pragma: no cover
raise Exception(_("No org for this user, cannot claim"))
data = form.cleaned_data
self.object = Channel.add_config_external_channel(
org, self.request.user, data["country"], data["number"], "CT", {Channel.CONFIG_API_KEY: data["api_key"]}
)
return super(AuthenticatedExternalClaimView, self).form_valid(form)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:12,代码来源:views.py
示例9: test_channels
def test_channels(self):
url = reverse('api.v2.channels')
self.assertEndpointAccess(url)
# create channel for other org
Channel.create(self.org2, self.admin2, None, 'TT', name="Twitter Channel",
address="nyaruka", role="SR", scheme='twitter')
# no filtering
with self.assertNumQueries(NUM_BASE_REQUEST_QUERIES + 2):
response = self.fetchJSON(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json['next'], None)
self.assertResultsByUUID(response, [self.twitter, self.channel])
self.assertEqual(response.json['results'][1], {
'uuid': self.channel.uuid,
'name': "Test Channel",
'address': "+250785551212",
'country': "RW",
'device': {
'name': "Nexus 5X",
'network_type': None,
'power_level': -1,
'power_source': None,
'power_status': None
},
'last_seen': format_datetime(self.channel.last_seen),
'created_on': format_datetime(self.channel.created_on)
})
# filter by UUID
response = self.fetchJSON(url, 'uuid=%s' % self.twitter.uuid)
self.assertResultsByUUID(response, [self.twitter])
# filter by address
response = self.fetchJSON(url, 'address=billy_bob')
self.assertResultsByUUID(response, [self.twitter])
开发者ID:JediKoder,项目名称:rapidpro,代码行数:39,代码来源:test_v2.py
示例10: update_status
def update_status(self, status: str, duration: float, channel_type: str):
"""
Updates our status from a provide call status string
"""
if not status:
raise ValueError(f"IVR Call status must be defined, got: '{status}'")
previous_status = self.status
from temba.flows.models import FlowRun, ActionLog
ivr_protocol = Channel.get_type_from_code(channel_type).ivr_protocol
if ivr_protocol == ChannelType.IVRProtocol.IVR_PROTOCOL_TWIML:
self.status = self.derive_ivr_status_twiml(status, previous_status)
elif ivr_protocol == ChannelType.IVRProtocol.IVR_PROTOCOL_NCCO:
self.status = self.derive_ivr_status_nexmo(status, previous_status)
else: # pragma: no cover
raise ValueError(f"Unhandled IVR protocol: {ivr_protocol}")
# if we are in progress, mark our start time
if self.status == self.IN_PROGRESS and previous_status != self.IN_PROGRESS:
self.started_on = timezone.now()
# if we are done, mark our ended time
if self.status in ChannelSession.DONE:
self.ended_on = timezone.now()
if self.contact.is_test:
run = FlowRun.objects.filter(connection=self)
if run:
ActionLog.create(run[0], _("Call ended."))
if self.status in ChannelSession.RETRY_CALL and previous_status not in ChannelSession.RETRY_CALL:
flow = self.get_flow()
backoff_minutes = flow.metadata.get("ivr_retry", IVRCall.RETRY_BACKOFF_MINUTES)
self.schedule_call_retry(backoff_minutes)
if duration is not None:
self.duration = duration
# if we are moving into IN_PROGRESS, make sure our runs have proper expirations
if previous_status in (self.PENDING, self.QUEUED, self.WIRED) and self.status in (
self.IN_PROGRESS,
self.RINGING,
):
runs = FlowRun.objects.filter(connection=self, is_active=True, expires_on=None)
for run in runs:
run.update_expiration()
开发者ID:mxabierto,项目名称:rapidpro,代码行数:50,代码来源:models.py
示例11: setUp
def setUp(self):
self.clear_cache()
self.superuser = User.objects.create_superuser(username="super", email="[email protected]", password="super")
# create different user types
self.non_org_user = self.create_user("NonOrg")
self.user = self.create_user("User")
self.editor = self.create_user("Editor")
self.admin = self.create_user("Administrator")
self.surveyor = self.create_user("Surveyor")
# setup admin boundaries for Rwanda
self.country = AdminBoundary.objects.create(osm_id='171496', name='Rwanda', level=0)
self.state1 = AdminBoundary.objects.create(osm_id='1708283', name='Kigali City', level=1, parent=self.country)
self.state2 = AdminBoundary.objects.create(osm_id='171591', name='Eastern Province', level=1, parent=self.country)
self.district1 = AdminBoundary.objects.create(osm_id='1711131', name='Gatsibo', level=2, parent=self.state2)
self.district2 = AdminBoundary.objects.create(osm_id='1711163', name='Kayonza', level=2, parent=self.state2)
self.district3 = AdminBoundary.objects.create(osm_id='60485579', name='Kigali', level=2, parent=self.state1)
self.district4 = AdminBoundary.objects.create(osm_id='1711142', name='Rwamagana', level=2, parent=self.state2)
self.org = Org.objects.create(name="Temba", timezone="Africa/Kigali", country=self.country,
created_by=self.user, modified_by=self.user)
self.org.initialize()
# add users to the org
self.user.set_org(self.org)
self.org.viewers.add(self.user)
self.editor.set_org(self.org)
self.org.editors.add(self.editor)
self.admin.set_org(self.org)
self.org.administrators.add(self.admin)
self.surveyor.set_org(self.org)
self.org.surveyors.add(self.surveyor)
self.superuser.set_org(self.org)
# welcome topup with 1000 credits
self.welcome_topup = self.org.topups.all()[0]
# a single Android channel
self.channel = Channel.create(self.org, self.user, 'RW', 'A', name="Test Channel", address="+250785551212",
secret="12345", gcm_id="123")
# reset our simulation to False
Contact.set_simulation(False)
开发者ID:thierhost,项目名称:rapidpro,代码行数:49,代码来源:tests.py
示例12: setUp
def setUp(self):
self.clear_cache()
self.user = self.create_user("tito")
self.admin = self.create_user("Administrator")
self.org = Org.objects.create(
name="Nyaruka Ltd.", timezone="Africa/Kigali", created_by=self.user, modified_by=self.user
)
self.org.initialize()
self.org.administrators.add(self.admin)
self.admin.set_org(self.org)
self.org.administrators.add(self.user)
self.user.set_org(self.org)
self.tel_mtn = Channel.create(
self.org, self.user, "RW", "A", name="MTN", address="+250780000000", secret="12345", gcm_id="123"
)
self.tel_tigo = Channel.create(
self.org, self.user, "RW", "A", name="Tigo", address="+250720000000", secret="23456", gcm_id="234"
)
self.tel_bulk = Channel.create(self.org, self.user, "RW", "NX", name="Nexmo", parent=self.tel_tigo)
self.twitter = Channel.create(self.org, self.user, None, "TT", name="Twitter", address="billy_bob")
# for generating tuples of scheme, path and channel
generate_tel_mtn = lambda num: (TEL_SCHEME, "+25078%07d" % (num + 1), self.tel_mtn)
generate_tel_tigo = lambda num: (TEL_SCHEME, "+25072%07d" % (num + 1), self.tel_tigo)
generate_twitter = lambda num: (TWITTER_SCHEME, "tweep_%d" % (num + 1), self.twitter)
self.urn_generators = (generate_tel_mtn, generate_tel_tigo, generate_twitter)
self.field_nick = ContactField.get_or_create(
self.org, self.admin, "nick", "Nickname", show_in_table=True, value_type=TEXT
)
self.field_age = ContactField.get_or_create(
self.org, self.admin, "age", "Age", show_in_table=True, value_type=DECIMAL
)
开发者ID:reyrodrigues,项目名称:EU-SMS,代码行数:36,代码来源:perf_tests.py
示例13: browser
def browser(self):
self.driver.set_window_size(1024, 2000)
# view the homepage
self.fetch_page()
# go directly to our signup
self.fetch_page(reverse('orgs.org_signup'))
# create account
self.keys('email', '[email protected]')
self.keys('password', 'SuperSafe1')
self.keys('first_name', 'Joe')
self.keys('last_name', 'Blow')
self.click('#form-one-submit')
self.keys('name', 'Temba')
self.click('#form-two-submit')
# set up our channel for claiming
anon = User.objects.get(pk=settings.ANONYMOUS_USER_ID)
channel = Channel.create(None, anon, 'RW', 'A', name="Test Channel", address="0785551212",
claim_code='AAABBBCCC', secret="12345", gcm_id="123")
# and claim it
self.fetch_page(reverse('channels.channel_claim_android'))
self.keys('#id_claim_code', 'AAABBBCCC')
self.keys('#id_phone_number', '0785551212')
self.submit('.claim-form')
# get our freshly claimed channel
channel = Channel.objects.get(pk=channel.pk)
# now go to the contacts page
self.click('#menu-right .icon-contact')
self.click('#id_import_contacts')
# upload some contacts
directory = os.path.dirname(os.path.realpath(__file__))
self.keys('#csv_file', '%s/../media/test_imports/sample_contacts.xls' % directory)
self.submit('.smartmin-form')
# make sure they are there
self.click('#menu-right .icon-contact')
self.assertInElements('.value-phone', '+250788382382')
self.assertInElements('.value-text', 'Eric Newcomer')
self.assertInElements('.value-text', 'Sample Contacts')
开发者ID:MOconcepts,项目名称:rapidpro,代码行数:47,代码来源:tests.py
示例14: unregister_active_event
def unregister_active_event(self):
"""
Helper function for unregistering active events on a throttled channel
"""
r = get_redis_connection()
channel_key = Channel.redis_active_events_key(self.channel_id)
# are we on a throttled channel?
current_tracked_events = r.get(channel_key)
if current_tracked_events:
value = int(current_tracked_events)
if value <= 0: # pragma: no cover
raise ValueError("When this happens I'll quit my job and start producing moonshine/poitin/brlja !")
r.decr(channel_key)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:17,代码来源:models.py
示例15: form_valid
def form_valid(self, form):
org = self.request.user.get_org()
data = form.cleaned_data
config = {Channel.CONFIG_USERNAME: data["username"], Channel.CONFIG_PASSWORD: data["password"]}
self.object = Channel.create(
org=org,
user=self.request.user,
country=data["country"],
channel_type="MT",
name=data["service_id"],
address=data["service_id"],
config=config,
schemes=[TEL_SCHEME],
)
return super().form_valid(form)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:17,代码来源:views.py
示例16: setUp
def setUp(self):
super(APITest, self).setUp()
self.joe = self.create_contact("Joe Blow", "0788123123")
self.frank = self.create_contact("Frank", twitter="franky")
self.test_contact = Contact.get_test_contact(self.user)
self.twitter = Channel.create(self.org, self.user, None, 'TT', name="Twitter Channel",
address="billy_bob", role="SR", scheme='twitter')
self.create_secondary_org()
self.hans = self.create_contact("Hans Gruber", "+4921551511", org=self.org2)
self.maxDiff = None
# this is needed to prevent REST framework from rolling back transaction created around each unit test
connection.settings_dict['ATOMIC_REQUESTS'] = False
开发者ID:churcho,项目名称:rapidpro,代码行数:17,代码来源:test_v2.py
示例17: handle_simulate
def handle_simulate(self, num_runs, org_id, flow_name, seed):
"""
Prepares to resume simulating flow activity on an existing database
"""
self._log("Resuming flow activity simulation on existing database...\n")
orgs = Org.objects.order_by("id")
if org_id:
orgs = orgs.filter(id=org_id)
if not orgs:
raise CommandError("Can't simulate activity on an empty database")
self.configure_random(len(orgs), seed)
# in real life Nexmo messages are throttled, but that's not necessary for this simulation
Channel.get_type_from_code("NX").max_tps = None
inputs_by_flow_name = {f["name"]: f["templates"] for f in FLOWS}
self._log("Preparing existing orgs... ")
for org in orgs:
flows = org.flows.order_by("id").exclude(is_system=True)
if flow_name:
flows = flows.filter(name=flow_name)
flows = list(flows)
for flow in flows:
flow.input_templates = inputs_by_flow_name[flow.name]
org.cache = {
"users": list(org.get_org_users().order_by("id")),
"channels": list(org.channels.order_by("id")),
"groups": list(ContactGroup.user_groups.filter(org=org).order_by("id")),
"flows": flows,
"contacts": list(org.org_contacts.values_list("id", flat=True)), # only ids to save memory
"activity": None,
}
self._log(self.style.SUCCESS("OK") + "\n")
self.simulate_activity(orgs, num_runs)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:44,代码来源:test_db.py
示例18: task_enqueue_call_events
def task_enqueue_call_events():
from .models import IVRCall
r = get_redis_connection()
pending_call_events = (
IVRCall.objects.filter(status=IVRCall.PENDING)
.filter(direction=IVRCall.OUTGOING, is_active=True)
.filter(channel__is_active=True)
.filter(modified_on__gt=timezone.now() - timedelta(days=IVRCall.IGNORE_PENDING_CALLS_OLDER_THAN_DAYS))
.select_related("channel")
.order_by("modified_on")[:1000]
)
for call in pending_call_events:
# are we handling a call on a throttled channel ?
max_concurrent_events = call.channel.config.get(Channel.CONFIG_MAX_CONCURRENT_EVENTS)
if max_concurrent_events:
channel_key = Channel.redis_active_events_key(call.channel_id)
current_active_events = r.get(channel_key)
# skip this call if are on the limit
if current_active_events and int(current_active_events) >= max_concurrent_events:
continue
else:
# we can start a new call event
call.register_active_event()
# enqueue the call
ChannelLog.log_ivr_interaction(call, "Call queued internally", HttpEvent(method="INTERNAL", url=None))
call.status = IVRCall.QUEUED
call.save(update_fields=("status",))
start_call_task.apply_async(kwargs={"call_pk": call.id}, queue=Queue.HANDLER)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:37,代码来源:tasks.py
示例19: send
def send(self, channel, msg, text):
connection = None
# if the channel config has specified and override hostname use that, otherwise use settings
callback_domain = channel.config.get(Channel.CONFIG_RP_HOSTNAME_OVERRIDE, None)
if not callback_domain:
callback_domain = channel.callback_domain
# the event url Junebug will relay events to
event_url = "http://%s%s" % (
callback_domain,
reverse("handlers.junebug_handler", args=["event", channel.uuid]),
)
is_ussd = Channel.get_type_from_code(channel.channel_type).category == ChannelType.Category.USSD
# build our payload
payload = {"event_url": event_url, "content": text}
secret = channel.config.get(Channel.CONFIG_SECRET)
if secret is not None:
payload["event_auth_token"] = secret
connection = USSDSession.objects.get_with_status_only(msg.connection_id)
# make sure USSD responses are only valid for a short window
response_expiration = timezone.now() - timedelta(seconds=180)
external_id = None
if msg.response_to_id and msg.created_on > response_expiration:
external_id = Msg.objects.values_list("external_id", flat=True).filter(pk=msg.response_to_id).first()
# NOTE: Only one of `to` or `reply_to` may be specified, use external_id if we have it.
if external_id:
payload["reply_to"] = external_id
else:
payload["to"] = msg.urn_path
payload["channel_data"] = {"continue_session": connection and not connection.should_end or False}
log_url = channel.config[Channel.CONFIG_SEND_URL]
start = time.time()
event = HttpEvent("POST", log_url, json.dumps(payload))
headers = http_headers(extra={"Content-Type": "application/json"})
try:
response = requests.post(
channel.config[Channel.CONFIG_SEND_URL],
verify=True,
json=payload,
timeout=15,
headers=headers,
auth=(channel.config[Channel.CONFIG_USERNAME], channel.config[Channel.CONFIG_PASSWORD]),
)
event.status_code = response.status_code
event.response_body = response.text
except Exception as e:
raise SendException(str(e), event=event, start=start)
if not (200 <= response.status_code < 300):
raise SendException(
"Received a non 200 response %d from Junebug" % response.status_code, event=event, start=start
)
data = response.json()
if is_ussd and connection and connection.should_end:
connection.close()
try:
message_id = data["result"]["message_id"]
Channel.success(channel, msg, WIRED, start, event=event, external_id=message_id)
except KeyError as e:
raise SendException(
"Unable to read external message_id: %r" % (e,),
event=HttpEvent(
"POST", log_url, request_body=json.dumps(json.dumps(payload)), response_body=json.dumps(data)
),
start=start,
)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:80,代码来源:type.py
示例20: test_new_conversation_trigger
def test_new_conversation_trigger(self):
self.login(self.admin)
flow = self.create_flow()
flow2 = self.create_flow()
# see if we list new conversation triggers on the trigger page
create_trigger_url = reverse('triggers.trigger_create', args=[])
response = self.client.get(create_trigger_url)
self.assertNotContains(response, "conversation is started")
# create a facebook channel
fb_channel = Channel.add_facebook_channel(self.org, self.user, 'Temba', 1001, 'fb_token')
# should now be able to create one
response = self.client.get(create_trigger_url)
self.assertContains(response, "conversation is started")
# go create it
with patch('requests.post') as mock_post:
mock_post.return_value = MockResponse(200, '{"message": "Success"}')
response = self.client.post(reverse('triggers.trigger_new_conversation', args=[]),
data=dict(channel=fb_channel.id, flow=flow.id))
self.assertEqual(response.status_code, 200)
self.assertEqual(mock_post.call_count, 1)
# check that it is right
trigger = Trigger.objects.get(trigger_type=Trigger.TYPE_NEW_CONVERSATION, is_active=True, is_archived=False)
self.assertEqual(trigger.channel, fb_channel)
self.assertEqual(trigger.flow, flow)
# try to create another one, fails as we already have a trigger for that channel
response = self.client.post(reverse('triggers.trigger_new_conversation', args=[]), data=dict(channel=fb_channel.id, flow=flow2.id))
self.assertEqual(response.status_code, 200)
self.assertFormError(response, 'form', 'channel', 'Trigger with this Channel already exists.')
# ok, trigger a facebook event
data = json.loads("""{
"object": "page",
"entry": [
{
"id": "620308107999975",
"time": 1467841778778,
"messaging": [
{
"sender":{
"id":"1001"
},
"recipient":{
"id":"%s"
},
"timestamp":1458692752478,
"postback":{
"payload":"get_started"
}
}
]
}
]
}
""" % fb_channel.address)
with patch('requests.get') as mock_get:
mock_get.return_value = MockResponse(200, '{"first_name": "Ben","last_name": "Haggerty"}')
callback_url = reverse('handlers.facebook_handler', args=[fb_channel.uuid])
response = self.client.post(callback_url, json.dumps(data), content_type="application/json")
self.assertEqual(response.status_code, 200)
# should have a new flow run for Ben
contact = Contact.from_urn(self.org, 'facebook:1001')
self.assertTrue(contact.name, "Ben Haggerty")
run = FlowRun.objects.get(contact=contact)
self.assertEqual(run.flow, flow)
# archive our trigger, should unregister our callback
with patch('requests.post') as mock_post:
mock_post.return_value = MockResponse(200, '{"message": "Success"}')
Trigger.apply_action_archive(self.admin, Trigger.objects.filter(pk=trigger.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(mock_post.call_count, 1)
trigger.refresh_from_db()
self.assertTrue(trigger.is_archived)
开发者ID:ewheeler,项目名称:rapidpro,代码行数:86,代码来源:tests.py
注:本文中的temba.channels.models.Channel类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论