【发布时间】:2021-04-25 23:54:38
【问题描述】:
我是 pandas 和 parquet 文件类型的新手。我有一个 python 脚本:
- 读取 hdfs parquet 文件
- 将其转换为 pandas 数据框
- 循环遍历特定列并更改一些值
- 将数据帧写回到 parquet 文件中
然后使用 impala-shell 将 parquet 文件导入回 hdfs。
我遇到的问题似乎与第 2 步有关。我让它在读取数据帧后立即打印出数据帧的内容,然后在第 3 步中进行任何更改。它似乎正在更改数据类型和某些字段的数据,这在将其写回镶木地板文件时会导致问题。例子:
- 在数据库中显示为 NULL 的字段被替换为 string “无”(对于字符串列)或 string “ nan"(用于数字列)在数据框的打印输出中。
- 数据库中应为值为 0 的 Int 的字段更改为“0.00000”,并在数据框中变为浮点数。
似乎它实际上正在更改这些值,因为当它写入 parquet 文件并将其导入 hdfs 并运行查询时,我收到如下错误:
WARNINGS: File '<path>/test.parquet' has an incompatible Parquet schema for column
'<database>.<table>.tport'. Column type: INT, Parquet schema:
optional double tport [i:1 d:1 r:0]
我不知道为什么它会改变数据而不是保持原样。如果发生这种情况,我不知道我是否需要遍历每一列并将所有这些替换回它们的原始值,或者是否有其他方法告诉它不要管它们。
我一直在使用这个参考页面: http://arrow.apache.org/docs/python/parquet.html
它使用
pq.read_table(in_file)
读取 parquet 文件,然后
df = table2.to_pandas()
转换为我可以循环并更改列的数据框。我不明白为什么它会改变数据,我也找不到防止这种情况发生的方法。与 read_table 相比,我需要以不同的方式阅读它吗?
如果我查询数据库,数据将如下所示:
| tport |
|---|
| 0 |
| 1 |
我的 print(df) 行是这样的:
| tport |
|---|
| 0.00000 |
| nan |
| nan |
| 1.00000 |
这里是相关代码。我省略了处理命令行参数的部分,因为它很长并且不适用于这个问题。传入的文件是in_file:
import sys, getopt
import random
import re
import math
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import pyarrow as pa
import os.path
# <CLI PROCESSING SECTION HERE>
# GET LIST OF COLUMNS THAT MUST BE SCRAMBLED
field_file = open('scrambler_columns.txt', 'r')
contents = field_file.read()
scrambler_columns = contents.split('\n')
def scramble_str(xstr):
#print(xstr + '_scrambled!')
return xstr + '_scrambled!'
parquet_file = pq.ParquetFile(in_file)
table2 = pq.read_table(in_file)
metadata = pq.read_metadata(in_file)
df = table2.to_pandas() #dataframe
print('rows: ' + str(df.shape[0]))
print('cols: ' + str(df.shape[1]))
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', -1)
pd.set_option('display.float_format', lambda x: '%.5f' % x)
#df.fillna(value='', inplace=True) # np.nan # \xa0
print(df) # print before making any changes
cols = list(df)
# https://pythonbasics.org/pandas-iterate-dataframe/
for col_name, col_data in df.iteritems():
#print(cols[index])
if col_name in scrambler_columns:
print('scrambling values in column ' + col_name)
for i, val in col_data.items():
df.at[i, col_name] = scramble_str(str(val))
print(df) # print after making changes
print(parquet_file.num_row_groups)
print(parquet_file.read_row_group(0))
# WRITE NEW PARQUET FILE
new_table = pa.Table.from_pandas(df)
writer = pq.ParquetWriter(out_file, new_table.schema)
for i in range(1):
writer.write_table(new_table)
writer.close()
if os.path.isfile(out_file) == True:
print('wrote ' + out_file)
else:
print('error writing file ' + out_file)
# READ NEW PARQUET FILE
table3 = pq.read_table(out_file)
df = table3.to_pandas() #dataframe
print(df)
这与 pandas 数据框中的相同:
id object
col1 float64
col2 object
col3 object
col4 float64
col5 object
col6 object
col7 object
似乎可以转换
String to object
Int to float64
bigint to float64
如何告诉 pandas 列应该是什么数据类型?
编辑 2: 我能够通过直接处理 pyarrow 表找到解决方法。请在此处查看我的问题和答案:How to update data in pyarrow table?
【问题讨论】:
-
您的
int列是否也有空值? Pandas 对可空整数的支持是新的,我相信如果列中有空值,箭头将从整数转换为浮点数(以便 nan 可用)。你能添加print(df.dtypes)和print(table2)的输出吗?这将显示 parquet 文件中的数据类型以及 pandas 最终使用的数据类型。 -
@Pace 我在最后编辑了原始帖子,以显示 hdfs 和 pandas 看到的数据类型。
-
另外,我如何处理空/空值?我需要使用 df.fillna 吗?
-
@Pace 是的,int 列可以包含空值。
标签: python-3.x pandas dataframe parquet pyarrow