Spark DataFrame에 새 열을 어떻게 추가합니까 (PySpark 사용)?
Spark DataFrame (PySpark 1.5.1 사용)이 있고 새 열을 추가하고 싶습니다.
나는 성공하지 않고 다음을 시도했다.
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
또한 이것을 사용하여 오류가 발생했습니다 :
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
그렇다면 PySpark를 사용하여 기존 DataFrame에 새 열 (Python 벡터 기반)을 어떻게 추가합니까?
DataFrame
In Spark에 임의의 열을 추가 할 수 없습니다 . 리터럴을 사용해야 만 새 열을 만들 수 있습니다 (다른 리터럴 유형은 Spark DataFrame에서 상수 열을 추가하는 방법에 설명되어 있음 ).
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
기존 열 변환 :
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
사용하여 포함 join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
또는 함수 / udf로 생성 :
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
pyspark.sql.functions
Catalyst 표현식에 매핑되는 성능 측면의 내장 함수 ( )는 일반적으로 Python 사용자 정의 함수보다 선호됩니다.
If you want to add content of an arbitrary RDD as a column you can
- add row numbers to existing data frame
- call
zipWithIndex
on RDD and convert it to data frame - join both using index as a join key
To add a column using a UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
For Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
I would like to offer a generalized example for a very similar use case:
Use Case: I have a csv consisting of:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
I need to perform some transformations and the final csv needs to look like
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.
so:
1) I read the original csv using spark.read and call it "df".
2) I do something to the data.
3) I add the null columns using this script:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).
The simplest way to add a column is to use "withColumn". Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.
Below is an example that you can consider:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
We can add additional columns to DataFrame directly with below steps:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
You can define a new udf
when adding a column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
'Programing' 카테고리의 다른 글
ImportError : dateutil.parser라는 모듈이 없습니다. (0) | 2020.08.05 |
---|---|
배치 파일의 변수에서 큰 따옴표를 제거하면 CMD 환경에 문제가 발생합니다 (0) | 2020.08.05 |
Java에서 동일한 catch 블록에서 두 가지 예외를 잡을 수 있습니까? (0) | 2020.08.05 |
C #의 System.Threading.Timer가 작동하지 않는 것 같습니다. (0) | 2020.08.05 |
Android 디자인 라이브러리-부동 작업 버튼 패딩 / 여백 문제 (0) | 2020.08.05 |