# 아래 내용의 출처를 밝힙니다. 파이썬으로 배우는 아파치 스파크 ( Wenqiang Feng, 홍은희 )
RDD
: 복원 분산 데이터 집합(Resilient Distributed Dataset)를 의미합니다.
스파크의 RDD는 불변 객체 집합의 분산 컬렉션(collection)입니다.
각 RDD는 여러 개의 파티션(더 작은 집합과 비슷한 패턴)으로 나뉘며 클러스트의 다른 노드에서 계산될 수 있습니다.
# RDD 생성_00 기본형
from pyspark.sql import SparkSession
spark = SparkSession .builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.sparkContext.parallelize([(1,2,3,'a b c'),
(4,5,6,'d e f'),
(7,8,9,'g h i')]).toDF(['col1', 'col2', 'col3', 'col4'])
df.show()
# RDD 생성_01 createDataFrame를 이용한 생성, 컬럼명 명시
from pyspark.sql import SparkSession
spark = SparkSession .builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
Employee = spark.createDataFrame([
('1', 'Joe', '70000', '1'),
('2', 'Henry', '80000', '2'),
('3', 'Sam', '60000', '2'),
('4', 'Max', '90000', '1')],
['Id', 'Name', 'Sallary', 'DepartmentId']
)
Employee.show()
# RDD 생성_02
from pyspark.sql import SparkSession
spark = SparkSession .builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.format('com.databricks.spark.csv'). \
options(header='true',inferschema='true'). \
load("/home/feng/Spark/Code/data/Adventising.csv", header=True)
df.show(5)
df.printSchema()
# RDD spark 버전 확인
from pyspark.sql import SparkSession
spark = SparkSession .builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
print('Apache Spark 버전 확인: '+spark.version)
결과>> Apache Spark 버전 확인: 3.5.0
# RDD 생성_03 CSV 파일 load
from pyspark.sql import SparkSession
spark = SparkSession .builder \
.appName("Python Spark create RDD example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
df.show(5)
df.printSchema()
팁 ) databricks에서 제공하는 클라우드 spark환경에서 테이블 생성하기 위해 파일 업로드하는 위치를 지정가능함
파일올리면서 테이블을 생성해야 하고 그 위치는 /FileStore/tables/파일명
# RDD 생성_04 외부의 mysql 접속해서 테이블 조회하기
driver = "org.mariadb.jdbc.Driver"
database_host = "xxx.xxx.xxx.xxx"
database_port = "3306" # update if you use a non-default port
database_name = "xxxxxx"
table = "Advertising"
user = "xxxxx"
password = "xxxxx"
url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}"
print(url)
df = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("query", "select * from Advertising")
.option("user", user)
.option("password", password)
.option("allowPublicKeyRetrieval", True)
.option("useSSL", False)
.load()
)
df.show(5)
팁 ) 아래 두개를 .option에 넣어줘야 함
- allowPublicKeyRetrieval=true
- useSSL=false
# RDD 생성_05 hdfs 조회하기
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext.getOrCreate();
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://FileStore/tables/Advertising.csv")
print(tf1.first())
# 아직 코딩이 완성된 것이 아님