本文整理汇总了Python中swift.common.utils.normalize_timestamp函数的典型用法代码示例。如果您正苦于以下问题:Python normalize_timestamp函数的具体用法?Python normalize_timestamp怎么用?Python normalize_timestamp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了normalize_timestamp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_unicode
def test_unicode(self):
cu = container_updater.ContainerUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15',
})
containers_dir = os.path.join(self.sda1, container_server.DATADIR)
os.mkdir(containers_dir)
subdir = os.path.join(containers_dir, 'subdir')
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
container='\xce\xa9')
cb.initialize(normalize_timestamp(1))
cb.put_object('\xce\xa9', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')
def accept(sock, addr):
try:
with Timeout(3):
inc = sock.makefile('rb')
out = sock.makefile('wb')
out.write('HTTP/1.1 201 OK\r\nContent-Length: 0\r\n\r\n')
out.flush()
inc.read()
except BaseException, err:
import traceback
traceback.print_exc()
return err
return None
开发者ID:Awingu,项目名称:swift,代码行数:31,代码来源:test_updater.py
示例2: test_PUT_overwrite
def test_PUT_overwrite(self):
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '6',
'Content-Type': 'application/octet-stream'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip'})
req.body = 'VERIFY TWO'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY TWO')
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '10',
'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
'Content-Type': 'text/plain',
'name': '/a/c/o',
'Content-Encoding': 'gzip'})
开发者ID:edwardt,项目名称:swift,代码行数:31,代码来源:test_server.py
示例3: test_initialize
def test_initialize(self):
self.assertRaises(AttributeError,
DatabaseBroker(':memory:').initialize,
normalize_timestamp('1'))
stub_dict = {}
def stub(*args, **kwargs):
for key in stub_dict.keys():
del stub_dict[key]
stub_dict['args'] = args
for key, value in kwargs.items():
stub_dict[key] = value
broker = DatabaseBroker(':memory:')
broker._initialize = stub
broker.initialize(normalize_timestamp('1'))
self.assert_(hasattr(stub_dict['args'][0], 'execute'))
self.assertEquals(stub_dict['args'][1], '0000000001.00000')
with broker.get() as conn:
conn.execute('SELECT * FROM outgoing_sync')
conn.execute('SELECT * FROM incoming_sync')
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
broker._initialize = stub
broker.initialize(normalize_timestamp('1'))
self.assert_(hasattr(stub_dict['args'][0], 'execute'))
self.assertEquals(stub_dict['args'][1], '0000000001.00000')
with broker.get() as conn:
conn.execute('SELECT * FROM outgoing_sync')
conn.execute('SELECT * FROM incoming_sync')
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
broker._initialize = stub
self.assertRaises(DatabaseAlreadyExists,
broker.initialize, normalize_timestamp('1'))
开发者ID:BlueSkyChina,项目名称:swift,代码行数:32,代码来源:test_db.py
示例4: test_POST_update_meta
def test_POST_update_meta(self):
""" Test swift.object_server.ObjectController.POST """
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'application/x-test',
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
'X-Object-Meta-3': 'Three',
'X-Object-Meta-4': 'Four',
'Content-Type': 'application/x-test'})
resp = self.object_controller.POST(req)
self.assertEquals(resp.status_int, 202)
req = Request.blank('/sda1/p/a/c/o')
resp = self.object_controller.GET(req)
self.assert_("X-Object-Meta-1" not in resp.headers and \
"X-Object-Meta-3" in resp.headers)
self.assertEquals(resp.headers['Content-Type'], 'application/x-test')
开发者ID:edwardt,项目名称:swift,代码行数:27,代码来源:test_server.py
示例5: test_max_upload_time
def test_max_upload_time(self):
class SlowBody():
def __init__(self):
self.sent = 0
def read(self, size=-1):
if self.sent < 4:
sleep(0.1)
self.sent += 1
return ' '
return ''
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '4', 'Content-Type': 'text/plain'})
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
self.object_controller.max_upload_time = 0.1
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': SlowBody()},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '4', 'Content-Type': 'text/plain'})
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 408)
开发者ID:edwardt,项目名称:swift,代码行数:27,代码来源:test_server.py
示例6: _create_object_metadata
def _create_object_metadata(self, file_path):
if self._etag is None:
self._etag = md5().hexdigest() if self._is_dir \
else get_etag(file_path)
if self._file_has_changed or (X_TIMESTAMP not in self._metadata):
timestamp = normalize_timestamp(self._stat.st_mtime)
else:
timestamp = self._metadata[X_TIMESTAMP]
metadata = {
X_TYPE: OBJECT,
X_TIMESTAMP: timestamp,
X_CONTENT_TYPE: DIR_TYPE if self._is_dir else FILE_TYPE,
X_OBJECT_TYPE: DIR_NON_OBJECT if self._is_dir else FILE,
X_CONTENT_LENGTH: 0 if self._is_dir else self._stat.st_size,
X_ETAG: self._etag}
# Add X_MTIME key if object is a file
if not self._is_dir:
metadata[X_MTIME] = normalize_timestamp(self._stat.hpss_st_mtime)
meta_new = self._metadata.copy()
meta_new.update(metadata)
if self._metadata != meta_new:
write_metadata(file_path, meta_new)
# Avoid additional read_metadata() later
self._metadata = meta_new
开发者ID:openstack,项目名称:swiftonhpss,代码行数:28,代码来源:diskfile.py
示例7: test_quarantine_same_file
def test_quarantine_same_file(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
new_dir = df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname(
df.data_file)))
self.assert_(os.path.isdir(quar_dir))
self.assertEquals(quar_dir, new_dir)
# have to remake the datadir and file
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path))
开发者ID:Awingu,项目名称:swift,代码行数:28,代码来源:test_diskfile.py
示例8: test_DELETE_now_empty
def test_DELETE_now_empty(self):
req = Request.blank("/sda1/p/a", environ={"REQUEST_METHOD": "PUT", "HTTP_X_TIMESTAMP": "0"})
self.controller.PUT(req)
req = Request.blank(
"/sda1/p/a/c1",
environ={"REQUEST_METHOD": "PUT"},
headers={
"X-Put-Timestamp": "1",
"X-Delete-Timestamp": "0",
"X-Object-Count": "0",
"X-Bytes-Used": "0",
"X-Timestamp": normalize_timestamp(0),
},
)
self.controller.PUT(req)
req = Request.blank(
"/sda1/p/a/c1",
environ={"REQUEST_METHOD": "PUT"},
headers={
"X-Put-Timestamp": "1",
"X-Delete-Timestamp": "2",
"X-Object-Count": "0",
"X-Bytes-Used": "0",
"X-Timestamp": normalize_timestamp(0),
},
)
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 204)
req = Request.blank("/sda1/p/a", environ={"REQUEST_METHOD": "DELETE", "HTTP_X_TIMESTAMP": "1"})
resp = self.controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
开发者ID:H3C,项目名称:swift,代码行数:31,代码来源:test_server.py
示例9: test_reap_delay
def test_reap_delay(self):
time_value = [100]
def _time():
return time_value[0]
time_orig = reaper.time
try:
reaper.time = _time
r = reaper.AccountReaper({'delay_reaping': '10'})
b = FakeBroker()
b.info['delete_timestamp'] = normalize_timestamp(110)
self.assertFalse(r.reap_account(b, 0, None))
b.info['delete_timestamp'] = normalize_timestamp(100)
self.assertFalse(r.reap_account(b, 0, None))
b.info['delete_timestamp'] = normalize_timestamp(90)
self.assertFalse(r.reap_account(b, 0, None))
# KeyError raised immediately as reap_account tries to get the
# account's name to do the reaping.
b.info['delete_timestamp'] = normalize_timestamp(89)
self.assertRaises(KeyError, r.reap_account, b, 0, None)
b.info['delete_timestamp'] = normalize_timestamp(1)
self.assertRaises(KeyError, r.reap_account, b, 0, None)
finally:
reaper.time = time_orig
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:25,代码来源:test_reaper.py
示例10: test_delete_db
def test_delete_db(self):
def init_stub(conn, put_timestamp):
conn.execute('CREATE TABLE test (one TEXT)')
conn.execute('CREATE TABLE test_stat (id TEXT)')
conn.execute('INSERT INTO test_stat (id) VALUES (?)',
(str(uuid4),))
conn.execute('INSERT INTO test (one) VALUES ("1")')
conn.commit()
stub_called = [False]
def delete_stub(*a, **kw):
stub_called[0] = True
broker = DatabaseBroker(':memory:')
broker.db_type = 'test'
broker._initialize = init_stub
# Initializes a good broker for us
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.conn is not None)
broker._delete_db = delete_stub
stub_called[0] = False
broker.delete_db('2')
self.assert_(stub_called[0])
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
broker.db_type = 'test'
broker._initialize = init_stub
broker.initialize(normalize_timestamp('1'))
broker._delete_db = delete_stub
stub_called[0] = False
broker.delete_db('2')
self.assert_(stub_called[0])
# ensure that metadata was cleared
m2 = broker.metadata
self.assert_(not any(v[0] for v in m2.itervalues()))
self.assert_(all(v[1] == normalize_timestamp('2')
for v in m2.itervalues()))
开发者ID:JioCloud,项目名称:swift,代码行数:35,代码来源:test_db.py
示例11: get_object_metadata
def get_object_metadata(obj_path):
"""
Return metadata of object.
"""
metadata = {}
if os.path.exists(obj_path):
if not os.path.isdir(obj_path):
metadata = {
X_TIMESTAMP: normalize_timestamp(os.path.getctime(obj_path)),
X_CONTENT_TYPE: FILE_TYPE,
X_ETAG: get_etag(obj_path),
X_CONTENT_LENGTH: os.path.getsize(obj_path),
X_TYPE: OBJECT,
X_OBJECT_TYPE: FILE,
}
else:
metadata = {
X_TIMESTAMP: normalize_timestamp(os.path.getctime(obj_path)),
X_CONTENT_TYPE: DIR_TYPE,
X_ETAG: get_etag(obj_path),
X_CONTENT_LENGTH: 0,
X_TYPE: OBJECT,
X_OBJECT_TYPE: DIR,
}
return metadata
开发者ID:Gaurav-Gangalwar,项目名称:swift_plugin,代码行数:26,代码来源:utils.py
示例12: test_collect_object
def test_collect_object(self):
t = 42
t = normalize_timestamp(t)
data = 'ContentHere'
etag = md5()
etag.update(data)
testdir = self._create_test_file(
data,
timestamp = t,
account='TEST_Acc', container='TEST_Con', obj='TEST_Obj',
metadata={
'X-Timestamp':t,
'ETag' : etag.hexdigest(),
'Content-Length' : str(len(data)),
'delete_timestamp' : normalize_timestamp(0),
'Content-Type' : "text/plain",
'Content-Encoding' : 'gzip',
'Content-Disposition' : 'action',
'Content-Langauge':'en'
})._datadir
location = diskfile.AuditLocation(testdir, 'sda1', '0')
metaDict = self.crawler.collect_object(location)
self.assertEquals(metaDict['name'],'/TEST_Acc/TEST_Con/TEST_Obj')
self.assertEquals(metaDict['X-Timestamp'],t)
self.assertEquals(metaDict['ETag'],etag.hexdigest())
self.assertEquals(metaDict['Content-Length'],str(len(data)))
self.assertEquals(metaDict['Content-Type'],'text/plain')
self.assertEquals(metaDict['Content-Encoding'],'gzip')
self.assertEquals(metaDict['Content-Disposition'],'action')
self.assertEquals(metaDict['Content-Langauge'],'en')
开发者ID:ucsc-hp-group,项目名称:swift,代码行数:30,代码来源:test_crawler.py
示例13: get_info
def get_info(self):
name = os.path.basename(self.datadir)
st = os.stat(self.datadir)
if self._type == 2:
cont_cnt_str = xattr.getxattr(self.datadir, CONTCNT_KEY)
try:
container_count = int(cont_cnt_str)
except ValueError:
cont_cnt_str = "0"
# XXX object_count, bytes_used
return {'account': name,
'created_at': normalize_timestamp(st.st_ctime),
'put_timestamp': normalize_timestamp(st.st_mtime),
'delete_timestamp': '0',
'container_count': cont_cnt_str,
'object_count': '0',
'bytes_used': '0',
'hash': '-',
'id': ''}
else:
# XXX container info has something different in it, what is it?
return {'container': name,
'created_at': normalize_timestamp(st.st_ctime),
'put_timestamp': normalize_timestamp(st.st_mtime),
'delete_timestamp': '0',
'object_count': '0',
'bytes_used': '0',
'hash': '-',
'id': ''}
开发者ID:kururu-lu,项目名称:swift-lfs,代码行数:29,代码来源:lfs_posix.py
示例14: test_hash_cleanup_listdir
def test_hash_cleanup_listdir(self):
file_list = []
def mock_listdir(path):
return list(file_list)
def mock_unlink(path):
file_list.remove(os.path.basename(path))
with unit_mock({"os.listdir": mock_listdir, "os.unlink": mock_unlink}):
# purge .data if there's a newer .ts
file1 = normalize_timestamp(time()) + ".data"
file2 = normalize_timestamp(time() + 1) + ".ts"
file_list = [file1, file2]
self.assertEquals(diskfile.hash_cleanup_listdir("/whatever"), [file2])
# purge .ts if there's a newer .data
file1 = normalize_timestamp(time()) + ".ts"
file2 = normalize_timestamp(time() + 1) + ".data"
file_list = [file1, file2]
self.assertEquals(diskfile.hash_cleanup_listdir("/whatever"), [file2])
# keep .meta and .data if meta newer than data
file1 = normalize_timestamp(time()) + ".ts"
file2 = normalize_timestamp(time() + 1) + ".data"
file3 = normalize_timestamp(time() + 2) + ".meta"
file_list = [file1, file2, file3]
self.assertEquals(diskfile.hash_cleanup_listdir("/whatever"), [file3, file2])
# keep only latest of multiple .ts files
file1 = normalize_timestamp(time()) + ".ts"
file2 = normalize_timestamp(time() + 1) + ".ts"
file3 = normalize_timestamp(time() + 2) + ".ts"
file_list = [file1, file2, file3]
self.assertEquals(diskfile.hash_cleanup_listdir("/whatever"), [file3])
开发者ID:zhouyuan,项目名称:swift,代码行数:35,代码来源:test_diskfile.py
示例15: PUT
def PUT(self, req):
"""Handle HTTP PUT request."""
drive, part, account, container = split_and_validate_path(req, 3, 4)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
if container: # put account container
pending_timeout = None
container_policy_index = req.headers.get(POLICY_INDEX, 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
broker = self._get_account_broker(drive, part, account,
pending_timeout=pending_timeout)
if account.startswith(self.auto_create_account_prefix) and \
not os.path.exists(broker.db_file):
try:
broker.initialize(normalize_timestamp(
req.headers.get('x-timestamp') or time.time()))
except DatabaseAlreadyExists:
pass
if req.headers.get('x-account-override-deleted', 'no').lower() != \
'yes' and broker.is_deleted():
return HTTPNotFound(request=req)
broker.put_container(container, req.headers['x-put-timestamp'],
req.headers['x-delete-timestamp'],
req.headers['x-object-count'],
req.headers['x-bytes-used'],
container_policy_index)
if req.headers['x-delete-timestamp'] > \
req.headers['x-put-timestamp']:
return HTTPNoContent(request=req)
else:
return HTTPCreated(request=req)
else: # put account
broker = self._get_account_broker(drive, part, account)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp)
created = True
except DatabaseAlreadyExists:
created = False
elif broker.is_status_deleted():
return self._deleted_response(broker, req, HTTPForbidden,
body='Recently deleted')
else:
created = broker.is_deleted()
broker.update_put_timestamp(timestamp)
if broker.is_deleted():
return HTTPConflict(request=req)
metadata = {}
metadata.update((key, (value, timestamp))
for key, value in req.headers.iteritems()
if is_sys_or_user_meta('account', key))
if metadata:
broker.update_metadata(metadata)
if created:
return HTTPCreated(request=req)
else:
return HTTPAccepted(request=req)
开发者ID:sagarjha,项目名称:swift,代码行数:59,代码来源:server.py
示例16: test_unicode
def test_unicode(self):
cu = container_updater.ContainerUpdater(
{
"devices": self.devices_dir,
"mount_check": "false",
"swift_dir": self.testdir,
"interval": "1",
"concurrency": "1",
"node_timeout": "15",
}
)
containers_dir = os.path.join(self.sda1, DATADIR)
os.mkdir(containers_dir)
subdir = os.path.join(containers_dir, "subdir")
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, "hash.db"), account="a", container="\xce\xa9")
cb.initialize(normalize_timestamp(1), 0)
cb.put_object("\xce\xa9", normalize_timestamp(2), 3, "text/plain", "68b329da9893e34099c7d8ad5cb9c940")
def accept(sock, addr):
try:
with Timeout(3):
inc = sock.makefile("rb")
out = sock.makefile("wb")
out.write("HTTP/1.1 201 OK\r\nContent-Length: 0\r\n\r\n")
out.flush()
inc.read()
except BaseException as err:
import traceback
traceback.print_exc()
return err
return None
bindsock = listen(("127.0.0.1", 0))
def spawn_accepts():
events = []
for _junk in range(2):
with Timeout(3):
sock, addr = bindsock.accept()
events.append(spawn(accept, sock, addr))
return events
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev["port"] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info["object_count"], 1)
self.assertEqual(info["bytes_used"], 3)
self.assertEqual(info["reported_object_count"], 1)
self.assertEqual(info["reported_bytes_used"], 3)
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:58,代码来源:test_updater.py
示例17: test_unicode
def test_unicode(self):
cu = container_updater.ContainerUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '15',
})
containers_dir = os.path.join(self.sda1, DATADIR)
os.mkdir(containers_dir)
subdir = os.path.join(containers_dir, 'subdir')
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
container='\xce\xa9')
cb.initialize(normalize_timestamp(1), 0)
cb.put_object('\xce\xa9', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')
def accept(sock, addr):
try:
with Timeout(3):
inc = sock.makefile('rb')
out = sock.makefile('wb')
out.write('HTTP/1.1 201 OK\r\nContent-Length: 0\r\n\r\n')
out.flush()
inc.read()
except BaseException as err:
import traceback
traceback.print_exc()
return err
return None
bindsock = listen(('127.0.0.1', 0))
def spawn_accepts():
events = []
for _junk in range(2):
with Timeout(3):
sock, addr = bindsock.accept()
events.append(spawn(accept, sock, addr))
return events
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEquals(info['object_count'], 1)
self.assertEquals(info['bytes_used'], 3)
self.assertEquals(info['reported_object_count'], 1)
self.assertEquals(info['reported_bytes_used'], 3)
开发者ID:bigdig,项目名称:swift,代码行数:57,代码来源:test_updater.py
示例18: test_PUT_not_found
def test_PUT_not_found(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-PUT-Timestamp': normalize_timestamp(1),
'X-DELETE-Timestamp': normalize_timestamp(0),
'X-Object-Count': '1',
'X-Bytes-Used': '1',
'X-Timestamp': normalize_timestamp(0)})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 404)
开发者ID:colecrawford,项目名称:swift,代码行数:9,代码来源:test_server.py
示例19: test_DELETE
def test_DELETE(self):
""" Test swift.object_server.ObjectController.DELETE """
req = Request.blank('/sda1/p/a/c',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
# self.assertRaises(KeyError, self.object_controller.DELETE, req)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 404)
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
'X-Timestamp': timestamp,
'Content-Type': 'application/octet-stream',
'Content-Length': '4',
})
req.body = 'test'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(float(timestamp) - 1)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.ts')
self.assert_(os.path.isfile(objfile))
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.ts')
self.assert_(os.path.isfile(objfile))
开发者ID:edwardt,项目名称:swift,代码行数:56,代码来源:test_server.py
示例20: test_empty
def test_empty(self):
# Test AccountBroker.empty
broker = AccountBroker(':memory:', account='a')
broker.initialize(normalize_timestamp('1'))
self.assert_(broker.empty())
broker.put_container('o', normalize_timestamp(time()), 0, 0, 0)
self.assert_(not broker.empty())
sleep(.00001)
broker.put_container('o', 0, normalize_timestamp(time()), 0, 0)
self.assert_(broker.empty())
开发者ID:10389030,项目名称:swift,代码行数:10,代码来源:test_backend.py
注:本文中的swift.common.utils.normalize_timestamp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论