admin 管理员组

文章数量: 887006

🔥 本文讲解的使用chatgpt的openai调用,包含单线程、多线程、多进程的批量调用处理数据。至于哪种方法快,说不准,得自己测测,我觉得多线程就ok了,因为只是调用😄

文章目录

  • 1、单线程批量处理
  • 2、多线程批量处理
  • 3、多进程批量处理

1、单线程批量处理

import openai
openai.api_key = 'sk-xxx'
import time

# 定义预测函数
def predict(prompt):

    # 请求返回结果
    # model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
    # messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
    # role:system:设置gpt人设。
    # role:assistant:表示gpt。
    # role:user:表示用户。
    retry_count = 100
    retry_interval = 1
    for _ in range(retry_count):
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "system", "content": "算法工程师"},
                          {"role": "user", "content": prompt}],
                temperature=0
            )

            # 抽出gpt答复的内容
            msg = response.choices[0].message["content"].strip()
            return msg
        except openai.error.RateLimitError as e:
            print("超出openai api 调用频率:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)
        except Exception as e:
            print("任务执行出错:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)
def main():
    start_time = time.time()


    prompt = """请用少于5个字回答问题:{}"""
    input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
    all_res = []
    for query in input_data[:1]:
        res = predict(prompt.format(query))
        all_res.append(res)
        time.sleep(1)



    end_time = time.time()
    total_run_time = round(end_time-start_time, 3)
    print('Total_run_time: {} s'.format(total_run_time))

    print('chatgpt answer: ', all_res)


if __name__ == "__main__":
    main()

2、多线程批量处理

import openai
openai.api_key = 'sk-xxx'

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import collections


# 定义预测函数
def predict(params):
    prompt, query = params
    prompt = prompt.format(query)

    # 请求返回结果
    # model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
    # messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
    # role:system:设置gpt人设。
    # role:assistant:表示gpt。
    # role:user:表示用户。
    retry_count = 100
    retry_interval = 1
    for _ in range(retry_count):
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "system", "content": "算法工程师"},
                          {"role": "user", "content": prompt}],
                temperature=0
            )
            # 抽出gpt答复的内容
            msg = response.choices[0].message["content"].strip()
            return query, msg

        except openai.error.RateLimitError as e:
            print("超出openai api 调用频率:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)


        except TimeoutError:
            print("任务执行超时:", query)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)

        except Exception as e:
            print("任务执行出错:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)

    return query,'api请求失败'


def main():
    start_time = time.time()
    # 多线程并行预测
    # 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整线程池的大小,以获得更好的性能。
    prompt = """请用少于5个字回答问题:```{}```"""
    input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
    output_data = []
    with ThreadPoolExecutor(max_workers=3) as executor:
        ## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
        # output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]

        # # 异步调用(多进程并发执行)
        # futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
        # query2res = collections.defaultdict(int)
        # # 同步等待结果(返回顺序和原数据顺序一致)
        # for job in futures:
        #     query, res = job.result(timeout=None)  # 默认timeout=None,不限时间等待结果
        #     query2res[query] = res
        #
        #     time.sleep(1)  # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒


        # 异步调用(多进程并发执行)
        futures = [executor.submit(predict, (prompt, query)) for query in input_data]
        query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
        # 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
        for job in as_completed(futures):
            query,res = job.result(timeout=None)  # 默认timeout=None,不限时间等待结果
            query2res[query] = res


            time.sleep(1)  # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒


    end_time = time.time()
    total_run_time = round(end_time-start_time, 3)
    print('Total_run_time: {} s'.format(total_run_time))
    print(query2res)

    import pandas as pd
    df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
    df.to_excel('./chatgpt_infer_result.xlsx', index=False)

if __name__ == "__main__":
    main()

3、多进程批量处理

import openai
openai.api_key = 'sk-xxx'

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import collections


# 定义预测函数
def predict(params):
    prompt, query = params
    prompt = prompt.format(query)

    # 请求返回结果
    # model:调用的模型名称,是一个字符串,用最新模型直接设置成gpt-3.5-turbo
    # messages:请求的文本内容,是一个列表,列表里每个元素类型是字典
    # role:system:设置gpt人设。
    # role:assistant:表示gpt。
    # role:user:表示用户。
    retry_count = 100
    retry_interval = 1
    for _ in range(retry_count):
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "system", "content": "算法工程师"},
                          {"role": "user", "content": prompt}],
                temperature=0
            )
            # 抽出gpt答复的内容
            msg = response.choices[0].message["content"].strip()
            return query, msg

        except openai.error.RateLimitError as e:
            print("超出openai api 调用频率:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2 # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)


        except TimeoutError:
            print("任务执行超时:", query)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)

        except Exception as e:
            print("任务执行出错:", e)
            print('重新请求....')
            retry_count += 1
            retry_interval *= 2  # 指数退避策略,每次重试后加倍重试间隔时间
            time.sleep(retry_interval)

    return query,'api请求失败'


def main():
    start_time = time.time()
    # 多进程并行预测
    # 您可能需要根据自己的需求调整间隔时间。另外,您可以根据需要调整进程池的大小,以获得更好的性能。
    prompt = """请用少于5个字回答问题:{}"""
    input_data = ['1+1等于几啊?', '2+2等于几啊?', '3+3等于几啊?', '4+4等于几啊?']
    # output_data = []

    # output_data = collections.defaultdict(int)
    with ProcessPoolExecutor(max_workers=2) as executor:
        ## 同步调用.submit之后直接.result(一个进程执行完才能下一个进程)
        # output_data = [executor.submit(predict, prompt.format(query)).result() for query in input_data]

        # # 异步调用(多进程并发执行)
        # futures = [executor.submit(predict, prompt.format(query)) for query in input_data]
        # query2res = collections.defaultdict(int)
        # # 同步等待结果(返回顺序和原数据顺序一致)
        # for job in futures:
        #     query, res = job.result(timeout=None)  # 默认timeout=None,不限时间等待结果
        #     query2res[query] = res
        #
        #     time.sleep(1)  # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒


        # 异步调用(多进程并发执行)
        futures = [executor.submit(predict, (prompt, query)) for query in input_data]
        query2res = collections.defaultdict(int) # 因为异步等待结果,返回的顺序是不定的,所以记录一下进程和输入数据的对应
        # 异步等待结果(返回顺序和原数据顺序可能不一致) ,直接predict函数里返回结果?
        for job in as_completed(futures):
            query,res = job.result(timeout=None)  # 默认timeout=None,不限时间等待结果
            query2res[query] = res


            time.sleep(1)  # 为了避免超过OpenAI API的速率限制,每次预测之间间隔1秒



    end_time = time.time()
    total_run_time = round(end_time-start_time, 3)
    print('Total_run_time: {} s'.format(total_run_time))
    print(query2res)

    import pandas as pd
    df = pd.DataFrame({'query': list(query2res.keys()), 'infer_result': list(query2res.values())})
    df.to_excel('./chatgpt_infer_result.xlsx', index=False)





if __name__ == "__main__":
    main()

本文标签: 多线程 批量 进程 ChatGpt OpenAI