《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

楔子

数据科学家面临的一个独特的挑战是倾向于研究静止的数据,而非动态的数据,或者不是专门为预测建模和分析而收集的数据。这和传统的学术研究有很大的不同,在传统的学术研究中,数据是经过仔细和深思熟虑之后才收集的,因为要确保数据是真真正正能够派上用场的。但是现如今则不是这样,就像我们之前说的,我们面临的数据是大量的,至于有没有用则需要我们进行分析,从大量数据中分析出规律、将其变成商业价值正是数据科学家们所做的事情。现如今这个时代,已经出现了各种各样的存储介质和数据格式,我们不仅要能够分析,还要能将数据读取出来。所以这次我们就来介绍一些最流行的存储介质(系统),并使用Dask从中读取数据得到DataFrame。

当然也不要忘记前面所学的内容:Dask DataFrame由多个小型pandas DataFrame组成,它们在逻辑上被划分为多个分区。在Dask DataFrame上执行的所有操作都会生成一个Delayed对象的DAG,并且Delayed对象可以分布到多个进程或物理机器上。任务调度器控制任务的分发和执行。

那么下面让我们来读取数据吧。

从text文件中读取数据

下面我们就从最简单也是最常用的文件开始,也就是带有分隔符的文本文件。这种文件很简单,通过分隔符来将数据划分为逻辑行和逻辑列。

每一种分隔符文本文件都有两个分隔符:行分隔符和列分隔符,行分隔符是一种特殊的字符,表示已经到达了行的末尾,它右边的任何数据都应该被视为下一行的部分,最常见的行分隔符是\n;列分隔符则指示列的结束,它右边的任何数据都应该视为下一列的部分。列分隔符使用最频繁的就是逗号了,事实上使用逗号作为列分隔符的文件有一种特殊的文件格式:csv,csv和普通的文本文件没有太大区别。当然除了逗号,还可以指定其它的符号来作为列分隔符。

当然还有一个很重要的文本限定符,什么是文本限定符呢?假设我们的列分隔符是逗号,但是某个字符串里面也出现了逗号该怎么办?所以这个时候文本限定符就很有用了,它会保证自己内部的字符串都会被解析为文本。当然这些东西都比较简单了,这里就不废话了。

现在让我们看看如何使用Dask来读取那些带有分隔符的文本文件,事实上我们在之前就已经见识过了,因为我们读取的是csv,而csv也是一种特殊的带有分隔符的文本文件。如果将csv文件使用记事本打开,你会很清晰的看到里面的内容,但是这里我们将尝试读取使用书中说的纽约罚单数据,可以在www.kaggle.com/new-york-city/nyc-parking-tickets页面中进行下载。

import dask.dataframe as dd

df1 = dd.read_csv(r"C:\Users\satori\Desktop\nyc\Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv")
df2 = dd.read_csv(r"C:\Users\satori\Desktop\nyc\Parking_Violations_Issued_-_Fiscal_Year_2015.csv")
df3 = dd.read_csv(r"C:\Users\satori\Desktop\nyc\Parking_Violations_Issued_-_Fiscal_Year_2016.csv")
df4 = dd.read_csv(r"C:\Users\satori\Desktop\nyc\Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

然后我们来查看一下列名,也是使用df.columns。

df4.columns
"""
Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation'],
      dtype='object')
"""

当然df1、df2、df3、df4的列不是一样的,主要纽约市收集数据的时候改变规则了,当然这都无所谓。总之,如果我们直接将4个DataFrame合并在一起的话,那么会出现大量的空值,因此我们需要找到4个DataFrame*同出现的列。

common_cols = df1.columns & df2.columns & df3.columns & df4.columns
common_cols
"""
Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation'],
      dtype='object')
"""

然后我们可以直接查看里面的数据长什么样子,可以调用head方法,如果调用compute则是直接将整个DataFrame打印出来,调用head则是查看前几行,当然它们返回的都是pandas的DataFrame。

df4[common_cols].head()

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

注意:当我们调用head的时候,里面可以传入参数表示返回的行数,它们是要被加载到内存中的。如果你加载了太多的行,那么可能会导致内存溢出。然后我们用相同的方法来操作df1,但是发现报错了:

df1[common_cols].head()
"""
ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-----------------------+---------+----------+
| Column                | Found   | Expected |
+-----------------------+---------+----------+
| Issuer Squad          | object  | int64    |
| Unregistered Vehicle? | float64 | int64    |
| Violation Description | object  | float64  |
| Violation Legal Code  | object  | float64  |
| Violation Post Code   | object  | float64  |
+-----------------------+---------+----------+

The following columns also raised exceptions on conversion:

- Issuer Squad
  ValueError('cannot convert float NaN to integer')
- Violation Description
  ValueError("could not convert string to float: 'BUS LANE VIOLATION'")
- Violation Legal Code
  ValueError("could not convert string to float: 'T'")
- Violation Post Code
  ValueError("could not convert string to float: 'H -'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Issuer Squad': 'object',
       'Unregistered Vehicle?': 'float64',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Post Code': 'object'}
"""

首先dd.read_csv是一个懒加载动作,只有当我们真正需要值的时候才会读取,所以错误并没有在一开始的时候抛出。并且如果是大数据量的话,报错是一件非常让人痛苦的事情,因为你不知道错误发生在了哪里,尤其是在你还无法使用文本编辑器查看的情况下。但幸运的是,Dask的开发团队在错误信息中给了我们非常详细的提示。根据提示我们发现总共有五个列:Issuer Squad、Unregistered Vehicle?、Violation Description、Violation Legal Code和 Violation Post Code读取失败,原因是它们的数据类型不是Dask所期望的。正如我们之前提到的,Dask使用随机抽样来推断数据类型,以避免扫描整个DataFrame。尽管这一手段通常情况下会工作的很好,但当某一列中出现大量缺失值,或者大部分数据都可以归为一种类型(比如整型)、但是一小部分数据却打破了这种假设(出现了字符串),那么这一手段就会失效。于是当这种情况发生时,Dask在开始计算时就会抛出一个异常,为了帮助Dask正确地读取数据,我们需要为数据手动定义一个模式,而不是依赖于类型推断。不过在执行此操作之前,我们先看一下Dask中有哪些数据类型可用,以便于我们能为数据分配合适的存储空间。

使用Dask数据类型

和关系型数据库类似,列数据类型在Dask DataFrame中扮演着关键的角色,它们决定了我们可以对某一列执行哪些操作,以及如何重载运算符的行为方式。与Python中的大多数对象和容器不同,Dask数据类型使用显式的数据类型而不是鸭子类型。这意味着某一列中包含的值必须符合相同的数据类型,正如我们看到的,如果发现列中出现了违反相应类型的值,那么将会报错异常。

由于Dask DataFrame由多个pandas DataFrame组成,并且具有多个分区,而pandas又是基于numpy,因此Dask可以从numpy中获取其数据类型。numpy是一个非常强大的数学库,并且它还是Python中数据科学库的万物之源,让我们看看numpy中都有哪些数据类型吧。

import numpy as np

# 布尔类型:bool
print(np.ones((2, 2), dtype="bool"))
"""
[[ True  True]
 [ True  True]]
"""

# 有符号整型:int8 int16 int32 int64 int
print(np.ones((2, 2), dtype="int64"))
"""
[[1 1]
 [1 1]]
"""

# 无符号整型:uint8 uint16 uint32 uint64 uint
print(np.ones((2, 2), dtype="uint32"))
"""
[[1 1]
 [1 1]]
"""

# 浮点型:float16 float32 float64 float longfloat
print(np.ones((2, 2), dtype="longfloat"))
"""
[[1. 1.]
 [1. 1.]]
"""

# 复数类型:complex64 complex128 complex
print(np.ones((2, 2), dtype="complex64"))
"""
[[1.+0.j 1.+0.j]
 [1.+0.j 1.+0.j]]
"""

# 字符串类型:unicode,
print(np.ones((2, 2), dtype="unicode"))
"""
[['1' '1']
 ['1' '1']]
"""
# 此外unicode还可以写成U, 后面还可以加数字, 比如U3: 表示元素长度不能超过3
print(np.array(["你好呀", "猜猜我是谁"], dtype="U3"))  # ['你好呀' '猜猜我']
# 我们发现被截断了, 当然还可以使用<U3或者>U3这种形式, 采用小端存储或者大端存储

# object, Python中的对象, 可以泛指任意对象,object可以简写为O
print(np.ones((2, 2), dtype="object"))
"""
[[1 1]
 [1 1]]
"""
# 虽然结果类似,但是里面的元素都是Python中的对象
# 如果dtype是int,那么往里面插入字符串会报错,但是object不会

常见的类型就是上面那些,我们看到了Python中类型的影子,但是两者最大的不同是numpy中可以显式地指定宽度,比如:int32、int8等等,而Python的话则只使用你的操作系统和硬件所支持的最大宽度。因此如果你在一台拥有64位CPU、64位操作系统的机器上工作,那么Python的底层总是会使用64位的内存来存储一个整数(当然Python中的整数没有这么简单,但这里我们就不细说了,总之知道它使用的是最大宽度即可)。而有些值不需要那么大的宽度,比如一个人的身高,显然使用uint8存储是最合适的。这样做不仅可以节省内存,而且还能在内存和cpu的缓存中保存更多的数据,从而实现更加快速和高效的计算。因此我们在为存储数据时应该选择较小的数据类型,但是同时也带来了风险,那就是一旦出现某个值超过了数据类型允许的最大值,就会出现溢出错误,因此也要仔细斟酌数据的范围和域。

如果数据的类型飘忽不定,则可以将其存储为object类型,我们说这个类型表示Python中的任何对象。当Dask的类型推断不确定使用哪种类型时,Dask也将默认使用object类型。但如果当某个列丢失的数据所占百分比很高时,这个规则就会产生例外,否则我们上面就不会报错了。让我们再来看看那个报错信息:

"""
ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-----------------------+---------+----------+
| Column                | Found   | Expected |
+-----------------------+---------+----------+
| Issuer Squad          | object  | int64    |
| Unregistered Vehicle? | float64 | int64    |
| Violation Description | object  | float64  |
| Violation Legal Code  | object  | float64  |
| Violation Post Code   | object  | float64  |
+-----------------------+---------+----------+

The following columns also raised exceptions on conversion:

- Issuer Squad
  ValueError('cannot convert float NaN to integer')
- Violation Description
  ValueError("could not convert string to float: 'BUS LANE VIOLATION'")
- Violation Legal Code
  ValueError("could not convert string to float: 'T'")
- Violation Post Code
  ValueError("could not convert string to float: 'H -'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'Issuer Squad': 'object',
       'Unregistered Vehicle?': 'float64',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Post Code': 'object'}
"""

我们看一下报错信息中那个长的像二维表一样的部分,我们看到column这一列有一个"Violation Description",然后Dask发现了存在类型为object的值,但期望的是一个float64。虽然我英文不太好,但是Description这个单词我还是认识的,这翻译过来不是"描述"的意思吗?所以你告诉我它是一个float64,你叫我敢信?所以很明显它应该是一个文本类型,但是Dask为什么会将其推断为一个float64呢?主要是这个字段当中出现了大量的缺失值,在原始数据中它们是空白的,所以在解析的时候Dask将空白记录是为空值,并在默认情况下使用NaN进行填充。如果你使用内置函数type来检查的话,会发现NaN是一个float类型,所以Dask在推断时碰巧选择一个一堆NaN,发现它们是float类型,因此这一列就作为float类型了。那么下面就让我们来修复这个问题,这样我们就能使用适当的数据类型来将数据变成DataFrame。

为Dask DataFrame创建schema

通常在操作数据集的时候,你会知道每一列的类型以及取值范围,无论该它是否包含缺失值。这些信息统称为数据的schema(注意区分PostgreSQL里面的schema),如果数据集来自于关系型数据库,那么你很可能已经知道了它的schema,因为数据库中表的每一列都是有类型的,如果你提前知道了这些信息,那么应用到read_csv是非常简单的,在后面你会看到我们如何实现这一点。但有些时候,我们并不知道相关的数据信息,也许我们会从一个没有正确文档记录的web api中获取数据(笔者本人经常干),也许你从其它地方获取到了数据、但因为相关权限导致我们不能访问数据源来查看类型,这个时候一般有两种做法:

  • 猜测 & 检验
  • 手动对数据进行采样

我们对数据类型进行猜测并检验其实并不复杂,如果你的数据的列命名良好,比如:产品描述(Product Description)、销售额(Sales Amount)等等,那么你很容易推断出相应类型。如果我们在运行时看到上面类似的错误出现,那么只能根据错误信息重新创建schema。所以这种方式的优点是,你可以快速、轻松地尝试不同的schema,但缺点是如果计算因为数据类型问题而不断失败,那么不断地重启计算可能会变得单调乏味。

手动采样的方式可能会更复杂一些,因为它需要扫描数据,但是结果也会更加准确一些。让我们来看看如何实现这一点:

dtypes = {col: str for col in common_cols}
dtypes
"""
{'Summons Number': str,
 'Plate ID': str,
 'Registration State': str,
 'Plate Type': str,
 'Issue Date': str,
 'Violation Code': str,
 'Vehicle Body Type': str,
 'Vehicle Make': str,
 'Issuing Agency': str,
 'Street Code1': str,
 'Street Code2': str,
 'Street Code3': str,
 'Vehicle Expiration Date': str,
 'Violation Location': str,
 'Violation Precinct': str,
 'Issuer Precinct': str,
 'Issuer Code': str,
 'Issuer Command': str,
 'Issuer Squad': str,
 'Violation Time': str,
 'Time First Observed': str,
 'Violation County': str,
 'Violation In Front Of Or Opposite': str,
 'House Number': str,
 'Street Name': str,
 'Intersecting Street': str,
 'Date First Observed': str,
 'Law Section': str,
 'Sub Division': str,
 'Violation Legal Code': str,
 'Days Parking In Effect    ': str,
 'From Hours In Effect': str,
 'To Hours In Effect': str,
 'Vehicle Color': str,
 'Unregistered Vehicle?': str,
 'Vehicle Year': str,
 'Meter Number': str,
 'Feet From Curb': str,
 'Violation Post Code': str,
 'Violation Description': str,
 'No Standing or Stopping Violation': str,
 'Hydrant Violation': str,
 'Double Parking Violation': str}
"""

然后这个时候我们再重新读取数据集得到df1:

from dask.diagnostics import ProgressBar
df1 = dd.read_csv(r"C:\Users\satori\Desktop\nyc\Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv",
                  dtype=dtypes)
df1[common_cols].head()

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

尽管读取方式和原来一样,但是这次我们指定了dtype参数,这样就禁用了Dask中的类型推断,而是使用我们显式指定的数据类型。事实上,尽量不要依赖Dask的类型推断,尽可能地手动指定类型,方法就像我们上面那样。另外,书中将这里成为schema,说实话个人觉得很别扭,直接说成指定类型不行吗?这里我们将所有的列,都显式地指定为字符串类型,然后我们再查看前五行的时候就没有报错了。但是这还没完,否则直接指定为字符串格式不就行啦,我们下面还要查看每一列的数据,并选择更加合适的类型来使后续的执行效率最大化。所以这是一件比较耗时的事情,因为它需要我们人工观察,下面我们再来看看"Vehicle Year"这一列:

df1["Vehicle Year"].unique().head(10)
"""
0    2013
1    2012
2       0
3    2010
4    2011
5    2001
6    2005
7    1998
8    1995
9    2003
Name: Vehicle Year, dtype: object
"""

我们看到这一列存储的是年份,那么显然使用uint16是最合适的。其它的列也是同理,比如看到字符,那么不好意思直接就字符串了。当然这种采样的方式也不一定是百分之百正确的,就比如上面的那个年份,尽管我们目前看到的都是数字,但是保不齐会出现一个字符串(由于输入错误导致),但是不管咋样还是要基于我们的推断进行类型设置,如果错了那就从头来过。

当然我们这里的样本数量选择的显然不够多,这个时候你可能会使用compute来返回所有的唯一值。但是问题来了,如果这些值具有高度唯一性,比如它恰好可以充当主键,那么返回的数据量就会非常多。如果你的数据集大到一定程度的话,那么一列也可能撑爆你的内存。在大多数情况下,我们在去重之后返回10~50个即可。

但是对于可能是整数类型的列,我们还需要一个额外检测:这个列中是否含有缺失值,正如我们之前了解到的,Dask会将缺失值填充为NaN,它会被认为是float类型。使用pandas的时候,我们也经常会遇到,明明是整型、但是结尾却多了一个.0,原因就是里面出现了空值。所以我们应该使用float类型存储,而不是整型。

df1["Vehicle Year"].isnull().any().compute()  # True

接下来我们还要对剩余的几十个列重复这一过程,可以想象这是多么的枯燥乏味,不过作者已经帮我们做好了这一过程,当然他也建议我们直接kaggle上查看数据类型来加快这一过程。类型如下:

dtypes = {
    'Date First Observed': np.str,
    'Days Parking In Effect ': np.str,
    'Double Parking Violation': np.str,
    'Feet From Curb': np.float32,
    'From Hours In Effect': np.str,
    'House Number': np.str,
    'Hydrant Violation': np.str,
    'Intersecting Street': np.str,
    'Issue Date': np.str,
    'Issuer Code': np.float32,
    'Issuer Command': np.str,
    'Issuer Precinct': np.float32,
    'Issuer Squad': np.str,
    'Issuing Agency': np.str,
    'Law Section': np.float32,
    'Meter Number': np.str,
    'No Standing or Stopping Violation': np.str,
    'Plate ID': np.str,
    'Plate Type': np.str,
    'Registration State': np.str,
    'Street Code1': np.uint32,
    'Street Code2': np.uint32,
    'Street Code3': np.uint32,
    'Street Name': np.str,
    'Sub Division': np.str,
    'Summons Number': np.uint32,
    'Time First Observed': np.str,
    'To Hours In Effect': np.str,
    'Unregistered Vehicle?': np.str,
    'Vehicle Body Type': np.str,
    'Vehicle Color': np.str,
    'Vehicle Expiration Date': np.str,
    'Vehicle Make': np.str,
    'Vehicle Year': np.float32,
    'Violation Code': np.uint16,
    'Violation County': np.str,
    'Violation Description': np.str,
    'Violation In Front Of Or Opposite': np.str,
    'Violation Legal Code': np.str,
    'Violation Location': np.str,
    'Violation Post Code': np.str,
    'Violation Precinct': np.float32,
    'Violation Time': np.str
}

然后我们就来重新读取数据、并且将它们合并在一起,得到一个最终的DataFrame。但是我们并不是读取4个DataFrame,然后使用dd.concat([df1, df2, df3, df4]),虽然这样也可以,但是Dask为我们提供了一个更加简便的方法,那就是使用通配符。

# 会读取nyc目录下的所有csv文件
data = dd.read_csv(r"C:\Users\satori\Desktop\nyc\*.csv",
                   # 指定类型, 这里的dtypes是我们上面说的新的dtypes, 已经为每一列选择了合适的数据类型
                   dtype=dtypes,  
                   # 指定读取的列
                   usecols=common_cols)
len(data)  # 42339438

我们看到使用通配符是很方便的,因为在分布式文件系统中数据是被切分成多个块的,所以Dask提供了通配符这一个方式,同时里面的参数dtype、usecols会作用在每一个文件上。

现在文件读取进来之后,我们看到数据量是42339438行,可以说非常的多了,但我们暂时先不分析,其实分析起来也没啥意义,主要还是为了介绍相应的API罢了。所以就数据读取来说,我们的目的已经达到了,我们已经可以成功的从分隔符文本文件当中读取内容了。下面我们来看看如何从关系型数据库中读取数据。

从关系型数据库中读取数据

从关系型数据库中读取数据一样很容易,但是需要注意的是:当在集群中使用Dask时,需要保证每个节点都能访问到相应的数据库,这一点非常重要。

使用Dask读取数据库的方法是通过dd.read_sql_table,该方法支持的参数如下:

def read_sql_table(
    table,  # 表名, 注意:不是SQL语句, 只是一张表名
    uri,  # SQLAlchemy风格的连接驱动, 一个字符串, 不是SQLAlchemy中的引擎
    index_col,  # 选择哪一列作为索引, 因为要根据它进行分区
    divisions=None, # 还记得之前的df.divisions吗? 通过参数即可实现分区数量、以及每一个分区的边界
    npartitions=None,  # 分区数, 和divisions不可以同时指定, 通过这种方式指定分区比较简单, 但是每个分区大小是相同的
    limits=None,  # 一个二元组, 表示每一个分区的索引所能使用的最小值和最大值
    columns=None,  # 哪些列
    bytes_per_chunk="256 MiB",  # chunksize
    head_rows=5, # 数据类型推断时所加载的行数, 这里可能有人有疑问, 一会儿解释
    schema=None, # 数据库的schema, 比如:PostgreSQL
    meta=None, # 数据库表的元信息, 如果指定了meta, 那么就不再通过读取head_rows条数据进行推断了, 而是直接使用元信息
    engine_kwargs=None, # 一些额外的引擎参数, 比如我们在连接hive的时候需要往引擎传入一些额外参数, 便可以通过这个参数传递, 对于create_engine而言则是通过参数connect_args
    **kwargs,  # 附加参数, 会交给pd.read_sql
):

我们来读取一下吧,关于数据库读取你们可以从自己能够访问的数据库中读取,这里我就不提供数据了。

conn = "postgres://user:passwd@ip:port/db"  # 信息已隐藏
dd.read_sql_table("table", conn, "id", schema="schema")

也许你会认为Dask会从数据库中获取相应的数据类型信息,因为数据库的每一张表都是有类型的。但结果相反,Dask仍然像读取分隔符文本文件一样会读数据进行采样,并进行类型推断。不过Dask会顺序地从表中读取前五行(具体数量可以通过head_rows指定),而不是在数据集中随机采样,因为数据库每一张表确实都明确定义了类型,相比分隔符文本文件,从数据库中读取数据的话,Dask的类型推断会更可靠。然而,它仍然不是完美的,由于数据排序的方式不同,仍可能导致Dask选择错误的数据类型。例如,字符串列中的某些值可能只包含数字("1456", "2898"等等),如果Dask在数据推断时碰巧采样了这些数据,那么Dask很可能假定该列应该是整数类型而不是字符串类型。如果出现这种情况,那么你仍然需要做一些手工调整,就像之前那样。

第二个假设是数据应该如何分区,如果index_col是数值或datetime类型,那么我们只需要指定分区的数量即可,Dask将会自动根据分区数来推断每一个分区的边界。但如果index_col不是数值或者datetime类型,则必须手动指定分区边界(divisions)

conn = "postgres://user:passwd@ip:port/db"
# 这里以"id"作为index_col, 即分区字段, 它是一个整型
dd.read_sql_table("table", conn, "id", schema="schema").npartitions  # 1

我们看到分区数量为1,这是因为如果没有指定分区,那么Dask会按照数据的整体大小选择合适的分区,显然Dask认为我们这里的数据量只要一个分区就够了,但如果我们手动指定分区数的话:

dd.read_sql_table("adm_staff", conn, "dw_no", schema="adm", npartitions=100).npartitions  # 100

此时分区数就变成了100,而Dask的做法就是选出index_col这列中出现的最大值和最小值,然后根据我们指定的分区数进行分区,所以它要求index_col这一列必须是数值或者datetime类型。如果是其它类型,假设是字符串类型,那么显然是没办法自动分区的,因为两个字符串无法相减。

conn = "postgres://user:passwd@ip:port/db"
dd.read_sql_table("table", conn, "name", schema="schema")

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

发现报错了,因为我们说如果index_col是数值或者datetime类型,那么Dask会自动根据分区数进行分区,但如果不是的话,就会报错了,因为它需要我们自己手动指定分区边界。我们来看看Dask的源码是怎么做的吧。

def read_sql_table(
    table,
    uri,
    index_col,
    divisions=None,
    npartitions=None,
    limits=None,
    columns=None,
    bytes_per_chunk="256 MiB",
    head_rows=5,
    schema=None,
    meta=None,
    engine_kwargs=None,
    **kwargs,
):
    import sqlalchemy as sa
    from sqlalchemy import sql
    from sqlalchemy.sql import elements
	
    # index_col不可以为空
    if index_col is None:
        raise ValueError("Must specify index column to partition on")
	
    # create_engine的其它参数
    engine_kwargs = {} if engine_kwargs is None else engine_kwargs
    # 根据uri创建引擎, 而create_engine的其它参数就直接通过engine_kwargs传递
    # 所以uri直接传递一个字符串即可
    engine = sa.create_engine(uri, **engine_kwargs)
    
    # 创建MetaData
    m = sa.MetaData()
    # 如果table是字符串类型, 那么直接将表反射出来
    if isinstance(table, str):
        table = sa.Table(table, m, autoload=True, autoload_with=engine, schema=schema)
	
    # 取出index_col这一列
    index = table.columns[index_col] if isinstance(index_col, str) else index_col
    # index_col必须是elements.Label或者str类型
    if not isinstance(index_col, (str, elements.Label)):
        raise ValueError(
            "Use label when passing an SQLAlchemy instance as the index (%s)" % index
        )
    # 分区边界和分区数不可以同时指定
    if divisions and npartitions:
        raise TypeError("Must supply either divisions or npartitions, not both")
	
    # 如果没有指定columns,那么筛选全部字段
    columns = (
        [(table.columns[c] if isinstance(c, str) else c) for c in columns]
        if columns
        else list(table.columns)
    )
    # 如果index_col不在columns中,那么添加进去,因为index_col它是作为分区字段,所以是必须获取的
    if index_col not in columns:
        columns.append(
            table.columns[index_col] if isinstance(index_col, str) else index_col
        )
	
    # 将index_col和对应的值加入到kwargs中
    if isinstance(index_col, str):
        kwargs["index_col"] = index_col
    else:
        kwargs["index_col"] = index_col.name
	
    # 如果head_rows大于0
    if head_rows > 0:
        # 筛选出head_rows条记录
        q = sql.select(columns).limit(head_rows).select_from(table)
        head = pd.read_sql(q, engine, **kwargs)

        if head.empty:
            # 如果head为空,证明没有获取到数据
            name = table.name
            schema = table.schema
            head = pd.read_sql_table(name, uri, schema=schema, index_col=index_col)
            # 直接返回Dask DataFrame,分区数为1
            return from_pandas(head, npartitions=1)
		
        # 否则计算每一条记录所占的字节数
        bytes_per_row = (head.memory_usage(deep=True, index=True)).sum() / head_rows
        # 如果meta为None
        if meta is None:
            # 将meta设置空DataFrame,但是保留轴信息
            meta = head.iloc[:0]
    # 如果head_rows大于0不成立,那么meta就不可以为None了
    elif meta is None:
        raise ValueError("Must provide meta if head_rows is 0")
    else:
        # 到了这里说明head_rows<=0, meta不为空
        # 此时要求divisions和npartitions不能为空
        if divisions is None and npartitions is None:
            raise ValueError(
                "Must provide divisions or npartitions when using explicit meta."
            )
	
    # 重点来了,如果分区边界为空
    if divisions is None:
        # 没有指定limits
        if limits is None:
            # 读取index_col这一列
            q = sql.select([sql.func.max(index), sql.func.min(index)]).select_from(
                table
            )
            minmax = pd.read_sql(q, engine)
            # 找到最大值和最小值
            maxi, mini = minmax.iloc[0]
            # 获取类型,这里max_1指的是数据库在读取之后默认起的名字就叫max_1
            dtype = minmax.dtypes["max_1"]
        else:
            # 否则根据limits得到最大值和最小值
            mini, maxi = limits
            dtype = pd.Series(limits).dtype

        if npartitions is None:
            # 如果分区数为空
            q = sql.select([sql.func.count(index)]).select_from(table)
            count = pd.read_sql(q, engine)["count_1"][0]
            # 这里自动根据大小计算分区
            npartitions = (
                int(
                    round(
                        count * bytes_per_row / dask.utils.parse_bytes(bytes_per_chunk)
                    )
                )
                or 1
            )
        # 然后检测index_col对应的类型,如果在数据库中是datetime
        # 那么这里的dtype.kind就会等于"M"
        if dtype.kind == "M":
            divisions = methods.tolist(
                # 通过date_range计算每个分区的边界,用总秒数除以分区数即可
                pd.date_range(
                    start=mini,
                    end=maxi,
                    freq="%iS" % ((maxi - mini).total_seconds() / npartitions),
                )
            )
            divisions[0] = mini
            divisions[-1] = maxi
        # 如果是整型,那么dtype.kind == "i", 无符号整型则是"u", 浮点型则是"f", 所以它们都是数值型
        elif dtype.kind in ["i", "u", "f"]:
            # 我们看到直接根据np.linspace进行分区
            divisions = np.linspace(mini, maxi, npartitions + 1).tolist()
        else:
            # 否则的话, 就报错了, 和我们上面的报错信息是一样的
            # index_col不是数值或者datetime, 必须手动指定divisions
            # 如果只指定npartitions的话, 是没办法正确分区的, 当然都不指定更不可以
            # 都不指定默认基于数据所占内存大小来分区, 但是它同样要求index_col必须为数值或者datetime
            raise TypeError(
                'Provided index column is of type "{}".  If divisions is not provided the '
                "index column type must be numeric or datetime.".format(dtype)
            )

    parts = []
    lowers, uppers = divisions[:-1], divisions[1:]
    for i, (lower, upper) in enumerate(zip(lowers, uppers)):
        cond = index <= upper if i == len(lowers) - 1 else index < upper
        q = sql.select(columns).where(sql.and_(index >= lower, cond)).select_from(table)
        parts.append(
            delayed(_read_sql_chunk)(
                q, uri, meta, engine_kwargs=engine_kwargs, **kwargs
            )
        )

    engine.dispose()

    return from_delayed(parts, meta, divisions=divisions)

那么可能有人好奇了,如果是字符串指定divisions的话要怎么指定。关于这一点我实在是没有兴趣说,我只能告诉你它是基于字符串排序来处理的,字符串排序显然这太low了,所以一般在使用分区字段(index_col)的时候,我们都会使用数值或者datetime类型。另外需要注意的是,我们数据中的时期很多都是年月日格式的,也就是没有带时分秒,这个时候解析出来的dtype.kind不是M、而是O,所以此时你仍然需要手动指定divisions,但是它就比较简单了。如果从2018-1-1开始,2020-1-1结束,那么就把divisions指定为(2018-1-1, 2018-1-2, 2018-1-3, ......)即可。

第三个假设是Dask会认为你要读取表中的所有字段,这个时候我们可以通过columns参数来进行控制,筛选我们关心的字段,当然这个比较简单,就不废话了。

从HDFS和S3中读取数据

你在工作中遇到的大部分数据可能都存储在关系型数据库中,但还是那句话,在现如今这个数据量爆炸的时代,关系型数据库已经捉襟见肘了,所以更强大的替代品正在出现,也就是分布式文件存储系统。分布式文件系统发展非常迅速,最值得注意的是2006年以后,Apache Hadoop和Amazon's Simple(简称S3)等存储系统为分布式计算带来了巨大的效益,比如增加吞吐量、可伸缩性和健壮性。使用分布式计算框架和分布式文件存储系统是一种完美的结合,在最先进的分布式文件系统(比如HDFS)中,节点知道数据的本地性,从而允许将计算传递给数据,而不是将数据传递给计算资源。还是之前说的那句话,在大数据生态圈中是有说法,移动数据不如移动计算,"将计算调度到某个含有数据的节点上"比"将数据移动到某个用于计算的节点上"的成本要小的多,因为这样会节省大量的时间和网络上的通信。

如果是没有采用分布式文件系统的普通集群的话,那么在集群中将数据切分为多个块并发送到其它节点是不可避免的,因此是一个重要的性能瓶颈。在这种配置下,当Dask读取数据时,它将像往常一下对DataFrame进行分区,但是其它节点在接收到数据分区之前,它们不能执行任何的工作。因为在网络上传输这些64MB的数据块需要一些时间,所以这会增加总计算时间。

如果集群的大小以任何较大的幅度增长,那么这会产生更大的问题,假设我们有几百个(或者更多)的工作节点,那么它们会同时竞争数据块,导致工作节点的网络堆栈很容易就会造成饱和,并且变得像蜗牛一样慢。

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

但是通过采用分布式文件存储系统的话,这两个问题都可以得到解决。分布式文件系统没有因为只在一个节点保存数据而造成瓶颈,而是提前对数据进行切分(得到多个块,每个块便是一个block),然后分散到多个机器上。在许多分布式文件系统中,存储块/分区是为了的冗余副本是为了保证可靠性和性能,从可靠性的角度上来看,将每个分区以三副本的形式(常见的默认配置)意味着在发生数据丢失之前,必须有两*立的机器发生故障。但是两台机器在短时间内同时发生故障的可能性远低于一台机器发生故障的可能性,所以它以额外储存的成本增加了一层安全层。

另外从性能的角度来看,将数据分散到整个集群,可以使得运算的机器在本地找到数据的可能性变得更大。所以分散节点可以避免任何单个节点因为数据请求而达到网络饱和,如果一个节点忙于提供数据,它可以将其中一些请求转移到持有相同数据的其它节点。

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

控制分布式计算编排的节点(一般称之为driver)知道它想要处理的数据在哪些位置上是可用的,因为分布式文件系统中维护着保存数据的目录,它将首先询问本地拥有数据的机器是否繁忙。如果其中一个节点不忙,那么driver将指示该工作节点执行计算,如果拥有数据的节点都处于繁忙状态,那么driver可以选择等待,直到其中一个节点空闲,或者指示其它没有数据但出于空闲的节点远程获取数据并进行计算。HDFS和S3是两个最流行的分布式文件系统,但就我们当前的目的而言,它们有一个关键的区别:HDFS在设计时允许节点既可以传输数据也可以计算,但S3不是这样的,Amazon将S3设计为专门用来存储文件和检索文件的web服务。所以我们没有办法再S3服务器上运行应用程序,这意味着当你处理存储在S3中的数据时,必须要将分区从S3传输到Dask工作节点之后才能处理它。那么让我们来看看如果使用Dask从这些系统中读取数据。

这里我在我的阿里云上面部署一个Hadoop吧,然后使用Python的第三方库hdfs将我们之前的那个nyc停车罚单数据上传到HDFS中。

import hdfs

client = hdfs.Client("http://47.xx.xx.89:50070")
# 此时目录下面什么也没有
print(client.list("/"))  # []

# 创建一个目录吧
client.makedirs("/nyc", permission=777)
# 发现目录已经创建了
print(client.list("/"))  # ['nyc']

# 下面将文件上传上去
from pathlib import Path
from pprint import pprint
for file in Path(r"C:\Users\satori\Desktop\nyc").glob("*.csv"):
    client.upload(hdfs_path="/nyc", local_path=str(file))
# 查看/nyc目录, 发现文件已经上传成功了
pprint(client.list("/nyc"))
"""
['Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv',
 'Parking_Violations_Issued_-_Fiscal_Year_2015.csv',
 'Parking_Violations_Issued_-_Fiscal_Year_2016.csv',
 'Parking_Violations_Issued_-_Fiscal_Year_2017.csv']
"""

上传成功之后直接读取,还是使用read_csv函数,使用起来没有任何区别,只是文件的路径变了,之前是在本地,现在是在服务器的HDFS文件系统上。

df = dd.read_csv("hdfs://47.94.174.89/*.csv", dtype=dtypes, usecols=common_cols)

注意:如果想要连接hdfs的话,那么需要保证Dask所在的机器上有hdfs3这个第三方库,直接pip install hdfs3即可。注意:不是我们上面的hdfs。

如果连接S3的话,也是同理,只需要将hdfs换成s3即可,后面是你的AWS账户/文件名。我这里没有AWS,所以就不演示了,总之使用S3的话,需要Dask所在机器有s3fs这个库。

从 Parquet 从读取数据

分隔符文本文件的简单性和可移植性是非常伟大的,但它们并不是性能优化的最佳选择,特别是当我们执行诸如排序、merge、聚合等复杂的数据操作时。尽管各种各样的文件格式试图以多种不同的方式提高效率,但结果都不是很理想。最近比较引人注目的文件格式之一是Apache Parquet。Parquet是Twitter和Cloudera联合开发的一种高性能柱状存储格式,注意:Parquet仅仅是一种存储格式,它是语言、平台无关的,并且不需要和任何一种数据处理框架绑定。它是专门为分布式文件系统设计的,与基于文本的格式相比,它的设计为表带来了几个关键优势:更有效的使用IO、更好的压缩、更强的类型。

传统的文件格式都是"面向行"的形式,值会根据数据的行位顺序存储在内存和磁盘中,假设我们要计算x这一列的平均值,我们必须要逐行扫描,所以需要扫描10个值(3 + 3 + 3 + 1)才能找到我们想要的4个值。这意味着我们要花费更多的时间等待IO完成,而且还要扔掉从磁盘中读取的超过一半数量的值。如果是柱状格式(面向列)的话,我们只需要获取x这一列的连续块,即可得到我们想要的4个值,所以这种操作速度更快、效率更高。

《使用Python和Dask实现分布式并行计算》4. Loading data into DataFrames(从不同数据源加载数据得到DataFrame)

使用面向列的形式的另一个好处就是,现在可以按列对数据进行分区和分发。这将导致更快、更高效的转移操作,因为只有需要操作的列才会被转移,而不会包含其它的列。

最后,有效的压缩也是Parquet的一个主要优点,通过面向列的形式,可以对不同的列采用不同的压缩方案,从而使得整体能够以最有效的方式进行压缩。Python的Parquet库支持很多的流行的压缩算法,比如:gzip、lzo和snappy。

如果要通过Dask读取Parquet格式的文件,那么要确保所在机器安装了第三方库fastparquet或pyarrow,可以直接通过pip安装。这两个库都可以,但是更建议pyarrow,因为它对序列化复杂的嵌套结构有更好的支持。当然还有其它的压缩裤,比如:python-snappy或python-lzo,这些库也可以直接通过pip安装。

至于读取的话,可以通过dd.read_parquet来读取,具体的可以看源码注释,这里不多说了。当然这个方法除了可以读取本地的文件,也可以读取HDFS或者S3上的文件。

总结

  • 可以使用columns属性检查DataFrame的列。
  • 对于大型数据集,不应该依赖Dask的类型推断,相反你应该基于常见的numpy数据类型定义自己的类型。
  • 因为面向列的文件格式以及高压缩率,Parquet能提供更好的性能。如果可能的话,尝试以Parquet格式获取你的数据。
上一篇:反射与依赖注入


下一篇:Docker、Jenkins企业项目实战附文档(完整)