Skip to content

Commit ce768d4

Browse files
committed
refactor
1 parent e1f2da1 commit ce768d4

36 files changed

+907
-591
lines changed

app/common/app_settings.py

+169-93
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
from concurrent.futures import ProcessPoolExecutor
2-
from concurrent.futures.process import BrokenProcessPool
31
from multiprocessing import Process
4-
from threading import Event
5-
from threading import Thread
6-
from time import sleep
2+
from os import kill
3+
from signal import SIGINT
4+
from threading import Event, Thread
75
from urllib import parse
86

97
import requests
@@ -28,11 +26,12 @@
2826
from app.shared import Shared
2927
from app.utils.chat.managers.cache import CacheManager
3028
from app.utils.js_initializer import js_url_initializer
31-
from app.utils.logger import api_logger
29+
from app.utils.logger import ApiLogger
3230
from app.viewmodels.admin import ApiKeyAdminView, UserAdminView
3331

3432

3533
def check_health(url: str) -> bool:
34+
"""Check if the given url is available or not"""
3635
try:
3736
schema = parse.urlparse(url).scheme
3837
netloc = parse.urlparse(url).netloc
@@ -43,48 +42,183 @@ def check_health(url: str) -> bool:
4342
return False
4443

4544

46-
def start_llama_cpp_server():
45+
def start_llama_cpp_server(shared: Shared):
46+
"""Start Llama CPP server. if it is already running, terminate it first."""
4747
from app.start_llama_cpp_server import run
4848

49-
if Shared().process is not None and Shared().process.is_alive():
50-
api_logger.warning("Terminating existing Llama CPP server")
51-
Shared().process.terminate()
52-
Shared().process.join()
49+
if shared.process.is_alive():
50+
ApiLogger.cwarning("Terminating existing Llama CPP server")
51+
shared.process.terminate()
52+
shared.process.join()
5353

54-
api_logger.critical("Starting Llama CPP server")
55-
Shared().process = Process(target=run, args=(Shared().process_terminate_signal,))
56-
Shared().process.start()
54+
ApiLogger.ccritical("Starting Llama CPP server")
55+
shared.process = Process(target=run, daemon=True)
56+
shared.process.start()
5757

5858

59-
def shutdown_llama_cpp_server():
60-
api_logger.critical("Shutting down Llama CPP server")
61-
Shared().process_terminate_signal.set()
62-
Shared().process.join()
59+
def shutdown_llama_cpp_server(shared: Shared):
60+
"""Shutdown Llama CPP server."""
61+
ApiLogger.ccritical("Shutting down Llama CPP server")
62+
if shared.process.is_alive() and shared.process.pid:
63+
kill(shared.process.pid, SIGINT)
64+
shared.process.join()
6365

6466

65-
def monitor_llama_cpp_server(config: Config, terminate_signal: Event) -> None:
66-
while not terminate_signal.is_set():
67-
sleep(0.5)
68-
if config.llama_cpp_completion_url:
69-
if not check_health(config.llama_cpp_completion_url):
70-
if config.is_llama_cpp_booting or terminate_signal.is_set():
71-
continue
72-
api_logger.error("Llama CPP server is not available")
73-
config.is_llama_cpp_available = False
74-
config.is_llama_cpp_booting = True
75-
start_llama_cpp_server()
76-
else:
77-
config.is_llama_cpp_booting = False
78-
config.is_llama_cpp_available = True
79-
shutdown_llama_cpp_server()
67+
def monitor_llama_cpp_server(
68+
config: Config,
69+
shared: Shared,
70+
) -> None:
71+
"""Monitors the Llama CPP server and handles server availability.
72+
73+
Parameters:
74+
- `config: Config`: An object representing the server configuration.
75+
- `shared: Shared`: An object representing shared data."""
76+
thread_sigterm: Event = shared.thread_terminate_signal
77+
if not config.llama_cpp_completion_url:
78+
return
79+
while True:
80+
if not check_health(config.llama_cpp_completion_url):
81+
if thread_sigterm.is_set():
82+
break
83+
if config.is_llama_cpp_booting:
84+
continue
85+
ApiLogger.cerror("Llama CPP server is not available")
86+
config.is_llama_cpp_available = False
87+
config.is_llama_cpp_booting = True
88+
try:
89+
start_llama_cpp_server(shared)
90+
except ImportError:
91+
ApiLogger.cerror("ImportError: Llama CPP server is not available")
92+
return
93+
else:
94+
config.is_llama_cpp_booting = False
95+
config.is_llama_cpp_available = True
96+
shutdown_llama_cpp_server(shared)
97+
98+
99+
async def on_startup():
100+
"""
101+
Performs necessary operations during application startup.
102+
103+
This function is called when the application is starting up.
104+
It performs the following operations:
105+
- Logs a startup message using ApiLogger.
106+
- Retrieves the configuration object.
107+
- Checks if the MySQL database connection is initiated and logs the status.
108+
- Raises a ConnectionError if the Redis cache connection is not established.
109+
- Checks if the Redis cache connection is initiated and logs the status.
110+
- Attempts to import and set uvloop as the event loop policy, if available, and logs the result.
111+
- Starts Llama CPP server monitoring if the Llama CPP completion URL is provided.
112+
"""
113+
ApiLogger.ccritical("⚙️ Booting up...")
114+
config = Config.get()
115+
shared = Shared()
116+
if db.is_initiated:
117+
ApiLogger.ccritical("MySQL DB connected!")
118+
else:
119+
ApiLogger.ccritical("MySQL DB connection failed!")
120+
if cache.redis is None:
121+
raise ConnectionError("Redis is not connected yet!")
122+
if cache.is_initiated and await cache.redis.ping():
123+
await CacheManager.delete_user(f"testaccount@{config.host_main}")
124+
ApiLogger.ccritical("Redis CACHE connected!")
125+
else:
126+
ApiLogger.ccritical("Redis CACHE connection failed!")
127+
try:
128+
import asyncio
129+
130+
import uvloop # type: ignore
131+
132+
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
133+
ApiLogger.ccritical("uvloop installed!")
134+
except ImportError:
135+
ApiLogger.ccritical("uvloop not installed!")
136+
137+
if config.llama_cpp_completion_url:
138+
# Start Llama CPP server monitoring
139+
ApiLogger.ccritical("Llama CPP server monitoring started!")
140+
shared.thread = Thread(
141+
target=monitor_llama_cpp_server,
142+
args=(config, shared),
143+
daemon=True,
144+
)
145+
shared.thread.start()
146+
147+
148+
async def on_shutdown():
149+
"""
150+
Performs necessary operations during application shutdown.
151+
152+
This function is called when the application is shutting down.
153+
It performs the following operations:
154+
- Logs a shutdown message using ApiLogger.
155+
- Sets terminate signals for shared threads and processes.
156+
- Shuts down the process manager, if available.
157+
- Shuts down the process pool executor, if available.
158+
- Terminates and joins the process, if available.
159+
- Joins the thread, if available.
160+
- Closes the database and cache connections.
161+
- Logs a message indicating the closure of DB and CACHE connections.
162+
"""
163+
ApiLogger.ccritical("⚙️ Shutting down...")
164+
shared = Shared()
165+
# await CacheManager.delete_user(f"testaccount@{HOST_MAIN}")
166+
shared.thread_terminate_signal.set()
167+
shared.process_terminate_signal.set()
168+
169+
process_manager = shared._process_manager
170+
if process_manager is not None:
171+
process_manager.shutdown()
172+
173+
process_pool_executor = shared._process_pool_executor
174+
if process_pool_executor is not None:
175+
process_pool_executor.shutdown(wait=True)
176+
177+
process = shared._process
178+
if process is not None:
179+
process.terminate()
180+
process.join()
181+
182+
thread = shared._thread
183+
if thread is not None:
184+
thread.join()
185+
186+
await db.close()
187+
await cache.close()
188+
ApiLogger.ccritical("DB & CACHE connection closed!")
80189

81190

82191
def create_app(config: Config) -> FastAPI:
192+
"""
193+
Creates and configures the FastAPI application.
194+
195+
Args:
196+
config (Config): The configuration object.
197+
198+
Returns:
199+
FastAPI: The configured FastAPI application.
200+
201+
This function creates a new FastAPI application, sets the specified title, description, and version,
202+
and adds `on_startup` and `on_shutdown` event handlers.
203+
204+
It then starts the database and cache connections and initializes the JavaScript URL.
205+
206+
If the database engine is available, it adds admin views for managing users, API keys, and API white lists.
207+
208+
Next, it adds the necessary middlewares for access control, CORS, and trusted hosts.
209+
210+
It mounts the "/chat" endpoint for serving static files, and includes routers for index, websocket,
211+
authentication, services, users, and user services.
212+
213+
Finally, it sets the application's config and shared state and returns the configured application.
214+
"""
83215
# Initialize app & db & js
84216
new_app = FastAPI(
85217
title=config.app_title,
86218
description=config.app_description,
87219
version=config.app_version,
220+
on_startup=[on_startup],
221+
on_shutdown=[on_shutdown],
88222
)
89223
db.start(config=config)
90224
cache.start(config=config)
@@ -164,64 +298,6 @@ def create_app(config: Config) -> FastAPI:
164298
tags=["User Services"],
165299
dependencies=[Depends(USER_DEPENDENCY)],
166300
)
167-
168-
@new_app.on_event("startup")
169-
async def startup():
170-
if db.is_initiated:
171-
api_logger.critical("MySQL DB connected!")
172-
else:
173-
api_logger.critical("MySQL DB connection failed!")
174-
if cache.redis is None:
175-
raise ConnectionError("Redis is not connected yet!")
176-
if cache.is_initiated and await cache.redis.ping():
177-
await CacheManager.delete_user(f"testaccount@{config.host_main}")
178-
api_logger.critical("Redis CACHE connected!")
179-
else:
180-
api_logger.critical("Redis CACHE connection failed!")
181-
try:
182-
import asyncio
183-
184-
import uvloop # type: ignore
185-
186-
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
187-
api_logger.critical("uvloop installed!")
188-
except ImportError:
189-
api_logger.critical("uvloop not installed!")
190-
191-
if config.llama_cpp_completion_url:
192-
# Start Llama CPP server monitoring
193-
api_logger.critical("Llama CPP server monitoring started!")
194-
Shared().thread = Thread(
195-
target=monitor_llama_cpp_server,
196-
args=(config, Shared().thread_terminate_signal),
197-
)
198-
Shared().thread.start()
199-
200-
@new_app.on_event("shutdown")
201-
async def shutdown():
202-
# await CacheManager.delete_user(f"testaccount@{HOST_MAIN}")
203-
Shared().thread_terminate_signal.set()
204-
Shared().process_terminate_signal.set()
205-
206-
process_manager = Shared()._process_manager
207-
if process_manager is not None:
208-
process_manager.shutdown()
209-
210-
process_pool_executor = Shared()._process_pool_executor
211-
if process_pool_executor is not None:
212-
process_pool_executor.shutdown(wait=False)
213-
214-
process = Shared()._process
215-
if process is not None:
216-
process.terminate()
217-
process.join()
218-
219-
thread = Shared()._thread
220-
if thread is not None:
221-
thread.join()
222-
223-
await db.close()
224-
await cache.close()
225-
api_logger.critical("DB & CACHE connection closed!")
226-
301+
new_app.state.config = config
302+
new_app.state.shared = Shared()
227303
return new_app

app/common/constants.py

+14
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ class DescriptionTemplates:
6464
),
6565
input_variables=[],
6666
)
67+
USER_AI__ENGLISH: PromptTemplate = PromptTemplate(
68+
template=(
69+
"You are a good English teacher. Any sentence that {user} says that is surrounded"
70+
' by double quotation marks ("") is asking how you interpret that sentence. Pleas'
71+
"e analyze and explain that sentence in as much detail as possible. For the rest "
72+
"of the sentences, please respond in a way that will help {user} learn English."
73+
),
74+
input_variables=["user"],
75+
)
6776

6877

6978
class ChatTurnTemplates:
@@ -82,6 +91,11 @@ class ChatTurnTemplates:
8291
input_variables=["role", "content"],
8392
template_format="f-string",
8493
)
94+
ROLE_CONTENT_4: PromptTemplate = PromptTemplate(
95+
template="###{role}: {content}\n",
96+
input_variables=["role", "content"],
97+
template_format="f-string",
98+
)
8599

86100

87101
class SummarizationTemplates:

app/common/lotties.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import Self
2+
3+
from app.common.mixins import EnumMixin
4+
5+
6+
class Lotties(EnumMixin):
7+
CLICK = "lottie-click"
8+
READ = "lottie-read"
9+
SCROLL_DOWN = "lottie-scroll-down"
10+
GO_BACK = "lottie-go-back"
11+
SEARCH_WEB = "lottie-search-web"
12+
SEARCH_DOC = "lottie-search-doc"
13+
OK = "lottie-ok"
14+
FAIL = "lottie-fail"
15+
TRANSLATE = "lottie-translate"
16+
17+
def format(self, contents: str, end: bool = True) -> str:
18+
return f"\n```{self.get_value(self)}\n{contents}" + ("\n```\n" if end else "")
19+
20+
21+
if __name__ == "__main__":
22+
print(Lotties.CLICK.format("hello"))

app/middlewares/token_validator.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from app.models.base_models import UserToken
2020
from app.utils.auth.token import token_decode
2121
from app.utils.date_utils import UTC
22-
from app.utils.logger import api_logger
22+
from app.utils.logger import ApiLogger
2323
from app.utils.params_utils import hash_params
2424

2525

@@ -138,7 +138,7 @@ async def access_control(request: Request, call_next: RequestResponseEndpoint):
138138
"code": error.code if not isinstance(error, HTTPException) else None,
139139
},
140140
)
141-
api_logger.log_api(
141+
ApiLogger.clog(
142142
request=request,
143143
response=response,
144144
error=error,
@@ -150,7 +150,7 @@ async def access_control(request: Request, call_next: RequestResponseEndpoint):
150150
else:
151151
# Log error or service info
152152
if url.startswith("/api/services"):
153-
api_logger.log_api(
153+
ApiLogger.clog(
154154
request=request,
155155
response=response,
156156
cookies=request.cookies,

0 commit comments

Comments
 (0)