Publié le 31/10/2018 Par Clément Carreau

Si vous travaillez comme moi dans un environnement mêlant data engineers, data scientists et data analysts et que le choix de votre plateforme s’est porté sur la plateforme cloud de Google, il y a fort à parier que vous ayez accès à beaucoup de données sous BigQuery.


Rappel

BigQuery est un service web permettant de requêter et d’analyser vos données.

Vous pouvez vous en servir par le biais d’une API REST, du shell mais aussi par le biais de l’UI.

Pour faire une comparaison simpliste, on pourrait citer Hive. Tout comme BigQuery, Hive n’est pas une base de données mais permet de requêter ses données sur HDFS. BigQuery fonctionne d’une façon similaire, sauf que vos données sont stockées dans Colossus et traitées par Dremel.


Prenons en exemple mon contexte professionnel. Nous sommes passées depuis peu d’un cluster on-premise à la solution cloud de Google et toutes les données qui étaient auparavant stockées sous HDFS et matérialisées dans Hive sont désormais directement intégrées dans nos différents datasets et tables BigQuery. Dès lors, il semble naturel de se servir de BigQuery comme de notre nouvelle source de donnée pour Spark, après tout un connecteur existe bel et bien ! Mais est-ce réellement une bonne idée ?

Découverte du connecteur BigQuery pour Spark


Remarque

Les données utilisées ici sont des dataset publiques que vous pouvez facilement retrouver sur l’interface de BigQuery.


Lecture des données

Voyons un peu l’utilisation concrète de ce connecteur, pour le moment, vis-à-vis de la lecture des données :

# configuration pour la lecture depuis BigQuery
conf = {
'mapred.bq.project.id': "monProjet",
'mapred.bq.gcs.bucket': "monBucket",
'mapred.bq.temp.gcs.path': 'gs://monBucket/bigquery/pyspark_input',
'mapred.bq.input.project.id': " bigquery-public-data",
'mapred.bq.input.dataset.id': "hacker_news",
'mapred.bq.input.table.id': "comments",
}

# lecture effective
rdd = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)

 

Si comme moi vous êtes habitués à lire directement depuis un file system (type hdfs ou cloud storage), cette façon de lire les données devrait vous sembler beaucoup plus verbeuse.

On retrouve d’ailleurs des paramètres de configuration comme on pourrait s’y attendre :

  • Nom du projet
  • Nom du dataset
  • Nom de la table

Mais on retrouve aussi d’autres paramètres plus intéressants :

  • Un nom de bucket
  • Un chemin relatif à ce même bucket

Exécutons le code ci-dessus et rendons nous à l’adresse indiquée par le paramètre mapred.bq.temp.gcs.path.

On retrouve tout un tas de fichiers. Si vous regardez le contenu d’un dossier shard en particulier, vous verrez que les données sont au format JSON. Ce qui explique surement l’utilisation du JsonTextBigQueryInputFormat.

A l’heure d’écriture de cet article, la taille des données indiquée par BigQuery est de 3.41Go. Regardons désormais la taille des données qui ont été générées sur Cloud Storage :

gsutil du –sh gs://monBucket/bigquery/pyspark_input
# affiche 4.63Go

Les données sont donc plus volumineuses une fois exportées vers GCS. On peut d’ailleurs remarquer qu’elles ne sont pas compressées étant données qu’elles sont directement lisible en clair (vous pouvez essayer de télécharger un fichier JSON pour vous en assurer).

Autre questionnement de ma part lorsque j’ai voulu utiliser ce connecteur : « Pourquoi un RDD ? »

En effet, nous essayons ici de lire des données possédant un schéma, ce schéma étant connu par BigQuery mais étant aussi accessible dans les fichiers JSON eux-mêmes. Cela nous force donc à ajouter une étape de conversion du RDD vers DataFrame si nous souhaitons utiliser Spark SQL.

Ecriture des données

Voyons désormais la façon d’écrire des données dans BigQuery, si vous souhaitez par exemple effectuer des traitements puis sauvegarder le résultat dans une table.

#2 configuration
output_directory = 'gs://monBucket/bigquery/pyspark_output'
output_files = output_directory + '/part-*'
output_dataset = 'monDataset'
output_table = 'maTable'

# adaptation des données en dataframe selon vos données puis écriture
df = ...
df.write.json(output_directory)

subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--replace '
'--autodetect '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=output_files
).split())

 

Encore une fois, beaucoup de verbosité pour une action qui semble pourtant simple. Comme on peut le remarquer ici aussi, il est encore question de Cloud Storage. Il est nécessaire d’écrire les données dans GCS avant de les insérer dans BigQuery par le biais d’un appel à la commande bq load. Pas très sexy.

Considérations

Des fichiers temporaires pas si temporaires que ça

Détail intéressant relatif au paramètre mapred.bq.temp.gcs.path : la partie temp

Si vous n’avez pas encore supprimé votre bucket, je vous invite à retourner voir si vos fichiers « temporaires » ont bien été supprimés. Spoil : la réponse est non.

Attention donc à certains problèmes pouvant survenir :

  • Accroissement de la taille de votre bucket et donc répercussion sur votre billing
  • Échec en cas de relance du job (pour cause de path déjà existant)

N’oubliez pas de supprimer ces fichiers.

Attention aux débordements

Autre détail, vous avez surement du remarquer que la lecture des données était faite en JSON. D’après mes essais, il ne semble pas possible de demander à BigQuery d’exporter les données vers GCS dans un format autre que JSON ou Avro. Attention donc à la taille des données exportées !

Par exemple, j’ai essayé de créer une table sous BigQuery depuis un dossier contenant des fichiers Parquet, le dossier faisant 30Go. Une fois les données lues depuis Spark, plus de 300Go de données avaient été exportées par BigQuery dans Cloud Storage.

Une performance en deux temps

Etant donné qu’il semble impossible de ne pas passer par l’étape de Cloud Storage lorsque l’on souhaite lire/écrire des données en utilisant BigQuery, pourquoi ne pas considérer de complètement se passer de BigQuery ? Cela aurait le mérite d’éviter la partie export / import des données qui est totalement indépendante des performances votre cluster Dataproc.

Utiliser directement Cloud Storage à, en outre, quelques avantages :

# lecture très simple
df = spark.read.parquet("gs://monBucket/input/")

# écriture très simple
df.write.parquet("gs://monBucket/output")

Comme on peut le remarquer, c’est aussi simple que de lire depuis n’importe quel file system. Il suffit juste de modifier le préfixe.

Utiliser Cloud Storage est d’autant plus pratique que cela permet d’adapter très rapidement un code utilisant un autre file system (par exemple en changeant simplement hdfs:// ou mapr:// par gs://). C’est donc un gain de temps appréciable lorsqu’il est nécessaire de faire migrer des algorithmes vers la plateforme de Google.

En revanche, cela risque d’avoir un fort impact sur la gestion des données dans votre système d’information. Il sera désormais nécessaire de répliquer et de synchroniser les données présentes dans BigQuery avec celles présentes dans GCS et de propager toute modification pour assurer la consistance (ajout, modification, suppression). Sans oublier l’aspect sécurité, la prise en compte des données personnelles impactant le stockage de certaines données sensibles, etc.

En bref

Lorsque j’ai découvert le connecteur BigQuery pour Spark, je m’attendais à quelque chose de relativement bien intégré, comme nous l’avons vu, l’utilisation de ce dernier ne vient pas sans un coût.

Si vous avez actuellement déjà la possibilité de récupérer vos données soit depuis GCS soit depuis BigQuery, il semble, dans tous les cas, plus optimal de les lire depuis Cloud Storage.

En revanche, si vos données sont uniquement présentes dans BigQuery et que ni la taille des données exportées ni le temps de traitement ne sont des points critiques dans vos développements, il peut être légitime d’utiliser ce connecteur.

Enfin, si vous avez la possibilité d’utiliser Scala, pourquoi ne pas voir du côté du connecteur développé par Spotify ? Même s’il ne résout pas les problèmes énoncés plus haut il a au moins le mérite de simplifier la communication avec BigQuery et de vous permettre de lire directement des DataFrame, d’exécuter des requêtes SQL, etc.

Pas encore de commentaires

Publier un commentaire

Auteur

Clément Carreau

Issu d'une formation dans les systèmes distribués c'est avec une certaine logique que Clément s'est tourné vers le monde du big data dès la fin de sa formation. La constante évolution du domaine, les opportunités techniques et les problématiques associées sont autant de raisons l'ayant poussé à faire ce choix. Clément a rejoint Meritis en 2016 et évolue aujourd'hui en tant qu'ingénieur big data, travaillant conjointement avec des data-scientists à la mise en place de solutions machine learning.