• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python csv.writer函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中backports.csv.writer函数的典型用法代码示例。如果您正苦于以下问题:Python writer函数的具体用法?Python writer怎么用?Python writer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了writer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: fetch

    def fetch(self):
      fd, tmp_file = tempfile.mkstemp()

      pip = PointInPolygon(self.polygon_id, 60)

      traffic_signs = []
      reader = json.loads(open(self.mapping, 'r').read())
      try:
        for row in reader:
          traffic_signs += row['object']
      except:
        self.logger.err(row)
        raise

      with open(tmp_file, 'w') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['accuracy', 'direction', 'image_key', 'first_seen_at', 'last_seen_at', 'value', 'X', 'Y'])

      slice = lambda A, n: [A[i:i+n] for i in range(0, len(A), n)]

      bboxes = pip.bboxes()

      start_time = (datetime.today() - timedelta(days=365*2)).isoformat()[0:10]
      b = 0
      for traffic_signs_ in slice(traffic_signs, 10):
        b = b + 1
        self.logger.log('Batch {0}/{1}: {2}'.format(b, round(len(traffic_signs) / 10 + 0.5), ','.join(traffic_signs_)))
        for bbox in bboxes:
          url = 'https://a.mapillary.com/v3/map_features?bbox={bbox}&client_id={client_id}&layers={layer}&per_page=1000&start_time={start_time}&values={values}'.format(bbox=','.join(map(str, bbox)), layer=self.layer, client_id='MEpmMTFQclBTUWlacjV6RTUxWWMtZzo5OTc2NjY2MmRiMDUwYmMw', start_time=start_time, values=','.join(traffic_signs_))
          print(url)
          with open(tmp_file, 'a') as csvfile:
            writer = csv.writer(csvfile)

            r = None
            page = 0
            while(url):
              page = page + 1
              self.logger.log("Page {0}".format(page))
              r = downloader.get(url)
              url = r.links['next']['url'] if 'next' in r.links else None

              features = r.json()['features']
              filtered = 0
              self.logger.log('{0} features fetched'.format(len(features)))
              for j in features:
                p = j['properties']
                image_key = p['detections'][0]['image_key']
                gc = j['geometry']['coordinates']
                row = [p['accuracy'], p['direction'] if 'direction' in p else None, image_key, p['first_seen_at'], p['last_seen_at'], p['value']] + gc
                if row[0] > 0.01 and pip.point_inside_polygon(gc[0], gc[1]):
                  writer.writerow(row)
                  filtered = filtered + 1
              self.logger.log('{0} keeped'.format(filtered))

      return tmp_file
开发者ID:tkas,项目名称:osmose-backend,代码行数:55,代码来源:Analyser_Merge_Mapillary.py


示例2: test_writerows

    def test_writerows(self):
        class BrokenFile:
            def write(self, buf):
                raise OSError
        writer = csv.writer(BrokenFile())
        self.assertRaises(OSError, writer.writerows, [['a']])

        with TemporaryFile("w+", newline='') as fileobj:
            writer = csv.writer(fileobj)
            self.assertRaises(TypeError, writer.writerows, None)
            writer.writerows([['a','b'],['c','d']])
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:13,代码来源:tests.py


示例3: _write_test

 def _write_test(self, fields, expect, **kwargs):
     with TemporaryFile("w+", newline='') as fileobj:
         writer = csv.writer(fileobj, **kwargs)
         writer.writerow(fields)
         fileobj.seek(0)
         self.assertEqual(fileobj.read(),
                          expect + writer.dialect.lineterminator)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:7,代码来源:tests.py


示例4: export_to_csv

    def export_to_csv(self, result_list, export_filename="ACRCloud_ScanFile_Results.csv", export_dir="./"):
        try:
            results = []
            for item in result_list:
                filename = item["file"]
                timestamp = item["timestamp"]
                jsoninfo = item["result"]
                if "status" in jsoninfo and jsoninfo["status"]["code"] == 0:
                    row = self.parse_data(jsoninfo)
                    row = [filename, timestamp] + list(row)
                    results.append(row)

            results = sorted(results, key=lambda x:x[1])

            export_filepath = os.path.join(export_dir, export_filename)

            with codecs.open(export_filepath, 'w', 'utf-8-sig') as f:
                head_row = ['filename', 'timestamp',  'custom_files_title', 'custom_acrid', 'title', 'artists', 'album',
                        'acrid', 'played_duration', 'label', 'isrc', 'upc', 'dezzer', 'spotify', 'itunes', 'youtube']
                dw = csv.writer(f)
                dw.writerow(head_row)
                dw.writerows(results)
                if self.debug:
                    self.log.info("export_to_csv.Save Data to csv: {0}".format(export_filename))
        except Exception as e:
            self.log.error("[email protected]_to_csv", exc_info=True)
开发者ID:acrcloud,项目名称:acrcloud_scan_files_python,代码行数:26,代码来源:acrcloud_scan_files_libary.py


示例5: write_csv

def write_csv(filename, rows):
    with io.open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["latitude", "longitude", "datetime"])
        for row in rows:
            writer.writerow([row["latitude"], row["longitude"], row["datetime"]])
        f.close()
开发者ID:jonaqp,项目名称:Django_Celery_Ejemplo,代码行数:7,代码来源:tasks.py


示例6: writeUniqueResults

def writeUniqueResults(clustered_dupes, input_file, output_file):

    # Write our original data back out to a CSV with a new column called 
    # 'Cluster ID' which indicates which records refer to each other.

    logging.info('saving unique results to: %s' % output_file)

    cluster_membership = {}
    for cluster_id, (cluster, score) in enumerate(clustered_dupes):
        for record_id in cluster:
            cluster_membership[record_id] = cluster_id

    unique_record_id = cluster_id + 1

    writer = csv.writer(output_file)

    reader = csv.reader(StringIO(input_file))

    heading_row = next(reader)
    heading_row.insert(0, u'Cluster ID')
    writer.writerow(heading_row)

    seen_clusters = set()
    for row_id, row in enumerate(reader):
        if row_id in cluster_membership:
            cluster_id = cluster_membership[row_id]
            if cluster_id not in seen_clusters:
                row.insert(0, cluster_id)
                writer.writerow(row)
                seen_clusters.add(cluster_id)
        else:
            cluster_id = unique_record_id
            unique_record_id += 1
            row.insert(0, cluster_id)
            writer.writerow(row)
开发者ID:rlugojr,项目名称:csvdedupe,代码行数:35,代码来源:csvhelpers.py


示例7: get

    def get(self, request, *args, **kwargs):
        object_list = self.get_queryset()[:2000]

        # Do reasonable ACL check for global
        acl_obj = self.translation or self.component or self.project
        if not acl_obj:
            for change in object_list:
                if change.component:
                    acl_obj = change.component
                    break

        if not request.user.has_perm('change.download', acl_obj):
            raise PermissionDenied()

        # Always output in english
        activate('en')

        response = HttpResponse(content_type='text/csv; charset=utf-8')
        response['Content-Disposition'] = 'attachment; filename=changes.csv'

        writer = csv.writer(response)

        # Add header
        writer.writerow(('timestamp', 'action', 'user', 'url', 'target'))

        for change in object_list:
            writer.writerow((
                change.timestamp.isoformat(),
                change.get_action_display(),
                change.user.username if change.user else '',
                get_site_url(change.get_absolute_url()),
                change.target,
            ))

        return response
开发者ID:dekoza,项目名称:weblate,代码行数:35,代码来源:changes.py


示例8: test_roundtrip_escaped_unquoted_newlines

 def test_roundtrip_escaped_unquoted_newlines(self):
     with TemporaryFile("w+", newline="") as fileobj:
         writer = csv.writer(fileobj, quoting=csv.QUOTE_NONE, escapechar="\\")
         rows = [["a\nb", "b"], ["c", "x\r\nd"]]
         writer.writerows(rows)
         fileobj.seek(0)
         for i, row in enumerate(csv.reader(fileobj, quoting=csv.QUOTE_NONE, escapechar="\\")):
             self.assertEqual(row, rows[i])
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:8,代码来源:tests.py


示例9: compare_dialect_123

    def compare_dialect_123(self, expected, *writeargs, **kwwriteargs):

        with TemporaryFile("w+", newline="", encoding="utf-8") as fileobj:

            writer = csv.writer(fileobj, *writeargs, **kwwriteargs)
            writer.writerow([1, 2, 3])
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:8,代码来源:tests.py


示例10: test_unicode_write

 def test_unicode_write(self):
     import io
     with TemporaryFile("w+", newline='', encoding="utf-8") as fileobj:
         writer = csv.writer(fileobj)
         writer.writerow(self.names)
         expected = ",".join(self.names)+"\r\n"
         fileobj.seek(0)
         self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:8,代码来源:tests.py


示例11: test_roundtrip_quoteed_newlines

 def test_roundtrip_quoteed_newlines(self):
     with TemporaryFile("w+", newline="") as fileobj:
         writer = csv.writer(fileobj)
         self.assertRaises(TypeError, writer.writerows, None)
         rows = [["a\nb", "b"], ["c", "x\r\nd"]]
         writer.writerows(rows)
         fileobj.seek(0)
         for i, row in enumerate(csv.reader(fileobj)):
             self.assertEqual(row, rows[i])
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:9,代码来源:tests.py


示例12: test_quote_nonnumeric_decimal

    def test_quote_nonnumeric_decimal(self):
        """Decimals should not be quoted with non-numeric quoting."""
        import decimal

        with TemporaryFile("w+", newline="", encoding="utf-8") as fileobj:
            writer = csv.writer(fileobj, quoting=csv.QUOTE_NONNUMERIC)
            writer.writerow([10, 10.0, decimal.Decimal("10.0"), "10.0"])
            expected = '10,10.0,10.0,"10.0"\r\n'
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
开发者ID:ryanhiebert,项目名称:backports.csv,代码行数:10,代码来源:tests.py


示例13: test_char_write

    def test_char_write(self):
        import array, string
        a = array.array(str('u'), text_type(string.ascii_letters))

        with TemporaryFile("w+", newline='') as fileobj:
            writer = csv.writer(fileobj, dialect="excel")
            writer.writerow(a)
            expected = ",".join(a)+"\r\n"
            fileobj.seek(0)
            self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:10,代码来源:tests.py


示例14: test_float_write

 def test_float_write(self):
     import array
     contents = [(20-i)*0.1 for i in range(20)]
     a = array.array(str('f'), contents)
     with TemporaryFile("w+", newline='') as fileobj:
         writer = csv.writer(fileobj, dialect="excel")
         writer.writerow(a)
         expected = ",".join([str(i) for i in a])+"\r\n"
         fileobj.seek(0)
         self.assertEqual(fileobj.read(), expected)
开发者ID:gitter-badger,项目名称:backports.csv,代码行数:10,代码来源:tests.py


示例15: get

    def get(self):
        f = io.StringIO()
        writer = csv.writer(f)

        headers = [
            'User ID',
            'Username',
            'First Name',
            'Last Name',
            'Email',
            'Telephone',
            'Enabled',
            'Admin',
            'Last Login',
            'Last Active',
            'Cohorts',
            'Hospitals',
            'Roles',
        ]
        writer.writerow(headers)

        def get_groups(user, group_type):
            """Comma-separated list of groups."""

            groups = [x.name for x in user.groups if x.type == group_type]
            groups = sorted(groups)
            groups = uniq(groups)
            return ', '.join(groups)

        def get_roles(user):
            """Comma-separated list of roles."""
            roles = [gu.role.name for gu in user.group_users]
            return ', '.join(sorted(set(roles)))

        users = list_users()
        for user in users:
            output = []
            output.append(user.id)
            output.append(user.username)
            output.append(user.first_name)
            output.append(user.last_name)
            output.append(user.email)
            output.append(user.telephone_number)
            output.append(user.is_enabled)
            output.append(user.is_admin)
            output.append(user.last_login_date)
            output.append(user.last_active_date)
            output.append(get_groups(user, GROUP_TYPE.COHORT))
            output.append(get_groups(user, GROUP_TYPE.HOSPITAL))
            output.append(get_roles(user))

            writer.writerow(output)

        return Response(f.getvalue(), content_type='text/csv')
开发者ID:renalreg,项目名称:radar,代码行数:54,代码来源:users.py


示例16: _write_csv

def _write_csv(
        file_path,
        data,
        delimiter=DEFAULT_DELIMITER,
        lineterminator=DEFAULT_LINETERMINATOR):
    with io.open(file_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(
            f,
            delimiter=delimiter,
            lineterminator=lineterminator
        )
        writer.writerows(data)
开发者ID:jakubvalenta,项目名称:listio,代码行数:12,代码来源:listio.py


示例17: from_list_of_rows

    def from_list_of_rows(cls, schema, values, filepath=None, etag=None, quoteCharacter='"', escapeCharacter="\\", lineEnd=str(os.linesep), separator=",", linesToSkip=0, includeRowIdAndRowVersion=None, headers=None):

        ## create CSV file
        f = None
        try:
            if not filepath:
                temp_dir = tempfile.mkdtemp()
                filepath = os.path.join(temp_dir, 'table.csv')

            f = io.open(filepath, 'w', encoding='utf-8', newline='')

            writer = csv.writer(f,
                quoting=csv.QUOTE_NONNUMERIC,
                delimiter=separator,
                escapechar=escapeCharacter,
                lineterminator=lineEnd,
                quotechar=quoteCharacter,
                skipinitialspace=linesToSkip)

            ## if we haven't explicitly set columns, try to grab them from
            ## the schema object
            if not headers and "columns_to_store" in schema and schema.columns_to_store is not None:
                headers = [SelectColumn.from_column(col) for col in schema.columns_to_store]

            ## write headers?
            if headers:
                writer.writerow([header.name for header in headers])
                header = True
            else:
                header = False

            ## write row data
            for row in values:
                writer.writerow(row)

        finally:
            if f: f.close()

        return cls(
            schema=schema,
            filepath=filepath,
            etag=etag,
            quoteCharacter=quoteCharacter,
            escapeCharacter=escapeCharacter,
            lineEnd=lineEnd,
            separator=separator,
            header=header,
            headers=headers,
            includeRowIdAndRowVersion=includeRowIdAndRowVersion)
开发者ID:kkdang,项目名称:synapsePythonClient,代码行数:49,代码来源:table.py


示例18: __init__

 def __init__(
         self, fileobj, header=False, dialect=CSV_DIALECT, encoding='utf-8',
         **kwargs):
     self.fileobj = fileobj
     self.header = header
     self.dialect = dialect
     self.encoding = encoding
     self.keywords = kwargs
     self.count = 0
     self._first_row = None
     # The csv writer outputs strings so we stick a transcoding shim between
     # the writer and the output object
     self._writer = csv_.writer(
         codecs.getwriter(self.encoding)(self.fileobj),
         dialect=self.dialect, **self.keywords)
开发者ID:waveform80,项目名称:lars,代码行数:15,代码来源:csv.py


示例19: _get_project_strings_csv

def _get_project_strings_csv(project, entities, output):
    """Return a CSV content of all strings and translations for a project and locale.

    The file format looks as follow:

        source, locale_code_1, locale_code_2
        "string A", "tranlation A1", "tranlation A2"
        "string B", "tranlation B1", "tranlation B2"

    The first column has all source strings. Then there is one column per enabled locale, each
    containing available translations for each source string (or an empty cell). The first line
    contains the code of each locale, expect for the first cell which is always "source".

    :arg Project project: the project from which to take strings
    :arg list entities: the list of all entities of the project
    :arg buffer output: a buffer to which the CSV writed will send its data

    :returns: the same output object with the CSV data

    """
    locales = Locale.objects.filter(project_locale__project=project)
    translations = (
        Translation.objects
        .filter(
            entity__resource__project=project,
            approved=True,
        )
        .prefetch_related('locale')
        .prefetch_related('entity')
    )
    all_data = dict((x.id, {'source': x.string}) for x in entities)

    for translation in translations:
        all_data[translation.entity.id][translation.locale.code] = translation.string

    writer = csv.writer(output)
    headers = ['source'] + [x.code for x in locales]
    writer.writerow(headers)
    for string in all_data.values():
        row = [string.get(key, '') for key in headers]
        writer.writerow(row)

    return output
开发者ID:Pike,项目名称:pontoon,代码行数:43,代码来源:views.py


示例20: writeLinkedResults

def writeLinkedResults(clustered_pairs, input_1, input_2, output_file,
                       inner_join=False):
    logging.info('saving unique results to: %s' % output_file)

    matched_records = []
    seen_1 = set()
    seen_2 = set()

    input_1 = [row for row in csv.reader(StringIO(input_1))]
    row_header = input_1.pop(0)
    length_1 = len(row_header)

    input_2 = [row for row in csv.reader(StringIO(input_2))]
    row_header_2 = input_2.pop(0)
    length_2 = len(row_header_2)
    row_header += row_header_2

    for pair in clustered_pairs:
        index_1, index_2 = [int(index.split('|', 1)[1]) for index in pair[0]]

        matched_records.append(input_1[index_1] + input_2[index_2])
        seen_1.add(index_1)
        seen_2.add(index_2)

    writer = csv.writer(output_file)
    writer.writerow(row_header)

    for matches in matched_records:
        writer.writerow(matches)

    if not inner_join:

        for i, row in enumerate(input_1):
            if i not in seen_1:
                writer.writerow(row + [None] * length_2)

        for i, row in enumerate(input_2):
            if i not in seen_2:
                writer.writerow([None] * length_1 + row)
开发者ID:rlugojr,项目名称:csvdedupe,代码行数:39,代码来源:csvhelpers.py



注:本文中的backports.csv.writer函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ssl_match_hostname.match_hostname函数代码示例发布时间:2022-05-24
下一篇:
Python backport_collections.deque函数代码示例发布时间:2022-05-24
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap