• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python system.unique_resource_id函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test_utils.system.unique_resource_id函数的典型用法代码示例。如果您正苦于以下问题:Python unique_resource_id函数的具体用法?Python unique_resource_id怎么用?Python unique_resource_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了unique_resource_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_bigtable_delete_instance

def test_bigtable_delete_instance():
    # [START bigtable_delete_instance]
    from google.cloud.bigtable import Client

    client = Client(admin=True)
    instance_id_to_delete = "inst-my-" + unique_resource_id("-")
    # [END bigtable_delete_instance]

    cluster_id = "clus-my-" + unique_resource_id("-")

    instance = client.instance(
        instance_id_to_delete, instance_type=PRODUCTION, labels=LABELS
    )
    cluster = instance.cluster(
        cluster_id,
        location_id=ALT_LOCATION_ID,
        serve_nodes=SERVER_NODES,
        default_storage_type=STORAGE_TYPE,
    )
    operation = instance.create(clusters=[cluster])
    # We want to make sure the operation completes.
    operation.result(timeout=100)

    # [START bigtable_delete_instance]
    instance_to_delete = client.instance(instance_id_to_delete)
    instance_to_delete.delete()
    # [END bigtable_delete_instance]

    assert not instance_to_delete.exists()
开发者ID:dhermes,项目名称:gcloud-python,代码行数:29,代码来源:snippets.py


示例2: test_bigtable_create_instance

def test_bigtable_create_instance():
    # [START bigtable_create_prod_instance]
    from google.cloud.bigtable import Client
    from google.cloud.bigtable import enums

    my_instance_id = "inst-my-" + unique_resource_id("-")
    my_cluster_id = "clus-my-" + unique_resource_id("-")
    location_id = "us-central1-f"
    serve_nodes = 3
    storage_type = enums.StorageType.SSD
    production = enums.Instance.Type.PRODUCTION
    labels = {"prod-label": "prod-label"}

    client = Client(admin=True)
    instance = client.instance(my_instance_id, instance_type=production, labels=labels)
    cluster = instance.cluster(
        my_cluster_id,
        location_id=location_id,
        serve_nodes=serve_nodes,
        default_storage_type=storage_type,
    )
    operation = instance.create(clusters=[cluster])
    # We want to make sure the operation completes.
    operation.result(timeout=100)
    # [END bigtable_create_prod_instance]
    assert instance.exists()
    instance.delete()
开发者ID:dhermes,项目名称:gcloud-python,代码行数:27,代码来源:snippets.py


示例3: test_create_bucket

 def test_create_bucket(self):
     new_bucket_name = 'a-new-bucket' + unique_resource_id('-')
     self.assertRaises(exceptions.NotFound,
                       Config.CLIENT.get_bucket, new_bucket_name)
     created = retry_429(Config.CLIENT.create_bucket)(new_bucket_name)
     self.case_buckets_to_delete.append(new_bucket_name)
     self.assertEqual(created.name, new_bucket_name)
开发者ID:longfengpili,项目名称:google-cloud-python,代码行数:7,代码来源:system.py


示例4: test_document_set_merge

def test_document_set_merge(client, cleanup):
    document_id = "for-set" + unique_resource_id("-")
    document = client.document("i-did-it", document_id)
    # Add to clean-up before API request (in case ``set()`` fails).
    cleanup(document)

    # 0. Make sure the document doesn't exist yet
    snapshot = document.get()
    assert not snapshot.exists

    # 1. Use ``create()`` to create the document.
    data1 = {"name": "Sam", "address": {"city": "SF", "state": "CA"}}
    write_result1 = document.create(data1)
    snapshot1 = document.get()
    assert snapshot1.to_dict() == data1
    # Make sure the update is what created the document.
    assert snapshot1.create_time == snapshot1.update_time
    assert snapshot1.update_time == write_result1.update_time

    # 2. Call ``set()`` to merge
    data2 = {"address": {"city": "LA"}}
    write_result2 = document.set(data2, merge=True)
    snapshot2 = document.get()
    assert snapshot2.to_dict() == {
        "name": "Sam",
        "address": {"city": "LA", "state": "CA"},
    }
    # Make sure the create time hasn't changed.
    assert snapshot2.create_time == snapshot1.create_time
    assert snapshot2.update_time == write_result2.update_time
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:30,代码来源:system.py


示例5: test_document_set

def test_document_set(client, cleanup):
    document_id = "for-set" + unique_resource_id("-")
    document = client.document("i-did-it", document_id)
    # Add to clean-up before API request (in case ``set()`` fails).
    cleanup(document)

    # 0. Make sure the document doesn't exist yet
    snapshot = document.get()
    assert snapshot.to_dict() is None

    # 1. Use ``create()`` to create the document.
    data1 = {"foo": 88}
    write_result1 = document.create(data1)
    snapshot1 = document.get()
    assert snapshot1.to_dict() == data1
    # Make sure the update is what created the document.
    assert snapshot1.create_time == snapshot1.update_time
    assert snapshot1.update_time == write_result1.update_time

    # 2. Call ``set()`` again to overwrite.
    data2 = {"bar": None}
    write_result2 = document.set(data2)
    snapshot2 = document.get()
    assert snapshot2.to_dict() == data2
    # Make sure the create time hasn't changed.
    assert snapshot2.create_time == snapshot1.create_time
    assert snapshot2.update_time == write_result2.update_time
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:27,代码来源:system.py


示例6: test_update_type

    def test_update_type(self):
        from google.cloud.bigtable.enums import Instance

        _DEVELOPMENT = Instance.Type.DEVELOPMENT
        _PRODUCTION = Instance.Type.PRODUCTION
        ALT_INSTANCE_ID = "ndif" + unique_resource_id("-")
        instance = Config.CLIENT.instance(
            ALT_INSTANCE_ID, instance_type=_DEVELOPMENT, labels=LABELS
        )
        operation = instance.create(location_id=LOCATION_ID, serve_nodes=None)
        # Make sure this instance gets deleted after the test case.
        self.instances_to_delete.append(instance)

        # We want to make sure the operation completes.
        operation.result(timeout=10)

        # Unset the display_name
        instance.display_name = None

        instance.type_ = _PRODUCTION
        operation = instance.update()

        # We want to make sure the operation completes.
        operation.result(timeout=10)

        # Create a new instance instance and reload it.
        instance_alt = Config.CLIENT.instance(ALT_INSTANCE_ID)
        self.assertIsNone(instance_alt.type_)
        instance_alt.reload()
        self.assertEqual(instance_alt.type_, _PRODUCTION)
开发者ID:dhermes,项目名称:gcloud-python,代码行数:30,代码来源:system.py


示例7: test_create_instance_defaults

    def test_create_instance_defaults(self):
        from google.cloud.bigtable import enums

        ALT_INSTANCE_ID = "ndef" + unique_resource_id("-")
        instance = Config.CLIENT.instance(ALT_INSTANCE_ID, labels=LABELS)
        ALT_CLUSTER_ID = ALT_INSTANCE_ID + "-cluster"
        cluster = instance.cluster(
            ALT_CLUSTER_ID, location_id=LOCATION_ID, serve_nodes=SERVE_NODES
        )
        operation = instance.create(clusters=[cluster])
        # We want to make sure the operation completes.
        operation.result(timeout=10)

        # Make sure this instance gets deleted after the test case.
        self.instances_to_delete.append(instance)

        # Create a new instance instance and make sure it is the same.
        instance_alt = Config.CLIENT.instance(ALT_INSTANCE_ID)
        instance_alt.reload()

        self.assertEqual(instance, instance_alt)
        self.assertEqual(instance.display_name, instance_alt.display_name)
        # Make sure that by default a PRODUCTION type instance is created
        self.assertIsNone(instance.type_)
        self.assertEqual(instance_alt.type_, enums.Instance.Type.PRODUCTION)
开发者ID:dhermes,项目名称:gcloud-python,代码行数:25,代码来源:system.py


示例8: test_watch_collection

def test_watch_collection(client, cleanup):
    db = client
    doc_ref = db.collection(u"users").document(u"alovelace" + unique_resource_id())
    collection_ref = db.collection(u"users")

    # Initial setting
    doc_ref.set({u"first": u"Jane", u"last": u"Doe", u"born": 1900})

    # Setup listener
    def on_snapshot(docs, changes, read_time):
        on_snapshot.called_count += 1
        for doc in [doc for doc in docs if doc.id == doc_ref.id]:
            on_snapshot.born = doc.get("born")

    on_snapshot.called_count = 0
    on_snapshot.born = 0

    collection_ref.on_snapshot(on_snapshot)

    # delay here so initial on_snapshot occurs and isn't combined with set
    sleep(1)

    doc_ref.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})

    for _ in range(10):
        if on_snapshot.born == 1815:
            break
        sleep(1)

    if on_snapshot.born != 1815:
        raise AssertionError(
            "Expected the last document update to update born: " + str(on_snapshot.born)
        )
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:33,代码来源:system.py


示例9: test_collection_group_queries

def test_collection_group_queries(client, cleanup):
    collection_group = "b" + unique_resource_id("-")

    doc_paths = [
        "abc/123/" + collection_group + "/cg-doc1",
        "abc/123/" + collection_group + "/cg-doc2",
        collection_group + "/cg-doc3",
        collection_group + "/cg-doc4",
        "def/456/" + collection_group + "/cg-doc5",
        collection_group + "/virtual-doc/nested-coll/not-cg-doc",
        "x" + collection_group + "/not-cg-doc",
        collection_group + "x/not-cg-doc",
        "abc/123/" + collection_group + "x/not-cg-doc",
        "abc/123/x" + collection_group + "/not-cg-doc",
        "abc/" + collection_group,
    ]

    batch = client.batch()
    for doc_path in doc_paths:
        doc_ref = client.document(doc_path)
        batch.set(doc_ref, {"x": 1})

    batch.commit()

    query = client.collection_group(collection_group)
    snapshots = list(query.stream())
    found = [snapshot.id for snapshot in snapshots]
    expected = ["cg-doc1", "cg-doc2", "cg-doc3", "cg-doc4", "cg-doc5"]
    assert found == expected
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:29,代码来源:system.py


示例10: test_document_integer_field

def test_document_integer_field(client, cleanup):
    document_id = 'for-set' + unique_resource_id('-')
    document = client.document('i-did-it', document_id)
    # Add to clean-up before API request (in case ``set()`` fails).
    cleanup(document)

    data1 = {
        '1a': {
            '2b': '3c',
            'ab': '5e'},
        '6f': {
            '7g': '8h',
            'cd': '0j'}
    }
    document.create(data1)

    data2 = {'1a.ab': '4d', '6f.7g': '9h'}
    option2 = client.write_option(exists=True)
    document.update(data2, option=option2)
    snapshot = document.get()
    expected = {
        '1a': {
            '2b': '3c',
            'ab': '4d'},
        '6f': {
            '7g': '9h',
            'cd': '0j'}
    }
    assert snapshot.to_dict() == expected
开发者ID:supriyagarg,项目名称:gcloud-python,代码行数:29,代码来源:system.py


示例11: test_create_sink_pubsub_topic

    def test_create_sink_pubsub_topic(self):
        from google.cloud import pubsub_v1

        SINK_NAME = 'test-create-sink-topic%s' % (_RESOURCE_ID,)
        TOPIC_NAME = 'logging-systest{}'.format(unique_resource_id('-'))

        # Create the destination topic, and set up the IAM policy to allow
        # Stackdriver Logging to write into it.
        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(Config.CLIENT.project, TOPIC_NAME)
        self.to_delete.append(_DeleteWrapper(publisher, topic_path))
        publisher.create_topic(topic_path)

        policy = publisher.get_iam_policy(topic_path)
        policy.bindings.add(
            role='roles/owner',
            members=['group:[email protected]']
        )
        publisher.set_iam_policy(topic_path, policy)

        TOPIC_URI = 'pubsub.googleapis.com/%s' % (topic_path,)

        sink = Config.CLIENT.sink(SINK_NAME, DEFAULT_FILTER, TOPIC_URI)
        self.assertFalse(sink.exists())
        sink.create()
        self.to_delete.append(sink)
        self.assertTrue(sink.exists())
开发者ID:longfengpili,项目名称:google-cloud-python,代码行数:27,代码来源:test_system.py


示例12: test_create_sink_pubsub_topic

    def test_create_sink_pubsub_topic(self):
        from google.cloud import pubsub_v1

        SINK_NAME = "test-create-sink-topic%s" % (_RESOURCE_ID,)
        TOPIC_NAME = "logging-systest{}".format(unique_resource_id("-"))

        # Create the destination topic, and set up the IAM policy to allow
        # Stackdriver Logging to write into it.
        publisher = pubsub_v1.PublisherClient()
        topic_path = publisher.topic_path(Config.CLIENT.project, TOPIC_NAME)
        self.to_delete.append(_DeleteWrapper(publisher, topic_path))
        publisher.create_topic(topic_path)

        policy = publisher.get_iam_policy(topic_path)
        policy.bindings.add(role="roles/owner", members=["group:[email protected]"])
        publisher.set_iam_policy(topic_path, policy)

        TOPIC_URI = "pubsub.googleapis.com/%s" % (topic_path,)

        retry = RetryErrors((Conflict, ServiceUnavailable), max_tries=10)
        sink = Config.CLIENT.sink(SINK_NAME, DEFAULT_FILTER, TOPIC_URI)
        self.assertFalse(sink.exists())

        retry(sink.create)()

        self.to_delete.append(sink)
        self.assertTrue(sink.exists())
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:27,代码来源:test_system.py


示例13: test_rewrite_rotate_with_user_project

    def test_rewrite_rotate_with_user_project(self):
        BLOB_NAME = 'rotating-keys'
        file_data = self.FILES['simple']
        new_bucket_name = 'rewrite-rotate-up' + unique_resource_id('-')
        created = Config.CLIENT.create_bucket(
            new_bucket_name, requester_pays=True)
        try:
            with_user_project = Config.CLIENT.bucket(
                new_bucket_name, user_project=USER_PROJECT)

            SOURCE_KEY = os.urandom(32)
            source = with_user_project.blob(
                BLOB_NAME, encryption_key=SOURCE_KEY)
            source.upload_from_filename(file_data['path'])
            source_data = source.download_as_string()

            DEST_KEY = os.urandom(32)
            dest = with_user_project.blob(BLOB_NAME, encryption_key=DEST_KEY)
            token, rewritten, total = dest.rewrite(source)

            self.assertEqual(token, None)
            self.assertEqual(rewritten, len(source_data))
            self.assertEqual(total, len(source_data))

            self.assertEqual(dest.download_as_string(), source_data)
        finally:
            retry_429(created.delete)(force=True)
开发者ID:longfengpili,项目名称:google-cloud-python,代码行数:27,代码来源:system.py


示例14: setUpModule

def setUpModule():
    Config.CLIENT = storage.Client()
    bucket_name = 'new' + unique_resource_id()
    # In the **very** rare case the bucket name is reserved, this
    # fails with a ConnectionError.
    Config.TEST_BUCKET = Config.CLIENT.bucket(bucket_name)
    retry_429(Config.TEST_BUCKET.create)()
开发者ID:longfengpili,项目名称:google-cloud-python,代码行数:7,代码来源:system.py


示例15: test_copy_existing_file_with_user_project

    def test_copy_existing_file_with_user_project(self):
        new_bucket_name = 'copy-w-requester-pays' + unique_resource_id('-')
        created = Config.CLIENT.create_bucket(
            new_bucket_name, requester_pays=True)
        self.case_buckets_to_delete.append(new_bucket_name)
        self.assertEqual(created.name, new_bucket_name)
        self.assertTrue(created.requester_pays)

        to_delete = []
        blob = storage.Blob('simple', bucket=created)
        blob.upload_from_string(b'DEADBEEF')
        to_delete.append(blob)
        try:
            with_user_project = Config.CLIENT.bucket(
                new_bucket_name, user_project=USER_PROJECT)

            new_blob = retry_bad_copy(with_user_project.copy_blob)(
                blob, with_user_project, 'simple-copy')
            to_delete.append(new_blob)

            base_contents = blob.download_as_string()
            copied_contents = new_blob.download_as_string()
            self.assertEqual(base_contents, copied_contents)
        finally:
            for blob in to_delete:
                retry_429(blob.delete)()
开发者ID:longfengpili,项目名称:google-cloud-python,代码行数:26,代码来源:system.py


示例16: test_document_delete

def test_document_delete(client, cleanup):
    document_id = "deleted" + unique_resource_id("-")
    document = client.document("here-to-be", document_id)
    # Add to clean-up before API request (in case ``create()`` fails).
    cleanup(document)
    document.create({"not": "much"})

    # 1. Call ``delete()`` with invalid (in the past) "last timestamp" option.
    snapshot1 = document.get()
    timestamp_pb = timestamp_pb2.Timestamp(
        seconds=snapshot1.update_time.nanos - 3600, nanos=snapshot1.update_time.nanos
    )
    option1 = client.write_option(last_update_time=timestamp_pb)
    with pytest.raises(FailedPrecondition):
        document.delete(option=option1)

    # 2. Call ``delete()`` with invalid (in future) "last timestamp" option.
    timestamp_pb = timestamp_pb2.Timestamp(
        seconds=snapshot1.update_time.nanos + 3600, nanos=snapshot1.update_time.nanos
    )
    option2 = client.write_option(last_update_time=timestamp_pb)
    with pytest.raises(FailedPrecondition):
        document.delete(option=option2)

    # 3. Actually ``delete()`` the document.
    delete_time3 = document.delete()

    # 4. ``delete()`` again, even though we know the document is gone.
    delete_time4 = document.delete()
    assert_timestamp_less(delete_time3, delete_time4)
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:30,代码来源:system.py


示例17: test_query_unary

def test_query_unary(client, cleanup):
    collection_name = "unary" + unique_resource_id("-")
    collection = client.collection(collection_name)
    field_name = "foo"

    _, document0 = collection.add({field_name: None})
    # Add to clean-up.
    cleanup(document0)

    nan_val = float("nan")
    _, document1 = collection.add({field_name: nan_val})
    # Add to clean-up.
    cleanup(document1)

    # 0. Query for null.
    query0 = collection.where(field_name, "==", None)
    values0 = list(query0.stream())
    assert len(values0) == 1
    snapshot0 = values0[0]
    assert snapshot0.reference._path == document0._path
    assert snapshot0.to_dict() == {field_name: None}

    # 1. Query for a NAN.
    query1 = collection.where(field_name, "==", nan_val)
    values1 = list(query1.stream())
    assert len(values1) == 1
    snapshot1 = values1[0]
    assert snapshot1.reference._path == document1._path
    data1 = snapshot1.to_dict()
    assert len(data1) == 1
    assert math.isnan(data1[field_name])
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:31,代码来源:system.py


示例18: test_document_set_merge

def test_document_set_merge(client, cleanup):
    document_id = 'for-set' + unique_resource_id('-')
    document = client.document('i-did-it', document_id)
    # Add to clean-up before API request (in case ``set()`` fails).
    cleanup(document)

    # 0. Make sure the document doesn't exist yet
    snapshot = document.get()
    assert not snapshot.exists

    # 1. Use ``create()`` to create the document.
    data1 = {'name': 'Sam',
             'address': {'city': 'SF',
                         'state': 'CA'}}
    write_result1 = document.create(data1)
    snapshot1 = document.get()
    assert snapshot1.to_dict() == data1
    # Make sure the update is what created the document.
    assert snapshot1.create_time == snapshot1.update_time
    assert snapshot1.update_time == write_result1.update_time

    # 2. Call ``set()`` to merge
    data2 = {'address': {'city': 'LA'}}
    write_result2 = document.set(data2, merge=True)
    snapshot2 = document.get()
    assert snapshot2.to_dict() == {'name': 'Sam',
                                   'address': {'city': 'LA',
                                               'state': 'CA'}}
    # Make sure the create time hasn't changed.
    assert snapshot2.create_time == snapshot1.create_time
    assert snapshot2.update_time == write_result2.update_time
开发者ID:supriyagarg,项目名称:gcloud-python,代码行数:31,代码来源:system.py


示例19: test_watch_document

def test_watch_document(client, cleanup):
    db = client
    doc_ref = db.collection(u"users").document(u"alovelace" + unique_resource_id())

    # Initial setting
    doc_ref.set({u"first": u"Jane", u"last": u"Doe", u"born": 1900})

    sleep(1)

    # Setup listener
    def on_snapshot(docs, changes, read_time):
        on_snapshot.called_count += 1

    on_snapshot.called_count = 0

    doc_ref.on_snapshot(on_snapshot)

    # Alter document
    doc_ref.set({u"first": u"Ada", u"last": u"Lovelace", u"born": 1815})

    sleep(1)

    for _ in range(10):
        if on_snapshot.called_count == 1:
            return
        sleep(1)

    if on_snapshot.called_count != 1:
        raise AssertionError(
            "Failed to get exactly one document change: count: "
            + str(on_snapshot.called_count)
        )
开发者ID:GoogleCloudPlatform,项目名称:gcloud-python,代码行数:32,代码来源:system.py


示例20: test_document_get

def test_document_get(client, cleanup):
    now = datetime.datetime.utcnow().replace(tzinfo=UTC)
    document_id = 'for-get' + unique_resource_id('-')
    document = client.document('created', document_id)
    # Add to clean-up before API request (in case ``create()`` fails).
    cleanup(document)

    # First make sure it doesn't exist.
    assert not document.get().exists

    ref_doc = client.document('top', 'middle1', 'middle2', 'bottom')
    data = {
        'turtle': 'power',
        'cheese': 19.5,
        'fire': 199099299,
        'referee': ref_doc,
        'gio': firestore.GeoPoint(45.5, 90.0),
        'deep': [
            u'some',
            b'\xde\xad\xbe\xef',
        ],
        'map': {
            'ice': True,
            'water': None,
            'vapor': {
                'deeper': now,
            },
        },
    }
    write_result = document.create(data)
    snapshot = document.get()
    check_snapshot(snapshot, document, data, write_result)
    assert_timestamp_less(snapshot.create_time, snapshot.read_time)
开发者ID:supriyagarg,项目名称:gcloud-python,代码行数:33,代码来源:system.py



注:本文中的test_utils.system.unique_resource_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testable.register函数代码示例发布时间:2022-05-27
下一篇:
Python test_utils.TestUtils类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap