本文整理汇总了Python中sword2.Connection类的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, owner):
raise NotImplementedError
c = Connection(SD_URI, user_name = owner.username, user_pass=owner.password)
c.get_service_document()
# pick the first collection within the first workspace:
workspace_1_title, workspace_1_collections = c.workspaces[0]
collection = workspace_1_collections[0]
# upload "package.zip" to this collection as a new (binary) resource:
with open("package.zip", "r") as pkg:
receipt = c.create(col_iri = collection.href,
payload = pkg,
mimetype = "application/zip",
filename = "package.zip",
packaging = 'http://purl.org/net/sword/package/Binary',
in_progress = True) # As the deposit isn't yet finished
# Add a metadata record to this newly created resource (or 'container')
from sword2 import Entry
# Entry can be passed keyword parameters to add metadata to the entry (namespace + '_' + tagname)
e = Entry(id="atomid",
title="atom-title",
dcterms_abstract = "Info about the resource....",
)
# to add a new namespace:
e.register_namespace('skos', 'http://www.w3.org/2004/02/skos/core#')
e.add_field("skos_Concept", "...")
# Update the metadata entry to the resource:
updated_receipt = c.update(metadata_entry = e,
dr = receipt, # use the receipt to discover the right URI to use
in_progress = False) # finish the deposit
开发者ID:kwierman,项目名称:pyDryad,代码行数:35,代码来源:SWORD.py
示例2: test_15_Metadata_POST_to_sss_w_coliri
def test_15_Metadata_POST_to_sss_w_coliri(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
dr = conn.create(metadata_entry = e,
col_iri = conn.sd.workspaces[0][1][0].href,
in_progress=True)
assert dr.code == 201
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:7,代码来源:test_sss.py
示例3: preflight_submission
def preflight_submission(self):
opener = self.get_opener()
conn = Connection(self.sword2_sd_url, error_response_raises_exceptions=False, http_impl=UrlLib2Layer(opener))
logger.debug("Retrieving the service document")
conn.get_service_document()
logger.debug("Retrieved the service document")
self.assertIsNotNone(conn.sd)
self.assertIsNotNone(conn.sd.workspaces)
self.assertNotEqual(len(conn.sd.workspaces),0)
workspace = conn.sd.workspaces[0][1]
# we require there to be at least one collection
self.assertNotEqual(len(workspace),0)
col = workspace[0]
testid = "testid_"+str(uuid.uuid4())
logger.debug("col iri = " + str(col.href))
e = Entry(id=testid, title="test title", dcterms_abstract="test description")
print str(e)
receipt = conn.create(col_iri=col.href, metadata_entry=e, suggested_identifier=testid)
#col.href=http://192.168.2.237/swordv2/silo/test-silo
self.assertIsNotNone(receipt)
self.assertEquals(receipt.code,201)
return receipt.location
开发者ID:dataflow,项目名称:DataStage,代码行数:31,代码来源:TestDatasetSubmission.py
示例4: test_12_Metadata_POST_to_sss
def test_12_Metadata_POST_to_sss(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
resp = conn.create(metadata_entry = e,
workspace='Main Site',
collection=conn.sd.workspaces[0][1][0].title,
in_progress=True)
assert resp != None
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:8,代码来源:test_sss.py
示例5: test_04_init_from_sss_then_get_doc
def test_04_init_from_sss_then_get_doc(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword")
assert conn.sd_iri == "http://localhost:%s/sd-uri" % PORT_NUMBER
assert conn.sd == None # Not asked to get sd doc yet
conn.get_service_document()
assert conn.sd != None
assert conn.sd.parsed == True
assert conn.sd.valid == True
assert len(conn.sd.workspaces) == 1
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:9,代码来源:test_sss.py
示例6: test_03_init_then_load_from_string_t_history
def test_03_init_then_load_from_string_t_history(self):
conn = Connection("http://example.org/service-doc")
assert conn.sd_iri == "http://example.org/service-doc"
assert conn.sd == None
conn.load_service_document(long_service_doc)
# Should have made a two client 'transactions', the init and subsequent XML load
assert len(conn.history) == 2
assert conn.history[0]['type'] == "init"
assert conn.history[1]['type'] == "SD Parse"
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:9,代码来源:test_connection.py
示例7: testServiceDocumentAccess
def testServiceDocumentAccess(self):
opener = self.get_opener()
for i in range(10):
conn = Connection(self.sword2_sd_url, error_response_raises_exceptions=False, http_impl=UrlLib2Layer(opener))
conn.get_service_document()
self.assertIsNotNone(conn.sd, "Service document None (loop %d)"%(i))
self.assertIsNotNone(conn.sd.workspaces, "Service document workspace None (loop %d)"%(i))
self.assertNotEqual(len(conn.sd.workspaces),0, "Service document worksoacxe count %d (loop %d)"%(len(conn.sd.workspaces),i))
return
开发者ID:dataflow,项目名称:DataStage,代码行数:9,代码来源:TestServiceDocumentAccess.py
示例8: preflight_submission
def preflight_submission(self, dataset, opener, repository, silo ):
logger.debug("Carrying out pre-flight submission")
# verify that we can get a service document, and that there
# is at least one silo and that we can authenticate
if repository.sword2_sd_url is None:
raise SwordServiceError("No sword2 service-document URL for repository configuration")
# get the service document (for which we must be authenticated)
conn = Connection(repository.sword2_sd_url, error_response_raises_exceptions=False, http_impl=UrlLib2Layer(opener))
conn.get_service_document()
# we require there to be at least one workspace
if conn.sd is None:
raise SwordServiceError("did not successfully retrieve a service document")
if conn.sd.workspaces is None:
raise SwordServiceError("no workspaces defined in service document")
if len(conn.sd.workspaces) == 0:
raise SwordServiceError("no workspaces defined in service document")
workspace = conn.sd.workspaces[0][1]
# we require there to be at least one collection
if len(workspace) == 0:
raise SwordServiceError("no collections defined in workspace")
# FIXME: we don't currently have a mechanism to make decisions about
# which collection to put stuff in, so we just put stuff in the first
# one for the time being
col = workspace[0]
silohref = repository.homepage + "swordv2/silo/" + silo
# assemble the entry ready for deposit, using the basic metadata
# FIXME: is there anything further we need to do about the metadata here?
e = Entry(id=dataset.identifier, title=dataset.title, dcterms_abstract=dataset.description)
# create the item using the metadata-only approach (suppress errors along the way,
# we'll check for them below)
#receipt = conn.create(col_iri=col.href, metadata_entry=e, suggested_identifier=dataset.identifier)
logger.debug( "Deposit is being created" )
receipt = conn.create(col_iri=silohref, metadata_entry=e, suggested_identifier=dataset.identifier)
logger.debug( "Deposit created" )
# check for errors
if receipt.code >= 400:
# this is an error
logger.debug("Received error message from server: " + receipt.to_xml())
if receipt.error_href == "http://databank.ox.ac.uk/errors/DatasetConflict":
raise SwordSlugRejected()
raise SwordDepositError(receipt)
logger.debug("Deposit carried out to: " + receipt.location)
# return receipt.location
return (receipt.alternate,receipt.location)
开发者ID:dataflow,项目名称:DataStage,代码行数:56,代码来源:sword2depositor.py
示例9: test_34_check_metadata_only_state
def test_34_check_metadata_only_state(self):
conn = Connection(SSS_URL, user_name=SSS_UN, user_pass=SSS_PW)
conn.get_service_document()
col = conn.sd.workspaces[0][1][0]
e = Entry(title="An entry only deposit", id="asidjasidj", dcterms_abstract="abstract", dcterms_identifier="http://whatever/")
receipt = conn.create(col_iri = col.href, metadata_entry = e)
statement = conn.get_ore_sword_statement(receipt.ore_statement_iri)
assert len(statement.states) == 1
assert statement.states[0][0] == "http://databank.ox.ac.uk/state/EmptyContainer"
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:10,代码来源:test_databank.py
示例10: test_08_Simple_POST_to_sss_w_coliri
def test_08_Simple_POST_to_sss_w_coliri(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
resp = conn.create(payload = "Payload is just a load of text",
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
col_iri = conn.sd.workspaces[0][1][0].href,
in_progress=True,
metadata_entry=None)
assert resp.code == 201
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:11,代码来源:test_sss.py
示例11: test_06_Simple_POST_to_sss
def test_06_Simple_POST_to_sss(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
resp = conn.create(payload = "Payload is just a load of text",
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
workspace = 'Main Site',
collection = conn.sd.workspaces[0][1][0].title,
in_progress=True,
metadata_entry=None)
assert resp.code == 201
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:11,代码来源:test_sss.py
示例12: test_02_get_service_document_on_behalf_of
def test_02_get_service_document_on_behalf_of(self):
conn = Connection(SSS_URL, user_name=SSS_UN, user_pass=SSS_PW, on_behalf_of=SSS_OBO)
conn.get_service_document()
# given that the client is fully functional, testing that the
# service document parses and is valid is sufficient. This, obviously,
# doesn't test the validation routine itself.
assert conn.sd != None
assert conn.sd.parsed == True
assert conn.sd.valid == True
assert len(conn.sd.workspaces) == 1
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:11,代码来源:test_pylons.py
示例13: test_02_init_then_load_from_string
def test_02_init_then_load_from_string(self):
conn = Connection("http://example.org/service-doc")
assert conn.sd_iri == "http://example.org/service-doc"
assert conn.sd == None
conn.load_service_document(long_service_doc)
assert conn.sd != None
assert len(conn.sd.workspaces) == 2
assert len(conn.workspaces) == 2
assert conn.sd.workspaces[0][0] == "Main Site"
assert conn.sd.workspaces[1][0] == "Sub-site"
assert len(conn.sd.workspaces[1][1]) == 2
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:11,代码来源:test_connection.py
示例14: test_07_Multipart_POST_to_sss
def test_07_Multipart_POST_to_sss(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
resp = conn.create(payload = "Multipart payload here",
metadata_entry = e,
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
workspace='Main Site',
collection=conn.sd.workspaces[0][1][0].title,
in_progress=True)
assert resp.code == 201
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:12,代码来源:test_sss.py
示例15: test_22_Create_deposit_and_delete_deposit
def test_22_Create_deposit_and_delete_deposit(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI",
metadata_entry = e,
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
col_iri = conn.sd.workspaces[0][1][0].href,
in_progress=True)
assert deposit_receipt.edit != None
dr = conn.delete(resource_iri = deposit_receipt.edit)
assert dr.code == 204 or dr.code == 200
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:13,代码来源:test_sss.py
示例16: complete_submission
def complete_submission(self,edit_uri):
opener = self.get_opener()
conn = Connection(self.sword2_sd_url, error_response_raises_exceptions=False, http_impl=UrlLib2Layer(opener))
receipt = None
try:
receipt = conn.get_deposit_receipt(edit_uri)
except urllib2.URLError as e:
# The sword2 client does not catch network errors like this one,
# which indicates that the url couldn't be reached at all
# don't do anything about it here - we'll try again in a moment
# and then error out appropriately later
pass
# at this stage we need to ensure that we actually got back a deposit
# receipt
i = 0
while (receipt is None or receipt.code >= 400) and i < self.retry_limit:
err = None
if receipt is None:
err = "<unable to reach server>"
else:
err = str(receipt.code)
logger.debug("Attempt to retrieve Entry Document failed with error " + str(err) + " ... trying again in " + str(self.retry_delay) + " seconds")
i += 1
time.sleep(self.retry_delay)
try:
receipt = conn.get_deposit_receipt(edit_uri)
except urllib2.URLError as e:
# The sword2 client does not catch network errors like this one,
# which indicates that the url couldn't be reached at all
# just try again up to the retry_limit
continue
self.assertIsNotNone(receipt)
self.assertEquals(receipt.code,200)
# if we get to here we can go ahead with the deposit for real
with open( self.zipFileName, "rb") as data:
new_receipt = conn.update(dr = receipt,
payload=data,
mimetype="application/zip",
filename=self.dataset_identifier + ".zip",
packaging='http://dataflow.ox.ac.uk/package/DataBankBagIt')
self.assertIsNotNone(new_receipt)
self.assertEquals(new_receipt.code,204)
return
开发者ID:dataflow,项目名称:DataStage,代码行数:51,代码来源:TestDatasetSubmission.py
示例17: test_23_Finish_in_progress_deposit
def test_23_Finish_in_progress_deposit(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
e = Entry(title="Foo", id="asidjasidj", dcterms_appendix="blah blah", dcterms_title="foo bar")
deposit_receipt = conn.create(payload = "Multipart_POST_then_update_on_EM_IRI",
metadata_entry = e,
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
col_iri = conn.sd.workspaces[0][1][0].href,
in_progress=True)
assert deposit_receipt.edit != None
dr = conn.complete_deposit(se_iri = deposit_receipt.se_iri)
print "This will fail until the sss.py SWORD2 server responds properly, rather than with code 201"
assert dr.code == 200
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:14,代码来源:test_sss.py
示例18: test_09_basic_retrieve_deposit_receipt
def test_09_basic_retrieve_deposit_receipt(self):
conn = Connection(SSS_URL, user_name=SSS_UN, user_pass=SSS_PW)
conn.get_service_document()
col = conn.sd.workspaces[0][1][0]
e = Entry(title="An entry only deposit", id="asidjasidj", dcterms_abstract="abstract", dcterms_identifier="http://whatever/")
receipt = conn.create(col_iri = col.href, metadata_entry = e)
# we're going to work with the location
assert receipt.location != None
new_receipt = conn.get_deposit_receipt(receipt.location)
assert new_receipt.code == 200
assert new_receipt.parsed == True
assert new_receipt.valid == True
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:15,代码来源:test_databank.py
示例19: test_17_Simple_POST_and_GET
def test_17_Simple_POST_and_GET(self):
conn = Connection("http://localhost:%s/sd-uri" % PORT_NUMBER, user_name="sword", user_pass="sword", download_service_document=True)
col_iri = conn.sd.workspaces[0][1][0].href # pick the first collection
dr = conn.create(payload = "Simple_POST_and_GET",
mimetype = "text/plain",
filename = "readme.txt",
packaging = 'http://purl.org/net/sword/package/Binary',
col_iri = col_iri,
in_progress=True,
metadata_entry=None)
assert dr.code == 201
# Now to GET that resource with no prescribed for packaging
content_object = conn.get_resource(dr.cont_iri)
# Can't guarantee that sss.py won't mangle submissions, so can't validate response at this moment
assert content_object != None
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:15,代码来源:test_sss.py
示例20: test_07_basic_create_resource_with_entry
def test_07_basic_create_resource_with_entry(self):
conn = Connection(SSS_URL, user_name=SSS_UN, user_pass=SSS_PW)
conn.get_service_document()
col = conn.sd.workspaces[0][1][0]
e = Entry(title="An entry only deposit", id="asidjasidj", dcterms_abstract="abstract", dcterms_identifier="http://whatever/")
receipt = conn.create(col_iri = col.href,
metadata_entry = e)
assert receipt.code == 201
assert receipt.location != None
# these last two assertions are contingent on if we actually get a
# receipt back from the server (which we might not legitimately get)
assert receipt.dom is None or receipt.parsed == True
assert receipt.dom is None or receipt.valid == True
开发者ID:Hwesta,项目名称:python-client-sword2,代码行数:15,代码来源:test_pylons.py
注:本文中的sword2.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论