PySpark, aplatir une structure imbriquée

J'ai des tweets entrants en streaming depuis Kafka et j'ai défini le schéma suivant pour la partie valeur :

schema = StructType() \
.add("data", StructType() \
.add("created_at", TimestampType())
.add("text", StringType()))

Ensuite, j'applique le schéma au flux comme ceci :

from pyspark.sql import functions as F
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"), schema).alias('value'),
F.col('topic'))

Réception du schéma suivant :

root
|-- key: string (nullable = true)
|-- value: struct (nullable = true)
| |-- data: struct (nullable = true)
| | |-- created_at: timestamp (nullable = true)
| | |-- text: string (nullable = true)
|-- topic: string (nullable = true)

Je voudrais aplatir le dataframe pour obtenir quelque chose comme ceci:

root
|-- key: string (nullable = true)
|-- created_at: timestamp (nullable = true)
|-- text: string (nullable = true)
|-- topic: string (nullable = true)

J'ai regardé dans d'autres questions similaires dans StackOverflow mais toutes les réponses semblaient très compliquées, j'ai donc proposé une idée qui l'a simplifié:

df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"), schema).alias('value'),
F.col('topic')) \
.select(F.col('key').cast('string'), F.col('value.data.created_at'),
F.col('value.data.text'), F.col('topic'))

Je viens d'appeler la méthode.select() deux fois, mais pour cela j'ai besoin de répéter les colonnes et je me demandais s'il existait un autre moyen aussi simple que possible mais ne répétant pas les colonnes comme dans ma solution. J'ai aussi essayé d'utiliser F.explode(F.from_json(col("value").cast("string"), schema).alias('value'))mais cela me donne une erreur.


Solution du problème

J'ai fini par trouver une solution, en accédant simplement aux éléments directement à l'intérieur de l'instruction.select():

df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", topics_list) \
.option("startingOffsets", "earliest") \
.load() \
.select(F.col('key').cast('string'),
F.from_json(F.col("value").cast("string"),
schema)['data']['created_at'].alias('created_at'),
F.from_json(F.col("value").cast("string"),
schema)['data']['text'].alias('text'),
F.col('topic'))

Commentaires

Posts les plus consultés de ce blog

Erreur Symfony : "Une exception a été levée lors du rendu d'un modèle"

Détecter les appuis sur les touches fléchées en JavaScript

Une chaîne vide donne "Des erreurs ont été détectées dans les arguments de la ligne de commande, veuillez vous assurer que tous les arguments sont correctement définis"