本文整理汇总了Python中moto.mock_s3函数的典型用法代码示例。如果您正苦于以下问题:Python mock_s3函数的具体用法?Python mock_s3怎么用?Python mock_s3使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mock_s3函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_get_canonical_ids
def test_get_canonical_ids(self):
accounts = Account.query.all()
get_canonical_ids(accounts)
for account in accounts:
assert len(account.custom_fields) == 1
assert account.custom_fields[0].name == "canonical_id"
assert account.custom_fields[0].value == "bcaf1ffd86f41161ca5fb16fd081034f" # Default from moto.
# Make it something else to test overrides:
account.custom_fields[0].value = "replaceme"
db.session.add(account)
db.session.commit()
# Test without override (nothing should be changed):
get_canonical_ids(accounts)
for account in accounts:
assert len(account.custom_fields) == 1
assert account.custom_fields[0].name == "canonical_id"
assert account.custom_fields[0].value == "replaceme"
# Test override:
get_canonical_ids(accounts, override=True)
for account in accounts:
assert len(account.custom_fields) == 1
assert account.custom_fields[0].name == "canonical_id"
assert account.custom_fields[0].value == "bcaf1ffd86f41161ca5fb16fd081034f" # Default from moto.
mock_sts().stop()
mock_s3().stop()
开发者ID:DataDog,项目名称:security_monkey,代码行数:31,代码来源:test_s3_canonical.py
示例2: s3_resource
def s3_resource(tips_file):
pytest.importorskip('s3fs')
moto.mock_s3().start()
test_s3_files = [
('tips.csv', tips_file),
('tips.csv.gz', tips_file + '.gz'),
('tips.csv.bz2', tips_file + '.bz2'),
]
def add_tips_files(bucket_name):
for s3_key, file_name in test_s3_files:
with open(file_name, 'rb') as f:
conn.Bucket(bucket_name).put_object(
Key=s3_key,
Body=f)
boto3 = pytest.importorskip('boto3')
# see gh-16135
bucket = 'pandas-test'
conn = boto3.resource("s3", region_name="us-east-1")
conn.create_bucket(Bucket=bucket)
add_tips_files(bucket)
conn.create_bucket(Bucket='cant_get_it', ACL='private')
add_tips_files('cant_get_it')
yield conn
moto.mock_s3().stop()
开发者ID:Axik,项目名称:pandas,代码行数:31,代码来源:test_network.py
示例3: pre_test_setup
def pre_test_setup(self):
account_type_result = AccountType.query.filter(AccountType.name == 'AWS').first()
if not account_type_result:
account_type_result = AccountType(name='AWS')
db.session.add(account_type_result)
db.session.commit()
self.account = Account(identifier="012345678910", name="testing",
account_type_id=account_type_result.id)
self.technology = Technology(name="s3")
self.item = Item(region="us-west-2", name="somebucket",
arn="arn:aws:s3:::somebucket", technology=self.technology,
account=self.account)
db.session.add(self.account)
db.session.add(self.technology)
db.session.add(self.item)
db.session.commit()
mock_s3().start()
client = boto3.client("s3")
client.create_bucket(Bucket="somebucket")
client.create_bucket(Bucket="someotherbucket")
client.create_bucket(Bucket="someotherbucket2")
开发者ID:crruthe,项目名称:security_monkey,代码行数:25,代码来源:test_s3.py
示例4: setUp
def setUp(self):
mock_s3().start()
patchers = [
"autopush.main.task",
"autopush.main.reactor",
"autopush.settings.TwistedMetrics",
]
self.mocks = {}
for name in patchers:
patcher = patch(name)
self.mocks[name] = patcher.start()
开发者ID:martinthomson,项目名称:autopush,代码行数:11,代码来源:test_main.py
示例5: setUp
def setUp(self):
self.mock = mock_s3()
self.mock.start()
#
# Populate the data in mock S3
#
# s3+file first
conn = boto.connect_s3()
b = conn.create_bucket(self.bucket_name)
k = Key(b)
k.name = self.key_name
with open(test_file(self.key_name), 'rb') as f:
k.set_contents_from_file(f)
# s3+dir
b = conn.create_bucket(self.dir_bucket_name)
for fname in ('index.json', '1', '2', '3', '4', '5', '6'):
k = Key(b)
k.name = posixpath.join(self.dir_list_name, fname)
with open(test_file(posixpath.join('delta_dir_source', fname)),
'rb') as f:
k.set_contents_from_file(f)
# initialize the internal list data structure via the normal method
super(S3SourceListsTest, self).setUp()
开发者ID:rbillings,项目名称:shavar,代码行数:26,代码来源:test_lists.py
示例6: setUp
def setUp(self):
self.s3 = mock_s3()
self.s3.start()
boto = connect_s3()
boto.create_bucket(self._bucket)
super(MPConnectionTest, self).setUp()
开发者ID:V-Lamp,项目名称:longaccess-client,代码行数:7,代码来源:test_mpconnection.py
示例7: test_unpublish_cmd
def test_unpublish_cmd(self):
with mock_s3():
conn = boto.connect_s3()
bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
call_command("build")
call_command("unpublish", no_pooling=True, verbosity=3)
self.assertFalse(list(key for key in bucket.list()))
开发者ID:ramonakira,项目名称:django-bakery,代码行数:7,代码来源:__init__.py
示例8: test_cache_control
def test_cache_control(self):
if not sys.version_info[:2] == (3, 4):
from moto import mock_s3
with mock_s3():
# Set random max-age for various content types
with self.settings(BAKERY_CACHE_CONTROL={
"application/javascript": random.randint(0, 100000),
"text/css": random.randint(0, 100000),
"text/html": random.randint(0, 100000),
}):
conn = boto.connect_s3()
bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
call_command("build")
call_command("publish", no_pooling=True, verbosity=3)
for key in bucket:
key = bucket.get_key(key.name)
if key.content_type in settings.BAKERY_CACHE_CONTROL:
# key.cache_control returns string
# with "max-age=" prefix
self.assertIn(
str(settings.BAKERY_CACHE_CONTROL.get(
key.content_type)),
key.cache_control
)
else:
self.skipTest("Moto doesn't work in Python 3.4")
开发者ID:lexieheinle,项目名称:django-bakery,代码行数:26,代码来源:__init__.py
示例9: setUp
def setUp(self):
self.mock_s3 = moto.mock_s3()
self.mock_s3.start()
self.s3_conn = boto.connect_s3()
self.s3_conn.create_bucket('last_bucket')
bucket = self.s3_conn.get_bucket('last_bucket')
key = bucket.new_key('test_list/LAST')
self.pointers = ['pointer1', 'pointer2', 'pointer3', '']
key.set_contents_from_string('\r\n'.join(self.pointers))
key.close()
for key_name in POINTER_KEYS:
key = bucket.new_key(key_name)
out = StringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode='w') as f:
f.write(json.dumps({'name': key_name}))
key.set_contents_from_string(out.getvalue())
key.close()
self.options_prefix_pointer = {
'bucket': 'last_bucket',
'aws_access_key_id': 'KEY',
'aws_secret_access_key': 'SECRET',
'prefix_pointer': 'test_list/LAST'
}
开发者ID:eliasdorneles,项目名称:exporters,代码行数:25,代码来源:test_readers_s3.py
示例10: test_gzip_and_send_s3
def test_gzip_and_send_s3(self):
"""
Tests that a gzip is made and sent to S3 and everything cleaned after
"""
# First create some dummy content to work with
output_path = '{0}/test_out/'.format(os.getcwd())
helper_extract_all(cluster=self.cluster, output_path=output_path)
with mock_s3():
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=self.s3_details['bucket'])
# Run the gzip and send
dashboard.push_to_s3(
input_directory=output_path,
s3_details=self.s3_details
)
# Check there is a gzip in the bucket
s3_object = s3_resource.Object(
self.s3_details['bucket'],
'dashboard.tar.gz'
)
keys = s3_object.get().keys()
self.assertTrue(
len(keys) > 0
)
# Clean up files
shutil.rmtree(output_path)
开发者ID:adsabs,项目名称:kibtools,代码行数:32,代码来源:test_dashboard.py
示例11: test_publish_cmd
def test_publish_cmd(self):
if not sys.version_info[:2] == (3, 4):
from moto import mock_s3
with mock_s3():
conn = boto.connect_s3()
bucket = conn.create_bucket(settings.AWS_BUCKET_NAME)
call_command("build")
call_command("publish", no_pooling=True, verbosity=3)
local_file_list = []
for (dirpath, dirnames, filenames) in os.walk(
settings.BUILD_DIR):
for fname in filenames:
local_key = os.path.join(
os.path.relpath(dirpath, settings.BUILD_DIR),
fname
)
if local_key.startswith('./'):
local_key = local_key[2:]
local_file_list.append(local_key)
for key in bucket.list():
self.assertIn(key.name, local_file_list)
call_command("unbuild")
os.makedirs(settings.BUILD_DIR)
call_command("publish", no_pooling=True, verbosity=3)
else:
self.skipTest("Moto doesn't work in Python 3.4")
开发者ID:lexieheinle,项目名称:django-bakery,代码行数:26,代码来源:__init__.py
示例12: mock_s3_resource
def mock_s3_resource(self):
mock = mock_s3()
mock.start()
yield mock
mock.stop()
开发者ID:drgarcia1986,项目名称:simple-settings,代码行数:7,代码来源:test_s3.py
示例13: setUp
def setUp(self):
self.mock = moto.mock_s3()
self.mock.start()
self.conn = boto.connect_s3()
self.conn.create_bucket(TEST_BUCKET_NAME)
pyramid = Pyramid(stride=8)
grid_image = os.path.join(DATA_DIRECTORY, 'grid_crop', 'grid.png')
metatile = MetaTile(MetaTileIndex(19, 453824, 212288, 8),
data=open(grid_image, 'rb').read(),
mimetype='image/png')
format = FormatBundle(MapType('image'), TileFormat('PNG'))
storage = S3MetaTileStorage(levels=pyramid.levels,
stride=pyramid.stride,
bucket=TEST_BUCKET_NAME,
prefix='testlayer',
format=format)
storage.put(metatile)
self.node = S3StorageNode('s3', maptype='image',
tileformat=dict(format='PNG'),
levels=pyramid.levels,
stride=pyramid.stride,
bucket=TEST_BUCKET_NAME,
prefix='testlayer')
self.expected = grid_image
开发者ID:Kotaimen,项目名称:stonemason,代码行数:27,代码来源:test_storage.py
示例14: test_cache_control
def test_cache_control(self):
s3 = boto3.resource('s3')
with mock_s3():
# Set random max-age for various content types
with self.settings(BAKERY_CACHE_CONTROL={
"application/javascript": random.randint(0, 100000),
"text/css": random.randint(0, 100000),
"text/html": random.randint(0, 100000),
}):
self._create_bucket()
call_command("build")
call_command("publish", verbosity=3)
for obj in self._get_bucket_objects():
s3_obj = s3.Object(
settings.AWS_BUCKET_NAME, obj.get('Key'))
if s3_obj.content_type in settings.BAKERY_CACHE_CONTROL:
# key.cache_control returns string
# with "max-age=" prefix
self.assertIn(
str(settings.BAKERY_CACHE_CONTROL.get(
s3_obj.content_type)),
s3_obj.cache_control
)
开发者ID:datadesk,项目名称:django-bakery,代码行数:25,代码来源:__init__.py
示例15: pre_test_setup
def pre_test_setup(self):
self.account_type = AccountType(name='AWS')
db.session.add(self.account_type)
db.session.commit()
for x in range(0, 9):
db.session.add(Account(name="account{}".format(x), account_type_id=self.account_type.id,
identifier="01234567891{}".format(x), active=True))
db.session.commit()
mock_sts().start()
mock_s3().start()
self.s3_client = boto3.client("s3")
self.s3_client.create_bucket(Bucket="testBucket")
开发者ID:DataDog,项目名称:security_monkey,代码行数:16,代码来源:test_s3_canonical.py
示例16: s3
def s3():
# writable local S3 system
m = moto.mock_s3()
m.start()
import boto3
client = boto3.client('s3')
client.create_bucket(Bucket=test_bucket_name, ACL='public-read')
for k in [a, b, c, d]:
try:
client.delete_object(Bucket=test_bucket_name, Key=k)
except:
pass
for flist in [files, csv_files, text_files]:
for f, data in flist.items():
client.put_object(Bucket=test_bucket_name, Key=f, Body=data)
yield S3FileSystem(anon=False)
for flist in [files, csv_files, text_files]:
for f, data in flist.items():
try:
client.delete_object(Bucket=test_bucket_name, Key=f, Body=data)
except:
pass
for k in [a, b, c, d]:
try:
client.delete_object(Bucket=test_bucket_name, Key=k)
except:
pass
m.stop()
开发者ID:mheilman,项目名称:s3fs,代码行数:28,代码来源:test_s3fs.py
示例17: setUp
def setUp(self):
mock_s3 = moto.mock_s3()
mock_s3.start()
self.addCleanup(mock_s3.stop)
self.s3 = boto.connect_s3()
self.s3.create_bucket('test_s3_bucket')
开发者ID:m3brown,项目名称:cfgov-refresh,代码行数:7,代码来源:test_s3utils.py
示例18: setUp
def setUp(self):
mock_s3 = moto.mock_s3()
mock_s3.start()
self.addCleanup(mock_s3.stop)
self.s3 = boto3.client('s3')
self.s3.create_bucket(Bucket='test_s3_bucket')
开发者ID:contolini,项目名称:cfgov-refresh,代码行数:7,代码来源:test_s3utils.py
示例19: mock_aws_services
def mock_aws_services():
mock = moto.mock_s3()
mock.start()
yield
mock.stop()
开发者ID:okomestudio,项目名称:pyu2,代码行数:7,代码来源:conftest.py
示例20: setUp
def setUp(self):
f = tempfile.NamedTemporaryFile(mode='wb', delete=False)
self.tempFilePath = f.name
f.write(b"I'm a temporary file for testing\n")
f.close()
self.mock_s3 = mock_s3()
self.mock_s3.start()
开发者ID:17zuoye,项目名称:luigi,代码行数:7,代码来源:s3_test.py
注:本文中的moto.mock_s3函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论