博客

【求职分享】

【笔试/面试】

数据 | 如何用Google Cloud 建立简易数据处理流程

Feb 03,2023

分享本文:

【概要】数据集的生成一般通过编写数据流程( Data Pipeline)来实现。针对不同的原始数据类型-批数据(Batch Data)或流数据(Streaming Data),处理数据的流程也会略有不同。本文通过一个简单的例子,来介绍如何借助Google Cloud 建立批数据处理流程。

大家好, 我是Lucy@FinTech社区。今天和大家介绍最近爆火的超大规模自然处理深度学习模型: GPT-3!

 

扫码添加微信,加入FinTech社区,提认知,攒人脉,求职招聘!

图片

 

数据集的生成一般通过编写数据流程( Data Pipeline)来实现。针对不同的原始数据类型-批数据(Batch Data)或流数据(Streaming Data),处理数据的流程也会略有不同。本文通过一个简单的例子,来介绍如何借助Google Cloud 建立批数据处理流程。

 

 
 
 

01 提出需求

 
 

假设我们想要建立一个数据集,其包含大陆两大证券交易所(上海及深圳证券交易所)每日成交量排名前20的股票信息。

 

原始信息可以通过两大证券交易所网站获得:

 

http://www.sse.com.cn/market/stockdata/activity/main/

 

http://www.szse.cn/market/stock/active/actv/index.html

 

 

02 分析需求

 
 

在实现代码前,我们需要先对需求进行分析,建立模型,并判断需要实现的功能。

 

建模

 

一个典型的ETL(Extract, Transform, Load)数据流程大都由以下几部分组成:

 

  • 原始数据存储(download)从数据源(Vendor Source)下载数据,并备份到自有存储。自有存储的好处是便于日后重新处理或查找,而不用担心数据源的消失。

  • 原始数据变形(reshape)将不同数据格式统一,重新生成原始数据。不同数据提供商的数据保存格式往往不同(如csv,excel,xml等),将不同的数据格式进行统一,能方便后续的数据处理。

  • 数据归一(normalize)对变形后的数据进行重新建模,在理想的数据模型(schema)下重新生成数据。不同数据源的数据,其数据模型往往不尽相同,同时与我们最终想要得到的数据集模型也未必相同,所以我们需要对数据进行归一处理。

  • 数据整合(merge)对不同来源的相同数据进行整合,生成最终数据集。对同一对象,不同的数据源可能会提供不同的数据。例如根据计算方法不同,不同数据提供商针对同一支证券会提供不同的的收盘价。我们需要将这些数据整合来生成最终的数据集。

 

分析

 

在本例中,数据源(Vendor Source)共有两个,上海证券交易所和深圳证券交易所

 

上海证券交易所提供的数据存储在html文件中,除了成交量的排名外还有成交金额等其他指标排名。而深圳证券交易所提供的数据存储在xlsx文件中,并仅有成交量的排名。

 

上海证券交易所提供的数据中除了股票代码,股票简称,累计成交量外,还有价格信息(开盘,收盘,均价)及几个比率信息(振幅,换手率)等。而深圳证券交易所提供的数据除了股票代码,股票简称,累计成交量外,也提供几个价格信息。同时两组数据的单位并不相同(上海证券交易所为万股、万元,深圳证券交易所为亿股、亿元)。

 

两组数据互斥(mutually exclusive),所以假设如果没有人为改写(manual override)的需求,数据整合并不需要做什么。

 

根据上面的分析,我们确定数据流程中需要完成以下几项任务:

 

下载html及xlsx文件。转换html及xslx文件至同一格式。对两组数据进行重新建模,统一数据模型及单位。简单数据整合并输出数据集。

 

流程图

 

根据上面的分析,我们可以建立以下流程图:

 

图片

 

 

03 实现需求

 
 

技术栈

 

在本例中,数据流程的实现需要用到Google Cloud中的以下几个产品:

  • Cloud Storage 用来存储原始数据文件。

  • Cloud Function 用来下载数据文件,触发文件存储后的各项变形。

  • Cloud Scheduler 用来定时执行Cloud Function。

  • Dataflow 用来执行数据变形,归一以及整合。Big Query 用来存储变形后,归一后,以及整合后的数据。

 

原始数据存储

 

上海证券交易所主板成交量前20的数据并没有提供文件下载链接,所以我们只能存储整个活跃股排名前20的html页面。

 

数据下载及存储可以通过Cloud Function,Cloud Scheduler和Cloud Storage来实现。

 

Cloud Function伪代码如下:

 

图片

 

通过Cloud Scheduler定时调用上面的Cloud Function,下载每日的数据信息,并将最终的html文件存储在Cloud Storage中。

 

深圳证券交易所主板成交量前20的数据提供xlsx格式下载。同上,我们依旧可以通过Cloud Function,Cloud Scheduler和Cloud Storage来实现。

 

Cloud Function伪代码如下:

 

图片

 

原始数据变形,归一以及整合

 

在原始文件下载后,我们需要对原始数据进行变形,归一以及整合。这些变形都可以通过Dataflow来实现。每一个数据源,我们都需要进行变形及归一操作,而整合只需要对所有数据进行统一处理便可。

 

针对上海证券交易所数据的变形、归一的伪代码如下:

 

图片

 

针对深圳证券交易所数据的变形、归一与上面类似。

 

之后,我们可以通过Cloud Function来建立触发机制(trigger),每当原始数据文件被下载到Google Cloud Storage后,上面的Dataflow流程便会被自动调用。

 

因为本例中数据互斥,数据整合仅仅是简单的合并,所以整合部分的伪代码便略过了。

 

最后,整个流程中生成的各种数据会被存储到Big Query的各个表格中:

 

图片

 

main_board_trading_volume_top_20中存储的数据便是我们需要的数据集。

 

 
 
 

04 写在最后

 
 

如果要将上面所建立的数据流程应用到正式的生产环境(Production Environment)中,其实还有大量的问题需要解决,以下仅列出几个例子:

 

技术栈的选取随着各种技术的不断发展,我们往往有不同的选择来实现同一功能。如存储数据我们可以用Big Query,也可以用RDBMS,NoSQL等。选取的技术栈是否合适,是我们需要仔细考虑的问题。

 

代码复用 (Code Reusability)处理简单数据的数据流程大都大同小异。为了能减少后期维护成本,以及提高代码复用,这些数据流程往往是通过一个共同的数据流程框架来实现,而不是成百上千个独立的小程序,所以上面示例中存在的大量重复代码,在正式生产环境中并不常见。

 

证券标识(Security ID)统一 一个量化交易系统的搭建,往往需要数十乃至上百个数据集的协作。同一支证券,在不同数据集中往往拥有不同的标示(如Ticker,SEDOL,ISIN,CUSIP等)。为了能将新的数据集导入现有平台,我们需要解决证券标识的统一问题。

 

流程的可靠性(Pipeline Reliability)及错误处理 (Error Handling)一个流程的可靠性会极大地影响后期的维护成本。上面的例子中,一个常见的改动(文件地址更新,不完整文件,数据错误,数据模型更改)便会使整个数据流程出错。我们需要尽可能的完善错误处理机制,来尽可能的减少后期维护需求。

 

写一篇评论

发表评论
%{tishi_zhanwei}%

网站有漏洞

Aug 31,2023

1111111111112'

Aug 31,2023

1111111111112

Aug 31,2023

111111111111

Aug 31,2023

111111111111

Aug 31,2023

><000000'"/\

Aug 31,2023

< 12 >