Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

单卡多进程执行 #1223

Closed
huyiwen opened this issue Dec 7, 2024 · 5 comments
Closed

单卡多进程执行 #1223

huyiwen opened this issue Dec 7, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@huyiwen
Copy link

huyiwen commented Dec 7, 2024

我用了 #667 提到的多卡执行,不知道是否是文档较短的原因,每张卡的内存占用量和利用率都很低(5947MB / 80GB和10%上下)。想问一下有没有办法在一张卡上执行并行操作

@huyiwen huyiwen added the enhancement New feature or request label Dec 7, 2024
@myhloli
Copy link
Collaborator

myhloli commented Dec 7, 2024

可以单卡多进程,很简单,你只需要通过环境变量指定一张显卡,并使用多条命令行执行或python的多进程库调用接口方法即可

@huyiwen
Copy link
Author

huyiwen commented Dec 7, 2024

[Parallel(n_jobs=16)]: Using backend ThreadingBackend with 16 concurrent workers.
[Parallel(n_jobs=16)]: Done   9 tasks      | elapsed: 10.1min
[Parallel(n_jobs=16)]: Done  18 tasks      | elapsed: 53.5min
[Parallel(n_jobs=16)]: Done  29 tasks      | elapsed: 64.9min

我在client.py开了16个进程,但是server.py好像还是同时只有一个在处理

@myhloli
Copy link
Collaborator

myhloli commented Dec 7, 2024


[Parallel(n_jobs=16)]: Using backend ThreadingBackend with 16 concurrent workers.

[Parallel(n_jobs=16)]: Done   9 tasks      | elapsed: 10.1min

[Parallel(n_jobs=16)]: Done  18 tasks      | elapsed: 53.5min

[Parallel(n_jobs=16)]: Done  29 tasks      | elapsed: 64.9min

我在client.py开了16个进程,但是server.py好像还是同时只有一个在处理

单卡多进程需要改一下那个cs框架吧,或者让ai重新写一个简单的

@huyiwen
Copy link
Author

huyiwen commented Dec 7, 2024

import sys
import os
import torch
import filetype
import json, uuid
from unittest.mock import patch
from magic_pdf.tools.common import do_parse
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
import pymupdf
import numpy as np
from tqdm import tqdm


class MinerUAPI:
    def __init__(self, output_dir='/home/huyiwen/monorepo/projects/miniyulan/mineru/pdfs_output'):
        self.output_dir = output_dir

    @staticmethod
    def clean_memory(device):
        import gc
        if torch.cuda.is_available():
            with torch.cuda.device(device):
                torch.cuda.empty_cache()
                torch.cuda.ipc_collect()
        gc.collect()

    def setup(self, device):
        self.device = torch.device(device)
        with patch('magic_pdf.model.doc_analyze_by_custom_model.get_device') as mock_obj:
            mock_obj.return_value = device
            model_manager = ModelSingleton()
            model_manager.get_model(True, False)
            model_manager.get_model(False, False)
            mock_obj.assert_called()
            print(f'Model initialization complete!')

    def predict(self, inputs):
        try:
            pdf_name = str(uuid.uuid4())
            do_parse(self.output_dir, pdf_name, inputs[0], [], **inputs[1])
            return pdf_name
        except Exception as e:
            logger.error(f'Error during prediction: {e}')
            raise e
        finally:
            self.clean_memory(self.device)


def to_pdf(file_path):
    with pymupdf.open(file_path) as f:
        if f.is_pdf:
            pdf_bytes = f.tobytes()
        else:
            pdf_bytes = f.convert_to_pdf()
        return pdf_bytes


# Main execution: process multiple files in parallel
def process_files(files):
    miner_api = MinerUAPI()

    # Setup device for the miner API (you can add additional logic here to select devices)
    device = "cuda"
    miner_api.setup(device)

    # Process each file with data parallelism
    results = list(tqdm(map(miner_api.predict, [(to_pdf(file), {"parse_method": "auto", "debug_able": False}) for file in files])))

    return results


if __name__ == '__main__':
    dir_path = "pdfs"
    rank = int(sys.argv[1])
    world_size = int(sys.argv[2])
    files = [os.path.join(dir_path, f) for f in sorted(os.listdir(dir_path)) if f.endswith('pdf')][rank::world_size]
    results = process_files(files)
    print(results)

OK了

@huyiwen huyiwen closed this as completed Dec 7, 2024
@xcvil
Copy link

xcvil commented Dec 7, 2024

兄弟我直接改的LitServer里的

server = ls.LitServer(
        MinerUAPI(output_dir=args.output_dir),
        accelerator="cuda",
        devices=2, #number of GPU
        workers_per_device=8, #number of process per GPU
        timeout=False,
        track_requests=True
    )

是不是也一样

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants