本文整理汇总了Python中backports.csv.writer函数的典型用法代码示例。如果您正苦于以下问题:Python writer函数的具体用法?Python writer怎么用?Python writer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了writer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: fetch
def fetch(self):
fd, tmp_file = tempfile.mkstemp()
pip = PointInPolygon(self.polygon_id, 60)
traffic_signs = []
reader = json.loads(open(self.mapping, 'r').read())
try:
for row in reader:
traffic_signs += row['object']
except:
self.logger.err(row)
raise
with open(tmp_file, 'w') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['accuracy', 'direction', 'image_key', 'first_seen_at', 'last_seen_at', 'value', 'X', 'Y'])
slice = lambda A, n: [A[i:i+n] for i in range(0, len(A), n)]
bboxes = pip.bboxes()
start_time = (datetime.today() - timedelta(days=365*2)).isoformat()[0:10]
b = 0
for traffic_signs_ in slice(traffic_signs, 10):
b = b + 1
self.logger.log('Batch {0}/{1}: {2}'.format(b, round(len(traffic_signs) / 10 + 0.5), ','.join(traffic_signs_)))
for bbox in bboxes:
url = 'https://a.mapillary.com/v3/map_features?bbox={bbox}&client_id={client_id}&layers={layer}&per_page=1000&start_time={start_time}&values={values}'.format(bbox=','.join(map(str, bbox)), layer=self.layer, client_id='MEpmMTFQclBTUWlacjV6RTUxWWMtZzo5OTc2NjY2MmRiMDUwYmMw', start_time=start_time, values=','.join(traffic_signs_))
print(url)
with open(tmp_file, 'a') as csvfile:
writer = csv.writer(csvfile)
r = None
page = 0
while(url):
page = page + 1
self.logger.log("Page {0}".format(page))
r = downloader.get(url)
url = r.links['next']['url'] if 'next' in r.links else None
features = r.json()['features']
filtered = 0
self.logger.log('{0} features fetched'.format(len(features)))
for j in features:
p = j['properties']
image_key = p['detections'][0]['image_key']
gc = j['geometry']['coordinates']
row = [p['accuracy'], p['direction'] if 'direction' in p else None, image_key, p['first_seen_at'], p['last_seen_at'], p['value']] + gc
if row[0] > 0.01 and pip.point_inside_polygon(gc[0], gc[1]):
writer.writerow(row)
filtered = filtered + 1
self.logger.log('{0} keeped'.format(filtered))
return tmp_file
开发者ID:tkas,项目名称:osmose-backend,代码行数:55,代码来源:Analyser_Merge_Mapillary.py
示例2: test_writerows
def test_writerows(self):
class BrokenFile:
def write(self, buf):
raise OSError
writer = csv.writer(BrokenFile())
self.assertRaises(OSError, writer.writerows, [['a']])
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj)
self.assertRaises(TypeError, writer.writerows, None)
writer.writerows([['a','b'],['c','d']])
fileobj.seek(0)
self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:13,代码来源:tests.py
示例3: _write_test
def _write_test(self, fields, expect, **kwargs):
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, **kwargs)
writer.writerow(fields)
fileobj.seek(0)
self.assertEqual(fileobj.read(),
expect + writer.dialect.lineterminator)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:7,代码来源:tests.py
示例4: export_to_csv
def export_to_csv(self, result_list, export_filename="ACRCloud_ScanFile_Results.csv", export_dir="./"):
try:
results = []
for item in result_list:
filename = item["file"]
timestamp = item["timestamp"]
jsoninfo = item["result"]
if "status" in jsoninfo and jsoninfo["status"]["code"] == 0:
row = self.parse_data(jsoninfo)
row = [filename, timestamp] + list(row)
results.append(row)
results = sorted(results, key=lambda x:x[1])
export_filepath = os.path.join(export_dir, export_filename)
with codecs.open(export_filepath, 'w', 'utf-8-sig') as f:
head_row = ['filename', 'timestamp', 'custom_files_title', 'custom_acrid', 'title', 'artists', 'album',
'acrid', 'played_duration', 'label', 'isrc', 'upc', 'dezzer', 'spotify', 'itunes', 'youtube']
dw = csv.writer(f)
dw.writerow(head_row)
dw.writerows(results)
if self.debug:
self.log.info("export_to_csv.Save Data to csv: {0}".format(export_filename))
except Exception as e:
self.log.error("[email protected]_to_csv", exc_info=True)
开发者ID:acrcloud,项目名称:acrcloud_scan_files_python,代码行数:26,代码来源:acrcloud_scan_files_libary.py
示例5: write_csv
def write_csv(filename, rows):
with io.open(filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["latitude", "longitude", "datetime"])
for row in rows:
writer.writerow([row["latitude"], row["longitude"], row["datetime"]])
f.close()
开发者ID:jonaqp,项目名称:Django_Celery_Ejemplo,代码行数:7,代码来源:tasks.py
示例6: writeUniqueResults
def writeUniqueResults(clustered_dupes, input_file, output_file):
# Write our original data back out to a CSV with a new column called
# 'Cluster ID' which indicates which records refer to each other.
logging.info('saving unique results to: %s' % output_file)
cluster_membership = {}
for cluster_id, (cluster, score) in enumerate(clustered_dupes):
for record_id in cluster:
cluster_membership[record_id] = cluster_id
unique_record_id = cluster_id + 1
writer = csv.writer(output_file)
reader = csv.reader(StringIO(input_file))
heading_row = next(reader)
heading_row.insert(0, u'Cluster ID')
writer.writerow(heading_row)
seen_clusters = set()
for row_id, row in enumerate(reader):
if row_id in cluster_membership:
cluster_id = cluster_membership[row_id]
if cluster_id not in seen_clusters:
row.insert(0, cluster_id)
writer.writerow(row)
seen_clusters.add(cluster_id)
else:
cluster_id = unique_record_id
unique_record_id += 1
row.insert(0, cluster_id)
writer.writerow(row)
开发者ID:rlugojr,项目名称:csvdedupe,代码行数:35,代码来源:csvhelpers.py
示例7: get
def get(self, request, *args, **kwargs):
object_list = self.get_queryset()[:2000]
# Do reasonable ACL check for global
acl_obj = self.translation or self.component or self.project
if not acl_obj:
for change in object_list:
if change.component:
acl_obj = change.component
break
if not request.user.has_perm('change.download', acl_obj):
raise PermissionDenied()
# Always output in english
activate('en')
response = HttpResponse(content_type='text/csv; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename=changes.csv'
writer = csv.writer(response)
# Add header
writer.writerow(('timestamp', 'action', 'user', 'url', 'target'))
for change in object_list:
writer.writerow((
change.timestamp.isoformat(),
change.get_action_display(),
change.user.username if change.user else '',
get_site_url(change.get_absolute_url()),
change.target,
))
return response
开发者ID:dekoza,项目名称:weblate,代码行数:35,代码来源:changes.py
示例8: test_roundtrip_escaped_unquoted_newlines
def test_roundtrip_escaped_unquoted_newlines(self):
with TemporaryFile("w+", newline="") as fileobj:
writer = csv.writer(fileobj, quoting=csv.QUOTE_NONE, escapechar="\\")
rows = [["a\nb", "b"], ["c", "x\r\nd"]]
writer.writerows(rows)
fileobj.seek(0)
for i, row in enumerate(csv.reader(fileobj, quoting=csv.QUOTE_NONE, escapechar="\\")):
self.assertEqual(row, rows[i])
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:8,代码来源:tests.py
示例9: compare_dialect_123
def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):
with TemporaryFile("w+", newline="", encoding="utf-8") as fileobj:
writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
writer.writerow([1, 2, 3])
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:8,代码来源:tests.py
示例10: test_unicode_write
def test_unicode_write(self):
import io
with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
writer = csv.writer(fileobj)
writer.writerow(self.names)
expected = ",".join(self.names)+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:8,代码来源:tests.py
示例11: test_roundtrip_quoteed_newlines
def test_roundtrip_quoteed_newlines(self):
with TemporaryFile("w+", newline="") as fileobj:
writer = csv.writer(fileobj)
self.assertRaises(TypeError, writer.writerows, None)
rows = [["a\nb", "b"], ["c", "x\r\nd"]]
writer.writerows(rows)
fileobj.seek(0)
for i, row in enumerate(csv.reader(fileobj)):
self.assertEqual(row, rows[i])
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:9,代码来源:tests.py
示例12: test_quote_nonnumeric_decimal
def test_quote_nonnumeric_decimal(self):
"""Decimals should not be quoted with non-numeric quoting."""
import decimal
with TemporaryFile("w+", newline="", encoding="utf-8") as fileobj:
writer = csv.writer(fileobj, quoting=csv.QUOTE_NONNUMERIC)
writer.writerow([10, 10.0, decimal.Decimal("10.0"), "10.0"])
expected = '10,10.0,10.0,"10.0"\r\n'
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:10,代码来源:tests.py
示例13: test_char_write
def test_char_write(self):
import array, string
a = array.array(str('u'), text_type(string.ascii_letters))
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join(a)+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:10,代码来源:tests.py
示例14: test_float_write
def test_float_write(self):
import array
contents = [(20-i)*0.1 for i in range(20)]
a = array.array(str('f'), contents)
with TemporaryFile("w+", newline='') as fileobj:
writer = csv.writer(fileobj, dialect="excel")
writer.writerow(a)
expected = ",".join([str(i) for i in a])+"\r\n"
fileobj.seek(0)
self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:10,代码来源:tests.py
示例15: get
def get(self):
f = io.StringIO()
writer = csv.writer(f)
headers = [
'User ID',
'Username',
'First Name',
'Last Name',
'Email',
'Telephone',
'Enabled',
'Admin',
'Last Login',
'Last Active',
'Cohorts',
'Hospitals',
'Roles',
]
writer.writerow(headers)
def get_groups(user, group_type):
"""Comma-separated list of groups."""
groups = [x.name for x in user.groups if x.type == group_type]
groups = sorted(groups)
groups = uniq(groups)
return ', '.join(groups)
def get_roles(user):
"""Comma-separated list of roles."""
roles = [gu.role.name for gu in user.group_users]
return ', '.join(sorted(set(roles)))
users = list_users()
for user in users:
output = []
output.append(user.id)
output.append(user.username)
output.append(user.first_name)
output.append(user.last_name)
output.append(user.email)
output.append(user.telephone_number)
output.append(user.is_enabled)
output.append(user.is_admin)
output.append(user.last_login_date)
output.append(user.last_active_date)
output.append(get_groups(user, GROUP_TYPE.COHORT))
output.append(get_groups(user, GROUP_TYPE.HOSPITAL))
output.append(get_roles(user))
writer.writerow(output)
return Response(f.getvalue(), content_type='text/csv')
开发者ID:renalreg,项目名称:radar,代码行数:54,代码来源:users.py
示例16: _write_csv
def _write_csv(
file_path,
data,
delimiter=DEFAULT_DELIMITER,
lineterminator=DEFAULT_LINETERMINATOR):
with io.open(file_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(
f,
delimiter=delimiter,
lineterminator=lineterminator
)
writer.writerows(data)
开发者ID:jakubvalenta,项目名称:listio,代码行数:12,代码来源:listio.py
示例17: from_list_of_rows
def from_list_of_rows(cls, schema, values, filepath=None, etag=None, quoteCharacter='"', escapeCharacter="\\", lineEnd=str(os.linesep), separator=",", linesToSkip=0, includeRowIdAndRowVersion=None, headers=None):
## create CSV file
f = None
try:
if not filepath:
temp_dir = tempfile.mkdtemp()
filepath = os.path.join(temp_dir, 'table.csv')
f = io.open(filepath, 'w', encoding='utf-8', newline='')
writer = csv.writer(f,
quoting=csv.QUOTE_NONNUMERIC,
delimiter=separator,
escapechar=escapeCharacter,
lineterminator=lineEnd,
quotechar=quoteCharacter,
skipinitialspace=linesToSkip)
## if we haven't explicitly set columns, try to grab them from
## the schema object
if not headers and "columns_to_store" in schema and schema.columns_to_store is not None:
headers = [SelectColumn.from_column(col) for col in schema.columns_to_store]
## write headers?
if headers:
writer.writerow([header.name for header in headers])
header = True
else:
header = False
## write row data
for row in values:
writer.writerow(row)
finally:
if f: f.close()
return cls(
schema=schema,
filepath=filepath,
etag=etag,
quoteCharacter=quoteCharacter,
escapeCharacter=escapeCharacter,
lineEnd=lineEnd,
separator=separator,
header=header,
headers=headers,
includeRowIdAndRowVersion=includeRowIdAndRowVersion)
开发者ID:kkdang,项目名称:synapsePythonClient,代码行数:49,代码来源:table.py
示例18: __init__
def __init__(
self, fileobj, header=False, dialect=CSV_DIALECT, encoding='utf-8',
**kwargs):
self.fileobj = fileobj
self.header = header
self.dialect = dialect
self.encoding = encoding
self.keywords = kwargs
self.count = 0
self._first_row = None
# The csv writer outputs strings so we stick a transcoding shim between
# the writer and the output object
self._writer = csv_.writer(
codecs.getwriter(self.encoding)(self.fileobj),
dialect=self.dialect, **self.keywords)
开发者ID:waveform80,项目名称:lars,代码行数:15,代码来源:csv.py
示例19: _get_project_strings_csv
def _get_project_strings_csv(project, entities, output):
"""Return a CSV content of all strings and translations for a project and locale.
The file format looks as follow:
source, locale_code_1, locale_code_2
"string A", "tranlation A1", "tranlation A2"
"string B", "tranlation B1", "tranlation B2"
The first column has all source strings. Then there is one column per enabled locale, each
containing available translations for each source string (or an empty cell). The first line
contains the code of each locale, expect for the first cell which is always "source".
:arg Project project: the project from which to take strings
:arg list entities: the list of all entities of the project
:arg buffer output: a buffer to which the CSV writed will send its data
:returns: the same output object with the CSV data
"""
locales = Locale.objects.filter(project_locale__project=project)
translations = (
Translation.objects
.filter(
entity__resource__project=project,
approved=True,
)
.prefetch_related('locale')
.prefetch_related('entity')
)
all_data = dict((x.id, {'source': x.string}) for x in entities)
for translation in translations:
all_data[translation.entity.id][translation.locale.code] = translation.string
writer = csv.writer(output)
headers = ['source'] + [x.code for x in locales]
writer.writerow(headers)
for string in all_data.values():
row = [string.get(key, '') for key in headers]
writer.writerow(row)
return output
开发者ID:Pike,项目名称:pontoon,代码行数:43,代码来源:views.py
示例20: writeLinkedResults
def writeLinkedResults(clustered_pairs, input_1, input_2, output_file,
inner_join=False):
logging.info('saving unique results to: %s' % output_file)
matched_records = []
seen_1 = set()
seen_2 = set()
input_1 = [row for row in csv.reader(StringIO(input_1))]
row_header = input_1.pop(0)
length_1 = len(row_header)
input_2 = [row for row in csv.reader(StringIO(input_2))]
row_header_2 = input_2.pop(0)
length_2 = len(row_header_2)
row_header += row_header_2
for pair in clustered_pairs:
index_1, index_2 = [int(index.split('|', 1)[1]) for index in pair[0]]
matched_records.append(input_1[index_1] + input_2[index_2])
seen_1.add(index_1)
seen_2.add(index_2)
writer = csv.writer(output_file)
writer.writerow(row_header)
for matches in matched_records:
writer.writerow(matches)
if not inner_join:
for i, row in enumerate(input_1):
if i not in seen_1:
writer.writerow(row + [None] * length_2)
for i, row in enumerate(input_2):
if i not in seen_2:
writer.writerow([None] * length_1 + row)
开发者ID:rlugojr,项目名称:csvdedupe,代码行数:39,代码来源:csvhelpers.py
注:本文中的backports.csv.writer函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论