-
Notifications
You must be signed in to change notification settings - Fork 0
/
script_etl_with_gcp_and_airflow.py
177 lines (143 loc) · 13.5 KB
/
script_etl_with_gcp_and_airflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
'''
Tarefa 3 Projeto: Desafio: Capturar dados de Loja de Aplicativo - Google Play
Autor: Cícero Henrique dos Santos
Descrição: Criar objeto com operações de captura de dados, com atualização da tabela.
O objetivo aqui é criar um pipeline simplificado de dados para o banco, de forma que a tabela seja sempre atualizada com as últimas informações de reviews.
'''
'''
ATENÇÃO!
Antes de executar o arquivo, você precisa definir uma variável de ambiente em que seu código é executado.
1. Crie uma conta de serviço - https://cloud.google.com/docs/authentication/getting-started#creating_a_service_account
2. Forneça credenciais de autenticação ao código do aplicativo definindo a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS.
- Veja mais em: Como configurar a variável de ambiente para autenticação: https://cloud.google.com/docs/authentication/getting-started#windows
3. Crie uma chave de conta de serviço por meio da página de criação de chave de conta de serviço no console do Google Cloud Platform - https://console.cloud.google.com/apis/credentials/serviceaccountkey.
Selecione o projeto> clique na aba Chaves > tipo de chave JSON e baixe o arquivo de chave.
4. Para evitar alguns conflitos, instale o gerenciador de pacotes PIP a cada sessão:
'''
# Importação de bibliotecas necessárias
from google.cloud import client, storage # Importa métodos de acesso ao Google Cloud Storage
from google.oauth2 import service_account # Importa métodos de autenticação da conta de serviço do Google Cloud
from google_play_scraper import Sort, reviews_all # Importa métodos Sort e reviews para buscar avaliações do aplicativo pela biblioteca google_play_scraper
import pandas as pd # Importa biblioteca pandas para analise de dados
# Criação de variáveis para facilitar manutenção e troca de informações
app_id = 'com.amazon.dee.app' # Armazena o ID do APP a ser utilizar
my_bucket_name = 'bigquery-gcp-scrapper' # Inclua o nome do seu bucket
my_project = 'bigquery-google-play-scrapper' # Inclua o nome do seu projeto
my_project_id = 'nifty-foundry-336623' # Inclua o nome do id da chave de autenticação de acesso
SCOPE = ["https://www.googleapis.com/auth/cloud-platform"] # Variável padrão de scopo de autenticação
class etl_with_gcp():
# Classe de construtores para sempre iniciar por esta antes das demais e subir as chaves
def __init__(self):
#PARA TESTE NO WINDOWS
self.my_credential = service_account.Credentials.from_service_account_file('C:/Users/cicer/Documents/GitHub/desafio-case-python-sauter/dags/etl_with_gcp_and_airflow/nifty-foundry-336623-dad5e810d66e.json', scopes=SCOPE)
self.client = storage.Client.from_service_account_json('C:/Users/cicer/Documents/GitHub/desafio-case-python-sauter/dags/etl_with_gcp_and_airflow/nifty-foundry-336623-dad5e810d66e.json') # Acesso ao Google Cloud Storage
#PARA TESTE LINUX
#self.my_credential = service_account.Credentials.from_service_account_file('/opt/airflow/dags/etl_with_gcp_and_airflow/nifty-foundry-336623-dad5e810d66e.json', scopes=SCOPE)
#self.client = storage.Client.from_service_account_json('/opt/airflow/dags/etl_with_gcp_and_airflow/nifty-foundry-336623-dad5e810d66e.json') # Acesso ao Google Cloud Storage
#Função para capturar(extrair) dados no google play e enviar dados em arquivos csv para o Google Cloud Storage
def scraper_google_play(self):
try:
print('Capturando dados, por favor aguarde...')
resultado_revisao = [] # Criar lista para armazenar os resultados capturados
# Criando uma lista com os resultados das análises do app rastreados
for pontuacao in list(range(1, 6)): # Laço para percorrer as avaliações por nível de classificação de 1 a 5
resultado = reviews_all(
app_id, # Busca os dados do app Alexa
sleep_milliseconds=0, # Define limite em 0 milisegundos para rastrear
lang='pt', # Define linguagem como família português
country='br', # Define a região de origem como Brasil
sort=Sort.RATING, # Pesquisa por avaliação (RATING)
filter_score_with = pontuacao, # Definindo o filtro de acordo com a potuação atual do for
)
resultado_revisao.extend(resultado) # Salva o resultado de cada captura dentro da ultima possição lista
df_alexa = pd.DataFrame(resultado_revisao) # Transforma o resultado em um DataFrame com pandas
print('Sucesso! Captura de dados realizada')
# Filtrando colunas selecionadas com o método filter
df_alexa = df_alexa.filter(['content', 'score', 'thumbsUpCount', 'reviewCreatedVersion', 'at'])
df_aval_positiva = df_alexa.loc[df_alexa['score'] >= 4] # Classificação positiva (maior ou igual a 4)
df_aval_neutra = df_alexa.loc[df_alexa['score'] == 3] # Classificação neutra (Igual a 3)
df_aval_negativa = df_alexa.loc[df_alexa['score'] < 3] # Classificação negativa (Menor que 3)
# Salvar classificação positiva em arquivo CSV
df_aval_positiva.to_csv('aval_positiva.csv', # Definie o nome do arquivo
encoding='utf-8', # Definie a codificação para utf-8
index=False, # Define a retirada do index
sep=';', # Define o separador como ';'
header=True # Define salvar como cabeçalho
)
# Salvar classificação neutra em arquivo CSV
df_aval_neutra.to_csv('aval_neutra.csv', # Definie o nome do arquivo
encoding='utf-8', # Definie a codificação para utf-8
index=False, # Define a retirada do index
sep=';', # Define o separador como ';'
header=True # Define salvar como cabeçalho
)
# Salvar classificação neutra em arquivo CSV
df_aval_negativa.to_csv('aval_negativa.csv', # Definie o nome do arquivo
encoding='utf-8', # Definie a codificação para utf-8
index=False, # Define a retirada do index
sep=';', # Define o separador como ';'
header=True # Define salvar como cabeçalho
)
print('Sucesso! Arquivos CSVs salvos')
# Envio de arquivos para o Google Cloud Storage
bucket = self.client.bucket(my_bucket_name) # Define o nome do Bucket
blob = bucket.blob('aval_positiva.csv') # Define o local onde o arquivo será armazenado
blob.upload_from_filename('aval_positiva.csv') # Define o nome que o arquivo será armazenado
bucket = self.client.bucket(my_bucket_name) # Define o nome do Bucket
blob = bucket.blob('aval_neutra.csv') # Define o local onde o arquivo será armazenado
blob.upload_from_filename('aval_neutra.csv') # Define o nome que o arquivo será armazenado
bucket = self.client.bucket(my_bucket_name) # Define o nome do Bucket
blob = bucket.blob('aval_negativa.csv') # Define o local onde o arquivo será armazenado
blob.upload_from_filename('aval_negativa.csv') # Define o nome que o arquivo será armazenado
print('Sucesso! Arquivos enviados para o Google Cloud Storage')
except:
print('Erro! Não foi possível realizar a captura de dados')
#Função para fazer download dos arquivos CSV do Google Cloud Storage
def download_files_csv_cloud_storage(self):
try:
bucket = self.client.bucket(my_bucket_name) # Define o nome do Bucket
blobs = self.client.list_blobs(my_bucket_name) # Armazenar arquivos em uma lista passando o nome do bucket de onde estão os arquivos
# Laço para percorrer cada elemento de dentro do bucket
for object in blobs:
arquivo = bucket.blob(object.name) # Captura o nome do arquivo de acordo com a posição do laço
arquivo.download_to_filename(object.name) # Salva o arquivo com o mesmo nome
print(arquivo) # Imprime o nome do arquivo que foi realizado o download a cada laço
print('Sucesso! Download de arquivos CSV completados')
except:
print('Erro! Download imcompleto, algo deu errado.')
# Função para enviar Dataframes para o DataWarehouse(BigQuery)
def upload_dataframe_to_bigquery(self):
#Lê os arquivos CSVs e armazena os dados em um DataFrame
df_aval_positiva = pd.read_csv('aval_positiva.csv', sep=';') # Armazena os dados do CSV aval_positiva.csv no DataFrame df_aval_positiva
df_aval_neutra = pd.read_csv('aval_neutra.csv', sep=';') # Armazena os dados do CSV aval_neutra.csv no DataFrame df_aval_neutra
df_aval_negativa = pd.read_csv('aval_negativa.csv', sep=';') # Armazena os dados do CSV aval_negativa.csv no DataFrame df_aval_negativa
try:
# Carregar DataFrames para tabela dentro do Datawarehouse(Bigquery)
df_aval_positiva.to_gbq(
destination_table = 'avaliacoes.avaliacao_positiva', # Define o caminho da tabela de destino criada no BigQuery = NomeConjuntodeDados.nomedaTabela
project_id = my_project_id, # Define o nome do ID do projeto
if_exists = 'replace', # Substitui a tabela caso já exista, a fim de evitar dados duplicados na mesma tabela
credentials = self.my_credential # Define a credencial de serviço de acesso
)
print('Sucesso! df_aval_positiva enviado para o BigQuery')
df_aval_neutra.to_gbq(
destination_table = 'avaliacoes.avaliacao_neutra', # Define o caminho da tabela de destino criada no BigQuery = NomeConjuntodeDados.nomedaTabela
project_id = my_project_id, # Define o nome do ID do projeto
if_exists = 'replace', # Substitui a tabela caso já exista, a fim de evitar dados duplicados na mesma tabela
credentials = self.my_credential # Define a credencial de serviço de acesso
)
print('Sucesso! df_aval_neutra enviado para o BigQuery')
df_aval_negativa.to_gbq(
destination_table = 'avaliacoes.avaliacao_negativa', # Define o caminho da tabela de destino criada no BigQuery = NomeConjuntodeDados.nomedaTabela
project_id = my_project_id, # Define o nome do ID do projeto
if_exists = 'replace', # Substitui a tabela caso já exista, a fim de evitar dados duplicados na mesma tabela
credentials = self.my_credential # Define a credencial de serviço de acesso
)
print('Sucesso! df_aval_negativa enviado para o BigQuery')
except:
print('Erro! Não foi possível enviar arquivos para o BigQuery')
# Testar funções separadamente
if __name__ == "__main__":
etl_with_gcp().scraper_google_play()
etl_with_gcp().download_files_csv_cloud_storage()
etl_with_gcp().upload_dataframe_to_bigquery()