MLOps

[kubeflow] Recommandation(XGBoost) 파이프라인 구축 - 2

Crysis 2022. 9. 26. 12:17

이번 포스트에서는 나머지 5개 노드를 연결한 파이프라인을 연결한 것을 작성해보려고 한다.

쿠버네티스의 기본적인 사용방법만 알고 시도해보았는데 kubeflow 자체를 사용하는 것은 어렵지 않지만 파이프라인을 커스텀하는 것에 부족함을 많이 느끼게 되어서 최근에는 쿠버네티스를 기초부터 학습해보는 중이다.

 


Pipeline

kubeflow로 구축해본 파이프라인은 아래처럼 총 6개 노드로 구성되어있다.

 


kfp

import kfp
from kfp import dsl
from kfp.components import OutputPath
from kfp import onprem


def convert_json():
    return dsl.ContainerOp(        
        name='Convert Data',
        image='crysiss/kubeflow-convertjson:0.2',
        arguments=[],
        file_outputs={
            'df_customer': '/tmp/df_customer.json',
            'df_purchase': '/tmp/df_purchase.json',
            'df_product': '/tmp/df_product.json'
        }        
    )

def preprocess_op(df_customer, df_purchase, df_product):
    return dsl.ContainerOp(
        name='Preprocess Data',
        image='crysiss/kubeflow-preprocess-a:0.3',
        arguments=[
            '--df_customer', df_customer,
            '--df_purchase', df_purchase,
            '--df_product', df_product
        ],
        file_outputs={
            'df': '/tmp/prerprocess.json'
        }
    )

'''
생략 
'''

def extract_top_op(y_test, pred_probs):
    return dsl.ContainerOp(
        name='Extract Top5&10',
        image='crysiss/kubeflow-metric:0.8.6',
        arguments=[
            '--y_test', y_test,
            '--pred_probs', pred_probs
        ],
        file_outputs={
            'metrics': 'tmp/metrics.json'
        }
    )


@dsl.pipeline(name='XGBOOST-REC Pipeline',
              description='A pipeline that trains and logs a classification model'
)
def data_pipeline():   
    _convert_json = convert_json()

    _preprocess = preprocess_op(
        dsl.InputArgumentPath(_convert_json.outputs['df_customer']),
        dsl.InputArgumentPath(_convert_json.outputs['df_purchase']),
        dsl.InputArgumentPath(_convert_json.outputs['df_product'])
    ).after(_convert_json)

    _spilt_data_op = spilt_data_op(
        dsl.InputArgumentPath(_preprocess.outputs['df'])
    ).after(_preprocess)

    _train_op = train_op(
        dsl.InputArgumentPath(_spilt_data_op.outputs['x_tr']),
        dsl.InputArgumentPath(_spilt_data_op.outputs['y_tr']),
        dsl.InputArgumentPath(_spilt_data_op.outputs['x_val']),
        dsl.InputArgumentPath(_spilt_data_op.outputs['y_val'])
    ).after(_spilt_data_op)

    _test_op = test_op(
        dsl.InputArgumentPath(_spilt_data_op.outputs['x_test']),
        dsl.InputArgumentPath(_spilt_data_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model']),
    ).after(_train_op)

    _extract_top_op = extract_top_op(
        dsl.InputArgumentPath(_spilt_data_op.outputs['y_test']),
        dsl.InputArgumentPath(_test_op.outputs['pred_probs'])
    ).after(_test_op)

if __name__ == "__main__":
    import kfp.compiler as compiler
    compiler.Compiler().compile(data_pipeline, 'xgboost_kf.yaml')

각 노드에서 file_outputs으로 정의된 output을 다음 노드의 arguments를 받아오는 방식이다.

이런 데이터 이동방식에는 확인한 것으로는 2가지 방식이 있다. (다른 방식이 더 있을 수도 있다)

 

  1.  arguments = ['--df_customer', convert_json.outputs["df_customer"]]
  2. InputArgumentPath를 파이프라인에 정의해 주어서 이동해주는 방식

 

이렇게 2가지 방법이 있고 이는 데이터 이동방식에 차이가 있다.

 

InputArgumentPath

데이터를 minio에 저장을 해놓고 kubeflow에서 자체적으로 읽을 수 있는 링크(파일위치 링크)로 데이터를 불러오는 방식이다.

InputArgumentPath는 데이터가 이동할 때 Serialization를 안하며 json이 아닌 다른 포맷도 쉽게 전달 가능하다.

 

arguments로 바로 전달

이 경우에는 json으로 데이터 변환을 해주어야 유리한 측면이 존재한다.

데이터가 이동할때 Serialization을 해주고 넘어가기 때문에 json으로 포맷을 변경하는 것을 추천한다. 바로 전달하게 되면 다음 노드에서 반드시 파이썬의 StringIO로 변환하는 작업이 필수적으로 필요하다

 

 


본 포스트에는 각 노드에 해당하는 파이썬 파일을 설명하진 않았다.

아래 깃허브 주소를 통해 들어가면 확인해 볼 수 있을 것이다. 사실 크게 설명할 부분이 없어 작성하지 않았다.

 

 

GitHub - kch8906/kubeflow-xgboost-recommendation

Contribute to kch8906/kubeflow-xgboost-recommendation development by creating an account on GitHub.

github.com

 

이번 파이프라인을 구현 시도하면서 위에 설명한 부분에서 많이 애를 먹었다. kubeflow에도 살펴봐도 명확하게 설명해주지 않아

카카오톡 오픈채팅 "파이썬 처음러럼"의 엘카인님의 도움을 많이 받았다.

 

추후에는 이 파이프라인으로 몇가지를 더 시도해 볼 생각인데 아래 사항을 적용해보는 것을 시도해보려고 한다

 

1. seldon-core로 모델 serving

2. Github Actions CI/CD

3. Train 노드에 GPU 할당

4. 프로메테우스, 그라파나(모니터링) 적용

 

 

Kubeflow 파이프라인을 어떻게 구현해봐서 신날 줄 알았는데 어떻게 된게 부족한 점만 계속 나온다...

 

'MLOps' 카테고리의 다른 글

k8s v1.25.2 master node 구축  (0) 2022.10.11
[kubeflow] Recommandation(XGBoost) 파이프라인 구축 - 1  (0) 2022.09.17
[Docker] 설치 및 기본 명령어  (0) 2022.09.01