from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        # output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

with DAG(
        dag_id='hynix_pluto_split_file',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_file_op = PythonOperator(
        task_id='split_file_op',
        python_callable=split_file,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'n': 10
            }
    )

split_file_op
import math

def split_file(input_file, n):
    # 입력 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        content = file.readlines()
    
    # 전체 라인 수
    total_lines = len(content)
    
    # 각 파일에 들어갈 라인 수 계산
    lines_per_file = math.ceil(total_lines / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * lines_per_file
        end = min((i + 1) * lines_per_file, total_lines)
        
        # 새 파일명 생성 (원본 파일명_001.txt 형식)
        output_file = f"{input_file.rsplit('.', 1)[0]}_{i+1:03d}.{input_file.rsplit('.', 1)[1]}"
        
        # 파일 저장
        with open(output_file, 'w', encoding='utf-8') as file:
            file.writelines(content[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "example.txt"  # 분할할 원본 파일명
n = 5  # 분할할 파일 수

split_file(input_file, n)
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/one_tsv_file_1.tsv',
            'output_prefix':'/opt/airflow/dags/sp/total_split',
            'n': 10
            }
    )

split_tsv_op
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter=',')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
# input_file = "TbCorona19CountStatus.csv"  # 입력 TSV 파일 이름
# output_prefix = "output"  # 출력 파일 접두사
# n = 1  # 분할할 파일 수


with DAG(
        dag_id='hynix_pluto',
        start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
        schedule=None,
        catchup=False
) as dag:
    
    split_tsv_op = PythonOperator(
        task_id='split_tsv_op',
        python_callable=split_tsv,
        op_kwargs={
            'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
            'output_prefix':'/opt/airflow/dags/one_tsv_file',
            'n': 1
            }
    )

split_tsv_op

 

import csv
import math

def split_tsv(input_file, output_prefix, n):
    # TSV 파일 읽기
    with open(input_file, 'r', encoding='utf-8') as file:
        reader = csv.reader(file, delimiter='\t')
        data = list(reader)
    
    # 헤더와 데이터 분리
    header = data[0]
    rows = data[1:]
    
    # 각 파일에 들어갈 행 수 계산
    total_rows = len(rows)
    rows_per_file = math.ceil(total_rows / n)
    
    # 파일 분할 및 저장
    for i in range(n):
        start = i * rows_per_file
        end = min((i + 1) * rows_per_file, total_rows)
        
        output_file = f"{output_prefix}_{i+1}.tsv"
        
        with open(output_file, 'w', encoding='utf-8', newline='') as file:
            writer = csv.writer(file, delimiter='\t')
            writer.writerow(header)
            writer.writerows(rows[start:end])
        
        print(f"파일 저장 완료: {output_file}")

# 사용 예시
input_file = "input.tsv"  # 입력 TSV 파일 이름
output_prefix = "output"  # 출력 파일 접두사
n = 3  # 분할할 파일 수

split_tsv(input_file, output_prefix, n)

'Python > Airflow' 카테고리의 다른 글

[airfow] tsv파일을 10개 파일로 나누는 로직  (0) 2025.02.18
[airflow] csv 파일을 tsv파일로 변환  (0) 2025.02.18
paramiko key처리  (0) 2025.02.11
sample  (0) 2025.02.11
airflow_sftp_minio_hdfs_boto3  (0) 2025.02.09

+ Recent posts