本文整理汇总了Python中moto.compat.OrderedDict类的典型用法代码示例。如果您正苦于以下问题:Python OrderedDict类的具体用法?Python OrderedDict怎么用?Python OrderedDict使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了OrderedDict类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, region_name):
super(SNSBackend, self).__init__()
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
self.applications = {}
self.platform_endpoints = {}
self.region_name = region_name
开发者ID:mpappas86,项目名称:moto,代码行数:7,代码来源:models.py
示例2: __init__
def __init__(self, ec2_backend, elb_backend, elbv2_backend):
self.autoscaling_groups = OrderedDict()
self.launch_configurations = OrderedDict()
self.policies = {}
self.ec2_backend = ec2_backend
self.elb_backend = elb_backend
self.elbv2_backend = elbv2_backend
开发者ID:copland,项目名称:moto,代码行数:7,代码来源:models.py
示例3: Shard
class Shard(BaseModel):
def __init__(self, shard_id, starting_hash, ending_hash):
self._shard_id = shard_id
self.starting_hash = starting_hash
self.ending_hash = ending_hash
self.records = OrderedDict()
@property
def shard_id(self):
return "shardId-{0}".format(str(self._shard_id).zfill(12))
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)
results = []
for sequence_number, record in self.records.items():
if sequence_number > last_sequence_id:
results.append(record)
last_sequence_id = sequence_number
if len(results) == limit:
break
return results, last_sequence_id
def put_record(self, partition_key, data, explicit_hash_key):
# Note: this function is not safe for concurrency
if self.records:
last_sequence_number = self.get_max_sequence_number()
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
self.records[sequence_number] = Record(
partition_key, data, sequence_number, explicit_hash_key)
return sequence_number
def get_min_sequence_number(self):
if self.records:
return list(self.records.keys())[0]
return 0
def get_max_sequence_number(self):
if self.records:
return list(self.records.keys())[-1]
return 0
def to_json(self):
return {
"HashKeyRange": {
"EndingHashKey": str(self.ending_hash),
"StartingHashKey": str(self.starting_hash)
},
"SequenceNumberRange": {
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
},
"ShardId": self.shard_id
}
开发者ID:2rs2ts,项目名称:moto,代码行数:59,代码来源:models.py
示例4: FakeTargetGroup
class FakeTargetGroup(BaseModel):
def __init__(self,
name,
arn,
vpc_id,
protocol,
port,
healthcheck_protocol,
healthcheck_port,
healthcheck_path,
healthcheck_interval_seconds,
healthcheck_timeout_seconds,
healthy_threshold_count,
unhealthy_threshold_count):
self.name = name
self.arn = arn
self.vpc_id = vpc_id
self.protocol = protocol
self.port = port
self.healthcheck_protocol = healthcheck_protocol
self.healthcheck_port = healthcheck_port
self.healthcheck_path = healthcheck_path
self.healthcheck_interval_seconds = healthcheck_interval_seconds
self.healthcheck_timeout_seconds = healthcheck_timeout_seconds
self.healthy_threshold_count = healthy_threshold_count
self.unhealthy_threshold_count = unhealthy_threshold_count
self.load_balancer_arns = []
self.tags = {}
self.attributes = {
'deregistration_delay.timeout_seconds': 300,
'stickiness.enabled': 'false',
}
self.targets = OrderedDict()
def register(self, targets):
for target in targets:
self.targets[target['id']] = {
'id': target['id'],
'port': target.get('port', self.port),
}
def deregister(self, targets):
for target in targets:
t = self.targets.pop(target['id'], None)
if not t:
raise InvalidTargetError()
def add_tag(self, key, value):
if len(self.tags) >= 10 and key not in self.tags:
raise TooManyTagsError()
self.tags[key] = value
def health_for(self, target):
t = self.targets.get(target['id'])
if t is None:
raise InvalidTargetError()
return FakeHealthStatus(t['id'], t['port'], self.healthcheck_port, 'healthy')
开发者ID:whummer,项目名称:moto,代码行数:59,代码来源:models.py
示例5: __init__
def __init__(self, region_name):
super(SNSBackend, self).__init__()
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
self.applications = {}
self.platform_endpoints = {}
self.region_name = region_name
self.sms_attributes = {}
self.opt_out_numbers = ['+447420500600', '+447420505401', '+447632960543', '+447632960028', '+447700900149', '+447700900550', '+447700900545', '+447700900907']
self.permissions = {}
开发者ID:botify-labs,项目名称:moto,代码行数:10,代码来源:models.py
示例6: SNSBackend
class SNSBackend(BaseBackend):
def __init__(self):
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
def create_topic(self, name):
topic = Topic(name, self)
self.topics[topic.arn] = topic
return topic
def _get_values_nexttoken(self, values_map, next_token=None):
if next_token is None:
next_token = 0
next_token = int(next_token)
values = list(values_map.values())[next_token: next_token + DEFAULT_PAGE_SIZE]
if len(values) == DEFAULT_PAGE_SIZE:
next_token = next_token + DEFAULT_PAGE_SIZE
else:
next_token = None
return values, next_token
def list_topics(self, next_token=None):
return self._get_values_nexttoken(self.topics, next_token)
def delete_topic(self, arn):
self.topics.pop(arn)
def get_topic(self, arn):
return self.topics[arn]
def set_topic_attribute(self, topic_arn, attribute_name, attribute_value):
topic = self.get_topic(topic_arn)
setattr(topic, attribute_name, attribute_value)
def subscribe(self, topic_arn, endpoint, protocol):
topic = self.get_topic(topic_arn)
subscription = Subscription(topic, endpoint, protocol)
self.subscriptions[subscription.arn] = subscription
return subscription
def unsubscribe(self, subscription_arn):
self.subscriptions.pop(subscription_arn)
def list_subscriptions(self, topic_arn=None, next_token=None):
if topic_arn:
topic = self.get_topic(topic_arn)
filtered = OrderedDict([(k, sub) for k, sub in self.subscriptions.items() if sub.topic == topic])
return self._get_values_nexttoken(filtered, next_token)
else:
return self._get_values_nexttoken(self.subscriptions, next_token)
def publish(self, topic_arn, message):
topic = self.get_topic(topic_arn)
message_id = topic.publish(message)
return message_id
开发者ID:DreadPirateShawn,项目名称:moto,代码行数:55,代码来源:models.py
示例7: Shard
class Shard(object):
def __init__(self, shard_id):
self.shard_id = shard_id
self.records = OrderedDict()
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)
results = []
for sequence_number, record in self.records.items():
if sequence_number > last_sequence_id:
results.append(record)
last_sequence_id = sequence_number
if len(results) == limit:
break
return results, last_sequence_id
def put_record(self, partition_key, data):
# Note: this function is not safe for concurrency
if self.records:
last_sequence_number = self.get_max_sequence_number()
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
self.records[sequence_number] = Record(partition_key, data, sequence_number)
return sequence_number
def get_min_sequence_number(self):
if self.records:
return list(self.records.keys())[0]
return 0
def get_max_sequence_number(self):
if self.records:
return list(self.records.keys())[-1]
return 0
def to_json(self):
return {
"HashKeyRange": {
"EndingHashKey": "113427455640312821154458202477256070484",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
},
"ShardId": self.shard_id
}
开发者ID:iandees,项目名称:moto,代码行数:51,代码来源:models.py
示例8: __init__
def __init__(self, ec2_backend, region_name):
self.region = region_name
self.clusters = {}
self.subnet_groups = {}
self.security_groups = {
"Default": SecurityGroup("Default", "Default Redshift Security Group", self.region)
}
self.parameter_groups = {
"default.redshift-1.0": ParameterGroup(
"default.redshift-1.0",
"redshift-1.0",
"Default Redshift parameter group",
self.region
)
}
self.ec2_backend = ec2_backend
self.snapshots = OrderedDict()
self.RESOURCE_TYPE_MAP = {
'cluster': self.clusters,
'parametergroup': self.parameter_groups,
'securitygroup': self.security_groups,
'snapshot': self.snapshots,
'subnetgroup': self.subnet_groups
}
self.snapshot_copy_grants = {}
开发者ID:spulec,项目名称:moto,代码行数:25,代码来源:models.py
示例9: __init__
def __init__(self,
name,
arn,
vpc_id,
protocol,
port,
healthcheck_protocol,
healthcheck_port,
healthcheck_path,
healthcheck_interval_seconds,
healthcheck_timeout_seconds,
healthy_threshold_count,
unhealthy_threshold_count):
self.name = name
self.arn = arn
self.vpc_id = vpc_id
self.protocol = protocol
self.port = port
self.healthcheck_protocol = healthcheck_protocol
self.healthcheck_port = healthcheck_port
self.healthcheck_path = healthcheck_path
self.healthcheck_interval_seconds = healthcheck_interval_seconds
self.healthcheck_timeout_seconds = healthcheck_timeout_seconds
self.healthy_threshold_count = healthy_threshold_count
self.unhealthy_threshold_count = unhealthy_threshold_count
self.load_balancer_arns = []
self.tags = {}
self.attributes = {
'deregistration_delay.timeout_seconds': 300,
'stickiness.enabled': 'false',
}
self.targets = OrderedDict()
开发者ID:whummer,项目名称:moto,代码行数:34,代码来源:models.py
示例10: __init__
def __init__(self, region, name, extended_config):
self.region = region
self.id = "{}_{}".format(self.region, str(uuid.uuid4().hex))
self.name = name
self.status = None
self.extended_config = extended_config or {}
self.creation_date = datetime.datetime.utcnow()
self.last_modified_date = datetime.datetime.utcnow()
self.clients = OrderedDict()
self.identity_providers = OrderedDict()
self.users = OrderedDict()
self.refresh_tokens = {}
self.access_tokens = {}
self.id_tokens = {}
with open(os.path.join(os.path.dirname(__file__), "resources/jwks-private.json")) as f:
self.json_web_key = json.loads(f.read())
开发者ID:nimbis,项目名称:moto,代码行数:18,代码来源:models.py
示例11: DataPipelineBackend
class DataPipelineBackend(BaseBackend):
def __init__(self):
self.pipelines = OrderedDict()
def create_pipeline(self, name, unique_id, **kwargs):
pipeline = Pipeline(name, unique_id, **kwargs)
self.pipelines[pipeline.pipeline_id] = pipeline
return pipeline
def list_pipelines(self):
return self.pipelines.values()
def describe_pipelines(self, pipeline_ids):
pipelines = [pipeline for pipeline in self.pipelines.values(
) if pipeline.pipeline_id in pipeline_ids]
return pipelines
def get_pipeline(self, pipeline_id):
return self.pipelines[pipeline_id]
def delete_pipeline(self, pipeline_id):
self.pipelines.pop(pipeline_id, None)
def put_pipeline_definition(self, pipeline_id, pipeline_objects):
pipeline = self.get_pipeline(pipeline_id)
pipeline.set_pipeline_objects(pipeline_objects)
def get_pipeline_definition(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
return pipeline.objects
def describe_objects(self, object_ids, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline_objects = [
pipeline_object for pipeline_object in pipeline.objects
if pipeline_object.object_id in object_ids
]
return pipeline_objects
def activate_pipeline(self, pipeline_id):
pipeline = self.get_pipeline(pipeline_id)
pipeline.activate()
开发者ID:Affirm,项目名称:moto,代码行数:43,代码来源:models.py
示例12: KinesisBackend
class KinesisBackend(BaseBackend):
def __init__(self):
self.streams = OrderedDict()
self.delivery_streams = {}
def create_stream(self, stream_name, shard_count, region):
if stream_name in self.streams:
raise ResourceInUseError(stream_name)
stream = Stream(stream_name, shard_count, region)
self.streams[stream_name] = stream
return stream
def describe_stream(self, stream_name):
if stream_name in self.streams:
return self.streams[stream_name]
else:
raise StreamNotFoundError(stream_name)
def list_streams(self):
return self.streams.values()
def delete_stream(self, stream_name):
if stream_name in self.streams:
return self.streams.pop(stream_name)
raise StreamNotFoundError(stream_name)
def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, starting_sequence_number,
at_timestamp):
# Validate params
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
shard_iterator = compose_new_shard_iterator(
stream_name, shard, shard_iterator_type, starting_sequence_number, at_timestamp
)
return shard_iterator
def get_records(self, shard_iterator, limit):
decomposed = decompose_shard_iterator(shard_iterator)
stream_name, shard_id, last_sequence_id = decomposed
stream = self.describe_stream(stream_name)
shard = stream.get_shard(shard_id)
records, last_sequence_id, millis_behind_latest = shard.get_records(last_sequence_id, limit)
next_shard_iterator = compose_shard_iterator(
stream_name, shard, last_sequence_id)
return next_shard_iterator, records, millis_behind_latest
def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
stream = self.describe_stream(stream_name)
sequence_number, shard_id = stream.put_record(
partition_key, explicit_hash_key, sequence_number_for_ordering, data
)
return sequence_number, shard_id
def put_records(self, stream_name, records):
stream = self.describe_stream(stream_name)
response = {
"FailedRecordCount": 0,
"Records": []
}
for record in records:
partition_key = record.get("PartitionKey")
explicit_hash_key = record.get("ExplicitHashKey")
data = record.get("Data")
sequence_number, shard_id = stream.put_record(
partition_key, explicit_hash_key, None, data
)
response['Records'].append({
"SequenceNumber": sequence_number,
"ShardId": shard_id
})
return response
def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
stream = self.describe_stream(stream_name)
if shard_to_split not in stream.shards:
raise ResourceNotFoundError(shard_to_split)
if not re.match(r'0|([1-9]\d{0,38})', new_starting_hash_key):
raise InvalidArgumentError(new_starting_hash_key)
new_starting_hash_key = int(new_starting_hash_key)
shard = stream.shards[shard_to_split]
last_id = sorted(stream.shards.values(),
key=attrgetter('_shard_id'))[-1]._shard_id
if shard.starting_hash < new_starting_hash_key < shard.ending_hash:
#.........这里部分代码省略.........
开发者ID:botify-labs,项目名称:moto,代码行数:101,代码来源:models.py
示例13: __init__
def __init__(self):
self.tables = OrderedDict()
开发者ID:DreadPirateShawn,项目名称:moto,代码行数:2,代码来源:models.py
示例14: DynamoDBBackend
class DynamoDBBackend(BaseBackend):
def __init__(self):
self.tables = OrderedDict()
def create_table(self, name, **params):
table = Table(name, **params)
self.tables[name] = table
return table
def delete_table(self, name):
return self.tables.pop(name, None)
def update_table_throughput(self, name, throughput):
table = self.tables[name]
table.throughput = throughput
return table
def put_item(self, table_name, item_attrs):
table = self.tables.get(table_name)
if not table:
return None
return table.put_item(item_attrs)
def get_table_keys_name(self, table_name):
table = self.tables.get(table_name)
if not table:
return None, None
else:
return table.hash_key_attr, table.range_key_attr
def get_keys_value(self, table, keys):
if table.hash_key_attr not in keys or (table.has_range_key and table.range_key_attr not in keys):
raise ValueError("Table has a range key, but no range key was passed into get_item")
hash_key = DynamoType(keys[table.hash_key_attr])
range_key = DynamoType(keys[table.range_key_attr]) if table.has_range_key else None
return hash_key, range_key
def get_table(self, table_name):
return self.tables.get(table_name)
def get_item(self, table_name, keys):
table = self.get_table(table_name)
if not table:
return None
hash_key, range_key = self.get_keys_value(table, keys)
return table.get_item(hash_key, range_key)
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts):
table = self.tables.get(table_name)
if not table:
return None, None
hash_key = DynamoType(hash_key_dict)
range_values = [DynamoType(range_value) for range_value in range_value_dicts]
return table.query(hash_key, range_comparison, range_values)
def scan(self, table_name, filters):
table = self.tables.get(table_name)
if not table:
return None, None, None
scan_filters = {}
for key, (comparison_operator, comparison_values) in filters.items():
dynamo_types = [DynamoType(value) for value in comparison_values]
scan_filters[key] = (comparison_operator, dynamo_types)
return table.scan(scan_filters)
def update_item(self, table_name, key, update_expression):
table = self.get_table(table_name)
hash_value = DynamoType(key)
item = table.get_item(hash_value)
item.update(update_expression)
return item
def delete_item(self, table_name, keys):
table = self.tables.get(table_name)
if not table:
return None
hash_key, range_key = self.get_keys_value(table, keys)
return table.delete_item(hash_key, range_key)
开发者ID:DreadPirateShawn,项目名称:moto,代码行数:84,代码来源:models.py
示例15: __init__
def __init__(self):
self.stacks = OrderedDict()
self.deleted_stacks = {}
self.exports = OrderedDict()
self.change_sets = OrderedDict()
开发者ID:Affirm,项目名称:moto,代码行数:5,代码来源:models.py
示例16: CloudFormationBackend
class CloudFormationBackend(BaseBackend):
def __init__(self):
self.stacks = OrderedDict()
self.deleted_stacks = {}
self.exports = OrderedDict()
self.change_sets = OrderedDict()
def create_stack(self, name, template, parameters, region_name, notification_arns=None, tags=None, role_arn=None, create_change_set=False):
stack_id = generate_stack_id(name)
new_stack = FakeStack(
stack_id=stack_id,
name=name,
template=template,
parameters=parameters,
region_name=region_name,
notification_arns=notification_arns,
tags=tags,
role_arn=role_arn,
cross_stack_resources=self.exports,
create_change_set=create_change_set,
)
self.stacks[stack_id] = new_stack
self._validate_export_uniqueness(new_stack)
for export in new_stack.exports:
self.exports[export.name] = export
return new_stack
def create_change_set(self, stack_name, change_set_name, template, parameters, region_name, change_set_type, notification_arns=None, tags=None, role_arn=None):
if change_set_type == 'UPDATE':
stacks = self.stacks.values()
stack = None
for s in stacks:
if s.name == stack_name:
stack = s
if stack is None:
raise ValidationError(stack_name)
else:
stack = self.create_stack(stack_name, template, parameters,
region_name, notification_arns, tags,
role_arn, create_change_set=True)
change_set_id = generate_changeset_id(change_set_name, region_name)
self.stacks[change_set_name] = {'Id': change_set_id,
'StackId': stack.stack_id}
self.change_sets[change_set_id] = stack
return change_set_id, stack.stack_id
def execute_change_set(self, change_set_name, stack_name=None):
stack = None
if change_set_name in self.change_sets:
# This means arn was passed in
stack = self.change_sets[change_set_name]
else:
for cs in self.change_sets:
if self.change_sets[cs].name == change_set_name:
stack = self.change_sets[cs]
if stack is None:
raise ValidationError(stack_name)
if stack.events[-1].resource_status == 'REVIEW_IN_PROGRESS':
stack._add_stack_event('CREATE_COMPLETE')
else:
stack._add_stack_event('UPDATE_IN_PROGRESS')
stack._add_stack_event('UPDATE_COMPLETE')
return True
def describe_stacks(self, name_or_stack_id):
stacks = self.stacks.values()
if name_or_stack_id:
for stack in stacks:
if stack.name == name_or_stack_id or stack.stack_id == name_or_stack_id:
return [stack]
if self.deleted_stacks:
deleted_stacks = self.deleted_stacks.values()
for stack in deleted_stacks:
if stack.stack_id == name_or_stack_id:
return [stack]
raise ValidationError(name_or_stack_id)
else:
return list(stacks)
def list_stacks(self):
return self.stacks.values()
def get_stack(self, name_or_stack_id):
all_stacks = dict(self.deleted_stacks, **self.stacks)
if name_or_stack_id in all_stacks:
# Lookup by stack id - deleted stacks incldued
return all_stacks[name_or_stack_id]
else:
# Lookup by stack name - undeleted stacks only
for stack in self.stacks.values():
if stack.name == name_or_stack_id:
return stack
def update_stack(self, name, template, role_arn=None, parameters=None, tags=None):
stack = self.get_stack(name)
stack.update(template, role_arn, parameters=parameters, tags=tags)
return stack
#.........这里部分代码省略.........
开发者ID:Affirm,项目名称:moto,代码行数:101,代码来源:models.py
示例17: __init__
def __init__(self, region_name=None):
self.region_name = region_name
self.tables = OrderedDict()
开发者ID:dhepper,项目名称:moto,代码行数:3,代码来源:models.py
示例18: DynamoDBBackend
class DynamoDBBackend(BaseBackend):
def __init__(self):
self.tables = OrderedDict()
def create_table(self, name, **params):
if name in self.tables:
return None
table = Table(name, **params)
self.tables[name] = table
return table
def delete_table(self, name):
return self.tables.pop(name, None)
def update_table_throughput(self, name, throughput):
table = self.tables[name]
table.throughput = throughput
return table
def update_table_global_indexes(self, name, global_index_updates):
table = self.tables[name]
gsis_by_name = dict((i['IndexName'], i) for i in table.global_indexes)
for gsi_update in global_index_updates:
gsi_to_create = gsi_update.get('Create')
gsi_to_update = gsi_update.get('Update')
gsi_to_delete = gsi_update.get('Delete')
if gsi_to_delete:
index_name = gsi_to_delete['IndexName']
if index_name not in gsis_by_name:
raise ValueError('Global Secondary Index does not exist, but tried to delete: %s' %
gsi_to_delete['IndexName'])
del gsis_by_name[index_name]
if gsi_to_update:
index_name = gsi_to_update['IndexName']
if index_name not in gsis_by_name:
raise ValueError('Global Secondary Index does not exist, but tried to update: %s' %
gsi_to_update['IndexName'])
gsis_by_name[index_name].update(gsi_to_update)
if gsi_to_create:
if gsi_to_create['IndexName'] in gsis_by_name:
raise ValueError('Global Secondary Index already exists: %s' % gsi_to_create['IndexName'])
gsis_by_name[gsi_to_create['IndexName']] = gsi_to_create
table.global_indexes = gsis_by_name.values()
return table
def put_item(self, table_name, item_attrs, expected=None, overwrite=False):
table = self.tables.get(table_name)
if not table:
return None
return table.put_item(item_attrs, expected, overwrite)
def get_table_keys_name(self, table_name, keys):
"""
Given a set of keys, extracts the key and range key
"""
table = self.tables.get(table_name)
if not table:
return None, None
else:
if len(keys) == 1:
for key in keys:
if key in table.hash_key_names:
return key, None
# for potential_hash, potential_range in zip(table.hash_key_names, table.range_key_names):
# if set([potential_hash, potential_range]) == set(keys):
# return potential_hash, potential_range
potential_hash, potential_range = None, None
for key in set(keys):
if key in table.hash_key_names:
potential_hash = key
elif key in table.range_key_names:
potential_range = key
return potential_hash, potential_range
def get_keys_value(self, table, keys):
if table.hash_key_attr not in keys or (table.has_range_key and table.range_key_attr not in keys):
raise ValueError("Table has a range key, but no range key was passed into get_item")
hash_key = DynamoType(keys[table.hash_key_attr])
range_key = DynamoType(keys[table.range_key_attr]) if table.has_range_key else None
return hash_key, range_key
def get_table(self, table_name):
return self.tables.get(table_name)
def get_item(self, table_name, keys):
table = self.get_table(table_name)
if not table:
raise ValueError("No table found")
hash_key, range_key = self.get_keys_value(table, keys)
return table.get_item(hash_key, range_key)
def query(self, table_name, hash_key_dict, range_comparison, range_value_dicts,
limit, exclusive_start_key, scan_index_forward, index_name=None, **filter_kwargs):
#.........这里部分代码省略.........
开发者ID:mpappas86,项目名称:moto,代码行数:101,代码来源:models.py
示例19: Shard
class Shard(BaseModel):
def __init__(self, shard_id, starting_hash, ending_hash):
self._shard_id = shard_id
self.starting_hash = starting_hash
self.ending_hash = ending_hash
self.records = OrderedDict()
@property
def shard_id(self):
return "shardId-{0}".format(str(self._shard_id).zfill(12))
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)
results = []
secs_behind_latest = 0
for sequence_number, record in self.records.items():
if sequence_number > last_sequence_id:
results.append(record)
last_sequence_id = sequence_number
very_last_record = self.records[next(reversed(self.records))]
secs_behind_latest = very_last_record.created_at - record.created_at
if len(results) == limit:
break
millis_behind_latest = int(secs_behind_latest * 1000)
return results, last_sequence_id, millis_behind_latest
def put_record(self, partition_key, data, explicit_hash_key):
# Note: this function is not safe for concurrency
if self.records:
last_sequence_number = self.get_max_sequence_number()
else:
last_sequence_number = 0
sequence_number = last_sequence_number + 1
self.records[sequence_number] = Record(
partition_key, data, sequence_number, explicit_hash_key)
return sequence_number
def get_min_sequence_number(self):
if self.records:
return list(self.records.keys())[0]
return 0
def get_max_sequence_number(self):
if self.records:
return list(self.records.keys())[-1]
return 0
def get_sequence_number_at(self, at_timestamp):
if not self.records or at_timestamp < list(self.records.values())[0].created_at:
return 0
else:
# find the last item in the list that was created before
# at_timestamp
r = next((r for r in reversed(self.records.values()) if r.created_at < at_timestamp), None)
return r.sequence_number
def to_json(self):
return {
"HashKeyRange": {
"EndingHashKey": str(self.ending_hash),
"StartingHashKey": str(self.starting_hash)
},
"SequenceNumberRange": {
"EndingSequenceNumber": self.get_max_sequence_number(),
"StartingSequenceNumber": self.get_min_sequence_number(),
},
"ShardId": self.shard_id
}
开发者ID:botify-labs,项目名称:moto,代码行数:73,代码来源:models.py
示例20: SNSBackend
class SNSBackend(BaseBackend):
def __init__(self, region_name):
super(SNSBackend, self).__init__()
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
self.applications = {}
self.platform_endpoints = {}
self.region_name = region_name
self.sms_attributes = {}
self.opt_out_numbers = ['+447420500600', '+447420505401', '+447632960543', '+447632960028', '+447700900149', '+447700900550', '+447700900545', '+447700900907']
self.permissions = {}
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def update_sms_attributes(self, attrs):
self.sms_attributes.update(attrs)
def create_topic(self, name):
fails_constraints = not re.match(r'^[a-zA-Z0-9_-]{1,256}$', name)
if fails_constraints:
raise InvalidParameterValue("Topic names must be made up of only uppercase and lowercase ASCII letters, numbers, underscores, and hyphens, and must be between 1 and 256 characters long.")
candidate_topic = Topic(name, self)
if candidate_topic.arn in self.topics:
return self.topics[candidate_topic.arn]
else:
self.topics[candidate_topic.arn] = candidate_topic
return candidate_topic
def _get_values_nexttoken(self, values_map, next_token=None):
if next_token is None:
next_token = 0
next_token = int(next_token)
values = list(values_map.values())[
next_token: next_token + DEFAULT_PAGE_SIZE]
if len(values) == DEFAULT_PAGE_SIZE:
next_token = next_token + DEFAULT_PAGE_SIZE
else:
next_token = None
return values, next_token
def _get_topic_subscriptions(self, topic):
return [sub for sub in self.subscriptions.values() if sub.topic == topic]
def list_topics(self, next_token=None):
return self._get_values_nexttoken(self.topics, next_token)
def delete_topic(self, arn):
topic = self.get_topic(arn)
subscriptions = self._get_topic_subscriptions(topic)
for sub in subscriptions:
self.unsubscribe(sub.arn)
self.topics.pop(arn)
def get_topic(self, arn):
try:
return self.topics[arn]
except KeyError:
raise SNSNotFoundError("Topic with arn {0} not found".format(arn))
def get_topic_from_phone_number(self, number):
for subscription in self.subscriptions.values():
if subscription.protocol == 'sms' and subscription.endpoint == number:
return subscription.topic.arn
raise SNSNotFoundError('Could not find valid subscription')
def set_topic_attribute(self, topic_arn, attribute_name, attribute_value):
topic = self.get_topic(topic_arn)
setattr(topic, attribute_name, attribute_value)
def subscribe(self, topic_arn, endpoint, protocol):
# AWS doesn't create duplicates
old_subscription = self._find_subscription(topic_arn, endpoint, protocol)
if old_subscription:
return old_subscription
topic = self.get_topic(topic_arn)
subscription = Subscription(topic, endpoint, protocol)
self.subscriptions[subscription.arn] = subscription
return subscription
def _find_subscription(self, topic_arn, endpoint, protocol):
for subscription in self.subscriptions.values():
if subscription.topic.arn == topic_arn and subscription.endpoint == endpoint and subscription.protocol == protocol:
return subscription
return None
def unsubscribe(self, subscription_arn):
self.subscriptions.pop(subscription_arn)
def list_subscriptions(self, topic_arn=None, next_token=None):
if topic_arn:
topic = self.get_topic(topic_arn)
filtered = OrderedDict(
[(sub.arn, sub) for sub in self._get_topic_subscriptions(topic)])
return self._get_values_nexttoken(filtered, next_token)
else:
return self._get_values_nexttoken(self.subscriptions, next_token)
#.........这里部分代码省略.........
开发者ID:botify-labs,项目名称:moto,代码行数:101,代码来源:models.py
注:本文中的moto.compat.OrderedDict类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论