• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python unicodecsv.writer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中unicodecsv.writer函数的典型用法代码示例。如果您正苦于以下问题:Python writer函数的具体用法?Python writer怎么用?Python writer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了writer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: process_raw_msg

def process_raw_msg(raw_msg, formatted_output_file, append=True):
    """
    Given a Python list of raw messages and an output CSV file
    to write to, write details of the messages out to the CSV
    file in the format:
        <sender-domain>,<subject>,<message-text>
    """
    if append:
        mode = "ab"
    else:
        mode = "wb"
    mime_msg = email.message_from_string(raw_msg)
    text = remove_punctuation(html_to_text(concat_email_text(mime_msg)))
    subject = mime_msg.get("Subject")
    # Decode escaped character sets in the subject line
    subject = u" ".join([a[0].decode('utf-8', 'replace')
                         for a in email.header.decode_header(subject)])
    subject = remove_punctuation(subject.replace("\r", " ").replace("\n", " "))
    sender_domain = mime_msg.get("From").split("@")[1].split(">")[0]#\
                                                      #.decode("utf-8")
    # Strip whitespace
    csv_line = [fix_spaces_cr_lf(s) for s in [sender_domain, subject, text]]
    # If any of our strings are empty, replace with a placeholder
    # to make sure each CSV line has three items.
    csv_line = map(lambda s: (u'' == s) and u"PLACEHOLDERNONE" or s ,
                   csv_line)
    if formatted_output_file == "STDOUT":
        writer = unicodecsv.writer(sys.stdout,
                                 quoting=unicodecsv.QUOTE_ALL)
        writer.writerow(csv_line)
    else:
        with open(formatted_output_file, mode) as handle:
            writer = unicodecsv.writer(handle,
                                   quoting=unicodecsv.QUOTE_ALL)
            writer.writerow(csv_line)
开发者ID:jamesmishra,项目名称:college-email-analysis,代码行数:35,代码来源:download_gmail.py


示例2: getMatch

def getMatch(url, i):
    req = urllib2.Request(url + str(i), headers={'User-Agent' : "scraping bot"})
    con = urllib2.urlopen(req)
    print(url + str(i) + " " + str(con.getcode()))
    html = con.read()
    soup = BeautifulSoup(html, 'html.parser')
    if len(soup.select("body > main > section > h1")) > 0 and soup.select("body > main > section > h1")[0].text == '404':
        print ("404, matchid = " + str(i))
        with open(csvpath, 'ab') as csvfile:
            writer = unicodecsv.writer(csvfile, quoting=csv.QUOTE_NONNUMERIC)
            writer.writerow([i, "404"])
        return
    date = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > div:nth-of-type(1) > div:nth-of-type(3)")[0]
    date = date['title'] + " " + date.text.strip()
    bo = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > div:nth-of-type(1) > div:nth-of-type(2)")[0].text
    team_a = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > a:nth-of-type(1) > span > b")[0].text
    team_b = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > a:nth-of-type(2) > span > b")[0].text
    team_a_pct = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > a:nth-of-type(1) > span")[0].i.text
    team_b_pct = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > a:nth-of-type(2) > span")[0].i.text
    team_a_odds = re.findall(r'(\d+(\.\d+)? (to \d+(\.\d+)? )?for 1)',str(soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > div.full")[0].select('div.half')[0].find('div')))[0][0]
    team_b_odds = re.findall(r'(\d+(\.\d+)? (to \d+(\.\d+)? )?for 1)',str(soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > div.full")[0].select('div.half')[1].find('div')))[0][0]
    team_a_won = "(win)" in team_a.lower()
    team_b_won = "(win)" in team_b.lower()
    status = soup.select("body > main > section:nth-of-type(1) > div.box-shiny-alt > div:nth-of-type(2)")[0]
    if status.has_attr('class') and 'full' in status['class']:
        status = ""
    else:
        status = status.text.strip()
    winner = "a" if team_a_won else "b" if team_b_won else "none"

    with open(csvpath, 'ab') as csvfile:
        writer = unicodecsv.writer(csvfile, quoting=csv.QUOTE_NONNUMERIC)
        writer.writerow([i, date, bo, team_a, team_b, team_a_pct, team_b_pct, team_a_odds, team_b_odds, winner, status])
开发者ID:glentakahashi,项目名称:gobot,代码行数:33,代码来源:lounge_scraper.py


示例3: xml2csv

def xml2csv(file):
	
	# open file, parse it and get root
	tree = et.parse(file)
	root = tree.getroot()

	# setup csv writer for our data output
	csvwriter = csv.writer(sys.stdout)
	# print header row
	#csvwriter.writerow( [ ['CVE-ID'], ['CVSS Risk'], ['Summary'] ] )

	for entry_node in tree.xpath('*[local-name()="entry"]'):

		# Declare and initialise variables for use.
		vuln_id = "Unknown"
		vuln_score = "Unknown"
		vuln_summary = "Unknown"
	
		# get cve_id value
		vuln_id = entry_node.get('id')

		# get vuln summary
		index = len(entry_node.getchildren())
		summarynode = entry_node.getchildren()[index-1]
		vuln_summary = summarynode.text

		# get cvss risck score
		for n in entry_node.getchildren():
			if n.tag.find('cvss') != -1:
				cvss_node = n
				vuln_score = cvss_node.getchildren()[0].getchildren()[0].text

		row = [ vuln_id, vuln_score, vuln_summary ]
		csvwriter = csv.writer(sys.stdout)
		csvwriter.writerows( [row] )
开发者ID:cornerpirate,项目名称:cve-offline,代码行数:35,代码来源:xml2csv.py


示例4: augment_csv

def augment_csv(src_csv,dest_csv):
    data = load(src_csv,with_keys=True)
    master_description = master_table(data[1:],DESCRIPTION)
    con_description = confidence_table(master_description)
    master_tags = master_table(data[1:],TAGS)
    con_tags = confidence_table(master_tags)
    master_title = master_table(data[1:],TITLE)
    con_title = confidence_table(master_title)
    data[0].append('guess_description')
    data[0].append('confidence')
    data[0].append('guess_tags')
    data[0].append('confidence')
    data[0].append('guess_title')
    data[0].append('confidence')
    for n in range(1,len(data)):
        theme,confidence = guess_for_row(data[n],con_description)
        data[n].append(theme)
        data[n].append(confidence)
        theme,confidence = guess_for_row(data[n],con_tags)
        data[n].append(theme)
        data[n].append(confidence)
        theme,confidence = guess_for_row(data[n],con_title)
        data[n].append(theme)
        data[n].append(confidence)
    with open(dest_csv,'w') as f:
        unicodecsv.writer(f).writerows(data)
开发者ID:datagovuk,项目名称:refine-csv,代码行数:26,代码来源:themeguess.py


示例5: export_list

    def export_list(self, queryset):
        headers = [
            'Mobile', 'Text', 'Direction', 'Created', 'Delivered'
        ]
        output = StringIO()
        writer = unicodecsv.writer(output, encoding='utf-8')
        writer.writerow([unicode(i) for i in headers])
        yield output.getvalue()
        output.close()

        for message in queryset:
            # limit to three numbers for export and pad if less than three
            record = [
                message.sender if message.direction == 'IN'
                else message.recipient,
                message.text,
                message.direction,
                message.received.strftime('%Y-%m-%d %H:%M:%S')
                if message.received else '',
                message.delivered.strftime('%Y-%m-%d %H:%M:%S')
                if message.delivered else ''
            ]

            output = StringIO()
            writer = unicodecsv.writer(output, encoding='utf-8')
            writer.writerow([unicode(i) for i in record])
            yield output.getvalue()
            output.close()
开发者ID:nditech,项目名称:elections,代码行数:28,代码来源:__init__.py


示例6: csvlist_to_string

def csvlist_to_string(csvlist):
    f = StringIO()
    csv.writer(f, quoting=csv.QUOTE_MINIMAL,
               encoding='utf-8').writerow(csvlist)
    string = f.getvalue()
    f.close()
    return string
开发者ID:tuliocasagrande,项目名称:labeling,代码行数:7,代码来源:views_datasets.py


示例7: import_raw_xlsx

def import_raw_xlsx():
    fields = [
        'la_code',
        'ba_ref',
        'prop_empty:boolean',
        'prop_empty_date:date~make_date_YYYY_MM_DD',
        'prop_occupied:boolean',
        'prop_occupied_date:date',
        'prop_ba_rates:numeric',
        'tenant',
    ]

    files = [f for f in listdir(DIR) if isfile(join(DIR, f))]


    with open('vacancy_errors.csv', 'w') as fe:
        with open('vacancy.csv', 'w') as fp:
            a = csv.writer(fp, delimiter=',')
            e = csv.writer(fe, delimiter=',')
            a.writerows([fields])
            e.writerows([['file', 'line', 'error'] + fields])
            for f in files:
                out, errors = process(f)
                a.writerows(out)
                e.writerows(errors)
开发者ID:tobes,项目名称:munge,代码行数:25,代码来源:vacancies.py


示例8: export_as_csv

def export_as_csv(queryset, fields=None, header=None, filename=None, options=None, out=None):
    """
        Exports a queryset as csv from a queryset with the given fields.

    :param queryset: queryset to export
    :param fields: list of fields names to export. None for all fields
    :param header: if True, the exported file will have the first row as column names
    :param filename: name of the filename
    :param options: CSVOptions() instance or none
    :param: out: object that implements File protocol. HttpResponse if None.

    :return: HttpResponse instance
    """
    if out is None:
        if filename is None:
            filename = filename or "%s.csv" % queryset.model._meta.verbose_name_plural.lower().replace(" ", "_")
        response = HttpResponse(content_type='text/csv')
        response['Content-Disposition'] = 'attachment;filename="%s"' % filename.encode('us-ascii', 'replace')
    else:
        response = out

    if options is None:
        config = csv_options_default
    else:
        config = csv_options_default.copy()
        config.update(options)

    if fields is None:
        fields = [f.name for f in queryset.model._meta.fields]

    dialect = config.get('dialect', None)
    if dialect is not None:
        writer = csv.writer(response, dialect=dialect)
    else:
        writer = csv.writer(response,
                            escapechar=str(config['escapechar']),
                            delimiter=str(config['delimiter']),
                            quotechar=str(config['quotechar']),
                            quoting=int(config['quoting']))

    if bool(header):
        if isinstance(header, (list, tuple)):
            writer.writerow(header)
        else:
            writer.writerow([f for f in fields])

    for obj in queryset:
        row = []
        for fieldname in fields:
            value = get_field_value(obj, fieldname)
            if isinstance(value, datetime.datetime):
                value = dateformat.format(value, config['datetime_format'])
            elif isinstance(value, datetime.date):
                value = dateformat.format(value, config['date_format'])
            elif isinstance(value, datetime.time):
                value = dateformat.format(value, config['time_format'])
            row.append(smart_str(value))
        writer.writerow(row)

    return response
开发者ID:argfat,项目名称:updatengine-server,代码行数:60,代码来源:api.py


示例9: processData

def processData():
    global manualIgnoreRecords
    global yesIgnoreRecords
    global manualProcessedRecords		
    global yesProcessedRecords		
    	
    dirpath = parentdir + "/R3_profiles_YNNM_raw/" 
    with open(dirpath + 'MANUAL_RAW.csv', 'r') as infile, open(processeddir + 'MANUAL_PROCESSED.csv', 'ab') as outfile:
	rows = unicodecsv.reader(infile, delimiter=';', encoding='utf-8')
	writer = unicodecsv.writer(outfile, delimiter=';', encoding='utf-8')
	for row in rows:
	    if(row[6] in manual_ignore_list): #Ignore it
		manualIgnoreRecords += 1
		continue
	    else:
		manualProcessedRecords += 1
		writer.writerow(row)

    with open(dirpath + 'YES_RAW.csv', 'r') as infile, open(processeddir + 'YES_PROCESSED.csv', 'ab') as outfile:
	rows = unicodecsv.reader(infile, delimiter=';', encoding='utf-8')
	writer = unicodecsv.writer(outfile, delimiter=';', encoding='utf-8')	
	for row in rows:
	    if(row[6] in yes_ignore_list): #Ignore it	
		yesIgnoreRecords += 1
		continue
	    else:
		yesProcessedRecords
		writer.writerow(row)
开发者ID:hitenny,项目名称:linkedin-scrap,代码行数:28,代码来源:LinkedinStatsPostProcessor.py


示例10: writerawresults

def writerawresults(data, columns, placeholderurl, filename):
    csvoutfile = open(sys.argv[2] + '.csv', 'wb')
    datawriter = csv.writer(csvoutfile, delimiter=',')

    #Not needed in the long term. This was for comparing the file-finding capabilities of
    #of different methods
    csvfilesoutfile = open(sys.argv[2]+'.files.csv', 'wb')
    filesdatawriter = csv.writer(csvfilesoutfile, delimiter=',')
    
    row = []
    extraitems = ['format', 'geo', 'groups', 'tags']
    row.extend(extraitems);
    columnsoffset = len(extraitems)
    
    for column in columns:
        row.append(column)

    datawriter.writerow(row)
    
    for package in data:
        row = []
    
        #All files, for analysis
        dict_string = package['data_dict']
        json_dict = json.loads(dict_string)
        for resource in json_dict['resources']:
            if 'url' in resource:
                frow = []
                frow.append(resource['url'])
                filesdatawriter.writerow(frow)
    
        #Get resource formats
        if ('res_format' in package):
            [text, geo] = processListOfFormats(package['res_format'])
            row.extend([text, geo])
        else:
            row.extend('','')
    
        groups = u''
        tags = u''

        if 'groups' in package:
            row.append(arraytocsv(package['groups']))

        if 'tags' in package:
            row.append(arraytocsv(package['tags']))

        for column in columns:
            if column in package: 
                row.append(package[column])
            else:
                row.append('')

        if row[columns.index('url') + columnsoffset] == '':
            row[columns.index('url') + columnsoffset] = placeholderurl + row[columns.index('id') + columnsoffset]    
        datawriter.writerow(row)
    
    csvoutfile.close();
    csvfilesoutfile.close();
开发者ID:SebastianBerchtold,项目名称:odm-datenerfassung,代码行数:59,代码来源:metautils.py


示例11: __clean_and_fill_csv

    def __clean_and_fill_csv(self, file_path, headers, data_list):
        myfile = open(file_path, 'wb')
        wr = unicodecsv.writer(myfile, quoting=csv.QUOTE_NONE, delimiter = '\t')
        wr.writerow(headers)

        myfile = open(file_path, 'ab')
        wr = unicodecsv.writer(myfile, quoting=csv.QUOTE_NONE, delimiter = '\t')
        wr.writerows(data_list)
开发者ID:everis-innolab,项目名称:privatecloud-poc-data-generator,代码行数:8,代码来源:csv_generator.py


示例12: fetch_rolling

def fetch_rolling():
    output = []
    min_date = connection.execute(text('SELECT MIN(date) FROM races')).first()[0].replace(day=1)
    for begin_date in list(rrule.rrule(rrule.MONTHLY, dtstart=min_date, until=date.today())):
        end_date = begin_date + relativedelta.relativedelta(months=6)
        sql = 'SELECT AVG(avg), STDDEV(avg), AVG(dev) FROM (SELECT AVG(ranking) avg, STDDEV(ranking) dev FROM rankings WHERE rank_date BETWEEN :begin_date AND :end_date GROUP BY _driver) avg'
        result = connection.execute(text(sql), begin_date=begin_date, end_date=end_date).first()
        output.append([end_date.strftime('%Y-%m')] + result.values())
    unicodecsv.writer(open('charts/rolling_averages.csv', 'w')).writerows(output)
开发者ID:emkael,项目名称:elof1,代码行数:9,代码来源:charts.py


示例13: _split_batch_csv

 def _split_batch_csv(self, data_file, f_values, f_ids):
     writer_values = unicodecsv.writer(f_values)
     writer_ids = unicodecsv.writer(f_ids)
     for row in unicodecsv.reader(data_file):
         writer_values.writerow(row[1:])
         writer_ids.writerow([row[:1]])
     f_values.seek(0)
     f_ids.seek(0)
     return f_values, f_ids
开发者ID:SalesforceFoundation,项目名称:CumulusCI,代码行数:9,代码来源:bulkdata.py


示例14: main

def main(argv=None):
    if argv is None:
        argv = sys.argv
        
    parser = argparse.ArgumentParser(
        description="Parse program execution entries from the Amcache.hve Registry hive")
    parser.add_argument("registry_hive", type=str,
                        help="Path to the Amcache.hve hive to process")
    parser.add_argument("-v", action="store_true", dest="verbose",
                        help="Enable verbose output")
    parser.add_argument("-t", action="store_true", dest="do_timeline",
                        help="Output in simple timeline format")
    args = parser.parse_args(argv[1:])

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    if sys.platform == "win32":
        import os, msvcrt
        msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
        
    r = Registry.Registry(args.registry_hive)

    try:
        ee = parse_execution_entries(r)
    except NotAnAmcacheHive:
        g_logging.error("doesn't appear to be an Amcache.hve hive")
        return

    if args.do_timeline:
        entries = []
        for e in ee:
            for t in ["first_run", "created_timestamp", "modified_timestamp",
                    "modified_timestamp2", "linker_timestamp"]:
                ts = getattr(e, t)
                if ts == UNIX_TIMESTAMP_ZERO:
                    continue
                if ts == WINDOWS_TIMESTAMP_ZERO:
                    continue
                if ts == datetime.datetime.min:
                    continue

                entries.append(TimelineEntry(ts, t, e))
        w = unicodecsv.writer(sys.stdout, delimiter="|", quotechar="\"",
                              quoting=unicodecsv.QUOTE_MINIMAL, encoding="utf-8")
        w.writerow(["timestamp", "timestamp_type", "path", "sha1"])
        for e in sorted(entries, key=lambda e: e.timestamp):
            w.writerow([e.timestamp, e.type, e.entry.path, e.entry.sha1])
    else:
        w = unicodecsv.writer(sys.stdout, delimiter="|", quotechar="\"",
                              quoting=unicodecsv.QUOTE_MINIMAL, encoding="utf-8")
        w.writerow(map(lambda e: e.name, FIELDS))
        for e in ee:
            w.writerow(map(lambda i: getattr(e, i.name), FIELDS))
开发者ID:jamesshew,项目名称:python-registry,代码行数:56,代码来源:amcache.py


示例15: createTrainTestFiles

def createTrainTestFiles(inputFileName,folderName,trainSize,testSize,totalSamples=6034196,tags=None):
    if not os.path.exists('data/'+folderName):
        os.makedirs('data/'+folderName)
    writerSampleTrain = unicodecsv.writer(open('data/'+folderName +'/TrainSamples.csv', 'w'))
    writerSampleTest = unicodecsv.writer(open('data/'+folderName +'/TestSamples.csv', 'w'))
    readerSample = unicodecsv.reader(open('data/parsed/'+inputFileName+'Samples.csv', 'r'))
    
    writerIdsTrain = open('data/'+folderName +'/TrainIds', 'w')
    writerIdsTest = open('data/'+folderName +'/TestIds', 'w')
    readerIds = open('data/parsed/'+inputFileName+'Ids', 'r')
    
    writerTagsTrain = open('data/'+folderName +'/TrainTags', 'w')
    writerTagsTest = open('data/'+folderName +'/TestTags', 'w')
    readerTags = open('data/parsed/'+inputFileName+'Tags', 'r')
    

    if trainSize>=totalSamples:
        trainSize = totalSamples-testSize

    i = 0
    trainCount=0
    testCount=0
    for rowSample,rowId,rowTags in izip(readerSample,readerIds,readerTags):
        i+=1
        if tags is not None:
            toContinue = False
            for t in rowTags.split():
                if t not in tags:
                    toContinue = True
                    break
            if toContinue:
                continue
        x = randint(0,totalSamples)
        if x<testSize:
            writerSampleTest.writerow(rowSample)
            writerIdsTest.write(rowId)
            writerTagsTest.write(rowTags)
            testCount+=1
        else:
            if trainCount<trainSize:
                writerSampleTrain.writerow(rowSample)
                writerIdsTrain.write(rowId)
                writerTagsTrain.write(rowTags)
                trainCount+=1
            elif randint(0,5)<2:
                writerSampleTest.writerow(rowSample)
                writerIdsTest.write(rowId)
                writerTagsTest.write(rowTags)
                testCount+=1 
        if testCount>=testSize and trainCount>=trainSize:
            break
    
    print 'train : ' + str(trainCount)
    print 'test : ' + str(testCount)
    print 'total : ' + str(i)
开发者ID:yonilev,项目名称:kaggle-facebook,代码行数:55,代码来源:Run.py


示例16: process_tweets

    def process_tweets(self, dtype):
        self.fuser = open('users.csv', 'wb')
        self.ft = None
        self.csv_u = unicodecsv.writer(self.fuser, delimiter=',')

        if dtype == 'json':
            f = open('csm-tweets-nairobi.json', 'rb')
            raw_tweets = f.readlines()
            fjson_tweets = []
            count = 1
            print 'loading tweets from fake json\n'
            for raw_tweet in raw_tweets:
                tweet = ast.literal_eval(raw_tweet.rstrip("\n"))

                if count % 50000 == 1:
                    if self.ft is not None:
                        self.ft.close()
                    self.ft= open('tweets_%d.csv'%(count/10000+1), 'wb')
                    self.csv_t = unicodecsv.writer(self.ft, delimiter=',')

                self.createDicts(tweet)
                if count % 50000 == 0:
                    print '%d tweets processed!\n'%(count)
                    self.batchInsertDicts()

                count += 1

            self.insertMentions()
            f.close()

        elif dtype == 'pickle':
            count = 1
            for fn in glob.glob("*.pickle"):
                f = open(fn, 'rb')
                tweets = pickle.load(f)
                print 'tweets extracted from pickle!\n'
                for tweet in tweets:
                    if count % 50000 == 1:
                        if self.ft is not None:
                            self.ft.close()
                        self.ft= open('tweets_%d.csv'%(count/10000+1), 'wb')
                        self.csv_t = csv.writer(self.ft, delimiter=',')
                    self.createDicts(tweet)
                    count += 1

                self.batchInsertDicts()
                print 'another pickle processed!\n'
                f.close()

            self.insertMentions()

        else:
            print 'wrong data type!'
            return None
开发者ID:FAB4D,项目名称:csm-project,代码行数:54,代码来源:tweetstodb.py


示例17: writerow

 def writerow(self, row):
     row = [str(s) for s in row]
     try:
         queue = StringIO()
         self.writer = csv.writer(queue, lineterminator="")
         self.writer.writerow(row)
     except TypeError:
         queue = BytesIO()
         self.writer = csv.writer(queue, lineterminator="")
         self.writer.writerow(row)
     return queue.getvalue()
开发者ID:paulfitz,项目名称:catsql,代码行数:11,代码来源:main.py


示例18: __init__

    def __init__(self, *args, **kwargs):
        self.dataset_file = tempfile.NamedTemporaryFile(delete=False)
        self.resource_file = tempfile.NamedTemporaryFile(delete=False)

        self.dataset_csv = csv.writer(self.dataset_file)
        self.resource_csv = csv.writer(self.resource_file)

        self.dataset_filename = self.dataset_file.name
        self.resource_filename = self.resource_file.name

        self.organization_cache = {}

        self.keys = []
开发者ID:datagovuk,项目名称:ckanext-dgu,代码行数:13,代码来源:dumper.py


示例19: _init_writer

    def _init_writer(self, row):
        from sqlalchemy.engine.result import RowProxy
        # Four cases:
        #    Write header, or don't
        #    Write list, or dict

        row_is_dict = isinstance(row, dict) or isinstance(row, RowProxy)
        row_is_list = isinstance(row, (list, tuple))


        has_header = self.header is not None

        if not os.path.exists(self.path):
            if not os.path.exists(os.path.dirname(self.path)):
                os.makedirs(os.path.dirname(self.path))

        f = open(self.path, 'wb', buffering=self.buffer_size)
        
        self._f = f

        delimiter = self.delimiter
        
        if row_is_dict and has_header:
            self._writer = unicodecsv.DictWriter(f, self.header, delimiter=delimiter, 
                                                 escapechar=self.escapechar, encoding=self.encoding)
            if self.write_header:
                self._writer.writeheader()
            self._inserter = self._write_dict
            
        elif row_is_dict and not has_header:
            self.header = row.keys()
            self._writer = unicodecsv.DictWriter(f, self.header, delimiter=delimiter, 
                                                 escapechar=self.escapechar, encoding=self.encoding)
            if self.write_header:
                self._writer.writeheader()            
            self._inserter = self._write_dict
            
        elif row_is_list and has_header:
            self._writer = unicodecsv.writer(f, delimiter=delimiter, 
                                             escapechar=self.escapechar, encoding=self.encoding)
            if self.write_header:
                self._writer.writerow(self.header)
            self._inserter = self._write_list
            
        elif row_is_list and not has_header:
            self._writer = unicodecsv.writer(f, delimiter=delimiter, 
                                             escapechar=self.escapechar, encoding=self.encoding)
            self._inserter = self._write_list

        else:
            raise Exception("Unexpected case for type {}".format(type(row)))
开发者ID:kball,项目名称:ambry,代码行数:51,代码来源:csv.py


示例20: yield_csv_catalog

def yield_csv_catalog(datasets):
    '''Yield a dataset catalog line by line'''
    csvfile = StringIO.StringIO()
    writer = unicodecsv.writer(csvfile, encoding='utf-8', delimiter=b',', quotechar=b'"')
    # Generate header
    specs = Metric.get_for(Dataset)
    writer.writerow(header(specs))
    yield csvfile.getvalue()

    for dataset in datasets:
        csvfile = StringIO.StringIO()
        writer = unicodecsv.writer(csvfile, encoding='utf-8', delimiter=b',', quotechar=b'"')
        writer.writerow(to_row(dataset, specs))
        yield csvfile.getvalue()
开发者ID:rossjones,项目名称:udata,代码行数:14,代码来源:catalog.py



注:本文中的unicodecsv.writer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python unicodedata.bidirectional函数代码示例发布时间:2022-05-27
下一篇:
Python unicodecsv.reader函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap