|
|
欢迎注册论坛,享受更多奶昔会员权益!
您需要 登录 才可以下载或查看,没有账号?注册
×
今天又发现一家免费的第三方 DeepSeek R1 API,注册就有免费额度 50¥(现在缩减到 20¥了,而且多了个邀请码机制,不过被邀请的没有额外额度
https://sophnet.com/#/model/overview
官网标参数如下:
实测速度非常快,比华为云和腾讯云 LKEAP 都快,推理信息放在 reasoning_content 字段。
懒得注册的自己玩,附上脚本
- import requests
- import time
- from concurrent.futures import ThreadPoolExecutor
- import logging
- from datetime import datetime
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(threadName)s - %(message)s'
- )
- # 定义常量
- TOKEN = ''
- PROJECT_ID = '887100'
- PROJECT_TYPE = '1'
- # 获取手机号码
- def get_mobile():
- print('请求获取手机号码...')
- response = requests.post('http://h5.do889.com:81/api/get_mobile', data={
- 'token': TOKEN + ';project_id=' + PROJECT_ID + ';project_type=' + PROJECT_TYPE + ';operator=0;loop=1',
- })
- print(f'获取手机号码请求的响应: {response.status_code}, 内容: {response.text}')
- mobile_data = response.json()
- return mobile_data.get('mobile')
- # 发送验证码请求
- def send_code(phone_number):
- print('请求发送验证码...')
- code_response = requests.post('https://sophnet.com/api/sys/code', json={
- 'phoneNumber': phone_number
- })
- print(f'发送验证码请求的响应: {code_response.status_code}, 内容: {code_response.text}')
- return code_response
- # 轮询获取短信
- def poll_for_sms(phone_number):
- print('开始轮询获取短信...')
- start_time = time.time() # 记录开始时间
- while True:
- message_response = requests.post('http://h5.do889.com:81/api/get_message', data={
- 'token': TOKEN + ';phone_num=' + phone_number + ';project_type=' + PROJECT_TYPE + ';project_id=' + PROJECT_ID
- })
- print(f'获取短信请求的响应: {message_response.status_code}, 内容: {message_response.text}')
- message_data = message_response.json()
- if message_data['message'] == '短信还未到达,请继续获取':
- print('短信还未到达,继续轮询...')
- if time.time() - start_time > 30: # 检查是否超过 30 秒
- print('超过 30 秒未收到短信,跳过此次...')
- return None
- time.sleep(5) # 等待 5 秒后重试
- else:
- code = message_data['data'][0]['modle'].split('验证码:')[1].split(',')[0]
- print(f'获取到的验证码: {code}')
- return code
- # 登录并获取 access token
- def login(phone_number, code):
- print('请求登录并获取 access token...')
- login_response = requests.post('https://sophnet.com/api/sys/login/code', json={
- 'phoneNumber': phone_number,
- 'code': code,
- 'password': ''
- })
- print(f'登录请求的响应: {login_response.status_code}, 内容: {login_response.text}')
- return login_response.json()['result']['token']
- # 获取组织ID
- def get_org_id(access_token):
- print('获取组织ID...')
- org_response = requests.get('https://sophnet.com/api/orgs/joined-org', headers={
- 'Authorization': f'Bearer {access_token}'
- })
- print(f'获取组织ID响应: {org_response.status_code}, 内容: {org_response.text}')
- return org_response.json()['result']['list'][0]['id']
- # 获取项目ID
- def get_project_id(org_id, access_token):
- print('获取项目ID...')
- project_response = requests.get(f'https://sophnet.com/api/orgs/{org_id}/projects', params={
- 'pageNum': 1,
- 'pageSize': 99999
- }, headers={
- 'Authorization': f'Bearer {access_token}'
- })
- print(f'获取项目ID响应: {project_response.status_code}, 内容: {project_response.text}')
- return project_response.json()['result']['list'][0]['id']
- # 请求服务
- def request_service(project_id, access_token, display_model_id):
- print('请求服务...')
- service_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/service', json={
- "displayModelId": display_model_id,
- "llmType": "ChatCompletion",
- "quota": 200
- }, headers={
- 'Authorization': f'Bearer {access_token}'
- })
- print(f'请求服务响应: {service_response.status_code}, 内容: {service_response.text}')
- return service_response
- # 请求 API Key
- def request_api_key(project_id, access_token):
- print('请求服务以获取 API Key...')
- api_key_response = requests.post(f'https://sophnet.com/api/projects/{project_id}/apikey', headers={
- 'Authorization': f'Bearer {access_token}'
- }, json={
- 'name': 'API Key'
- })
- print(f'获取 API Key 请求的响应: {api_key_response.status_code}, 内容: {api_key_response.text}')
- return api_key_response.json()['result']['apikey']
- def process_single_flow():
- """执行单个完整的流程"""
- try:
- phone_number = get_mobile()
- if not phone_number:
- logging.info("获取手机号失败")
- return
- code_response = send_code(phone_number)
- if not code_response:
- logging.info(f"发送验证码失败: {phone_number}")
- return
- code = poll_for_sms(phone_number)
- if not code:
- logging.info(f"获取验证码失败: {phone_number}")
- return
- access_token = login(phone_number, code)
- if not access_token:
- logging.info(f"登录失败: {phone_number}")
- return
- org_id = get_org_id(access_token)
- if not org_id:
- logging.info(f"获取组织ID失败: {phone_number}")
- return
- project_id = get_project_id(org_id, access_token)
- if not project_id:
- logging.info(f"获取项目ID失败: {phone_number}")
- return
- # 并发执行服务请求和API密钥获取
- with ThreadPoolExecutor() as executor:
- # 先执行服务请求
- service_futures = [
- executor.submit(request_service, project_id, access_token, 11),
- executor.submit(request_service, project_id, access_token, 6)
- ]
- # 等待服务请求完成
- for future in service_futures:
- service_response = future.result()
- if not service_response:
- logging.error("服务请求失败")
- return False
-
- # 获取API密钥
- api_key_future = executor.submit(request_api_key, project_id, access_token)
- api_key = api_key_future.result()
- if api_key:
- with open('result.txt', 'a') as f:
- f.write(api_key + '\n') # 保存 API 密钥
- logging.info(f"API Key 已保存: {phone_number}")
- else:
- logging.error("获取 API Key 失败")
- return False
- logging.info(f"流程完成: {phone_number}")
- return True
- except Exception as e:
- logging.error(f"流程出错: {str(e)}")
- return False
- def main(concurrent_flows=5):
- """
- 主函数,控制并发流程数量,当一个流程完成后立即启动新的流程
- :param concurrent_flows: 最大并发流程数量
- """
- logging.info(f"开始运行,最大并发数: {concurrent_flows}")
-
- # 创建一个长期运行的线程池
- with ThreadPoolExecutor(max_workers=concurrent_flows) as executor:
- running_futures = set()
-
- while True:
- # 检查并移除已完成的任务
- done_futures = {f for f in running_futures if f.done()}
- for future in done_futures:
- try:
- result = future.result()
- if result:
- logging.info("一个流程成功完成")
- else:
- logging.info("一个流程失败")
- except Exception as e:
- logging.error(f"流程执行出错: {str(e)}")
- running_futures.remove(future)
-
- # 当有空闲线程时,立即启动新的流程
- while len(running_futures) < concurrent_flows:
- new_future = executor.submit(process_single_flow)
- running_futures.add(new_future)
- logging.info("启动新的流程")
-
- # 短暂休眠以避免CPU过度使用
- time.sleep(0.1)
- if __name__ == "__main__":
- # 设置并发数量为5,您可以根据需要调整这个数字
- main(concurrent_flows=20)
复制代码 |
|