from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.providers.sftp.operators.sftp import SFTPHook, SFTPOperator
import csv
import math
def split_tsv(input_file, output_prefix, n):
# TSV 파일 읽기
with open(input_file, 'r', encoding='utf-8') as file:
reader = csv.reader(file, delimiter=',')
data = list(reader)
# 헤더와 데이터 분리
header = data[0]
rows = data[1:]
# 각 파일에 들어갈 행 수 계산
total_rows = len(rows)
rows_per_file = math.ceil(total_rows / n)
# 파일 분할 및 저장
for i in range(n):
start = i * rows_per_file
end = min((i + 1) * rows_per_file, total_rows)
output_file = f"{output_prefix}_{i+1}.tsv"
with open(output_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file, delimiter='\t')
writer.writerow(header)
writer.writerows(rows[start:end])
print(f"파일 저장 완료: {output_file}")
# 사용 예시
# input_file = "TbCorona19CountStatus.csv" # 입력 TSV 파일 이름
# output_prefix = "output" # 출력 파일 접두사
# n = 1 # 분할할 파일 수
with DAG(
dag_id='hynix_pluto',
start_date=pendulum.datetime(2024, 7, 1, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
split_tsv_op = PythonOperator(
task_id='split_tsv_op',
python_callable=split_tsv,
op_kwargs={
'input_file':'/opt/airflow/dags/TbCorona19CountStatus.csv',
'output_prefix':'/opt/airflow/dags/one_tsv_file',
'n': 1
}
)
split_tsv_op