### 기술세션 | AI에이전트 시스템 구축 및 관리를 위한 통합 플랫폼

- Mosaic AI 개요
- ai builder ( 노코드기반 )
- AI playground : 개발 설정하는 곳
- AI 시스템 평가 : 문제점 확인후 수정후 재배포, AI Judge
- MLflow Tracing을 통해 디버깅, 로깅으로 추적
- Agent Evalution 리뷰앱 : 이해관계자와 빠른 피드백에 도움
- Mosaic AI 게이트웨이
- AI/BI 대시보드, 레이크하우스 모니터링 (운영관점)

### 기술세션 | LLMOps in Databricks: AI Agent를 Production 환경에 배포하기 위한 고려사항

- FACTSET FQL 을 자연어로 생성하는 프로젝트 사례
- GPT4 : 59%, 15s >> 85%, 6s
- Mosaic AI Model Training ( 모델 학습후 배포 )
- 모델 배포 3가지 : custom Models 데이터브릭스 자체적 기능 / Foundation Models 메타와 직접적인 구현 가능한 기능들 / External Models 상용AI모델을 사용할 때
- Governance : limit 공격적 요청수를 제한하여 리스크 관리
- 13 RAG : 적절한 문서를 탐색하는 것이 key, 벡터DB를 많이 활용함
  Anthropic과 협업해서 개발한 기능임
  Delta Table 특징 : vector에 컬럼을 추가해서 활용가능
- 23 OpenAI / Rang Chain : 데이터브릭스에서 자체 개발한 텍스트기반 SQL구성 Genie라고함
- 33 Tool Calling : tool 또는 지정된 함수를 이용하면 할루시네이션을 줄일 수 있음
- AI Playground에서 바로 응답을 확인할 수 있다.
- 평가 Metric을 제공을 하고 있음
- 평가는 개발단계에서 평가 / 배포후 단계에서 성능을 평가 이렇게 2가지로 구분됨
- MLflow Trace UI에서 테이블로 저장된 로그를 보는 화면을 제공함

### 파트너 스폰서 세션 | KT - Databricks를 활용한 네트워크 기지국 전력 제어 모델 개발 사례

- 01 Intro
- 02 기지국 트래픽 수요예측
  - 무선 네트워크 풀질 관리 / 네트워크 안정성 확보
  - Uplink : 사진 업로드, 음성 전송, 메시지 보내기
  - Downlink : 유튜브 영상 시청, 음악 감상, 웹 서핑
  - 트랙픽 수요 = Uplink 수요 + Downlink 수요
- 03 기지국 트래픽 수요 예측 모델링
  - 시간적 특징 / 공간적 특징 / 이벤트 변수 / log 변서(과거 트래픽 평균)
- 04 databricks datapipeliine 구성
  - MLOps Workflow 구성하는 단위별로 Python Script 구성
  - 변동성이 많은 KT같은 회사는 모니터링이 중요했다.


### 파트너 스폰서 세션 | AWS = Better Together Databricks on AWS

- 생산성 강화 : 다양한 배경 이미지/영상 생성, 프로모션 활용
- 운영 개선 : DDI EDS와 Amazon Bedrock 통합 시연 데모
- 창의/창조 : AI 기반 디지털 휴먼 구성
- 고객 경험 강화 : 쇼핑 어시스턴트 amazon
  사용자가 무슨 상품을 살지 물어보고 AI가 대답해서 원하는 물건을 찾아갈 수 있게 도와줌

- Agent 와 Agentic 차이 : 추론이 있으면 Agentic
- Agentic AI Use case 1 : RAG
- Agentic AI Use case 2 : Text2SQL
- Agentic AI Use case 3 : Text2Image

### 기술 세션 | Databricks 관리형 MLflow: AI 모델, LLM과 생성형 AI의 혁신적 관리 플랫폼

- MLflow Components : 1.Tracking / 2.Project / 3.Models / 4.Model Registry
- 1. ML워크플로우를 더욱 관리하기 쉽게 만들기
  - 실험 및 실행관리 / Custom Logging / Auto logging 
- 3. MLflow Models
  - 디렉토리 구조로 관리함
- 4. A centralized model store
  - 하나의 협업 허브 / 수명 주기 관리 / 가시성 및 거버넌스
  - catalog.schema.model

- Databricks AutoML 소개
  - 모두를 위한 빠르고 간소화된 머신 러닝
  - 데이터팀의 통제력을 유지하면서 역량을 강화할 수 있는 A glass-box 솔루션

- LLM-as-a-judge : 평가 및 디버깅 전후 비교

### 고객사례 |  KCD의 Text2SQL AI Agent를 활용한 데이터 민주화 구축

- 문제 : 작은 조직의 회사가 cashnote 서비스를 운영하면서 점점 확장되면서 문제가 생김
- Product Overview : KCD의 데이터 민주화
  - 데이터분석팀의 생산성을 확보하고 전문적인 기술 지식이 없는 누구나 데이터 기반 의사결정
- unity Cattalog
  - 가장 중요한 것은 회사의 비즈니스를 깊이 이해하고, 그에 맞춰 data ontology를 잘 설계하는 것
  - 이후에는 Text2SQL 프로세스를 효과적으로 운영하기 위해 온톨로지 기반하여 테이블 구조를 단순화하고 정교하게 다듬는 것이 핵심
- Text2SQL : Just Use the AI/BI Genie
  - 프롬프트 최적화 : 스키마 및 테이블 정보 전달 방식 개선
  - 보안 강화 : 테이블 접근과 LLM 프롬프트 내 데이터 보호
  - 실행 시간 단축 : 외부의 복수 API호출로 인한 지연 최소화
  - 할루시네이션 문제 해결 : 사용자 경험 침 신뢰성 보장
- 효과 : 비용 10%, 해결해준 비율 54%해소


### 고객사례 |  Bagelcode의 데이터 에이전트 DAVIS와 지니를 활용한 데이터 기반 의사 결정 가속화

- 포커스 : 빠른 의사 결정 (ex, 광고 집행)
- 데이터 기반 의사 결정을 더 바르게 할 수 없을까 ?
- DAVIS : 자연어 기반 데이터 서비스
  접근성 / 사용성 / 빠른 구현
- 기능 소개 : SQL Queries / Tablau Dashborad 접속하지도 않고 데이터 받아옴

- Genie 더 똑똑하게 만들기
  - Stay Focused
- 단일 게임에 대한 데이터 구조
- Mart Table만 사용해서 Genie를 구축하기로 집중함
  - Plan to iterate
     - Genie의 역질문 수집하기
  - Build on well-annotated Tables
     - dbt 모델 ( .sql+.yml)
     - 당신의 역할은 dbt모델에 들어갈 yml파일을 만드는 것입니다.







'Python > Spark' 카테고리의 다른 글

5.3.13 열 결합하기(Concat)  (0) 2025.01.08
5.3.11 조인(Join)  (0) 2025.01.07
5.3.9 컬럼 삭제하기(Drop)  (0) 2024.12.25
5.3.6 널(Null) 값 채우기  (0) 2024.12.25
5.1 RDD 생성  (0) 2024.12.15

Visual Studio Code에서 Continue 확장 프로그램을 설치하고 Groq Open API를 설정하는 방법을 자세히 알려드리겠습니다. 이 설정을 통해 GitHub Copilot과 같은 유료 AI 코딩 도구 대신 무료로 강력한 AI 코딩 지원을 받을 수 있습니다.

Groq 계정 설정하기

  1. Groq 콘솔(https://console.groq.com)에 접속합니다[1][2][4].
  2. 계정을 생성하거나 기존 계정으로 로그인합니다[1][2][4].
  3. API 키 섹션으로 이동합니다[1][2][4].
  4. 새 API 키를 생성합니다[1][2][4].
  5. 중요: API 키는 한 번만 표시되므로 즉시 복사하여 안전한 곳에 저장해 두세요[1][2][4].

Continue 확장 프로그램 설치하기

  1. Visual Studio Code를 엽니다[1][2].
  2. 확장 마켓플레이스에서 "Continue"를 검색합니다[1][2].
  3. 해당 확장 프로그램을 설치합니다[1][2].

Continue 확장 프로그램 설정하기

Continue 확장 프로그램은 최근에 설정 파일 형식이 변경되었습니다. 현재는 config.yaml 또는 config.json 형식을 사용합니다[1][5]. 두 가지 방법으로 설정할 수 있습니다:

YAML 형식으로 설정하기

config.yaml 파일에 다음과 같이 설정합니다:

models:
  - name: Llama 3.3 70b Versatile
    provider: groq
    model: llama-3.3-70b-versatile
    apiKey: <YOUR_GROQ_API_KEY>
    roles:
      - chat

JSON 형식으로 설정하기

config.json 파일에 다음과 같이 설정합니다:

{
  "models": [
    {
      "title": "Llama 3.3 70b Versatile",
      "provider": "groq",
      "model": "llama-3.3-70b-versatile",
      "apiKey": "<YOUR_GROQ_API_KEY>"
    }
  ]
}

두 경우 모두 `` 부분을 앞서 저장해둔 Groq API 키로 교체하세요[5].

Continue 확장 프로그램 사용하기

설정을 완료한 후에는 다음과 같이 사용할 수 있습니다:

  1. 코드 에디터에서 코드를 선택합니다.
  2. Command + L (또는 시스템에 맞는 단축키)를 눌러 Continue를 엽니다[1].
  3. 질문을 하거나, 코드 리뷰를 요청하거나, 솔루션을 생성할 수 있습니다[1].

주의사항 및 팁

  1. Groq는 현재 가장 빠른 모델 실행 플랫폼 중 하나로, GitHub Copilot보다 더 빠른 응답 속도를 제공합니다[2].
  2. Llama 3 모델은 코딩에 특히 뛰어난 성능을 보입니다[2].
  3. 리팩터 모드(cmd + I)를 사용할 때 API 기본 URL 문제가 발생할 수 있습니다. 이 경우 apiBasehttps://api.groq.com/openai/v1/chat로 설정하면 해결될 수 있습니다[3].
  4. 메시지를 너무 많이 보내면 속도 제한이 걸릴 수 있으므로 주의하세요[2].
  5. Continue는 오픈 소스이며 커뮤니티 지원이 활발합니다[2].

이 설정을 통해 GitHub Copilot과 같은 유료 서비스 없이도 강력한 AI 코딩 지원을 무료로 이용할 수 있습니다. 특히 Groq의 Llama 3 모델은 코딩에 매우 뛰어난 성능을 보이며, 빠른 응답 속도로 개발 생산성을 크게 향상시킬 수 있습니다.

Citations:
[1] https://dev.to/royged/no-copilot-no-problem-get-free-ai-in-vscode-now-1a1g
[2] https://www.youtube.com/watch?v=1VY4A5fG4ZY
[3] https://github.com/continuedev/continue/issues/1186
[4] https://dev.to/dani_avila7/lightning-fast-code-assistant-with-groq-in-vscode-4eme
[5] https://docs.continue.dev/customize/model-providers/more/groq
[6] https://www.youtube.com/watch?v=cohxfQAANdw
[7] https://www.continue.dev
[8] https://github.com/continuedev/continue/issues/3697
[9] https://anpigon.tistory.com/465
[10] https://anpigon.tistory.com/444
[11] https://github.com/continuedev/continue/blob/main/extensions/vscode/config_schema.json
[12] https://docs.continue.dev/customize/tutorials/llama3.1
[13] https://www.youtube.com/watch?v=rsJqHDuJWSI
[14] https://www.reddit.com/r/vscode/comments/1i3cwd4/this_is_definitely_the_fastest_version_of_ai/


Perplexity로부터의 답변: pplx.ai/share

 

import requests

# Solactive API 엔드포인트 및 클라이언트 ID 설정
BASE_URL = "https://clients.solactive.com/api/rest/v1/indices"
CLIENT_ID = "your_client_id"  # Solactive에서 제공받은 클라이언트 ID
ISIN = "DE000SL0J6Z8"

def fetch_index_data():
    try:
        # API 요청 URL 구성
        url = f"{BASE_URL}/{CLIENT_ID}/{ISIN}/index"
        
        # GET 요청 보내기
        response = requests.get(url)
        
        # 응답 데이터 확인
        if response.status_code == 200:
            data = response.json()
            print("Index Data:")
            print(data)
        else:
            print(f"Failed to fetch data. HTTP Status Code: {response.status_code}")
    except Exception as e:
        print(f"An error occurred: {e}")

# 실행
fetch_index_data()

 

'Python > 파이썬_자동매매' 카테고리의 다른 글

볼린저밴드  (0) 2025.02.02
샤프비율  (0) 2025.02.02
변동성  (0) 2025.02.02
MDD (Maximum Draw Down)  (0) 2025.02.02
CAGR (연평균수익률)  (0) 2025.02.02

JupyterLab을 초기화하는 방법은 여러 가지가 있습니다. 문제 해결을 위한 다양한 초기화 방법을 아래에 설명해 드리겠습니다.

## JupyterLab 초기화 방법

### 1. 설정 파일 초기화

JupyterLab의 설정 파일을 초기화하면 권한 문제 등 여러 이슈가 해결될 수 있습니다:

```bash
jupyter lab clean
jupyter lab build
```

이 명령어들은 JupyterLab의 빌드 파일과 캐시를 정리하고 다시 빌드합니다.

### 2. 설정 디렉토리 초기화

Jupyter 설정 디렉토리의 위치를 확인하고 초기화할 수 있습니다:

```bash
# 설정 디렉토리 위치 확인
jupyter --paths

# 설정 디렉토리 백업 및 초기화
mv ~/.jupyter ~/.jupyter_backup
```

이후 JupyterLab을 다시 실행하면 새로운 설정 파일이 생성됩니다.

### 3. 확장 프로그램 초기화

설치된 확장 프로그램이 문제를 일으킬 수 있습니다:

```bash
# 확장 프로그램 목록 확인
jupyter labextension list

# 모든 확장 프로그램 제거
jupyter labextension uninstall @jupyterlab/extension-name
```

여러 확장 프로그램이 있다면 하나씩 제거해 보세요.

### 4. 완전 재설치

JupyterLab을 완전히 제거하고 재설치하는 방법:

```bash
# 제거
pip uninstall jupyterlab

# 캐시 파일 정리
rm -rf ~/.jupyter/lab

# 재설치
pip install jupyterlab
```

### 5. 데이터베이스 초기화

JupyterLab의 내부 데이터베이스를 초기화:

```bash
# 데이터베이스 위치 확인
jupyter lab path

# 데이터베이스 초기화
jupyter lab --reset-db
```

### 6. 환경 변수 설정 및 초기화

환경 변수를 설정하여 JupyterLab의 동작을 초기화:

```bash
# 런타임 디렉토리 변경
export JUPYTER_RUNTIME_DIR=~/jupyter_runtime
export JUPYTER_DATA_DIR=~/jupyter_data

# 디렉토리 생성
mkdir -p ~/jupyter_runtime ~/jupyter_data

# 권한 설정
chmod 755 ~/jupyter_runtime ~/jupyter_data
```

### 7. 사용자 설정 파일 초기화

사용자별 설정 파일을 초기화:

```bash
# 설정 파일 위치 확인
jupyter --config-dir

# 설정 파일 초기화
rm -rf ~/.jupyter/lab/user-settings/*
```

### 8. 커널 초기화

Jupyter 커널을 초기화:

```bash
# 커널 목록 확인
jupyter kernelspec list

# 특정 커널 제거
jupyter kernelspec uninstall kernel_name

# 새 커널 설치
python -m ipykernel install --user --name=new_kernel
```

---

문제 해결 후에는 JupyterLab을 다시 시작하여 정상 작동하는지 확인하세요. 초기화 후에도 문제가 지속된다면, 시스템 로그를 확인하거나 더 구체적인 오류 메시지를 분석하여 추가적인 문제 해결 방법을 찾아볼 수 있습니다.

=======================================================

(base) jovyan@66ff31e068a0:~$ ps -ef | grep jupyter
jovyan       7     1  0 14:17 ?        00:00:02 /opt/conda/bin/python3.11 /opt/conda/bin/jupyter-lab
jovyan    1134   616  0 14:24 pts/1    00:00:00 grep --color=auto jupyter

=======================================================
(base) jovyan@66ff31e068a0:~$ jupyter-lab clean
[LabCleanApp] Cleaning /opt/conda/share/jupyter/lab...
[LabCleanApp] staging not present, skipping...
[LabCleanApp] Success!

=======================================================

(base) jovyan@66ff31e068a0:~$ jupyter-lab build
[LabBuildApp] JupyterLab 4.0.7
[LabBuildApp] Building in /opt/conda/share/jupyter/lab
[LabBuildApp] Building jupyterlab assets (production, minimized)
[LabBuildApp] WARNING | The extension "nbdime-jupyterlab" is outdated.

(base) jovyan@66ff31e068a0:~$ 

=======================================================

.env

OPENSEARCH_INITIAL_ADMIN_PASSWORD=Naver12#$



docker-compose.yaml

version: '3'
services:

  jupyter:
    image: jupyter/all-spark-notebook
    ports:
      - "8887:8888"  # jupyter 접속포트
      - "4041:4040"  # spark 접속포트
    volumes:
      - C:\Users\SUN\Documents\sop_folder\jupyter:/home/jovyan
    networks:
      - opensearch-net

  opensearch-node1:
    image: opensearchproject/opensearch:latest
    container_name: opensearch-node1
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-net

  opensearch-node2:
    image: opensearchproject/opensearch:latest
    container_name: opensearch-node2
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node2
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data2:/usr/share/opensearch/data
    networks:
      - opensearch-net

  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:latest
    container_name: opensearch-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
    networks:
      - opensearch-net

volumes:
  opensearch-data1:
  opensearch-data2:

networks:
  opensearch-net:

 

.env 파일과 docker-compose.yaml파일이 있는 곳에
jupyter폴더 만들기

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        # output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

with DAG(
        dag_id='hynix_pluto_split_file',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_file_op = PythonOperator(
        task_id='split_file_op',
        python_callable=split_file,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'n': 10
            }
    )

split_file_op
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "example.txt"  # 분할할 원본 파일명
n = 5  # 분할할 파일 수

split_file(input_file, n)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/one_tsv_file_1.tsv',
            'output_prefix':'/opt/airflow/dags/sp/total_split',
            'n': 10
            }
    )

split_tsv_op
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'output_prefix':'/opt/airflow/dags/one_tsv_file',
            'n': 1
            }
    )

split_tsv_op

 

import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "input.tsv"  # 입력 TSV 파일 이름
output_prefix = "output"  # 출력 파일 접두사
n = 3  # 분할할 파일 수

split_tsv(input_file, output_prefix, n)

'Python > Airflow' 카테고리의 다른 글

[airfow] tsv파일을 10개 파일로 나누는 로직  (0) 2025.02.18
[airflow] csv 파일을 tsv파일로 변환  (0) 2025.02.18
paramiko key처리  (0) 2025.02.11
sample  (0) 2025.02.11
airflow_sftp_minio_hdfs_boto3  (0) 2025.02.09

+ Recent posts