.env

OPENSEARCH_INITIAL_ADMIN_PASSWORD=Naver12#$



docker-compose.yaml

version: '3'
services:

  jupyter:
    image: jupyter/all-spark-notebook
    ports:
      - "8887:8888"  # jupyter 접속포트
      - "4041:4040"  # spark 접속포트
    volumes:
      - C:\Users\SUN\Documents\sop_folder\jupyter:/home/jovyan
    networks:
      - opensearch-net

  opensearch-node1:
    image: opensearchproject/opensearch:latest
    container_name: opensearch-node1
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node1
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data1:/usr/share/opensearch/data
    ports:
      - 9200:9200
      - 9600:9600
    networks:
      - opensearch-net

  opensearch-node2:
    image: opensearchproject/opensearch:latest
    container_name: opensearch-node2
    environment:
      - cluster.name=opensearch-cluster
      - node.name=opensearch-node2
      - discovery.seed_hosts=opensearch-node1,opensearch-node2
      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
      - bootstrap.memory_lock=true
      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
      - OPENSEARCH_INITIAL_ADMIN_PASSWORD=${OPENSEARCH_INITIAL_ADMIN_PASSWORD}
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - opensearch-data2:/usr/share/opensearch/data
    networks:
      - opensearch-net

  opensearch-dashboards:
    image: opensearchproject/opensearch-dashboards:latest
    container_name: opensearch-dashboards
    ports:
      - 5601:5601
    expose:
      - "5601"
    environment:
      OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
    networks:
      - opensearch-net

volumes:
  opensearch-data1:
  opensearch-data2:

networks:
  opensearch-net:

 

.env 파일과 docker-compose.yaml파일이 있는 곳에
jupyter폴더 만들기

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        # output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

with DAG(
        dag_id='hynix_pluto_split_file',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_file_op = PythonOperator(
        task_id='split_file_op',
        python_callable=split_file,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'n': 10
            }
    )

split_file_op
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "example.txt"  # 분할할 원본 파일명
n = 5  # 분할할 파일 수

split_file(input_file, n)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/one_tsv_file_1.tsv',
            'output_prefix':'/opt/airflow/dags/sp/total_split',
            'n': 10
            }
    )

split_tsv_op
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'output_prefix':'/opt/airflow/dags/one_tsv_file',
            'n': 1
            }
    )

split_tsv_op

 

import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "input.tsv"  # 입력 TSV 파일 이름
output_prefix = "output"  # 출력 파일 접두사
n = 3  # 분할할 파일 수

split_tsv(input_file, output_prefix, n)

'Python > Airflow' 카테고리의 다른 글

[airfow] tsv파일을 10개 파일로 나누는 로직  (0) 2025.02.18
[airflow] csv 파일을 tsv파일로 변환  (0) 2025.02.18
paramiko key처리  (0) 2025.02.11
sample  (0) 2025.02.11
airflow_sftp_minio_hdfs_boto3  (0) 2025.02.09
import paramiko
import csv

# SSH 접속 정보
host = "remote_server_ip"
username = "user"
password = "password"

# SSH 접속
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username=username, password=password)

# invoke_shell 을 사용해 명령어 전달
channel = client.invoke_shell()
channel.send("your_command")
time.sleep(30)
output = channel.recv(9999).decode('utf-8')

# output 을 csv 파일로 저장
file_name = "output.csv"
with open(file_name, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["output"])
    for line in output:
        writer.writerow([line.strip()])

# SSH 접속 종료
client.close()

sample

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator

sftp_hook = SFTPHook(ssh_conn_id=None, username="SYP", remote_host="xxx.xxx.xxx.xxx", cmd_timeout=None, key_file=None)

with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    put_file = SFTPOperator(
        task_id="put_file",
        sftp_hook = sftp_hook,
        local_filepath="/opt/airflow/config/10080.py",
        remote_filepath="10080.py",
        operation="get",
        create_intermediate_dirs=True,
        dag=dag
    )

put_file

 

 

https://engineer-1.tistory.com/entry/Airflow-SFTP-%EC%A0%91%EC%86%8D-%EC%84%A4%EC%A0%95-SSH-Key-%EC%9D%B8%EC%A6%9D

 

Extra 옵션 설명 :
no_host_key_check : default 'true' 

- true 일 경우 (airflow 설치한 서버) 로컬 서버의 'known_hosts' 파일에 원격 서버의 정보가 없을 경우 SFTP 접속 시 자동으로 known_hosts에 원격서버의 정보가 작성된다. 
- false 일 경우 로컬 서버의 'known_hosts' 적혀 있지 않은 원격서버의 키 인증 접속은 실패한다. known_hosts 파일의 관리가 필요하다.

 

단, "no_host_key_check": "true" 일경우 airflow log에 아래 경고 문구가 출력됨으로 업무에 적용 시 참고하길 바란다.

https://github.com/hjkim-sun/airflow/tree/master

 

▲▲ Python 오퍼레이터에 op_args로 변수 할당하기 ▲▲

dags_python_with_op_args.py

 

▲▲ Python 오퍼레이터에 op_kwargs로 변수 할당하기 ▲▲

dags_python_with_op_kwargs.py

 

▲▲ Python Operator에서 Xcom 사용 ▲▲

dags_python_with_xcom_eg1.py

dags_python_with_xcom_eg2.py

 

▲▲ Bash Operator에서 xcom 사용 ▲▲

dags_bash_with_xcom.py

 

▲▲ python과 bash operator간에 xcom 변수사용 ▲▲

dags_bash_python_with_xcom.py

 

▲▲ Bash Operator에서 xcom 사용 ▲▲

dags_bash_with_variable.py

 

 

 

 

 

 

 

 

 

 

 

'Python > Airflow' 카테고리의 다른 글

[airfow] tsv파일을 10개 파일로 나누는 로직  (0) 2025.02.18
[airflow] csv 파일을 tsv파일로 변환  (0) 2025.02.18
n개의 수만큼 파일을 분할 하는 파이썬로직  (0) 2025.02.18
paramiko key처리  (0) 2025.02.11
sample  (0) 2025.02.11

+ Recent posts