• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.make_bogus_data_file函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中synapseclient.utils.make_bogus_data_file函数的典型用法代码示例。如果您正苦于以下问题:Python make_bogus_data_file函数的具体用法?Python make_bogus_data_file怎么用?Python make_bogus_data_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了make_bogus_data_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_Entity

def test_Entity():
    # Test CRUD on Entity objects, Project, Folder, File with createEntity/getEntity/updateEntity
    project_name = str(uuid.uuid4())
    project = Project(project_name, description='Bogus testing project')
    project = syn.createEntity(project)
    schedule_for_cleanup(project)

    folder = Folder('Test Folder', parent=project, description='A place to put my junk', foo=1000)
    folder = syn.createEntity(folder)

    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)
    a_file = File(path, parent=folder, description='Random data for testing', foo='An arbitrary value', bar=[33,44,55], bday=Datetime(2013,3,15))
    a_file = syn._createFileEntity(a_file)

    ## local object state should be preserved
    assert a_file.path == path

    ## check the project entity
    project = syn.getEntity(project)
    assert project.name == project_name

    ## check the folder entity
    folder = syn.getEntity(folder.id)
    assert folder.name == 'Test Folder'
    assert folder.parentId == project.id
    assert folder.foo[0] == 1000

    ## check the file entity
    a_file = syn.getEntity(a_file)
    assert a_file['foo'][0] == 'An arbitrary value'
    assert a_file['bar'] == [33,44,55]
    assert a_file['bday'][0] == Datetime(2013,3,15)

    ## make sure file comes back intact
    a_file = syn.downloadEntity(a_file)
    assert filecmp.cmp(path, a_file.path)

    #TODO We're forgotten the local file path
    a_file.path = path

    ## update the file entity
    a_file['foo'] = 'Another arbitrary chunk of text data'
    a_file['new_key'] = 'A newly created value'
    a_file = syn.updateEntity(a_file)
    assert a_file['foo'][0] == 'Another arbitrary chunk of text data'
    assert a_file['bar'] == [33,44,55]
    assert a_file['bday'][0] == Datetime(2013,3,15)
    assert a_file.new_key[0] == 'A newly created value'
    assert a_file.path == path

    ## upload a new file
    new_path = utils.make_bogus_data_file()
    schedule_for_cleanup(new_path)

    a_file = syn.uploadFile(a_file, new_path)

    ## make sure file comes back intact
    a_file = syn.downloadEntity(a_file)
    assert filecmp.cmp(new_path, a_file.path)
开发者ID:xschildw,项目名称:synapsePythonClient,代码行数:60,代码来源:integration_test_Entity.py


示例2: test_syncFromSynapse

def test_syncFromSynapse():
    """This function tests recursive download as defined in syncFromSynapse
    most of the functionality of this function are already tested in the 
    tests/integration/test_command_line_client::test_command_get_recursive_and_query

    which means that the only test if for path=None
    """
    # Create a Project
    project_entity = syn.store(synapseclient.Project(name=str(uuid.uuid4())))
    schedule_for_cleanup(project_entity.id)

    # Create a Folder in Project
    folder_entity = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))

    # Create and upload two files in Folder
    uploaded_paths = []
    for i in range(2):
        f  = utils.make_bogus_data_file()
        uploaded_paths.append(f)
        schedule_for_cleanup(f)
        file_entity = syn.store(File(f, parent=folder_entity))
    #Add a file in the project level as well
    f  = utils.make_bogus_data_file()
    uploaded_paths.append(f)
    schedule_for_cleanup(f)
    file_entity = syn.store(File(f, parent=project_entity))

    ### Test recursive get
    output = synapseutils.syncFromSynapse(syn, project_entity)

    assert len(output) == len(uploaded_paths)
    for f in output:
        print(f.path)
        assert f.path in uploaded_paths
开发者ID:kkdang,项目名称:synapsePythonClient,代码行数:34,代码来源:test_synapseutils.py


示例3: test_uploadFileEntity

def test_uploadFileEntity():
    # Create a FileEntity
    # Dictionaries default to FileEntity as a type
    fname = utils.make_bogus_data_file()
    schedule_for_cleanup(fname)
    entity = {'name'        : 'fooUploadFileEntity', \
              'description' : 'A test file entity', \
              'parentId'    : project['id']}
    entity = syn.uploadFile(entity, fname)

    # Download and verify
    entity = syn.downloadEntity(entity)
    assert entity['files'][0] == os.path.basename(fname)
    assert filecmp.cmp(fname, entity['path'])

    # Check if we upload the wrong type of file handle
    fh = syn.restGET('/entity/%s/filehandles' % entity.id)['list'][0]
    assert fh['concreteType'] == 'org.sagebionetworks.repo.model.file.S3FileHandle'

    # Create a different temporary file
    fname = utils.make_bogus_data_file()
    schedule_for_cleanup(fname)

    # Update existing FileEntity
    entity = syn.uploadFile(entity, fname)

    # Download and verify that it is the same file
    entity = syn.downloadEntity(entity)
    assert entity['files'][0] == os.path.basename(fname)
    assert filecmp.cmp(fname, entity['path'])
开发者ID:apratap,项目名称:synapsePythonClient,代码行数:30,代码来源:integration_test.py


示例4: test_get_and_store_by_name_and_parent_id

def test_get_and_store_by_name_and_parent_id():
    project = create_project()

    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)

    f = File(path, name='Foobarbat', parent=project)
    f2 = syn.store(f)
    f = syn.get(f)

    assert f.id == f2.id
    assert f.name == f2.name
    assert f.parentId == f2.parentId

    ## new file
    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)

    ## should create a new version of the previous File entity
    f3 = File(path, name='Foobarbat', parent=project, description='banana', junk=1234)
    f3 = syn.store(f3)

    ## should be an update of the existing entity with the same name and parent
    assert f3.id == f.id
    assert f3.description == 'banana'
    assert f3.junk == [1234]
    assert filecmp.cmp(path, f3.path)
开发者ID:xschildw,项目名称:synapsePythonClient,代码行数:27,代码来源:integration_test_Entity.py


示例5: test_uploadFileEntity

def test_uploadFileEntity():
    projectEntity = create_project()

    # Defaults to FileEntity
    fname = utils.make_bogus_data_file()
    entity = {'name'        : 'foo', \
              'description' : 'A test file entity', \
              'parentId'    : projectEntity['id']}

    # Create new FileEntity
    entity = syn.uploadFile(entity, fname)

    # Download and verify
    entity = syn.downloadEntity(entity)
    assert entity['files'][0] == os.path.basename(fname)
    assert filecmp.cmp(fname, entity['path'])

    # Check if we upload the wrong type of file handle
    fh = syn.restGET('/entity/%s/filehandles' % entity.id)['list'][0]
    assert fh['concreteType'] == 'org.sagebionetworks.repo.model.file.S3FileHandle'
    os.remove(fname)

    # Create a different temporary file
    fname = utils.make_bogus_data_file()

    # Update existing FileEntity
    entity = syn.uploadFile(entity, fname)

    # Download and verify that it is the same filename
    entity = syn.downloadEntity(entity)
    print entity['files'][0]
    print os.path.basename(fname)
    assert entity['files'][0] == os.path.basename(fname)
    assert filecmp.cmp(fname, entity['path'])
    os.remove(fname)
开发者ID:xschildw,项目名称:synapsePythonClient,代码行数:35,代码来源:integration_test.py


示例6: test_wikiAttachment

def test_wikiAttachment():
    # Upload a file to be attached to a Wiki
    filename = utils.make_bogus_data_file()
    attachname = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    schedule_for_cleanup(attachname)
    fileHandle = syn._uploadFileToFileHandleService(filename)

    # Create and store a Wiki 
    # The constructor should accept both file handles and file paths
    md = """
    This is a test wiki
    =======================

    Blabber jabber blah blah boo.
    """
    wiki = Wiki(owner=project, title='A Test Wiki', markdown=md, 
                fileHandles=[fileHandle['id']], 
                attachments=[attachname])
    wiki = syn.store(wiki)
    
    # Create a Wiki sub-page
    subwiki = Wiki(owner=project, title='A sub-wiki', 
                   markdown='nothing', parentWikiId=wiki.id)
    subwiki = syn.store(subwiki)
    
    # Retrieve the root Wiki from Synapse
    wiki2 = syn.getWiki(project)
    assert wiki == wiki2

    # Retrieve the sub Wiki from Synapse
    wiki2 = syn.getWiki(project, subpageId=subwiki.id)
    assert subwiki == wiki2

    # Try making an update
    wiki['title'] = 'A New Title'
    wiki['markdown'] = wiki['markdown'] + "\nNew stuff here!!!\n"
    wiki = syn.store(wiki)
    assert wiki['title'] == 'A New Title'
    assert wiki['markdown'].endswith("\nNew stuff here!!!\n")

    # Check the Wiki's metadata
    headers = syn.getWikiHeaders(project)
    assert headers['totalNumberOfResults'] == 2
    assert headers['results'][0]['title'] in (wiki['title'], subwiki['title'])

    # # Retrieve the file attachment
    # tmpdir = tempfile.mkdtemp()
    # file_props = syn._downloadWikiAttachment(project, wiki, 
    #                         os.path.basename(filename), dest_dir=tmpdir)
    # path = file_props['path']
    # assert os.path.exists(path)
    # assert filecmp.cmp(original_path, path)

    # Clean up
    # syn._deleteFileHandle(fileHandle)
    syn.delete(wiki)
    syn.delete(subwiki)
    assert_raises(SynapseHTTPError, syn.getWiki, project)
开发者ID:apratap,项目名称:synapsePythonClient,代码行数:59,代码来源:integration_test.py


示例7: test_command_line_store_and_submit

def test_command_line_store_and_submit():
    """Test command line client store command."""

    ## Create a project
    output = run('synapse store --name "%s" --description "test of store command" --type Project' % str(str(uuid.uuid4())))
    project_id = parse(r'Created entity:\s+(syn\d+)\s+', output)
    schedule_for_cleanup(project_id)


    ## Create and upload a file
    filename = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    output = run('synapse store --name "BogusFileEntity" --description "Bogus data to test file upload" --parentid %s --file %s' % (project_id, filename))
    file_entity_id = parse(r'Created entity:\s+(syn\d+)\s+', output)

    
    ## Verify that we stored the file in Synapse
    f1 = syn.get(file_entity_id)
    fh = syn._getFileHandle(f1.dataFileHandleId)
    assert fh['concreteType'] == 'org.sagebionetworks.repo.model.file.S3FileHandle'
    
    
    ## Create an Evaluation to submit to
    eval = synapseclient.evaluation.Evaluation(name=str(str(uuid.uuid4())), contentSource=project_id)
    eval = syn.store(eval)
    syn.addEvaluationParticipant(eval)

    
    ## Submit a bogus file
    output = run('synapse submit --evaluation %s --name Some random name --entity %s' %(eval.id, file_entity_id))
    submission_id = parse(r'Submitted \(id: (\d+)\) entity:\s+', output)
    

    ## Update the file
    filename = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    output = run('synapse store --id %s --file %s' % (file_entity_id, filename,))
    updated_entity_id = parse(r'Updated entity:\s+(syn\d+)', output)


    ## Tests shouldn't have external dependencies, but here it's required
    ducky_url = 'http://upload.wikimedia.org/wikipedia/commons/9/93/Rubber_Duck.jpg'

    ## Test external file handle
    ## This time, omit the quotation marks
    output = run('synapse store --name Rubber Ducky --description I like rubber duckies --parentid %s --file %s' % (project_id, ducky_url))
    exteral_entity_id = parse(r'Created entity:\s+(syn\d+)\s+', output)

    ## Verify that we created an external file handle
    f2 = syn.get(exteral_entity_id)
    fh = syn._getFileHandle(f2.dataFileHandleId)
    assert fh['concreteType'] == 'org.sagebionetworks.repo.model.file.ExternalFileHandle'


    ## Delete project
    output = run('synapse delete %s' % project_id)
开发者ID:xschildw,项目名称:synapsePythonClient,代码行数:56,代码来源:test_command_line_client.py


示例8: test_walk

def test_walk():
    walked = []
    firstfile = utils.make_bogus_data_file()
    schedule_for_cleanup(firstfile)
    project_entity = syn.store(Project(name=str(uuid.uuid4())))
    schedule_for_cleanup(project_entity.id)
    folder_entity = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))
    schedule_for_cleanup(folder_entity.id)
    second_folder = syn.store(Folder(name=str(uuid.uuid4()), parent=project_entity))
    schedule_for_cleanup(second_folder.id)
    file_entity = syn.store(File(firstfile, parent=project_entity))
    schedule_for_cleanup(file_entity.id)

    walked.append(((project_entity.name,project_entity.id),
                   [(folder_entity.name, folder_entity.id),
                    (second_folder.name, second_folder.id)],
                   [(file_entity.name, file_entity.id)]))

    nested_folder = syn.store(Folder(name=str(uuid.uuid4()), parent=folder_entity))
    schedule_for_cleanup(nested_folder.id)
    secondfile = utils.make_bogus_data_file()
    schedule_for_cleanup(secondfile)
    second_file = syn.store(File(secondfile, parent=nested_folder))
    schedule_for_cleanup(second_file.id)
    thirdfile = utils.make_bogus_data_file()
    schedule_for_cleanup(thirdfile)
    third_file = syn.store(File(thirdfile, parent=second_folder))
    schedule_for_cleanup(third_file.id)


    walked.append(((os.path.join(project_entity.name,folder_entity.name),folder_entity.id),
                   [(nested_folder.name,nested_folder.id)],
                   []))
    walked.append(((os.path.join(os.path.join(project_entity.name,folder_entity.name),nested_folder.name),nested_folder.id),
                   [],
                   [(second_file.name,second_file.id)]))
    walked.append(((os.path.join(project_entity.name,second_folder.name),second_folder.id),
                   [],
                   [(third_file.name,third_file.id)]))


    temp = synu.walk(syn, project_entity.id)
    temp = list(temp)
    #Must sort the tuples returned, because order matters for the assert
    #Folders are returned in a different ordering depending on the name
    for i in walked:
        for x in i:
            if type(x) == list:
                x = x.sort()
    for i in temp:
        assert i in walked

    print("CHECK: Cannot synu.walk a file returns empty generator")
    temp = synu.walk(syn, second_file.id)
    assert list(temp) == []
开发者ID:thomasyu888,项目名称:synapsePythonClient,代码行数:55,代码来源:test_synapseutils.py


示例9: test_command_get_recursive_and_query

def test_command_get_recursive_and_query():
    """Tests the 'synapse get -r' and 'synapse get -q' functions"""
    # Create a Project
    project_entity = syn.store(synapseclient.Project(name=str(uuid.uuid4())))
    schedule_for_cleanup(project_entity.id)

    # Create a Folder in Project
    folder_entity = syn.store(synapseclient.Folder(name=str(uuid.uuid4()),
                                                   parent=project_entity))

    # Create and upload two files in Folder
    uploaded_paths = []
    for i in range(2):
        f  = utils.make_bogus_data_file()
        uploaded_paths.append(f)
        schedule_for_cleanup(f)
        file_entity = synapseclient.File(f, parent=folder_entity)
        file_entity.location = 'folder'
        file_entity = syn.store(file_entity)
    #Add a file in the project level as well
    f  = utils.make_bogus_data_file()
    uploaded_paths.append(f)
    schedule_for_cleanup(f)
    file_entity = synapseclient.File(f, parent=project_entity)
    file_entity.location = 'project'
    file_entity = syn.store(file_entity)

    ### Test recursive get
    output = run('synapse', '--skip-checks',
                 'get', '-r',
                 project_entity.id)
    #Verify that we downloaded files:
    new_paths = [os.path.join('.', folder_entity.name, os.path.basename(f)) for f in uploaded_paths[:-1]]
    new_paths.append(os.path.join('.', os.path.basename(uploaded_paths[-1])))
    schedule_for_cleanup(folder_entity.name)
    for downloaded, uploaded in zip(new_paths, uploaded_paths):
        print uploaded, downloaded
        assert os.path.exists(downloaded)
        assert filecmp.cmp(downloaded, uploaded)
    schedule_for_cleanup(new_paths[0])


    ### Test query get
    output = run('synapse', '--skip-checks',
                 'get', '-q', "select id from file where parentId=='%s' and location=='folder'" %
                 folder_entity.id)
    #Verify that we downloaded files:
    new_paths = [os.path.join('.', os.path.basename(f)) for f in uploaded_paths[:-1]]
    for downloaded, uploaded in zip(new_paths, uploaded_paths[:-1]):
        print uploaded, downloaded
        assert os.path.exists(downloaded)
        assert filecmp.cmp(downloaded, uploaded)
        schedule_for_cleanup(downloaded)
开发者ID:xindiguo,项目名称:synapsePythonClient,代码行数:53,代码来源:test_command_line_client.py


示例10: test_command_copy

def test_command_copy():
    """Tests the 'synapse cp' function"""
    # Create a Project
    project_entity = syn.store(synapseclient.Project(name=str(uuid.uuid4())))
    schedule_for_cleanup(project_entity.id)

    # Create a Folder in Project
    folder_entity = syn.store(synapseclient.Folder(name=str(uuid.uuid4()),
                                                   parent=project_entity))
    # Create and upload a file in Folder
    dummy = utils.make_bogus_data_file()
    schedule_for_cleanup(dummy)
    dummy_entity = syn.store(synapseclient.File(dummy, parent=folder_entity))
    
    repo_url = 'https://github.com/Sage-Bionetworks/synapsePythonClient'
    annots = {'test':'hello_world'}
    # Create, upload, and set annotations on a file in Folder
    filename = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    file_entity = syn.store(synapseclient.File(filename, parent=folder_entity),used=dummy_entity.id,executed=repo_url)
    syn.setAnnotations(file_entity,annots)

    ### Test cp function
    output = run('synapse', '--skip-checks',
                 'cp', '--id',file_entity.id,
                 '--parentid',project_entity.id)
    
    copied_id = parse(r'Copied syn\d+ to (syn\d+)',output)
    #Verify that our copied files are identical
    copied_ent = syn.get(copied_id)
    schedule_for_cleanup(copied_id)
    copied_ent_annot = syn.getAnnotations(copied_ent)

    copied_annot = dict((key,copied_ent_annot[key].pop()) for key in copied_ent_annot if key not in ('uri','id','creationDate','etag'))
    copied_prov = syn.getProvenance(copied_ent)['used'][0]['reference']['targetId']

    assert copied_prov == file_entity.id
    assert copied_annot == annots
    #Verify that errors are being thrown when folders/projects are attempted to be copied,
    #or file is copied to a foler/project that has a file with the same filename
    assert_raises(ValueError,run, 'synapse', '--debug', '--skip-checks',
                 'cp', '--id',folder_entity.id,
                 '--parentid',project_entity.id)
    assert_raises(ValueError,run, 'synapse', '--debug', '--skip-checks',
                 'cp', '--id',project_entity.id,
                 '--parentid',project_entity.id)
    assert_raises(ValueError,run, 'synapse', '--debug', '--skip-checks',
                 'cp', '--id',file_entity.id,
                 '--parentid',project_entity.id)    
开发者ID:ychae,项目名称:synapsePythonClient,代码行数:49,代码来源:test_command_line_client.py


示例11: test_get_local_file

def test_get_local_file():
    """Tests synapse.get() with local a local file """
    new_path = utils.make_bogus_data_file()
    schedule_for_cleanup(new_path)
    folder = Folder('TestFindFileFolder', parent=project, description='A place to put my junk')
    folder = syn.createEntity(folder)

    #Get an nonexistent file in Synapse
    assert_raises(SynapseError, syn.get, new_path)

    #Get a file really stored in Synapse
    ent_folder = syn.store(File(new_path, parent=folder))
    ent2 = syn.get(new_path)
    assert ent_folder.id==ent2.id and ent_folder.versionNumber==ent2.versionNumber

    #Get a file stored in Multiple locations #should display warning
    ent = syn.store(File(new_path, parent=project))
    ent = syn.get(new_path)

    #Get a file stored in multiple locations with limit set
    ent = syn.get(new_path, limitSearch=folder.id)
    assert ent.id == ent_folder.id and ent.versionNumber==ent_folder.versionNumber

    #Get a file that exists but such that limitSearch removes them and raises error
    assert_raises(SynapseError, syn.get, new_path, limitSearch='syn1')
开发者ID:yassineS,项目名称:synapsePythonClient,代码行数:25,代码来源:integration_test_Entity.py


示例12: test_synapseStore_flag

def test_synapseStore_flag():
    # Store a path to a local file
    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)
    bogus = File(path, name='Totally bogus data', parent=project, synapseStore=False)
    bogus = syn.store(bogus)
    
    # Verify the thing can be downloaded as a URL
    bogus = syn.get(bogus, downloadFile=False)
    assert bogus.name == 'Totally bogus data'
    assert bogus.path == path, "Path: %s\nExpected: %s" % (bogus.path, path)
    assert bogus.synapseStore == False

    # Make sure the test runs on Windows and other OS's
    if path[0].isalpha() and path[1]==':':
        # A Windows file URL looks like this: file:///c:/foo/bar/bat.txt
        expected_url = 'file:///' + path
    else:
        expected_url = 'file://' + path

    assert bogus.externalURL == expected_url, 'URL: %s\nExpected %s' % (bogus.externalURL, expected_URL)

    # A file path that doesn't exist should still work
    bogus = File('/path/to/local/file1.xyz', parentId=project.id, synapseStore=False)
    bogus = syn.store(bogus)
    assert_raises(IOError, syn.get, bogus)
    assert bogus.synapseStore == False

    # Try a URL
    bogus = File('http://dev-versions.synapse.sagebase.org/synapsePythonClient', parent=project, synapseStore=False)
    bogus = syn.store(bogus)
    bogus = syn.get(bogus)
    assert bogus.synapseStore == False
开发者ID:yassineS,项目名称:synapsePythonClient,代码行数:33,代码来源:integration_test_Entity.py


示例13: test_slow_unlocker

def test_slow_unlocker():
    """Manually grabs a lock and makes sure the get/store methods are blocked."""
    
    # Make a file to manually lock
    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)
    contention = File(path, parent=syn.test_parent)
    contention = syn.store(contention)
    
    # Lock the Cache Map
    cacheDir = cache.determine_cache_directory(contention)
    cache.obtain_lock_and_read_cache(cacheDir)
    
    # Start a few calls to get/store that should not complete yet
    store_thread = wrap_function_as_child_thread(lambda: store_catch_412_HTTPError(contention))
    get_thread = wrap_function_as_child_thread(lambda: syn.get(contention))
    thread.start_new_thread(store_thread, ())
    thread.start_new_thread(get_thread, ())
    time.sleep(min(5, cache.CACHE_LOCK_TIME / 2))
    
    # Make sure the threads did not finish
    assert syn.test_threadsRunning > 0
    cache.write_cache_then_release_lock(cacheDir)
    
    # Let the threads go
    while syn.test_threadsRunning > 0:
        time.sleep(1)
    collect_errors_and_fail()
开发者ID:apratap,项目名称:synapsePythonClient,代码行数:28,代码来源:test_caching.py


示例14: test_download_table_files

def test_download_table_files():
    cols = [
        Column(name='artist', columnType='STRING', maximumSize=50),
        Column(name='album', columnType='STRING', maximumSize=50),
        Column(name='year', columnType='INTEGER'),
        Column(name='catalog', columnType='STRING', maximumSize=50),
        Column(name='cover', columnType='FILEHANDLEID')]

    schema = syn.store(Schema(name='Jazz Albums', columns=cols, parent=project))
    schedule_for_cleanup(schema)

    data = [["John Coltrane",  "Blue Train",   1957, "BLP 1577", "coltraneBlueTrain.jpg"],
            ["Sonny Rollins",  "Vol. 2",       1957, "BLP 1558", "rollinsBN1558.jpg"],
            ["Sonny Rollins",  "Newk's Time",  1958, "BLP 4001", "rollinsBN4001.jpg"],
            ["Kenny Burrel",   "Kenny Burrel", 1956, "BLP 1543", "burrellWarholBN1543.jpg"]]

    ## upload files and store file handle ids
    original_files = []
    for row in data:
        path = utils.make_bogus_data_file()
        original_files.append(path)
        schedule_for_cleanup(path)
        file_handle = syn._chunkedUploadFile(path)
        row[4] = file_handle['id']

    row_reference_set = syn.store(RowSet(columns=cols, schema=schema, rows=[Row(r) for r in data]))

    ## retrieve the files for each row and verify that they are identical to the originals
    results = syn.tableQuery('select artist, album, year, catalog, cover from %s'%schema.id, resultsAs="rowset")
    for i, row in enumerate(results):
        print "%s_%s" % (row.rowId, row.versionNumber), row.values
        file_info = syn.downloadTableFile(results, rowId=row.rowId, versionNumber=row.versionNumber, column='cover', downloadLocation='.')
        assert filecmp.cmp(original_files[i], file_info['path'])
        schedule_for_cleanup(file_info['path'])
开发者ID:aogier,项目名称:synapsePythonClient,代码行数:34,代码来源:test_tables.py


示例15: test_uploadFile_given_dictionary

def test_uploadFile_given_dictionary():
    # Make a Folder Entity the old fashioned way
    folder = {'concreteType': Folder._synapse_entity_type, 
            'parentId'  : project['id'], 
            'name'      : 'fooDictionary',
            'foo'       : 334455}
    entity = syn.store(folder)
    
    # Download and verify that it is the same file
    entity = syn.get(entity)
    assert entity.parentId == project.id
    assert entity.foo[0] == 334455

    # Update via a dictionary
    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)
    rareCase = {}
    rareCase.update(entity.annotations)
    rareCase.update(entity.properties)
    rareCase.update(entity.local_state())
    rareCase['description'] = 'Updating with a plain dictionary should be rare.'

    # Verify it works
    entity = syn.store(rareCase)
    assert entity.description == rareCase['description']
    assert entity.name == 'fooDictionary'
    entity = syn.get(entity['id'])
开发者ID:xindiguo,项目名称:synapsePythonClient,代码行数:27,代码来源:integration_test.py


示例16: test_store_activity

def test_store_activity():
    """Test storing entities with Activities"""
    project = create_project()

    path = utils.make_bogus_data_file()
    schedule_for_cleanup(path)

    f = File(path, name='Hinkle horn honking holes', parent=project)

    honking = Activity(name='Hinkle horn honking', description='Nettlebed Cave is a limestone cave located on the South Island of New Zealand.')
    honking.used('http://www.flickr.com/photos/bevanbfree/3482259379/')
    honking.used('http://www.flickr.com/photos/bevanbfree/3482185673/')

    ## doesn't set the ID of the activity
    f = syn.store(f, activity=honking)

    honking = syn.getProvenance(f.id)
    ## now, we have an activity ID

    assert honking['name'] == 'Hinkle horn honking'
    assert len(honking['used']) == 2
    assert honking['used'][0]['concreteType'] == 'org.sagebionetworks.repo.model.provenance.UsedURL'
    assert honking['used'][0]['wasExecuted'] == False
    assert honking['used'][0]['url'].startswith('http://www.flickr.com/photos/bevanbfree/3482')
    assert honking['used'][1]['concreteType'] == 'org.sagebionetworks.repo.model.provenance.UsedURL'
    assert honking['used'][1]['wasExecuted'] == False

    ## store another entity with the same activity
    f2 = File('http://en.wikipedia.org/wiki/File:Nettlebed_cave.jpg', name='Nettlebed Cave', parent=project)
    f2 = syn.store(f2, activity=honking)

    honking2 = syn.getProvenance(f2)

    assert honking['id'] == honking2['id']
开发者ID:xschildw,项目名称:synapsePythonClient,代码行数:34,代码来源:integration_test_Entity.py


示例17: test_resume_partial_download

def test_resume_partial_download():
    original_file = utils.make_bogus_data_file(40000)
    original_md5 = utils.md5_for_file(original_file).hexdigest()

    entity = File(original_file, parent=project['id'])
    entity = syn.store(entity)

    ## stash the original file for comparison later
    shutil.move(original_file, original_file+'.original')
    original_file += '.original'
    schedule_for_cleanup(original_file)

    temp_dir = tempfile.gettempdir()

    url = '%s/entity/%s/file' % (syn.repoEndpoint, entity.id)
    path = syn._download(url, destination=temp_dir, file_handle_id=entity.dataFileHandleId, expected_md5=entity.md5)

    ## simulate an imcomplete download by putting the
    ## complete file back into its temporary location
    tmp_path = utils.temp_download_filename(temp_dir, entity.dataFileHandleId)
    shutil.move(path, tmp_path)

    ## ...and truncating it to some fraction of its original size
    with open(tmp_path, 'r+') as f:
        f.truncate(3*os.path.getsize(original_file)//7)

    ## this should complete the partial download
    path = syn._download(url, destination=temp_dir, file_handle_id=entity.dataFileHandleId, expected_md5=entity.md5)

    assert filecmp.cmp(original_file, path), "File comparison failed"
开发者ID:kkdang,项目名称:synapsePythonClient,代码行数:30,代码来源:test_download.py


示例18: test_configPath

def test_configPath():
    """Test using a user-specified configPath for Synapse configuration file."""

    tmp_config_file = tempfile.NamedTemporaryFile(suffix='.synapseConfig', delete=False)
    shutil.copyfile(synapseclient.client.CONFIG_FILE, tmp_config_file.name)

    # Create a File
    filename = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    output = run('synapse',
                 '--skip-checks',
                 '--configPath',
                 tmp_config_file.name,
                 'add',
                 '-name',
                 'BogusFileEntityTwo',
                 '-description',
                 'Bogus data to test file upload',
                 '-parentid',
                 project.id,
                 filename)
    file_entity_id = parse(r'Created/Updated entity:\s+(syn\d+)\s+', output)

    # Verify that we stored the file in Synapse
    f1 = syn.get(file_entity_id)
    fh = syn._getFileHandle(f1.dataFileHandleId)
    assert_equals(fh['concreteType'], 'org.sagebionetworks.repo.model.file.S3FileHandle')
开发者ID:Sage-Bionetworks,项目名称:synapsePythonClient,代码行数:27,代码来源:test_command_line_client.py


示例19: test_download_table_files

def test_download_table_files():
    cols = [
        Column(name='artist', columnType='STRING', maximumSize=50),
        Column(name='album', columnType='STRING', maximumSize=50),
        Column(name='year', columnType='INTEGER'),
        Column(name='catalog', columnType='STRING', maximumSize=50),
        Column(name='cover', columnType='FILEHANDLEID')]

    schema = syn.store(Schema(name='Jazz Albums', columns=cols, parent=project))
    schedule_for_cleanup(schema)

    data = [["John Coltrane",  "Blue Train",   1957, "BLP 1577", "coltraneBlueTrain.jpg"],
            ["Sonny Rollins",  "Vol. 2",       1957, "BLP 1558", "rollinsBN1558.jpg"],
            ["Sonny Rollins",  "Newk's Time",  1958, "BLP 4001", "rollinsBN4001.jpg"],
            ["Kenny Burrel",   "Kenny Burrel", 1956, "BLP 1543", "burrellWarholBN1543.jpg"]]

    ## upload files and store file handle ids
    original_files = []
    for row in data:
        path = utils.make_bogus_data_file()
        original_files.append(path)
        schedule_for_cleanup(path)
        file_handle = syn._uploadToFileHandleService(path)
        row[4] = file_handle['id']

    row_reference_set = syn.store(RowSet(columns=cols, schema=schema, rows=[Row(r) for r in data]))

    ## retrieve the files for each row and verify that they are identical to the originals
    results = syn.tableQuery('select artist, album, year, catalog, cover from %s'%schema.id, resultsAs="rowset")
    for i, row in enumerate(results):
        print("%s_%s" % (row.rowId, row.versionNumber), row.values)
        file_info = syn.downloadTableFile(results, rowId=row.rowId, versionNumber=row.versionNumber, column='cover')
        assert filecmp.cmp(original_files[i], file_info['path'])
        schedule_for_cleanup(file_info['path'])

    ## test that cached copies are returned for already downloaded files
    original_downloadFile_method = syn._downloadFile
    with patch("synapseclient.Synapse._downloadFile") as _downloadFile_mock:
        _downloadFile_mock.side_effect = original_downloadFile_method

        results = syn.tableQuery("select artist, album, year, catalog, cover from %s where artist = 'John Coltrane'"%schema.id, resultsAs="rowset")
        for i, row in enumerate(results):
            print("%s_%s" % (row.rowId, row.versionNumber), row.values)
            file_info = syn.downloadTableFile(results, rowId=row.rowId, versionNumber=row.versionNumber, column='cover')
            assert filecmp.cmp(original_files[i], file_info['path'])

        assert not _downloadFile_mock.called, "Should have used cached copy of file and not called _downloadFile"

    ## test download table column
    results = syn.tableQuery('select * from %s' % schema.id)
    ## uncache 2 out of 4 files
    for i, row in enumerate(results):
        if i % 2 == 0:
            syn.cache.remove(row[6])
    file_map = syn.downloadTableColumns(results, ['cover'])
    assert len(file_map) == 4
    for row in results:
        filecmp.cmp(original_files[i], file_map[row[6]])
开发者ID:kkdang,项目名称:synapsePythonClient,代码行数:58,代码来源:test_tables.py


示例20: test_command_copy

def test_command_copy():
    """Tests the 'synapse cp' function"""

    # Create a Project
    project_entity = syn.store(synapseclient.Project(name=str(uuid.uuid4())))
    schedule_for_cleanup(project_entity.id)

    # Create a Folder in Project
    folder_entity = syn.store(synapseclient.Folder(name=str(uuid.uuid4()),
                                                   parent=project_entity))
    schedule_for_cleanup(folder_entity.id)
    # Create and upload a file in Folder
    repo_url = 'https://github.com/Sage-Bionetworks/synapsePythonClient'
    annots = {'test': ['hello_world']}
    # Create, upload, and set annotations on a file in Folder
    filename = utils.make_bogus_data_file()
    schedule_for_cleanup(filename)
    file_entity = syn.store(synapseclient.File(filename, parent=folder_entity))
    externalURL_entity = syn.store(synapseclient.File(repo_url, name='rand', parent=folder_entity, synapseStore=False))
    syn.setAnnotations(file_entity, annots)
    syn.setAnnotations(externalURL_entity, annots)
    schedule_for_cleanup(file_entity.id)
    schedule_for_cleanup(externalURL_entity.id)

    # Test cp function
    output = run('synapse', '--skip-checks', 'cp', file_entity.id, '--destinationId', project_entity.id)
    output_URL = run('synapse', '--skip-checks', 'cp', externalURL_entity.id, '--destinationId', project_entity.id)

    copied_id = parse(r'Copied syn\d+ to (syn\d+)', output)
    copied_URL_id = parse(r'Copied syn\d+ to (syn\d+)', output_URL)

    # Verify that our copied files are identical
    copied_ent = syn.get(copied_id)
    copied_URL_ent = syn.get(copied_URL_id, downloadFile=False)
    schedule_for_cleanup(copied_id)
    schedule_for_cleanup(copied_URL_id)
    copied_ent_annot = syn.getAnnotations(copied_id)
    copied_url_annot = syn.getAnnotations(copied_URL_id)

    copied_prov = syn.getProvenance(copied_id)['used'][0]['reference']['targetId']
    copied_url_prov = syn.getProvenance(copied_URL_id)['used'][0]['reference']['targetId']

    # Make sure copied files are the same
    assert_equals(copied_prov, file_entity.id)
    assert_equals(copied_ent_annot, annots)
    assert_equals(copied_ent.properties.dataFileHandleId, file_entity.properties.dataFileHandleId)

    # Make sure copied URLs are the same
    assert_equals(copied_url_prov, externalURL_entity.id)
    assert_equals(copied_url_annot, annots)
    assert_equals(copied_URL_ent.externalURL, repo_url)
    assert_equals(copied_URL_ent.name, 'rand')
    assert_equals(copied_URL_ent.properties.dataFileHandleId, externalURL_entity.properties.dataFileHandleId)

    # Verify that errors are being thrown when a
    # file is copied to a folder/project that has a file with the same filename
    assert_raises(ValueError, run, 'synapse', '--debug', '--skip-checks', 'cp', file_entity.id,
                  '--destinationId', project_entity.id)
开发者ID:Sage-Bionetworks,项目名称:synapsePythonClient,代码行数:58,代码来源:test_command_line_client.py



注:本文中的synapseclient.utils.make_bogus_data_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sync.Sync类代码示例发布时间:2022-05-27
下一篇:
Python utils.make_bogus_binary_file函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap