• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python models.Broadcast类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中temba.msgs.models.Broadcast的典型用法代码示例。如果您正苦于以下问题:Python Broadcast类的具体用法?Python Broadcast怎么用?Python Broadcast使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Broadcast类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: send_spam

def send_spam(user_id, contact_id):  # pragma: no cover
    """
    Processses a single incoming message through our queue.
    """
    from django.contrib.auth.models import User
    from temba.contacts.models import Contact, TEL_SCHEME
    from temba.msgs.models import Broadcast

    contact = Contact.all().get(pk=contact_id)
    user = User.objects.get(pk=user_id)
    channel = contact.org.get_send_channel(TEL_SCHEME)

    if not channel:  # pragma: no cover
        print("Sorry, no channel to be all spammy with")
        return

    long_text = (
        "Test Message #%d. The path of the righteous man is beset on all sides by the iniquities of the "
        "selfish and the tyranny of evil men. Blessed is your face."
    )

    # only trigger sync on the last one
    for idx in range(10):
        broadcast = Broadcast.create(contact.org, user, long_text % (idx + 1), contacts=[contact])
        broadcast.send(trigger_send=(idx == 149))
开发者ID:teehamaral,项目名称:rapidpro,代码行数:25,代码来源:tasks.py


示例2: restore_object

    def restore_object(self, attrs, instance=None):
        """
        Create a new broadcast to send out
        """
        if instance: # pragma: no cover
            raise ValidationError("Invalid operation")

        user = self.user
        org = self.org

        if 'urn' in attrs and attrs['urn']:
            urns = attrs.get('urn', [])
        else:
            urns = attrs.get('phone', [])

        channel = attrs['channel']
        contacts = list()
        for urn in urns:
            # treat each urn as a separate contact
            contacts.append(Contact.get_or_create(user, channel.org, urns=[urn], channel=channel))

        # add any contacts specified by uuids
        uuid_contacts = attrs.get('contact', [])
        for contact in uuid_contacts:
            contacts.append(contact)

        # create the broadcast
        broadcast = Broadcast.create(org, user, attrs['text'], recipients=contacts)

        # send it
        broadcast.send()
        return broadcast
开发者ID:AxisOfEval,项目名称:rapidpro,代码行数:32,代码来源:serializers.py


示例3: save

    def save(self):
        """
        Create a new broadcast to send out
        """
        contact_urns = []
        for urn in self.validated_data.get("urns", []):
            # create contacts for URNs if necessary
            __, contact_urn = Contact.get_or_create(self.context["org"], urn, user=self.context["user"])
            contact_urns.append(contact_urn)

        text, base_language = self.validated_data["text"]

        # create the broadcast
        broadcast = Broadcast.create(
            self.context["org"],
            self.context["user"],
            text=text,
            base_language=base_language,
            groups=self.validated_data.get("groups", []),
            contacts=self.validated_data.get("contacts", []),
            urns=contact_urns,
            channel=self.validated_data.get("channel"),
        )

        # send in task
        on_transaction_commit(lambda: send_broadcast_task.delay(broadcast.id))

        return broadcast
开发者ID:teehamaral,项目名称:rapidpro,代码行数:28,代码来源:serializers.py


示例4: test_calculating_next_fire

    def test_calculating_next_fire(self):

        self.org.timezone = 'US/Eastern'
        self.org.save()

        tz = self.org.get_tzinfo()
        eleven_pm_est = datetime(2013, 1, 3, hour=23, minute=0, second=0, microsecond=0).replace(tzinfo=tz)

        # Test date is 10am on a Thursday, Jan 3rd
        schedule = self.create_schedule('D', start_date=eleven_pm_est)
        schedule.save()

        Broadcast.create(self.org, self.admin, 'Message', [], schedule=schedule)
        schedule = Schedule.objects.get(pk=schedule.pk)

        # when is the next fire once our first one passes
        sched_date = datetime(2013, 1, 3, hour=23, minute=30, second=0, microsecond=0).replace(tzinfo=tz)
        schedule.update_schedule(sched_date)
        self.assertEquals('2013-01-04 23:00:00-05:00', unicode(schedule.next_fire))
开发者ID:Maximus325,项目名称:rapidpro,代码行数:19,代码来源:tests.py


示例5: test_update_near_day_boundary

    def test_update_near_day_boundary(self):

        self.org.timezone = 'US/Eastern'
        self.org.save()
        tz = pytz.timezone(self.org.timezone)

        sched = self.create_schedule('D')
        Broadcast.create(self.org, self.admin, 'Message', [], schedule=sched)
        sched = Schedule.objects.get(pk=sched.pk)

        update_url = reverse('schedules.schedule_update', args=[sched.pk])

        self.login(self.admin)

        # way off into the future
        start_date = datetime(2050, 1, 3, 23, 0, 0, 0)
        start_date = tz.localize(start_date)
        start_date = pytz.utc.normalize(start_date.astimezone(pytz.utc))

        post_data = dict()
        post_data['repeat_period'] = 'D'
        post_data['start'] = 'later'
        post_data['start_datetime_value'] = "%d" % time.mktime(start_date.timetuple())
        self.client.post(update_url, post_data)
        sched = Schedule.objects.get(pk=sched.pk)

        # 11pm in NY should be 4am UTC the next day
        self.assertEquals('2050-01-04 04:00:00+00:00', unicode(sched.next_fire))

        # a time in the past
        start_date = datetime(2010, 1, 3, 23, 45, 0, 0)
        start_date = tz.localize(start_date)
        start_date = pytz.utc.normalize(start_date.astimezone(pytz.utc))

        post_data = dict()
        post_data['repeat_period'] = 'D'
        post_data['start'] = 'later'
        post_data['start_datetime_value'] = "%d" % time.mktime(start_date.timetuple())
        self.client.post(update_url, post_data)
        sched = Schedule.objects.get(pk=sched.pk)

        # next fire should fall at the right hour and minute
        self.assertIn('04:45:00+00:00', unicode(sched.next_fire))
开发者ID:CliffordOwino,项目名称:rapidpro,代码行数:43,代码来源:tests.py


示例6: test_update_near_day_boundary

    def test_update_near_day_boundary(self):

        self.org.timezone = pytz.timezone("US/Eastern")
        self.org.save()
        tz = self.org.timezone

        sched = self.create_schedule("D")
        Broadcast.create(self.org, self.admin, "Message", schedule=sched, contacts=[self.joe])
        sched = Schedule.objects.get(pk=sched.pk)

        update_url = reverse("schedules.schedule_update", args=[sched.pk])

        self.login(self.admin)

        # way off into the future
        start_date = datetime(2050, 1, 3, 23, 0, 0, 0)
        start_date = tz.localize(start_date)
        start_date = pytz.utc.normalize(start_date.astimezone(pytz.utc))

        post_data = dict()
        post_data["repeat_period"] = "D"
        post_data["start"] = "later"
        post_data["start_datetime_value"] = "%d" % time.mktime(start_date.timetuple())
        self.client.post(update_url, post_data)
        sched = Schedule.objects.get(pk=sched.pk)

        # 11pm in NY should be 4am UTC the next day
        self.assertEqual("2050-01-04 04:00:00+00:00", str(sched.next_fire))

        # a time in the past
        start_date = datetime(2010, 1, 3, 23, 45, 0, 0)
        start_date = tz.localize(start_date)
        start_date = pytz.utc.normalize(start_date.astimezone(pytz.utc))

        post_data = dict()
        post_data["repeat_period"] = "D"
        post_data["start"] = "later"
        post_data["start_datetime_value"] = "%d" % time.mktime(start_date.timetuple())
        self.client.post(update_url, post_data)
        sched = Schedule.objects.get(pk=sched.pk)

        # next fire should fall at the right hour and minute
        self.assertIn("04:45:00+00:00", str(sched.next_fire))
开发者ID:mxabierto,项目名称:rapidpro,代码行数:43,代码来源:tests.py


示例7: test_calculating_next_fire

    def test_calculating_next_fire(self):

        self.org.timezone = pytz.timezone("US/Eastern")
        self.org.save()

        tz = self.org.timezone
        eleven_fifteen_est = tz.localize(datetime(2013, 1, 3, hour=23, minute=15, second=0, microsecond=0))

        # Test date is 10:15am on a Thursday, Jan 3rd
        schedule = self.create_schedule("D", start_date=eleven_fifteen_est)
        schedule.save()

        Broadcast.create(self.org, self.admin, "Message", schedule=schedule, contacts=[self.joe])
        schedule = Schedule.objects.get(pk=schedule.pk)

        # when is the next fire once our first one passes
        sched_date = tz.localize(datetime(2013, 1, 3, hour=23, minute=30, second=0, microsecond=0))

        schedule.update_schedule(sched_date)
        self.assertEqual("2013-01-04 23:15:00-05:00", str(schedule.next_fire))
开发者ID:mxabierto,项目名称:rapidpro,代码行数:20,代码来源:tests.py


示例8: test_broadcasts

    def test_broadcasts(self):
        url = reverse('api.v2.broadcasts')

        self.assertEndpointAccess(url)

        reporters = self.create_group("Reporters", [self.joe, self.frank])

        bcast1 = Broadcast.create(self.org, self.admin, "Hello 1", [self.frank.get_urn('twitter')])
        bcast2 = Broadcast.create(self.org, self.admin, "Hello 2", [self.joe])
        bcast3 = Broadcast.create(self.org, self.admin, "Hello 3", [self.frank], status='S')
        bcast4 = Broadcast.create(self.org, self.admin, "Hello 4",
                                  [self.frank.get_urn('twitter'), self.joe, reporters], status='F')
        Broadcast.create(self.org2, self.admin2, "Different org...", [self.hans])

        # no filtering
        with self.assertNumQueries(NUM_BASE_REQUEST_QUERIES + 5):
            response = self.fetchJSON(url)

        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.json['next'], None)
        self.assertResultsById(response, [bcast4, bcast3, bcast2, bcast1])
        self.assertEqual(response.json['results'][0], {
            'id': bcast4.pk,
            'urns': ["twitter:franky"],
            'contacts': [{'uuid': self.joe.uuid, 'name': self.joe.name}],
            'groups': [{'uuid': reporters.uuid, 'name': reporters.name}],
            'text': "Hello 4",
            'created_on': format_datetime(bcast4.created_on)
        })

        # filter by id
        response = self.fetchJSON(url, 'id=%d' % bcast3.pk)
        self.assertResultsById(response, [bcast3])

        # filter by after
        response = self.fetchJSON(url, 'after=%s' % format_datetime(bcast3.created_on))
        self.assertResultsById(response, [bcast4, bcast3])

        # filter by before
        response = self.fetchJSON(url, 'before=%s' % format_datetime(bcast2.created_on))
        self.assertResultsById(response, [bcast2, bcast1])

        with AnonymousOrg(self.org):
            # URNs shouldn't be included
            response = self.fetchJSON(url, 'id=%d' % bcast1.pk)
            self.assertEqual(response.json['results'][0]['urns'], None)
开发者ID:JediKoder,项目名称:rapidpro,代码行数:46,代码来源:test_v2.py


示例9: restore_object

    def restore_object(self, attrs, instance=None):
        """
        Create a new broadcast to send out
        """
        from temba.msgs.tasks import send_broadcast_task

        if instance:  # pragma: no cover
            raise ValidationError("Invalid operation")

        recipients = attrs.get('contacts') + attrs.get('groups')

        for urn in attrs.get('urns'):
            # create contacts for URNs if necessary
            contact = Contact.get_or_create(self.org, self.user, urns=[urn])
            contact_urn = contact.urn_objects[urn]
            recipients.append(contact_urn)

        # create the broadcast
        broadcast = Broadcast.create(self.org, self.user, attrs['text'],
                                     recipients=recipients, channel=attrs['channel'])

        # send in task
        send_broadcast_task.delay(broadcast.id)
        return broadcast
开发者ID:mdheyab,项目名称:rapidpro,代码行数:24,代码来源:serializers.py


示例10: _create_broadcast

 def _create_broadcast(self, text, recipients):
     """
     Creates the a single broadcast to the given recipients (which can groups, contacts, URNs)
     """
     return Broadcast.create(self.org, self.user, text, recipients)
开发者ID:Ilhasoft,项目名称:rapidpro,代码行数:5,代码来源:perf_tests.py


示例11: test_messages


#.........这里部分代码省略.........

        # add message in other org
        self.create_msg(direction='I', msg_type='I', text="Guten tag!", contact=self.hans, org=self.org2)

        # label some of the messages, this will change our modified on as well for our `incoming` view
        label = Label.get_or_create(self.org, self.admin, "Spam")

        # we do this in two calls so that we can predict ordering later
        label.toggle_label([frank_msg3], add=True)
        label.toggle_label([frank_msg1], add=True)
        label.toggle_label([joe_msg3], add=True)

        frank_msg1.refresh_from_db(fields=('modified_on',))
        joe_msg3.refresh_from_db(fields=('modified_on',))

        # filter by inbox
        with self.assertNumQueries(NUM_BASE_REQUEST_QUERIES + 7):
            response = self.fetchJSON(url, 'folder=INBOX')

        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.json['next'], None)
        self.assertResultsById(response, [frank_msg1])
        self.assertMsgEqual(response.json['results'][0], frank_msg1, msg_type='inbox', msg_status='queued', msg_visibility='visible')

        # filter by incoming, should get deleted messages too
        with self.assertNumQueries(NUM_BASE_REQUEST_QUERIES + 7):
            response = self.fetchJSON(url, 'folder=incoming')

        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.json['next'], None)
        self.assertResultsById(response, [joe_msg3, frank_msg1, frank_msg3, deleted_msg, joe_msg1])
        self.assertMsgEqual(response.json['results'][0], joe_msg3, msg_type='flow', msg_status='queued', msg_visibility='visible')

        # filter by folder (flow)
        response = self.fetchJSON(url, 'folder=flows')
        self.assertResultsById(response, [joe_msg3, joe_msg1])

        # filter by folder (archived)
        response = self.fetchJSON(url, 'folder=archived')
        self.assertResultsById(response, [frank_msg3])

        # filter by folder (outbox)
        response = self.fetchJSON(url, 'folder=outbox')
        self.assertResultsById(response, [joe_msg2])

        # filter by folder (sent)
        response = self.fetchJSON(url, 'folder=sent')
        self.assertResultsById(response, [joe_msg4, frank_msg2])

        # filter by invalid view
        response = self.fetchJSON(url, 'folder=invalid')
        self.assertResultsById(response, [])

        # filter by id
        response = self.fetchJSON(url, 'id=%d' % joe_msg3.pk)
        self.assertResultsById(response, [joe_msg3])

        # filter by contact
        response = self.fetchJSON(url, 'contact=%s' % self.joe.uuid)
        self.assertResultsById(response, [joe_msg4, joe_msg3, joe_msg2, joe_msg1])

        # filter by invalid contact
        response = self.fetchJSON(url, 'contact=invalid')
        self.assertResultsById(response, [])

        # filter by label name
        response = self.fetchJSON(url, 'label=Spam')
        self.assertResultsById(response, [joe_msg3, frank_msg1])

        # filter by label UUID
        response = self.fetchJSON(url, 'label=%s' % label.uuid)
        self.assertResultsById(response, [joe_msg3, frank_msg1])

        # filter by invalid label
        response = self.fetchJSON(url, 'label=invalid')
        self.assertResultsById(response, [])

        # filter by before (inclusive)
        response = self.fetchJSON(url, 'folder=incoming&before=%s' % format_datetime(frank_msg1.modified_on))
        self.assertResultsById(response, [frank_msg1, frank_msg3, deleted_msg, joe_msg1])

        # filter by after (inclusive)
        response = self.fetchJSON(url, 'folder=incoming&after=%s' % format_datetime(frank_msg1.modified_on))
        self.assertResultsById(response, [joe_msg3, frank_msg1])

        # filter by broadcast
        broadcast = Broadcast.create(self.org, self.user, "A beautiful broadcast", [self.joe, self.frank])
        broadcast.send()
        response = self.fetchJSON(url, 'broadcast=%s' % broadcast.pk)

        expected = {m.pk for m in broadcast.msgs.all()}
        results = {m['id'] for m in response.json['results']}
        self.assertEqual(expected, results)

        # can't filter by more than one of contact, folder, label or broadcast together
        for query in ('contact=%s&label=Spam' % self.joe.uuid, 'label=Spam&folder=inbox',
                      'broadcast=12345&folder=inbox', 'broadcast=12345&label=Spam'):
            response = self.fetchJSON(url, query)
            self.assertResponseError(response, None,
                                     "You may only specify one of the contact, folder, label, broadcast parameters")
开发者ID:churcho,项目名称:rapidpro,代码行数:101,代码来源:test_v2.py



注:本文中的temba.msgs.models.Broadcast类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python models.Msg类代码示例发布时间:2022-05-27
下一篇:
Python models.Flow类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap