This script is used to handle chat interactions using the ChainLit library and a chat engine.
Actions are observed by Langfuse.
To make it work vector storage should be filled with the embeddings of the documents.
To run the script execute the following command from the root directory of the project:
python src/chat.py
app_shutdown()
async
Clean up resources on application shutdown.
Stops the scheduler if it is running.
Source code in src/augment.py
94
95
96
97
98
99
100
101
102
103
104 | @cl.on_app_shutdown
async def app_shutdown() -> None:
"""
Clean up resources on application shutdown.
Stops the scheduler if it is running.
"""
try:
initializer.get_scheduler().stop()
logger.info("Scheduler stopped successfully")
except Exception as e:
logger.warning(f"Failed to stop scheduler: {e}")
|
app_startup()
async
Initialize the application on startup.
Sets up the augmentation initializer and configuration, and starts the scheduler.
Source code in src/augment.py
25
26
27
28
29
30
31
32
33
34 | @cl.on_app_startup
async def app_startup() -> None:
"""
Initialize the application on startup.
Sets up the augmentation initializer and configuration, and starts the scheduler.
"""
global initializer, configuration
initializer = AugmentationInitializer()
configuration = initializer.get_configuration()
initializer.get_scheduler().start()
|
get_data_layer()
Initialize Chainlit's data layer with the custom service.
Returns: |
-
ChainlitService ( ChainlitService
) –
The custom service for data layer.
|
Source code in src/augment.py
37
38
39
40
41
42
43
44
45 | @cl.data_layer
def get_data_layer() -> ChainlitService:
"""
Initialize Chainlit's data layer with the custom service.
Returns:
ChainlitService: The custom service for data layer.
"""
return ChainlitServiceFactory.create(configuration.augmentation)
|
main(user_message)
async
Process user messages and generate responses.
Parameters: |
-
user_message
(Message )
–
Message received from user
|
Source code in src/augment.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | @cl.on_message
async def main(user_message: cl.Message) -> None:
"""
Process user messages and generate responses.
Args:
user_message: Message received from user
"""
try:
chat_engine = cl.user_session.get("chat_engine")
assistant_message = cl.Message(content="", author="Assistant")
response = await cl.make_async(chat_engine.stream_chat)(
message=user_message.content,
chainlit_message_id=assistant_message.parent_id,
)
for token in response.response_gen:
await assistant_message.stream_token(token)
utils = ChainlitUtilsFactory.create(configuration.augmentation.chainlit)
utils.add_references(assistant_message, response)
await assistant_message.send()
except Exception as e:
# It is imprecise to catch all exceptions, but llamaindex doesn't provide unified RateLimitError
logger.error(f"Error in main: {e}")
await cl.ErrorMessage(
content="You have reached the request rate limit. Please try again later.",
).send()
|
start()
async
Initialize chat session with chat engine.
Sets up session-specific chat engine and displays welcome message.
Source code in src/augment.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 | @cl.on_chat_start
async def start() -> None:
"""
Initialize chat session with chat engine.
Sets up session-specific chat engine and displays welcome message.
"""
chat_engine = ChatEngineRegistry.get(
configuration.augmentation.chat_engine.name
).create(configuration)
chat_engine.set_session_id(cl.user_session.get("id"))
cl.user_session.set("chat_engine", chat_engine)
utils = ChainlitUtilsFactory.create(configuration.augmentation.chainlit)
await utils.get_disclaimer_message().send()
await utils.get_welcome_message().send()
|