• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python models.Channel类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中temba.channels.models.Channel的典型用法代码示例。如果您正苦于以下问题:Python Channel类的具体用法?Python Channel怎么用?Python Channel使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Channel类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

    def setUp(self):
        self.clear_cache()

        self.user = self.create_user("tito")
        self.admin = self.create_user("Administrator")
        self.org = Org.objects.create(name="Nyaruka Ltd.", timezone="Africa/Kigali",
                                      created_by=self.user, modified_by=self.user)
        self.org.initialize()

        self.org.administrators.add(self.admin)
        self.admin.set_org(self.org)
        self.org.administrators.add(self.user)
        self.user.set_org(self.org)

        self.tel_mtn = Channel.create(self.org, self.user, 'RW', 'A', name="MTN", address="+250780000000",
                                      secret="12345", gcm_id="123")
        self.tel_tigo = Channel.create(self.org, self.user, 'RW', 'A', name="Tigo", address="+250720000000",
                                       secret="23456", gcm_id="234")
        self.tel_bulk = Channel.create(self.org, self.user, 'RW', 'NX', name="Nexmo", parent=self.tel_tigo)
        self.twitter = Channel.create(self.org, self.user, None, 'TT', name="Twitter", address="billy_bob")

        # for generating tuples of scheme, path and channel
        def generate_tel_mtn(num):
            return TEL_SCHEME, "+25078%07d" % (num + 1), self.tel_mtn

        def generate_tel_tigo(num):
            return TEL_SCHEME, "+25072%07d" % (num + 1), self.tel_tigo

        def generate_twitter(num):
            return TWITTER_SCHEME, "tweep_%d" % (num + 1), self.twitter

        self.urn_generators = (generate_tel_mtn, generate_tel_tigo, generate_twitter)

        self.field_nick = ContactField.get_or_create(self.org, self.admin, 'nick', 'Nickname', show_in_table=True, value_type=Value.TYPE_TEXT)
        self.field_age = ContactField.get_or_create(self.org, self.admin, 'age', 'Age', show_in_table=True, value_type=Value.TYPE_DECIMAL)
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:35,代码来源:perf_tests.py


示例2: __init__

    def __init__(self, user, *args, **kwargs):
        flows = Flow.objects.filter(
            org=user.get_org(), is_active=True, is_archived=False, flow_type__in=[Flow.TYPE_USSD]
        )
        super().__init__(user, flows, *args, **kwargs)

        self.fields["channel"].queryset = Channel.get_by_category(self.user.get_org(), ChannelType.Category.USSD)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:7,代码来源:views.py


示例3: setUp

    def setUp(self):
        super().setUp()

        flow = self.get_flow("ussd_example")
        self.starcode = "*113#"

        self.channel.delete()
        self.channel = Channel.create(
            self.org,
            self.user,
            "RW",
            "JNU",
            None,
            "1234",
            config=dict(username="junebug-user", password="junebug-pass", send_url="http://example.org/"),
            uuid="00000000-0000-0000-0000-000000001234",
            role=Channel.ROLE_USSD,
        )

        self.trigger, _ = Trigger.objects.get_or_create(
            channel=self.channel,
            keyword=self.starcode,
            flow=flow,
            created_by=self.user,
            modified_by=self.user,
            org=self.org,
            trigger_type=Trigger.TYPE_USSD_PULL,
        )
开发者ID:mxabierto,项目名称:rapidpro,代码行数:28,代码来源:tests.py


示例4: setUp

    def setUp(self):

        # if we are super verbose, turn on debug for sql queries
        if self.get_verbosity() > 2:
            settings.DEBUG = True

        self.clear_cache()

        self.superuser = User.objects.create_superuser(username="super", email="[email protected]", password="super")

        # create different user types
        self.non_org_user = self.create_user("NonOrg")
        self.user = self.create_user("User")
        self.editor = self.create_user("Editor")
        self.admin = self.create_user("Administrator")
        self.surveyor = self.create_user("Surveyor")

        # setup admin boundaries for Rwanda
        self.country = AdminBoundary.objects.create(osm_id='171496', name='Rwanda', level=0)
        self.state1 = AdminBoundary.objects.create(osm_id='1708283', name='Kigali City', level=1, parent=self.country)
        self.state2 = AdminBoundary.objects.create(osm_id='171591', name='Eastern Province', level=1, parent=self.country)
        self.district1 = AdminBoundary.objects.create(osm_id='1711131', name='Gatsibo', level=2, parent=self.state2)
        self.district2 = AdminBoundary.objects.create(osm_id='1711163', name='Kayônza', level=2, parent=self.state2)
        self.district3 = AdminBoundary.objects.create(osm_id='3963734', name='Nyarugenge', level=2, parent=self.state1)
        self.district4 = AdminBoundary.objects.create(osm_id='1711142', name='Rwamagana', level=2, parent=self.state2)
        self.ward1 = AdminBoundary.objects.create(osm_id='171113181', name='Kageyo', level=3, parent=self.district1)
        self.ward2 = AdminBoundary.objects.create(osm_id='171116381', name='Kabare', level=3, parent=self.district2)
        self.ward3 = AdminBoundary.objects.create(osm_id='171114281', name='Bukure', level=3, parent=self.district4)

        self.org = Org.objects.create(name="Temba", timezone="Africa/Kigali", country=self.country, brand=settings.DEFAULT_BRAND,
                                      created_by=self.user, modified_by=self.user)

        self.org.initialize(topup_size=1000)

        # add users to the org
        self.user.set_org(self.org)
        self.org.viewers.add(self.user)

        self.editor.set_org(self.org)
        self.org.editors.add(self.editor)

        self.admin.set_org(self.org)
        self.org.administrators.add(self.admin)

        self.surveyor.set_org(self.org)
        self.org.surveyors.add(self.surveyor)

        self.superuser.set_org(self.org)

        # welcome topup with 1000 credits
        self.welcome_topup = self.org.topups.all()[0]

        # a single Android channel
        self.channel = Channel.create(self.org, self.user, 'RW', 'A', name="Test Channel", address="+250785551212",
                                      device="Nexus 5X", secret="12345", gcm_id="123")

        # reset our simulation to False
        Contact.set_simulation(False)
开发者ID:eHealthAfrica,项目名称:rapidpro,代码行数:58,代码来源:tests.py


示例5: register_active_event

    def register_active_event(self):
        """
        Helper function for registering active events on a throttled channel
        """
        r = get_redis_connection()

        channel_key = Channel.redis_active_events_key(self.channel_id)

        r.incr(channel_key)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:9,代码来源:models.py


示例6: browser

    def browser(self):

        self.driver.set_window_size(1024, 2000)

        # view the homepage
        self.fetch_page()

        # go directly to our signup
        self.fetch_page(reverse("orgs.org_signup"))

        # create account
        self.keys("email", "[email protected]")
        self.keys("password", "SuperSafe1")
        self.keys("first_name", "Joe")
        self.keys("last_name", "Blow")
        self.click("#form-one-submit")
        self.keys("name", "Temba")
        self.click("#form-two-submit")

        # set up our channel for claiming
        channel = Channel.create(
            None,
            get_anonymous_user(),
            "RW",
            "A",
            name="Test Channel",
            address="0785551212",
            claim_code="AAABBBCCC",
            secret="12345",
            gcm_id="123",
        )

        # and claim it
        self.fetch_page(reverse("channels.channel_claim_android"))
        self.keys("#id_claim_code", "AAABBBCCC")
        self.keys("#id_phone_number", "0785551212")
        self.submit(".claim-form")

        # get our freshly claimed channel
        channel = Channel.objects.get(pk=channel.pk)

        # now go to the contacts page
        self.click("#menu-right .icon-contact")
        self.click("#id_import_contacts")

        # upload some contacts
        directory = os.path.dirname(os.path.realpath(__file__))
        self.keys("#csv_file", "%s/../media/test_imports/sample_contacts.xls" % directory)
        self.submit(".smartmin-form")

        # make sure they are there
        self.click("#menu-right .icon-contact")
        self.assertInElements(".value-phone", "+250788382382")
        self.assertInElements(".value-text", "Eric Newcomer")
        self.assertInElements(".value-text", "Sample Contacts")
开发者ID:mxabierto,项目名称:rapidpro,代码行数:55,代码来源:base.py


示例7: __init__

    def __init__(self, user, *args, **kwargs):
        flows = Flow.objects.filter(is_archived=False, org=user.get_org(), flow_type__in=[Flow.FLOW, Flow.VOICE])
        super(FollowTriggerForm, self).__init__(user, flows, *args, **kwargs)

        # all channel types that support follow triggers
        types_for_follow = set()
        for scheme in URN_SCHEMES_SUPPORTING_FOLLOW:
            types_for_follow.update(Channel.types_for_scheme(scheme))

        self.fields['channel'].queryset = Channel.objects.filter(is_active=True, org=self.user.get_org(),
                                                                 channel_type__in=types_for_follow)
开发者ID:TextoCMR,项目名称:TexTo.cm,代码行数:11,代码来源:views.py


示例8: form_valid

    def form_valid(self, form):
        org = self.request.user.get_org()

        if not org:  # pragma: no cover
            raise Exception(_("No org for this user, cannot claim"))

        data = form.cleaned_data
        self.object = Channel.add_config_external_channel(
            org, self.request.user, data["country"], data["number"], "CT", {Channel.CONFIG_API_KEY: data["api_key"]}
        )

        return super(AuthenticatedExternalClaimView, self).form_valid(form)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:12,代码来源:views.py


示例9: test_channels

    def test_channels(self):
        url = reverse('api.v2.channels')

        self.assertEndpointAccess(url)

        # create channel for other org
        Channel.create(self.org2, self.admin2, None, 'TT', name="Twitter Channel",
                       address="nyaruka", role="SR", scheme='twitter')

        # no filtering
        with self.assertNumQueries(NUM_BASE_REQUEST_QUERIES + 2):
            response = self.fetchJSON(url)

        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.json['next'], None)
        self.assertResultsByUUID(response, [self.twitter, self.channel])
        self.assertEqual(response.json['results'][1], {
            'uuid': self.channel.uuid,
            'name': "Test Channel",
            'address': "+250785551212",
            'country': "RW",
            'device': {
                'name': "Nexus 5X",
                'network_type': None,
                'power_level': -1,
                'power_source': None,
                'power_status': None
            },
            'last_seen': format_datetime(self.channel.last_seen),
            'created_on': format_datetime(self.channel.created_on)
        })

        # filter by UUID
        response = self.fetchJSON(url, 'uuid=%s' % self.twitter.uuid)
        self.assertResultsByUUID(response, [self.twitter])

        # filter by address
        response = self.fetchJSON(url, 'address=billy_bob')
        self.assertResultsByUUID(response, [self.twitter])
开发者ID:JediKoder,项目名称:rapidpro,代码行数:39,代码来源:test_v2.py


示例10: update_status

    def update_status(self, status: str, duration: float, channel_type: str):
        """
        Updates our status from a provide call status string

        """
        if not status:
            raise ValueError(f"IVR Call status must be defined, got: '{status}'")

        previous_status = self.status

        from temba.flows.models import FlowRun, ActionLog

        ivr_protocol = Channel.get_type_from_code(channel_type).ivr_protocol
        if ivr_protocol == ChannelType.IVRProtocol.IVR_PROTOCOL_TWIML:
            self.status = self.derive_ivr_status_twiml(status, previous_status)
        elif ivr_protocol == ChannelType.IVRProtocol.IVR_PROTOCOL_NCCO:
            self.status = self.derive_ivr_status_nexmo(status, previous_status)
        else:  # pragma: no cover
            raise ValueError(f"Unhandled IVR protocol: {ivr_protocol}")

        # if we are in progress, mark our start time
        if self.status == self.IN_PROGRESS and previous_status != self.IN_PROGRESS:
            self.started_on = timezone.now()

        # if we are done, mark our ended time
        if self.status in ChannelSession.DONE:
            self.ended_on = timezone.now()

            if self.contact.is_test:
                run = FlowRun.objects.filter(connection=self)
                if run:
                    ActionLog.create(run[0], _("Call ended."))

        if self.status in ChannelSession.RETRY_CALL and previous_status not in ChannelSession.RETRY_CALL:
            flow = self.get_flow()
            backoff_minutes = flow.metadata.get("ivr_retry", IVRCall.RETRY_BACKOFF_MINUTES)

            self.schedule_call_retry(backoff_minutes)

        if duration is not None:
            self.duration = duration

        # if we are moving into IN_PROGRESS, make sure our runs have proper expirations
        if previous_status in (self.PENDING, self.QUEUED, self.WIRED) and self.status in (
            self.IN_PROGRESS,
            self.RINGING,
        ):
            runs = FlowRun.objects.filter(connection=self, is_active=True, expires_on=None)
            for run in runs:
                run.update_expiration()
开发者ID:mxabierto,项目名称:rapidpro,代码行数:50,代码来源:models.py


示例11: setUp

    def setUp(self):
        self.clear_cache()

        self.superuser = User.objects.create_superuser(username="super", email="[email protected]", password="super")

        # create different user types
        self.non_org_user = self.create_user("NonOrg")
        self.user = self.create_user("User")
        self.editor = self.create_user("Editor")
        self.admin = self.create_user("Administrator")
        self.surveyor = self.create_user("Surveyor")

        # setup admin boundaries for Rwanda
        self.country = AdminBoundary.objects.create(osm_id='171496', name='Rwanda', level=0)
        self.state1 = AdminBoundary.objects.create(osm_id='1708283', name='Kigali City', level=1, parent=self.country)
        self.state2 = AdminBoundary.objects.create(osm_id='171591', name='Eastern Province', level=1, parent=self.country)
        self.district1 = AdminBoundary.objects.create(osm_id='1711131', name='Gatsibo', level=2, parent=self.state2)
        self.district2 = AdminBoundary.objects.create(osm_id='1711163', name='Kayonza', level=2, parent=self.state2)
        self.district3 = AdminBoundary.objects.create(osm_id='60485579', name='Kigali', level=2, parent=self.state1)
        self.district4 = AdminBoundary.objects.create(osm_id='1711142', name='Rwamagana', level=2, parent=self.state2)

        self.org = Org.objects.create(name="Temba", timezone="Africa/Kigali", country=self.country,
                                      created_by=self.user, modified_by=self.user)
        self.org.initialize()

        # add users to the org
        self.user.set_org(self.org)
        self.org.viewers.add(self.user)

        self.editor.set_org(self.org)
        self.org.editors.add(self.editor)

        self.admin.set_org(self.org)
        self.org.administrators.add(self.admin)

        self.surveyor.set_org(self.org)
        self.org.surveyors.add(self.surveyor)

        self.superuser.set_org(self.org)

        # welcome topup with 1000 credits
        self.welcome_topup = self.org.topups.all()[0]

        # a single Android channel
        self.channel = Channel.create(self.org, self.user, 'RW', 'A', name="Test Channel", address="+250785551212",
                                      secret="12345", gcm_id="123")

        # reset our simulation to False
        Contact.set_simulation(False)
开发者ID:thierhost,项目名称:rapidpro,代码行数:49,代码来源:tests.py


示例12: setUp

    def setUp(self):
        self.clear_cache()

        self.user = self.create_user("tito")
        self.admin = self.create_user("Administrator")
        self.org = Org.objects.create(
            name="Nyaruka Ltd.", timezone="Africa/Kigali", created_by=self.user, modified_by=self.user
        )
        self.org.initialize()

        self.org.administrators.add(self.admin)
        self.admin.set_org(self.org)
        self.org.administrators.add(self.user)
        self.user.set_org(self.org)

        self.tel_mtn = Channel.create(
            self.org, self.user, "RW", "A", name="MTN", address="+250780000000", secret="12345", gcm_id="123"
        )
        self.tel_tigo = Channel.create(
            self.org, self.user, "RW", "A", name="Tigo", address="+250720000000", secret="23456", gcm_id="234"
        )
        self.tel_bulk = Channel.create(self.org, self.user, "RW", "NX", name="Nexmo", parent=self.tel_tigo)
        self.twitter = Channel.create(self.org, self.user, None, "TT", name="Twitter", address="billy_bob")

        # for generating tuples of scheme, path and channel
        generate_tel_mtn = lambda num: (TEL_SCHEME, "+25078%07d" % (num + 1), self.tel_mtn)
        generate_tel_tigo = lambda num: (TEL_SCHEME, "+25072%07d" % (num + 1), self.tel_tigo)
        generate_twitter = lambda num: (TWITTER_SCHEME, "tweep_%d" % (num + 1), self.twitter)
        self.urn_generators = (generate_tel_mtn, generate_tel_tigo, generate_twitter)

        self.field_nick = ContactField.get_or_create(
            self.org, self.admin, "nick", "Nickname", show_in_table=True, value_type=TEXT
        )
        self.field_age = ContactField.get_or_create(
            self.org, self.admin, "age", "Age", show_in_table=True, value_type=DECIMAL
        )
开发者ID:reyrodrigues,项目名称:EU-SMS,代码行数:36,代码来源:perf_tests.py


示例13: browser

    def browser(self):

        self.driver.set_window_size(1024, 2000)

        # view the homepage
        self.fetch_page()

        # go directly to our signup
        self.fetch_page(reverse('orgs.org_signup'))

        # create account
        self.keys('email', '[email protected]')
        self.keys('password', 'SuperSafe1')
        self.keys('first_name', 'Joe')
        self.keys('last_name', 'Blow')
        self.click('#form-one-submit')
        self.keys('name', 'Temba')
        self.click('#form-two-submit')

        # set up our channel for claiming
        anon = User.objects.get(pk=settings.ANONYMOUS_USER_ID)
        channel = Channel.create(None, anon, 'RW', 'A', name="Test Channel", address="0785551212",
                                 claim_code='AAABBBCCC', secret="12345", gcm_id="123")

        # and claim it
        self.fetch_page(reverse('channels.channel_claim_android'))
        self.keys('#id_claim_code', 'AAABBBCCC')
        self.keys('#id_phone_number', '0785551212')
        self.submit('.claim-form')

        # get our freshly claimed channel
        channel = Channel.objects.get(pk=channel.pk)

        # now go to the contacts page
        self.click('#menu-right .icon-contact')
        self.click('#id_import_contacts')

        # upload some contacts
        directory = os.path.dirname(os.path.realpath(__file__))
        self.keys('#csv_file', '%s/../media/test_imports/sample_contacts.xls' % directory)
        self.submit('.smartmin-form')

        # make sure they are there
        self.click('#menu-right .icon-contact')
        self.assertInElements('.value-phone', '+250788382382')
        self.assertInElements('.value-text', 'Eric Newcomer')
        self.assertInElements('.value-text', 'Sample Contacts')
开发者ID:MOconcepts,项目名称:rapidpro,代码行数:47,代码来源:tests.py


示例14: unregister_active_event

    def unregister_active_event(self):
        """
        Helper function for unregistering active events on a throttled channel
        """
        r = get_redis_connection()

        channel_key = Channel.redis_active_events_key(self.channel_id)
        # are we on a throttled channel?
        current_tracked_events = r.get(channel_key)

        if current_tracked_events:

            value = int(current_tracked_events)
            if value <= 0:  # pragma: no cover
                raise ValueError("When this happens I'll quit my job and start producing moonshine/poitin/brlja !")

            r.decr(channel_key)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:17,代码来源:models.py


示例15: form_valid

    def form_valid(self, form):
        org = self.request.user.get_org()
        data = form.cleaned_data
        config = {Channel.CONFIG_USERNAME: data["username"], Channel.CONFIG_PASSWORD: data["password"]}

        self.object = Channel.create(
            org=org,
            user=self.request.user,
            country=data["country"],
            channel_type="MT",
            name=data["service_id"],
            address=data["service_id"],
            config=config,
            schemes=[TEL_SCHEME],
        )

        return super().form_valid(form)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:17,代码来源:views.py


示例16: setUp

    def setUp(self):
        super(APITest, self).setUp()

        self.joe = self.create_contact("Joe Blow", "0788123123")
        self.frank = self.create_contact("Frank", twitter="franky")
        self.test_contact = Contact.get_test_contact(self.user)

        self.twitter = Channel.create(self.org, self.user, None, 'TT', name="Twitter Channel",
                                      address="billy_bob", role="SR", scheme='twitter')

        self.create_secondary_org()
        self.hans = self.create_contact("Hans Gruber", "+4921551511", org=self.org2)

        self.maxDiff = None

        # this is needed to prevent REST framework from rolling back transaction created around each unit test
        connection.settings_dict['ATOMIC_REQUESTS'] = False
开发者ID:churcho,项目名称:rapidpro,代码行数:17,代码来源:test_v2.py


示例17: handle_simulate

    def handle_simulate(self, num_runs, org_id, flow_name, seed):
        """
        Prepares to resume simulating flow activity on an existing database
        """
        self._log("Resuming flow activity simulation on existing database...\n")

        orgs = Org.objects.order_by("id")
        if org_id:
            orgs = orgs.filter(id=org_id)

        if not orgs:
            raise CommandError("Can't simulate activity on an empty database")

        self.configure_random(len(orgs), seed)

        # in real life Nexmo messages are throttled, but that's not necessary for this simulation
        Channel.get_type_from_code("NX").max_tps = None

        inputs_by_flow_name = {f["name"]: f["templates"] for f in FLOWS}

        self._log("Preparing existing orgs... ")

        for org in orgs:
            flows = org.flows.order_by("id").exclude(is_system=True)

            if flow_name:
                flows = flows.filter(name=flow_name)
            flows = list(flows)

            for flow in flows:
                flow.input_templates = inputs_by_flow_name[flow.name]

            org.cache = {
                "users": list(org.get_org_users().order_by("id")),
                "channels": list(org.channels.order_by("id")),
                "groups": list(ContactGroup.user_groups.filter(org=org).order_by("id")),
                "flows": flows,
                "contacts": list(org.org_contacts.values_list("id", flat=True)),  # only ids to save memory
                "activity": None,
            }

        self._log(self.style.SUCCESS("OK") + "\n")

        self.simulate_activity(orgs, num_runs)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:44,代码来源:test_db.py


示例18: task_enqueue_call_events

def task_enqueue_call_events():
    from .models import IVRCall

    r = get_redis_connection()

    pending_call_events = (
        IVRCall.objects.filter(status=IVRCall.PENDING)
        .filter(direction=IVRCall.OUTGOING, is_active=True)
        .filter(channel__is_active=True)
        .filter(modified_on__gt=timezone.now() - timedelta(days=IVRCall.IGNORE_PENDING_CALLS_OLDER_THAN_DAYS))
        .select_related("channel")
        .order_by("modified_on")[:1000]
    )

    for call in pending_call_events:

        # are we handling a call on a throttled channel ?
        max_concurrent_events = call.channel.config.get(Channel.CONFIG_MAX_CONCURRENT_EVENTS)

        if max_concurrent_events:
            channel_key = Channel.redis_active_events_key(call.channel_id)
            current_active_events = r.get(channel_key)

            # skip this call if are on the limit
            if current_active_events and int(current_active_events) >= max_concurrent_events:
                continue
            else:
                # we can start a new call event
                call.register_active_event()

        # enqueue the call
        ChannelLog.log_ivr_interaction(call, "Call queued internally", HttpEvent(method="INTERNAL", url=None))

        call.status = IVRCall.QUEUED
        call.save(update_fields=("status",))

        start_call_task.apply_async(kwargs={"call_pk": call.id}, queue=Queue.HANDLER)
开发者ID:teehamaral,项目名称:rapidpro,代码行数:37,代码来源:tasks.py


示例19: send

    def send(self, channel, msg, text):
        connection = None

        # if the channel config has specified and override hostname use that, otherwise use settings
        callback_domain = channel.config.get(Channel.CONFIG_RP_HOSTNAME_OVERRIDE, None)
        if not callback_domain:
            callback_domain = channel.callback_domain

        # the event url Junebug will relay events to
        event_url = "http://%s%s" % (
            callback_domain,
            reverse("handlers.junebug_handler", args=["event", channel.uuid]),
        )

        is_ussd = Channel.get_type_from_code(channel.channel_type).category == ChannelType.Category.USSD

        # build our payload
        payload = {"event_url": event_url, "content": text}

        secret = channel.config.get(Channel.CONFIG_SECRET)
        if secret is not None:
            payload["event_auth_token"] = secret

        connection = USSDSession.objects.get_with_status_only(msg.connection_id)

        # make sure USSD responses are only valid for a short window
        response_expiration = timezone.now() - timedelta(seconds=180)
        external_id = None
        if msg.response_to_id and msg.created_on > response_expiration:
            external_id = Msg.objects.values_list("external_id", flat=True).filter(pk=msg.response_to_id).first()
        # NOTE: Only one of `to` or `reply_to` may be specified, use external_id if we have it.
        if external_id:
            payload["reply_to"] = external_id
        else:
            payload["to"] = msg.urn_path
        payload["channel_data"] = {"continue_session": connection and not connection.should_end or False}

        log_url = channel.config[Channel.CONFIG_SEND_URL]
        start = time.time()

        event = HttpEvent("POST", log_url, json.dumps(payload))
        headers = http_headers(extra={"Content-Type": "application/json"})

        try:
            response = requests.post(
                channel.config[Channel.CONFIG_SEND_URL],
                verify=True,
                json=payload,
                timeout=15,
                headers=headers,
                auth=(channel.config[Channel.CONFIG_USERNAME], channel.config[Channel.CONFIG_PASSWORD]),
            )

            event.status_code = response.status_code
            event.response_body = response.text

        except Exception as e:
            raise SendException(str(e), event=event, start=start)

        if not (200 <= response.status_code < 300):
            raise SendException(
                "Received a non 200 response %d from Junebug" % response.status_code, event=event, start=start
            )

        data = response.json()

        if is_ussd and connection and connection.should_end:
            connection.close()

        try:
            message_id = data["result"]["message_id"]
            Channel.success(channel, msg, WIRED, start, event=event, external_id=message_id)
        except KeyError as e:
            raise SendException(
                "Unable to read external message_id: %r" % (e,),
                event=HttpEvent(
                    "POST", log_url, request_body=json.dumps(json.dumps(payload)), response_body=json.dumps(data)
                ),
                start=start,
            )
开发者ID:teehamaral,项目名称:rapidpro,代码行数:80,代码来源:type.py


示例20: test_new_conversation_trigger

    def test_new_conversation_trigger(self):
        self.login(self.admin)
        flow = self.create_flow()
        flow2 = self.create_flow()

        # see if we list new conversation triggers on the trigger page
        create_trigger_url = reverse('triggers.trigger_create', args=[])
        response = self.client.get(create_trigger_url)
        self.assertNotContains(response, "conversation is started")

        # create a facebook channel
        fb_channel = Channel.add_facebook_channel(self.org, self.user, 'Temba', 1001, 'fb_token')

        # should now be able to create one
        response = self.client.get(create_trigger_url)
        self.assertContains(response, "conversation is started")

        # go create it
        with patch('requests.post') as mock_post:
            mock_post.return_value = MockResponse(200, '{"message": "Success"}')

            response = self.client.post(reverse('triggers.trigger_new_conversation', args=[]),
                                        data=dict(channel=fb_channel.id, flow=flow.id))
            self.assertEqual(response.status_code, 200)
            self.assertEqual(mock_post.call_count, 1)

        # check that it is right
        trigger = Trigger.objects.get(trigger_type=Trigger.TYPE_NEW_CONVERSATION, is_active=True, is_archived=False)
        self.assertEqual(trigger.channel, fb_channel)
        self.assertEqual(trigger.flow, flow)

        # try to create another one, fails as we already have a trigger for that channel
        response = self.client.post(reverse('triggers.trigger_new_conversation', args=[]), data=dict(channel=fb_channel.id, flow=flow2.id))
        self.assertEqual(response.status_code, 200)
        self.assertFormError(response, 'form', 'channel', 'Trigger with this Channel already exists.')

        # ok, trigger a facebook event
        data = json.loads("""{
        "object": "page",
          "entry": [
            {
              "id": "620308107999975",
              "time": 1467841778778,
              "messaging": [
                {
                  "sender":{
                    "id":"1001"
                  },
                  "recipient":{
                    "id":"%s"
                  },
                  "timestamp":1458692752478,
                  "postback":{
                    "payload":"get_started"
                  }
                }
              ]
            }
          ]
        }
        """ % fb_channel.address)

        with patch('requests.get') as mock_get:
            mock_get.return_value = MockResponse(200, '{"first_name": "Ben","last_name": "Haggerty"}')

            callback_url = reverse('handlers.facebook_handler', args=[fb_channel.uuid])
            response = self.client.post(callback_url, json.dumps(data), content_type="application/json")
            self.assertEqual(response.status_code, 200)

            # should have a new flow run for Ben
            contact = Contact.from_urn(self.org, 'facebook:1001')
            self.assertTrue(contact.name, "Ben Haggerty")

            run = FlowRun.objects.get(contact=contact)
            self.assertEqual(run.flow, flow)

        # archive our trigger, should unregister our callback
        with patch('requests.post') as mock_post:
            mock_post.return_value = MockResponse(200, '{"message": "Success"}')

            Trigger.apply_action_archive(self.admin, Trigger.objects.filter(pk=trigger.pk))
            self.assertEqual(response.status_code, 200)
            self.assertEqual(mock_post.call_count, 1)

            trigger.refresh_from_db()
            self.assertTrue(trigger.is_archived)
开发者ID:ewheeler,项目名称:rapidpro,代码行数:86,代码来源:tests.py



注:本文中的temba.channels.models.Channel类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python models.Contact类代码示例发布时间:2022-05-27
下一篇:
Python telnetlib.Telnet类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap