ChatGPT解决这个技术问题 Extra ChatGPT

使用 pandas 的“大数据”工作流程 [关闭]

关闭。此问题不符合 Stack Overflow 准则。它目前不接受答案。我们不允许提出有关书籍、工具、软件库等建议的问题。您可以编辑问题,以便可以用事实和引用来回答它。 7 个月前关闭。社区在 7 个月前审查了是否重新打开此问题并将其关闭:原始关闭原因未解决 改进此问题

在学习熊猫的过程中,我已经尝试了好几个月来想出这个问题的答案。我在日常工作中使用 SAS,它的核心支持非常棒。然而,由于许多其他原因,SAS 作为一个软件是可怕的。

有一天,我希望用 python 和 pandas 代替我对 SAS 的使用,但我目前缺乏大型数据集的核心工作流程。我不是在谈论需要分布式网络的“大数据”,而是说文件太大而无法放入内存但小到足以放入硬盘驱动器。

我的第一个想法是使用 HDFStore 将大型数据集保存在磁盘上,并仅将我需要的部分拉入数据帧进行分析。其他人提到 MongoDB 是一种更易于使用的替代方案。我的问题是这样的:

完成以下任务的最佳实践工作流程是什么:

将平面文件加载到永久的磁盘数据库结构中 查询该数据库以检索数据以馈送到 pandas 数据结构中 在处理 pandas 中的片段后更新数据库

现实世界的例子将不胜感激,尤其是任何在“大数据”上使用 pandas 的人。

编辑——我希望它如何工作的一个例子:

迭代地导入一个大的平面文件并将其存储在一个永久的磁盘数据库结构中。这些文件通常太大而无法放入内存。为了使用 Pandas,我想读取可以放入内存的这些数据的子集(通常一次只有几列)。我将通过对选定列执行各种操作来创建新列。然后我必须将这些新列附加到数据库结构中。

我正在尝试找到执行这些步骤的最佳实践方式。阅读有关 pandas 和 pytables 的链接似乎附加一个新列可能是一个问题。

编辑——具体回答杰夫的问题:

我正在建立消费者信用风险模型。数据种类包括电话、SSN、地址特征;财产价值;犯罪记录、破产等贬损信息……我每天使用的数据集平均有近 1,000 到 2,000 个混合数据类型的字段:数字和字符数据的连续变量、名义变量和有序变量。我很少追加行,但我确实执行了许多创建新列的操作。典型的操作包括使用条件逻辑将多个列组合成一个新的复合列。例如,如果 var1 > 2 则 newvar = 'A' elif var2 = 4 然后 newvar = 'B'。这些操作的结果是我的数据集中每条记录的新列。最后,我想将这些新列附加到磁盘数据结构中。我将重复第 2 步,使用交叉表和描述性统计数据探索数据,试图找到有趣、直观的建模关系。一个典型的项目文件通常约为 1GB。文件被组织成这样一种方式,其中一行包含消费者数据的记录。对于每条记录,每一行都有相同数量的列。情况将永远如此。在创建新列时,我很少会按行进行子集化。但是,在创建报告或生成描述性统计数据时,对我来说,对行进行子集化是很常见的。例如,我可能想为特定业务线创建一个简单的频率,例如零售信用卡。为此,除了我想要报告的任何列之外,我只会选择那些业务范围 = 零售的记录。但是,在创建新列时,我会提取所有数据行,并且只提取操作所需的列。建模过程要求我分析每一列,寻找与某些结果变量的有趣关系,并创建描述这些关系的新复合列。我探索的专栏通常是在小范围内完成的。例如,我将专注于一组 20 列仅处理财产价值的列,并观察它们与贷款违约的关系。一旦探索了这些并创建了新的专栏,我就会转到另一组专栏,比如大学教育,然后重复这个过程。我正在做的是创建候选变量来解释我的数据与某些结果之间的关系。在这个过程的最后,我应用了一些学习技术,从这些复合列中创建一个方程。

我很少会向数据集添加行。我几乎总是会创建新列(统计/机器学习用语中的变量或特征)。

核心尺寸/全尺寸的比率是 1%、10% 吗?有没有关系——如果你可以将 cols 压缩到 int8,或者过滤掉嘈杂的行,那会改变你的计算思考循环从几小时到几分钟吗? (同时添加标签大数据。)
在可能的情况下存储float32而不是float64和int8应该是微不足道的(虽然不知道float64内部有哪些工具/函数)
你能把你的任务分成几块工作吗?
一个不错的 2019 年解决方案,用于对不适合内存的“中等”数据进行类似 pandas 的操作是dask
duckdb 正在逐渐成为在单台机器上处理中型数据集的一个很好的替代方案。

U
Unmitigated

我经常以这种方式使用数十 GB 的数据,例如,我在磁盘上有表,我通过查询读取、创建数据并附加回来。

有关如何存储数据的一些建议,值得阅读 the docslate in this thread

会影响您如何存储数据的详细信息,例如: 提供尽可能多的详细信息;我可以帮助你开发一个结构。

数据大小、行数、列数、列类型;你是追加行,还是只是列?典型的操作会是什么样子。例如,对列进行查询以选择一堆行和特定列,然后执行操作(在内存中),创建新列,保存这些。 (举一个玩具例子可以让我们提供更具体的建议。)在处理之后,你会做什么?第 2 步是临时的还是可重复的?输入平面文件:多少,以 Gb 为单位的粗略总大小。这些是如何组织的,例如按记录?每个文件是否包含不同的字段,还是每个文件都有一些记录以及每个文件中的所有字段?您是否曾经根据标准选择行(记录)的子集(例如选择字段 A > 5 的行)?然后做一些事情,或者你只是选择所有记录的字段A,B,C(然后做一些事情)?您是否“处理”所有列(分组),或者是否有一个很好的比例只能用于报告(例如,您想保留数据,但不需要明确地拉入该列,直到最终结果时间)?

解决方案

确保您已安装 pandas at least 0.10.1

阅读 iterating files chunk-by-chunkmultiple table queries

由于 pytables 被优化为按行操作(这是您查询的内容),我们将为每组字段创建一个表。这种方式很容易选择一小组字段(这将适用于一个大表,但这样做更有效......我想我将来可能能够解决这个限制......这是无论如何更直观):(以下是伪代码。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

读入文件并创建存储(基本上是做 append_to_multiple 所做的):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

现在您在文件中拥有了所有表(实际上,如果您愿意,您可以将它们存储在单独的文件中,您可能必须将文件名添加到 group_map,但这可能不是必需的)。

这是获取列和创建新列的方式:

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

当您准备好进行后期处理时:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

关于 data_columns,您实际上不需要定义任何 data_columns;它们允许您根据列子选择行。例如:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

在最终报告生成阶段,它们可能对您最感兴趣(本质上,一个数据列与其他列分离,如果您定义很多,这可能会在一定程度上影响效率)。

您可能还想:

创建一个函数,该函数采用字段列表,在 groups_map 中查找组,然后选择这些并连接结果,以便获得结果帧(这本质上是 select_as_multiple 所做的)。这样,结构对您来说将是非常透明的。

某些数据列的索引(使行子集更快)。

启用压缩。

有问题时告诉我!


感谢您的链接。第二个链接让我有点担心我无法将新列附加到 HDFStore 中的表中?那是对的吗?另外,我添加了一个如何使用此设置的示例。
hdf 中的实际结构取决于您。 Pytables 是面向行的,在创建时具有固定的列。创建表后,您无法追加列。但是,您可以创建一个索引与现有表相同的新表。 (请参阅文档中的 select_as_multiple 示例)。通过这种方式,您可以创建任意大小的对象,同时进行非常高效的查询。使用数据的方式是如何在磁盘上组织数据的关键。向我发送一封带有更具体示例的伪代码的列表外电子邮件。
我已经更新了我的问题以回应您的详细观点。我将通过一个示例将您从列表中发送出去。谢谢!
@Jeff,现在 Pandas 的版本为 0.17.x,上面列出的问题在 Pandas 中得到了解决吗?
@Jeff 热衷于快速更新您的答案以促进 dask?
u
user1827356

我认为上面的答案缺少一种我发现非常有用的简单方法。

当我有一个文件太大而无法加载到内存中时,我会将文件分成多个较小的文件(按行或按列)

示例:如果有 30 天的价值约 30GB 的交易数据,我每天将其分解为约 1GB 大小的文件。我随后分别处理每个文件并在最后汇总结果

最大的优点之一是它允许并行处理文件(多个线程或进程)

另一个优点是文件操作(如示例中的添加/删除日期)可以通过常规 shell 命令完成,这在更高级/复杂的文件格式中是不可能的

这种方法并不涵盖所有场景,但在很多场景中都非常有用


同意。随着所有的炒作,很容易忘记command-line tools can be 235x faster than a Hadoop cluster
更新链接:adamdrake.com/…
P
Private

现在,在问题发生两年后,一个“核心外”熊猫等效:dask。太棒了!虽然它不支持所有 pandas 功能,但你可以用它走得很远。更新:在过去的两年中,它一直得到维护,并且有大量用户社区与 Dask 合作。

而现在,在提出问题四年后,在 Vaex 中还有另一个等效的高性能“核外”熊猫。它“使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。”它可以处理数十亿行的数据集,并且不会将它们存储到内存中(甚至可以对次优硬件进行分析)。


对于 dask 的完整示例,请看这里stackoverflow.com/questions/37979167/…
根据您的数据,查看 pystore 是有意义的。它依赖于 dask
它总是“脱离核心”吗? (即不是 RAM 密集型?)。如果您手头没有集群,那么 Dask 不是一个好的解决方案,恕我直言。引用 Dask documentation itself“如果您希望管理 TB 或更少的表格格式 CSV 或 JSON 数据,那么您应该忘记 Spark 和 Dask,而使用 Postgres 或 MongoDB。”
r
rjurney

如果您的数据集在 1 到 20GB 之间,您应该获得一个具有 48GB RAM 的工作站。然后 Pandas 可以将整个数据集保存在 RAM 中。我知道这不是您在这里寻找的答案,但是在具有 4GB RAM 的笔记本电脑上进行科学计算是不合理的。


“在 4GB RAM 的笔记本上做科学计算是不合理的” 定义合理。我认为UNIVAC 会采取不同的观点。 arstechnica.com/tech-policy/2011/09/…
同意!尝试继续在内存中工作,即使它预先花费 $$。如果您的工作带来了财务回报,那么随着时间的推移,您将通过提高效率来弥补开支。
在具有 48GB RAM 的工作站上进行科学计算是不合理的。
@YaroslavNikitenko 具有 61GB/RAM 的 r4.2xlarge 是 0.532 美元/小时。你在做什么没有那么有价值的科学计算?听起来不寻常,如果不是不合理的话。
@rjurney 抱歉,也许我应该删除我的评论。您对“不合理”的科学计算机的判断似乎非常主观。我多年来一直在笔记本电脑上进行科学计算,这对我来说似乎已经足够了,因为我大部分时间都在编写代码。从编程的角度来看,我的算法比从计算的角度来看要困难得多。此外,我很确定要编写可扩展的算法,不应依赖当前的硬件限制。您对他人计算的评论可能听起来有点冒犯(除了主观性),您介意删除这几个词吗?
c
chishaku

我知道这是一个旧线程,但我认为 Blaze 库值得一试。它专为这些类型的情况而设计。

从文档:

Blaze 将 NumPy 和 Pandas 的可用性扩展到分布式和核外计算。 Blaze 提供了一个类似于 NumPy ND-Array 或 Pandas DataFrame 的接口,但将这些熟悉的接口映射到各种其他计算引擎,如 Postgres 或 Spark。

编辑:顺便说一句,它得到了 ContinuumIO 和 NumPy 的作者 Travis Oliphant 的支持。


另一个可能值得一看的库是 GraphLab Create:它具有高效的类似 DataFrame 的结构,不受内存容量的限制。 blog.dato.com/…
a
ali_m

pymongo 就是这种情况。我还在 python 中使用 sql server、sqlite、HDF、ORM (SQLAlchemy) 进行了原型设计。首先,pymongo 是一个基于文档的数据库,因此每个人都是一个文档(属性的dict)。许多人组成一个集合,您可以拥有多个集合(人、股票市场、收入)。

pd.dateframe -> pymongo 注意:我使用 read_csv 中的 chunksize 将其保持在 5 到 10k 条记录(如果更大,pymongo 会丢弃套接字)

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

查询:gt = 大于...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find() 返回一个迭代器,因此我通常使用 ichunked 来分割成更小的迭代器。

加入一个怎么样,因为我通常会将 10 个数据源粘贴在一起:

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

然后(在我的情况下,有时我必须先在 aJoinDF 上聚合,然后才能“合并”。)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

然后您可以通过下面的更新方法将新信息写入您的主集合。 (逻辑集合与物理数据源)。

collection.update({primarykey:foo},{key:change})

在较小的查找中,只需非规范化即可。例如,您在文档中有代码,您只需添加域代码文本并在创建文档时执行 dict 查找。

现在你有一个很好的基于一个人的数据集,你可以在每个案例上释放你的逻辑并制作更多属性。最后,您可以将您的 3 to memory max 关键指标读入 pandas 并进行枢轴/聚合/数据探索。这对我来说适用于 300 万条带有数字/大文本/类别/代码/浮点数/...的记录

您还可以使用 MongoDB 中内置的两种方法(MapReduce 和聚合框架)。 See here for more info about the aggregate framework,因为它似乎比 MapReduce 更容易,并且看起来很方便进行快速聚合工作。请注意,我不需要定义我的字段或关系,我可以将项目添加到文档中。在快速变化的 numpy、pandas、python 工具集的当前状态下,MongoDB 帮助我开始工作:)


您好,我也在玩您的示例,尝试插入数据库时遇到此错误:In [96]: test.insert((a[1].to_dict() for a in df.iterrows())) --------------- InvalidDocument: Cannot encode object: 0。有什么想法可能是错的吗?我的数据框由所有 int64 dtypes 组成,非常简单。
是的,我对一个简单的范围 DF 做了同样的事情,而来自 numpy 的 int64 似乎打扰了 pymongo。我玩过的所有数据都是从 CSV 转换而来的(与人工通过 range() 相比),并且类型很长,因此没有问题。在 numpy 中,您可以转换,但我确实认为这有损。我必须承认 HDF 的 10.1 项目看起来令人兴奋。
S
Sociopath

我发现对大数据用例有用的一个技巧是通过将浮点精度降低到 32 位来减少数据量。它并不适用于所有情况,但在许多应用程序中,64 位精度是多余的,2 倍的内存节省是值得的。为了使一个明显的观点更加明显:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB

一般而言,我认为您应该对每种列类型都进行严格的处理,并有一个一致地处理它们的字典。这允许进一步向下转换(到小的 int,甚至更好的类别)。根据目的,您甚至可以使用一些“技巧”,例如更改您使用的单位(使用 k$ 可能会帮助您将所有内容向下转换为 int16)或将事物分组。理想情况下,这应该在第一次导出之前完成。
J
Johann Hibschman

我发现这有点晚了,但我遇到了类似的问题(抵押预付款模型)。我的解决方案是跳过 pandas HDFStore 层并使用直接的 pytables。我在最终文件中将每一列保存为单独的 HDF5 数组。

我的基本工作流程是首先从数据库中获取 CSV 文件。我gzip它,所以它没有那么大。然后我将其转换为面向行的 HDF5 文件,通过在 python 中对其进行迭代,将每一行转换为实际数据类型,并将其写入 HDF5 文件。这需要几十分钟,但它不使用任何内存,因为它只是逐行操作。然后我将面向行的 HDF5 文件“转置”为面向列的 HDF5 文件。

表转置如下所示:

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

读回来看起来像:

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

现在,我通常在具有大量内存的机器上运行它,所以我可能对内存使用不够小心。例如,默认情况下,加载操作会读取整个数据集。

这通常对我有用,但有点笨拙,而且我不能使用花哨的 pytables 魔法。

编辑:与记录数组 pytables 默认值相比,这种方法的真正优势在于,我可以使用无法处理表的 h5r 将数据加载到 R 中。或者,至少,我无法让它加载异构表。


你介意和我分享你的一些代码吗?我对在推送到 pytables 之前如何在不知道数据类型的情况下从某种纯文本格式加载数据感兴趣。此外,您似乎只使用一种类型的数据。那是对的吗?
首先,我假设我在加载之前知道列的类型,而不是试图从数据中猜测。我保存了一个带有列名和类型的 JSON“数据规范”文件,并在处理数据时使用它。 (该文件通常是一些糟糕的 BCP 输出,没有任何标签。)我使用的数据类型是字符串、浮点数、整数或每月日期。我通过保存枚举表将字符串转换为整数并将日期转换为整数(2000 年过去的几个月),所以我的数据中只剩下整数和浮点数,以及枚举。我现在将浮点数保存为 float64,但我尝试使用 float32。
如果您有时间,请尝试使用 R: pandas.pydata.org/pandas-docs/dev/… 进行外部兼容,如果您有困难,也许我们可以调整一下
我会试试,如果可以的话。 rhdf5 很痛苦,因为它是一个生物导体包,而不仅仅是像 h5r 那样在 CRAN 上。我受制于我们的技术架构团队,上次我要求 rhdf5 时出现了一些问题。无论如何,使用 OLAP 存储面向行而不是面向列似乎是一个错误,但现在我在漫无目的。
w
wp78de

正如其他人所指出的,几年后出现了“核心外”熊猫等效物:dask。尽管 dask 不是 pandas 的替代品,但它的所有功能都因其以下几个原因而脱颖而出:

Dask 是一个用于分析计算的灵活并行计算库,针对“大数据”集合(如并行数组、数据帧和列表)的动态任务调度进行了优化,这些集合将常见接口(如 NumPy、Pandas 或 Python 迭代器)扩展到更大-内存或分布式环境,并从笔记本电脑扩展到集群。

Dask 强调以下优点: 熟悉:提供并行化的 NumPy 数组和 Pandas DataFrame 对象 灵活:为更多自定义工作负载和与其他项目的集成提供任务调度接口。 Native:在纯 Python 中启用分布式计算,并访问 PyData 堆栈。快速:以快速数值算法所需的低开销、低延迟和最小序列化运行 向上扩展:在具有 1000 个内核的集群上弹性运行 向下扩展:在单个进程中在笔记本电脑上轻松设置和运行 响应式:采用交互式设计考虑到计算它提供快速反馈和诊断以帮助人类

并添加一个简单的代码示例:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

像这样替换一些熊猫代码:

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

并且特别值得注意的是,它通过 concurrent.futures 接口提供了用于提交自定义任务的通用基础架构:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

我已经添加了这个答案,因为@Private 的帖子定期出现在建议的内容和长度删除列表中。
除非您需要多索引,否则 Dask 很棒。截至撰写本文时,缺乏多索引是一个主要问题。
l
lev

这里值得一提的是Ray
它是一个分布式计算框架,它以分布式方式为 Pandas 提供了自己的实现。

只需替换 pandas 导入,代码就可以正常工作:

# import pandas as pd
import ray.dataframe as pd

# use pd as usual

可以在这里阅读更多详细信息:

https://rise.cs.berkeley.edu/blog/pandas-on-ray/

更新:处理 pandas 分发的部分已提取到 modin 项目。

现在使用它的正确方法是:

# import pandas as pd
import modin.pandas as pd

O
Ophir Yoktan

另一种变化

在 pandas 中完成的许多操作也可以作为 db 查询(sql、mongo)来完成

使用 RDBMS 或 mongodb 允许您在 DB Query 中执行一些聚合(针对大数据进行了优化,并有效地使用缓存和索引)

稍后,您可以使用 pandas 进行后期处理。

这种方法的优点是您获得了处理大数据的数据库优化,同时仍然以高级声明性语法定义逻辑 - 而不必处理决定在内存中做什么和做什么的细节的核心。

尽管查询语言和 pandas 不同,但将部分逻辑从一种翻译到另一种通常并不复杂。


G
Golf Monkey

如果您采用创建分解为多个较小文件的数据管道的简单路径,请考虑 Ruffus


R
Rob

我想指出 Vaex 包。

Vaex 是一个用于惰性核心数据帧(类似于 Pandas)的 Python 库,用于可视化和探索大型表格数据集。它可以在高达每秒十亿 (109) 个对象/行的 N 维网格上计算统计数据,例如平均值、总和、计数、标准差等。可视化是使用直方图、密度图和 3D 体绘制完成的,允许对大数据进行交互式探索。 Vaex 使用内存映射、零内存复制策略和惰性计算来获得最佳性能(不浪费内存)。

查看文档:https://vaex.readthedocs.io/en/latest/ API 非常接近 pandas 的 API。


t
timpjohns

我最近遇到了一个类似的问题。我发现只需以块的形式读取数据并在将其以块的形式写入相同的 csv 时将其附加效果很好。我的问题是根据另一个表中的信息添加日期列,使用某些列的值如下。这可能会帮助那些对 dask 和 hdf5 感到困惑但更熟悉像我这样的 pandas 的人。

def addDateColumn():
"""Adds time to the daily rainfall data. Reads the csv as chunks of 100k 
   rows at a time and outputs them, appending as needed, to a single csv. 
   Uses the column of the raster names to get the date.
"""
    df = pd.read_csv(pathlist[1]+"CHIRPS_tanz.csv", iterator=True, 
                     chunksize=100000) #read csv file as 100k chunks

    '''Do some stuff'''

    count = 1 #for indexing item in time list 
    for chunk in df: #for each 100k rows
        newtime = [] #empty list to append repeating times for different rows
        toiterate = chunk[chunk.columns[2]] #ID of raster nums to base time
        while count <= toiterate.max():
            for i in toiterate: 
                if i ==count:
                    newtime.append(newyears[count])
            count+=1
        print "Finished", str(chunknum), "chunks"
        chunk["time"] = newtime #create new column in dataframe based on time
        outname = "CHIRPS_tanz_time2.csv"
        #append each output to same csv, using no header
        chunk.to_csv(pathlist[2]+outname, mode='a', header=None, index=None)

u
user2957943

parquet 文件格式非常适合您描述的用例。您可以使用 pd.read_parquet(path_to_file, columns=["foo", "bar"]) 有效地读取特定的列子集

https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html


T
TiRoX

目前我正在“像”你一样工作,只是规模较小,这就是为什么我没有 PoC 来提供我的建议。

但是,我似乎在使用pickle作为缓存系统并将各种功能的执行外包到文件中取得了成功——从我的突击队/主文件中执行这些文件;例如,我使用 prepare_use.py 来转换对象类型,将数据集拆分为测试、验证和预测数据集。

你的泡菜缓存是如何工作的?我使用字符串来访问动态创建的pickle文件,具体取决于传递的参数和数据集(我尝试捕获并确定程序是否已经运行,使用.shape作为数据集,使用dict传递参数)。尊重这些措施,我得到一个字符串来尝试查找和读取 .pickle 文件,如果找到,可以跳过处理时间以跳转到我现在正在处理的执行。

使用数据库时我遇到了类似的问题,这就是为什么我在使用这个解决方案时感到很高兴,但是 - 肯定有很多限制 - 例如由于冗余而存储大量的泡菜集。可以通过适当的索引将表格从转换之前更新到转换之后 - 验证信息会打开另一本书(我尝试合并抓取的租金数据并在 2 小时后基本停止使用数据库 - 因为我想在之后跳回去每个转换过程)

我希望我的 2 美分能在某种程度上帮助你。

问候。