# import pandas as pd  # bricks는 import가 없어도 됨
# import pyspark.pandas as ps # bricks는 import가 없어도 됨

my_list = [['male', 1, None], ['female', 2, 3], ['male', 3, 4]]
#-- pd --
dp = pd.DataFrame(my_list, columns=['A', 'B', 'C'])
dp.head()

#-- ps --
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
ds.show()

 

 

#-- pd --
dp.fillna(-99)
#-- ps --
ds.fillna(-99).show()

 

 

#-- pd --
# 주의 : 특정 컬럼을 선택해야 합니다.
dp.A.replace(['male','female'],['1', '0'], inplace=True)
dp
#-- ps --
# 주의 : na를 대체하면서 동시에 특정 열의 값을 대체하지 못합니다.
# 이 경우 특정 열의 대체 값만 진행됩니다.
ds.na.replace(['male', 'female'],['1', '0']).show()

 

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15

# 아래 내용의 출처를 밝힙니다. 파이썬으로 배우는 아파치 스파크 ( Wenqiang Feng, 홍은희 )

 

RDD

: 복원 분산 데이터 집합(Resilient Distributed Dataset)를 의미합니다.

스파크의 RDD는 불변 객체 집합의 분산 컬렉션(collection)입니다.
각 RDD는 여러 개의 파티션(더 작은 집합과 비슷한 패턴)으로 나뉘며 클러스트의 다른 노드에서 계산될 수 있습니다.

 

# RDD 생성_00 기본형

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.sparkContext.parallelize([(1,2,3,'a b c'),
            (4,5,6,'d e f'),
            (7,8,9,'g h i')]).toDF(['col1', 'col2', 'col3', 'col4'])
df.show()

 

# RDD 생성_01 createDataFrame를 이용한 생성, 컬럼명 명시

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Employee = spark.createDataFrame([
    ('1', 'Joe', '70000', '1'),
    ('2', 'Henry', '80000', '2'),
    ('3', 'Sam', '60000', '2'),
    ('4', 'Max', '90000', '1')],
    ['Id', 'Name', 'Sallary', 'DepartmentId']
    )
Employee.show()

 

# RDD 생성_02 

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format('com.databricks.spark.csv'). \
    options(header='true',inferschema='true'). \
    load("/home/feng/Spark/Code/data/Adventising.csv", header=True)

df.show(5)
df.printSchema()

 

# RDD spark 버전 확인

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
print('Apache Spark 버전 확인: '+spark.version)

결과>>  Apache Spark 버전 확인: 3.5.0

 

# RDD 생성_03 CSV 파일 load

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
df.show(5)
df.printSchema()

팁 ) databricks에서 제공하는 클라우드 spark환경에서 테이블 생성하기 위해 파일 업로드하는 위치를 지정가능함

파일올리면서 테이블을 생성해야 하고 그 위치는 /FileStore/tables/파일명

 

# RDD 생성_04 외부의 mysql 접속해서 테이블 조회하기

driver = "org.mariadb.jdbc.Driver"
database_host = "xxx.xxx.xxx.xxx"
database_port = "3306" # update if you use a non-default port
database_name = "xxxxxx"
table = "Advertising"
user = "xxxxx"
password = "xxxxx"
url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}"
print(url)

df = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("query", "select * from Advertising")
.option("user", user)
.option("password", password)
.option("allowPublicKeyRetrieval", True)
.option("useSSL", False)
.load()
)
df.show(5)

팁 )  아래 두개를 .option에 넣어줘야 함

  • allowPublicKeyRetrieval=true
  • useSSL=false

# RDD 생성_05 hdfs 조회하기

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext.getOrCreate();
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://FileStore/tables/Advertising.csv")
    
print(tf1.first())
# 아직 코딩이 완성된 것이 아님

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25

+ Recent posts