はじめに
こんにちは。弥生R&D室のsiidaです。R&D室ではSageMakerを使用して機械学習 (ML) のプロジェクトを進めています。SageMakerはMLのための様々な機能が搭載されたサービスであり、データ分析からモデル訓練、ひいてはワークフローの構築まで、SageMakerの中で完結させることができます。
SageMakerにはProcessingJobという機能があり、こちらはコマンドをジョブの形でサーバ上から実行できるというものです。重い計算を実行したり、再現性のある実験を行うために有用な機能ですが、実はPythonラッパーで簡単に実行することができます。今回はそのラッパーについて紹介します。
ProcessingJobのPythonラッパー
ドキュメント
https://sagemaker.readthedocs.io/en/stable/api/training/processing.html
インストール方法
pip install sagameker==2.231.0
使用例
def run_processor( processor: ScriptProcessor, s3_inputs: list[S3Path], s3_out_dir: S3Path, run_script: str | Path, script_arguments: list[str], *, job_name_prefix: str = "job", timestamp: Optional[str] = None, ): if timestamp is None: timestamp = get_current_time() s3_out_dir = s3_out_dir.joinpath(f"workflow/{job_name_prefix}-{timestamp}") processing_job_name = f"{job_name_prefix}-{timestamp}" run_script = Path(run_script) if isinstance(run_script, str) else run_script uploaded_script_path = upload_script_to_s3(run_script, s3_out_dir) print(f"Run script is uploaded to: {uploaded_script_path}") processor.run( code=uploaded_script_path, inputs=[ ProcessingInput( source=s3_input.uri, destination=f"{INPUT_DIR}/{get_nearest_parent_dir(s3_input)}", ) for s3_input in s3_inputs ], outputs=[ ProcessingOutput( source=OUTPUT_DIR, destination=s3_out_dir.uri, ), ], arguments=script_arguments if len(script_arguments) else None, wait=False, job_name=processing_job_name, )
簡単に要点を解説すると、
ScriptProcessor
オブジェクトを渡してProcessingInput
/ProcessingOutput
を渡してScriptProcessor.run()
を実行する
というものです。
こちらで掲載したコードは関数部分のみ抜粋したものですが、コマンドラインから動作するサンプルはGithub上で公開しています。
まとめ
- PythonからProcessingJobの操作が可能です
- 基本は入出力を定義して実行するだけ
本記事は下記の記事と同じ内容です。 アクセス解析を目的としてマルチポストしています。
弥生では一緒に働く仲間を募集しています。 ぜひエントリーお待ちしております。