### 기술세션 | AI에이전트 시스템 구축 및 관리를 위한 통합 플랫폼

- Mosaic AI 개요
- ai builder ( 노코드기반 )
- AI playground : 개발 설정하는 곳
- AI 시스템 평가 : 문제점 확인후 수정후 재배포, AI Judge
- MLflow Tracing을 통해 디버깅, 로깅으로 추적
- Agent Evalution 리뷰앱 : 이해관계자와 빠른 피드백에 도움
- Mosaic AI 게이트웨이
- AI/BI 대시보드, 레이크하우스 모니터링 (운영관점)

### 기술세션 | LLMOps in Databricks: AI Agent를 Production 환경에 배포하기 위한 고려사항

- FACTSET FQL 을 자연어로 생성하는 프로젝트 사례
- GPT4 : 59%, 15s >> 85%, 6s
- Mosaic AI Model Training ( 모델 학습후 배포 )
- 모델 배포 3가지 : custom Models 데이터브릭스 자체적 기능 / Foundation Models 메타와 직접적인 구현 가능한 기능들 / External Models 상용AI모델을 사용할 때
- Governance : limit 공격적 요청수를 제한하여 리스크 관리
- 13 RAG : 적절한 문서를 탐색하는 것이 key, 벡터DB를 많이 활용함
  Anthropic과 협업해서 개발한 기능임
  Delta Table 특징 : vector에 컬럼을 추가해서 활용가능
- 23 OpenAI / Rang Chain : 데이터브릭스에서 자체 개발한 텍스트기반 SQL구성 Genie라고함
- 33 Tool Calling : tool 또는 지정된 함수를 이용하면 할루시네이션을 줄일 수 있음
- AI Playground에서 바로 응답을 확인할 수 있다.
- 평가 Metric을 제공을 하고 있음
- 평가는 개발단계에서 평가 / 배포후 단계에서 성능을 평가 이렇게 2가지로 구분됨
- MLflow Trace UI에서 테이블로 저장된 로그를 보는 화면을 제공함

### 파트너 스폰서 세션 | KT - Databricks를 활용한 네트워크 기지국 전력 제어 모델 개발 사례

- 01 Intro
- 02 기지국 트래픽 수요예측
  - 무선 네트워크 풀질 관리 / 네트워크 안정성 확보
  - Uplink : 사진 업로드, 음성 전송, 메시지 보내기
  - Downlink : 유튜브 영상 시청, 음악 감상, 웹 서핑
  - 트랙픽 수요 = Uplink 수요 + Downlink 수요
- 03 기지국 트래픽 수요 예측 모델링
  - 시간적 특징 / 공간적 특징 / 이벤트 변수 / log 변서(과거 트래픽 평균)
- 04 databricks datapipeliine 구성
  - MLOps Workflow 구성하는 단위별로 Python Script 구성
  - 변동성이 많은 KT같은 회사는 모니터링이 중요했다.


### 파트너 스폰서 세션 | AWS = Better Together Databricks on AWS

- 생산성 강화 : 다양한 배경 이미지/영상 생성, 프로모션 활용
- 운영 개선 : DDI EDS와 Amazon Bedrock 통합 시연 데모
- 창의/창조 : AI 기반 디지털 휴먼 구성
- 고객 경험 강화 : 쇼핑 어시스턴트 amazon
  사용자가 무슨 상품을 살지 물어보고 AI가 대답해서 원하는 물건을 찾아갈 수 있게 도와줌

- Agent 와 Agentic 차이 : 추론이 있으면 Agentic
- Agentic AI Use case 1 : RAG
- Agentic AI Use case 2 : Text2SQL
- Agentic AI Use case 3 : Text2Image

### 기술 세션 | Databricks 관리형 MLflow: AI 모델, LLM과 생성형 AI의 혁신적 관리 플랫폼

- MLflow Components : 1.Tracking / 2.Project / 3.Models / 4.Model Registry
- 1. ML워크플로우를 더욱 관리하기 쉽게 만들기
  - 실험 및 실행관리 / Custom Logging / Auto logging 
- 3. MLflow Models
  - 디렉토리 구조로 관리함
- 4. A centralized model store
  - 하나의 협업 허브 / 수명 주기 관리 / 가시성 및 거버넌스
  - catalog.schema.model

- Databricks AutoML 소개
  - 모두를 위한 빠르고 간소화된 머신 러닝
  - 데이터팀의 통제력을 유지하면서 역량을 강화할 수 있는 A glass-box 솔루션

- LLM-as-a-judge : 평가 및 디버깅 전후 비교

### 고객사례 |  KCD의 Text2SQL AI Agent를 활용한 데이터 민주화 구축

- 문제 : 작은 조직의 회사가 cashnote 서비스를 운영하면서 점점 확장되면서 문제가 생김
- Product Overview : KCD의 데이터 민주화
  - 데이터분석팀의 생산성을 확보하고 전문적인 기술 지식이 없는 누구나 데이터 기반 의사결정
- unity Cattalog
  - 가장 중요한 것은 회사의 비즈니스를 깊이 이해하고, 그에 맞춰 data ontology를 잘 설계하는 것
  - 이후에는 Text2SQL 프로세스를 효과적으로 운영하기 위해 온톨로지 기반하여 테이블 구조를 단순화하고 정교하게 다듬는 것이 핵심
- Text2SQL : Just Use the AI/BI Genie
  - 프롬프트 최적화 : 스키마 및 테이블 정보 전달 방식 개선
  - 보안 강화 : 테이블 접근과 LLM 프롬프트 내 데이터 보호
  - 실행 시간 단축 : 외부의 복수 API호출로 인한 지연 최소화
  - 할루시네이션 문제 해결 : 사용자 경험 침 신뢰성 보장
- 효과 : 비용 10%, 해결해준 비율 54%해소


### 고객사례 |  Bagelcode의 데이터 에이전트 DAVIS와 지니를 활용한 데이터 기반 의사 결정 가속화

- 포커스 : 빠른 의사 결정 (ex, 광고 집행)
- 데이터 기반 의사 결정을 더 바르게 할 수 없을까 ?
- DAVIS : 자연어 기반 데이터 서비스
  접근성 / 사용성 / 빠른 구현
- 기능 소개 : SQL Queries / Tablau Dashborad 접속하지도 않고 데이터 받아옴

- Genie 더 똑똑하게 만들기
  - Stay Focused
- 단일 게임에 대한 데이터 구조
- Mart Table만 사용해서 Genie를 구축하기로 집중함
  - Plan to iterate
     - Genie의 역질문 수집하기
  - Build on well-annotated Tables
     - dbt 모델 ( .sql+.yml)
     - 당신의 역할은 dbt모델에 들어갈 yml파일을 만드는 것입니다.







'Python > Spark' 카테고리의 다른 글

5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15
import pandas as pd
my_list  =  [('a',  2,  3),
('b',  5,  6),
('c',  8,  9),
('a',  2,  3),
('b',  5,  6),
('c',  8,  9)]
col_name  =  ['col1',  'col2',  'col3']
#-- pd
dp  =  pd.DataFrame(my_list,columns=col_name)
dp
#-- ps
ds = spark.createDataFrame(my_list,schema=col_name

 

import pyspark.sql.functions as F
#-- pd
dp['concat']  =  dp.apply(lambda  x:'%s%s'%(x['col1'],x['col2']),axis=1)
dp
#-- ps
ds.withColumn('concat',F.concat('col1','col2')).show()

 

# 5.3.14 GroupBy
#-- pd
dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})
#-- ps
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()

 

 

 

# 5.3.15 피벗(Pivot)
import numpy as np
#-- pd
pd.pivot_table(dp, values='col3', index='col1', columns='col2', aggfunc=np.sum)
#-- ps
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

 

# 5.3.16  Window
d  =  {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
#-- pd
dp  =  pd.DataFrame(d)
dp
#-- ps
ds  =  spark.createDataFrame(dp)
ds.show()

 

#-- pd
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False)
dp
#-- ps
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds  =  ds.withColumn('rank',F.rank().over(w))
ds.show()

 

# 5.3.17 rank vs dense_rank
d  ={'Id':[1,2,3,4,5,6],
'Score':  [4.00,  4.00,  3.85,  3.65,  3.65,  3.50]}
data = pd.DataFrame(d)
dp = data.copy()
dp
ds = spark.createDataFrame(data)
#-- ps
dp['Rank_dense'] = dp['Score'].rank(method='dense',ascending=False)
dp['Rank'] = dp['Score'].rank(method='min',ascending=False)
dp
#-- ps
import  pyspark.sql.functions  as  F
from  pyspark.sql.window  import  Window
w  =  Window.orderBy(ds.Score.desc())
ds  =  ds.withColumn('Rank_spark_dense',F.dense_rank().over(w)) ds  =  ds.withColumn('Rank_spark',F.rank().over(w))
ds.show()

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15
#-- pd --
import pandas as pd
leftp  =  pd.DataFrame({'A':  ['A0',  'A1',  'A2',  'A3'],
        'B':  ['B0',  'B1',  'B2',  'B3'],
        'C':  ['C0',  'C1',  'C2',  'C3'],
        'D':  ['D0',  'D1',  'D2',  'D3']}, index=[0,  1,  2,  3])
rightp  =  pd.DataFrame({'A':  ['A0',  'A1',  'A6',  'A7'],
        'F':  ['B4',  'B5',  'B6',  'B7'], 
        'G':  ['C4',  'C5',  'C6',  'C7'],
        'H':  ['D4',  'D5',  'D6',  'D7']}, index=[4,  5,  6,  7])
#-- ps
lefts  =  spark.createDataFrame(leftp) 
rights  =  spark.createDataFrame(rightp)

leftp.head(4)
rightsp.head(4)
lefts.show(4)
rights.show(4)

 

 

 

# 1.  Left Join
#-- pd
leftp.merge(rightp, on='A', how='left')
#-- ps
lefts.join(rights, on='A', how='left').orderBy('A',ascending=True).show()

 

 

# 2.  Right Join
#-- pd
leftp.merge(rightp, on='A', how='right')
#-- ps
lefts.join(rights, on='A', how='right').orderBy('A',ascending=True).show()

 

# 3.  Inner Join
#-- pd
leftp.merge(rightp, on='A', how='inner')
#-- ps
lefts.join(rights, on='A', how='inner').orderBy('A',ascending=True).show()

 

# 4.  Full Join
#-- pd
leftp.merge(rightp, on='A', how='outer')
#-- ps
lefts.join(rights, on='A', how='full').orderBy('A',ascending=True).show()

 

 

 

 

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15

# pd (판다스)

import pandas as pd
# import pyspark.pandas as ps

my_list = [[230.1, 37.8, 69.2, 22.1], [44.5, 39.3, 45.1, 10.4],[17.2, 45.9, 69.3, 9.3],[151.5, 41.3, 58.5, 18.5]]
col_name = ['TV', 'Radio', 'Newspaper', 'Sales']

#-- pd --
drop_name = ['Newspaper', 'Sales']
dp = pd.DataFrame(my_list, columns= col_name)
dp.drop(drop_name, axis=1).head(4)

 

 

# PS (spark)

import pyspark.pandas as ps
spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
ds = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
#-- ps --
drop_name = ['Newspaper', 'Sales']
ds.drop(*drop_name).show(4)

 

5.3.10 추출(Filter)

 

# pd (판다스)

import pyspark.pandas as ps
from pyspark.sql import SparkSession
# 주의 !! databricks에서 pandas의 csv파일 read하려면 로컬에 파일이 있어야만 가능하기 때문에 교재명령은 불가능
# dp = pd.read_csv('Advertising.csv') 교재예시
# 그래서 편법으로 아래와 같이 사용함

import pandas as pd

df_pandas = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True).toPandas()
df_pandas.head(4)

df_pandas['TV'] = df_pandas['TV'].astype(float)
df_pandas['Radio'] = df_pandas['Radio'].astype(float)
df_pandas['Newspaper'] = df_pandas['Newspaper'].astype(float)
df_pandas['Sales'] = df_pandas['Sales'].astype(float)

# df_pandas.dtypes
df_pandas[df_pandas.Newspaper<20].head(4)
df_pandas[(df_pandas.Newspaper<20)&(df_pandas.TV>100)].head(4)

 

 

# PS (spark)

ds = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
ds[ds.Newspaper<20].show(4)
ds[(ds.Newspaper<20)&(ds.TV>100)].show(4)

 

 

5.3.11 새 컬럼 추가하기

 

#-- df_pandas
df_pandas['tv_norm'] = df_pandas.TV/sum(df_pandas.TV)
df_pandas.head(4)

#-- ps
import pyspark.sql.functions as F
ds = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).show(4)

 

#-- df_pandas
df_pandas['cond'] = df_pandas.apply(lambda A: 1 if((A.TV>100)&(A.Radio<40)) else 2 if(A.Sales> 10) else 3,axis=1)
df_pandas.head(4)

#-- ps
ds.withColumn('cond', F.when((ds.TV>100)&(ds.Radio<40),1)\
    .when(ds.Sales>10, 2)\
    .otherwise(3)).show(4)

 

 

#-- df_pandas
import numpy as np
df_pandas['log_tv'] = np.log(df_pandas.TV)
df_pandas.head(4)

#-- ps
import pyspark.sql.functions as F
ds.withColumn('log_tv', F.log(ds.TV)).show(4)

 

#-- df_pandas
df_pandas['tv+10'] = df_pandas.TV.apply(lambda x: x+10)
df_pandas.head(4)

#-- ps
ds.withColumn('tv+10', ds.TV+10).show(4)

 

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.6 널(Null) 값 채우기  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15

 

# import pandas as pd  # bricks는 import가 없어도 됨
# import pyspark.pandas as ps # bricks는 import가 없어도 됨

my_list = [['male', 1, None], ['female', 2, 3], ['male', 3, 4]]
#-- pd --
dp = pd.DataFrame(my_list, columns=['A', 'B', 'C'])
dp.head()

#-- ps --
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
ds.show()

 

 

#-- pd --
dp.fillna(-99)
#-- ps --
ds.fillna(-99).show()

 

 

#-- pd --
# 주의 : 특정 컬럼을 선택해야 합니다.
dp.A.replace(['male','female'],['1', '0'], inplace=True)
dp
#-- ps --
# 주의 : na를 대체하면서 동시에 특정 열의 값을 대체하지 못합니다.
# 이 경우 특정 열의 대체 값만 진행됩니다.
ds.na.replace(['male', 'female'],['1', '0']).show()

 

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15

# 아래 내용의 출처를 밝힙니다. 파이썬으로 배우는 아파치 스파크 ( Wenqiang Feng, 홍은희 )

 

RDD

: 복원 분산 데이터 집합(Resilient Distributed Dataset)를 의미합니다.

스파크의 RDD는 불변 객체 집합의 분산 컬렉션(collection)입니다.
각 RDD는 여러 개의 파티션(더 작은 집합과 비슷한 패턴)으로 나뉘며 클러스트의 다른 노드에서 계산될 수 있습니다.

 

# RDD 생성_00 기본형

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.sparkContext.parallelize([(1,2,3,'a b c'),
            (4,5,6,'d e f'),
            (7,8,9,'g h i')]).toDF(['col1', 'col2', 'col3', 'col4'])
df.show()

 

# RDD 생성_01 createDataFrame를 이용한 생성, 컬럼명 명시

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Employee = spark.createDataFrame([
    ('1', 'Joe', '70000', '1'),
    ('2', 'Henry', '80000', '2'),
    ('3', 'Sam', '60000', '2'),
    ('4', 'Max', '90000', '1')],
    ['Id', 'Name', 'Sallary', 'DepartmentId']
    )
Employee.show()

 

# RDD 생성_02 

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format('com.databricks.spark.csv'). \
    options(header='true',inferschema='true'). \
    load("/home/feng/Spark/Code/data/Adventising.csv", header=True)

df.show(5)
df.printSchema()

 

# RDD spark 버전 확인

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    
print('Apache Spark 버전 확인: '+spark.version)

결과>>  Apache Spark 버전 확인: 3.5.0

 

# RDD 생성_03 CSV 파일 load

from pyspark.sql import SparkSession

spark = SparkSession .builder \
    .appName("Python Spark create RDD example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format("csv").load("/FileStore/tables/Advertising.csv", header=True)
df.show(5)
df.printSchema()

팁 ) databricks에서 제공하는 클라우드 spark환경에서 테이블 생성하기 위해 파일 업로드하는 위치를 지정가능함

파일올리면서 테이블을 생성해야 하고 그 위치는 /FileStore/tables/파일명

 

# RDD 생성_04 외부의 mysql 접속해서 테이블 조회하기

driver = "org.mariadb.jdbc.Driver"
database_host = "xxx.xxx.xxx.xxx"
database_port = "3306" # update if you use a non-default port
database_name = "xxxxxx"
table = "Advertising"
user = "xxxxx"
password = "xxxxx"
url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}"
print(url)

df = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("query", "select * from Advertising")
.option("user", user)
.option("password", password)
.option("allowPublicKeyRetrieval", True)
.option("useSSL", False)
.load()
)
df.show(5)

팁 )  아래 두개를 .option에 넣어줘야 함

  • allowPublicKeyRetrieval=true
  • useSSL=false

# RDD 생성_05 hdfs 조회하기

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import HiveContext

sc = SparkContext.getOrCreate();
hc = HiveContext(sc)
tf1 = sc.textFile("hdfs://FileStore/tables/Advertising.csv")
    
print(tf1.first())
# 아직 코딩이 완성된 것이 아님

 

'Python > Spark' 카테고리의 다른 글

AI Agent 구축  (2) 2025.04.30
5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25

+ Recent posts