View Snippet
import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage
import pandas_gbq as pd_gbq
import pandas as pd
import traceback, os, sys
class Dataniel():
"""
This class is made to transform content features for all users but it should
be easy to refactor it to fit to just our ground truth churn user base.
"""
def __init__(self,
bq_creds: str = "cred_access_token.json"):
self.ATTRS = { # set by set_bq_clients
'BQ_CLIENT' : None,
'BQ_STORAGE_CLIENT' : None,
}
self.DATA = {
}
self.set_bq_clients(bq_creds)
def set_bq_clients(self,
goog_app_creds : str) -> object:
try:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = goog_app_creds
# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
scopes = ["https://www.googleapis.com/auth/bigquery"]
)
# Make clients.
self.ATTRS["BQ_CLIENT"] = bigquery.Client(credentials = credentials, project = your_project_id)
self.ATTRS["BQ_STORAGE_CLIENT"] = bigquery_storage.BigQueryReadClient(credentials = credentials)
except Exception as e:
print(traceback.print_exc())
def retrieve_data(self, **kwargs):
try:
name = kwargs.get("name")
if kwargs.get("new_creds"):
self.set_bq_clients(kwargs.get('new_creds'))
self.DATA[name] = self.__retrieve_data(kwargs.get('query_string'))
return self.DATA[name] if kwargs.get('return_df') else None
except Exception as e:
print(traceback.print_exc())
def __retrieve_data(self,
query_string : str) -> pd.DataFrame:
""" Function for downloading query results with an input SQL statement string"""
print('JOB..', end = "")
try:
# connecting to big query and exporting the table as a dataframe
dataframe = (
self.ATTRS["BQ_CLIENT"].query(query_string)
.result()
.to_dataframe(bqstorage_client = self.ATTRS['BQ_STORAGE_CLIENT'])
)
print('COMPLETE. Output shape:', dataframe.shape)
return dataframe
except Exception as e:
print("FAILED! Error below")
print(traceback.print_exc())
def upload_data(self,
featurized_data : pd.DataFrame,
destination_dataset : str = "vores_analysen",
destination_table : str = "temp",
if_exists_action : str = "replace"):
""" Function for uploading a dataframe to big query.
pushing the input dataframe to big query as a table"""
try:
print('UPLOADING DATA...', end = "")
pd_gbq.to_gbq(featurized_data,
destination_table = f"{destination_dataset}.{destination_table}",
project_id = "zetland-master",
if_exists = if_exists_action,
location = 'EU')
print('COMPLETE')
#return transformed_churn_data
except Exception as e:
print("FAILED! Error below")
print(traceback.print_exc())