Query_engine

This module contains functionality related to the the query_engine module for common.

Query_engine

RagQueryEngine

Bases: CustomQueryEngine

Custom query engine implementing Retrieval-Augmented Generation (RAG).

Coordinates retrieval, post-processing, and response generation for RAG workflow. Integrates with Langfuse for tracing and Chainlit for message tracking.

Attributes:
  • retriever (BaseRetriever) –

    Component for retrieving relevant documents

  • postprocessors (List[BaseNodePostprocessor]) –

    Chain of document post-processors

  • response_synthesizer (BaseSynthesizer) –

    Component for generating final responses

  • callback_manager (CallbackManager) –

    Manager for tracing and monitoring callbacks

  • chainlit_tag_format (str) –

    Format string for Chainlit message IDs

Source code in src/common/query_engine.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class RagQueryEngine(CustomQueryEngine):
    """Custom query engine implementing Retrieval-Augmented Generation (RAG).

    Coordinates retrieval, post-processing, and response generation for RAG workflow.
    Integrates with Langfuse for tracing and Chainlit for message tracking.

    Attributes:
        retriever: Component for retrieving relevant documents
        postprocessors: Chain of document post-processors
        response_synthesizer: Component for generating final responses
        callback_manager: Manager for tracing and monitoring callbacks
        chainlit_tag_format: Format string for Chainlit message IDs
    """

    retriever: BaseRetriever = Field(
        description="The retriever used to retrieve relevant nodes based on a given query."
    )
    postprocessors: List[BaseNodePostprocessor] = Field(
        description="The postprocessor used to process the retrieved nodes."
    )
    response_synthesizer: BaseSynthesizer = Field(
        description="The response synthesizer used to generate a response based on the retrieved nodes and the original query."
    )
    callback_manager: CallbackManager = Field(
        description="The callback manager used to handle callbacks."
    )
    chainlit_tag_format: str = Field(
        description="Format of the tag used to retrieve the trace by chainlit message id in Langfuse."
    )

    def query(
        self,
        str_or_query_bundle: QueryType,
        chainlit_message_id: str = None,
        source_process: SourceProcess = SourceProcess.CHAT_COMPLETION,
    ) -> RESPONSE_TYPE:
        """Process a query using RAG pipeline.

        Args:
            str_or_query_bundle: Raw query string or structured query bundle
            chainlit_message_id: Optional ID for Chainlit message tracking
            source_process: Source context of the query

        Returns:
            RESPONSE_TYPE: Generated response from RAG pipeline
        """
        self._set_chainlit_message_id(
            message_id=chainlit_message_id, source_process=source_process
        )
        return super().query(str_or_query_bundle)

    async def aquery(
        self,
        str_or_query_bundle: QueryType,
        chainlit_message_id: str = None,
        source_process: SourceProcess = SourceProcess.CHAT_COMPLETION,
    ) -> RESPONSE_TYPE:
        """Asynchronously process a query using RAG pipeline.

        Args:
            str_or_query_bundle: Raw query string or structured query bundle
            chainlit_message_id: Optional ID for Chainlit message tracking
            source_process: Source context of the query

        Returns:
            RESPONSE_TYPE: Generated response from RAG pipeline
        """
        self._set_chainlit_message_id(
            message_id=chainlit_message_id, source_process=source_process
        )
        return await super().aquery(str_or_query_bundle)

    def custom_query(self, query_str: str):
        """Execute custom RAG query processing pipeline.

        Implements retrieval, post-processing, and response synthesis stages.

        Args:
            query_str: Raw query string to process

        Returns:
            Response object containing generated answer
        """
        nodes = self.retriever.retrieve(query_str)
        for postprocessor in self.postprocessors:
            nodes = postprocessor.postprocess_nodes(
                nodes, QueryBundle(query_str)
            )
        response_obj = self.response_synthesizer.synthesize(query_str, nodes)
        return response_obj

    def get_current_langfuse_trace(self) -> StatefulTraceClient:
        """Retrieve current Langfuse trace from callback handler.

        Returns:
            StatefulTraceClient: Active Langfuse trace or None if not found
        """
        for handler in self.callback_manager.handlers:
            if isinstance(handler, LlamaIndexCallbackHandler):
                return handler.trace
        return None

    def set_session_id(self, session_id: str) -> None:
        """Set session ID for Langfuse tracing.

        Args:
            session_id: Unique identifier for current session
        """
        for handler in self.callback_manager.handlers:
            if isinstance(handler, LlamaIndexCallbackHandler):
                handler.session_id = session_id

    def _set_chainlit_message_id(
        self, message_id: str, source_process: SourceProcess
    ) -> None:
        """Configure Chainlit message tracking in Langfuse trace.

        Links Langfuse trace to Chainlit message and tags processing context.

        Args:
            message_id: Chainlit message identifier
            source_process: Source context of the query
        """
        for handler in self.callback_manager.handlers:
            if isinstance(handler, LlamaIndexCallbackHandler):
                handler.set_trace_params(
                    tags=[
                        self.chainlit_tag_format.format(message_id=message_id),
                        source_process.name.lower(),
                    ]
                )

_set_chainlit_message_id(message_id, source_process)

Configure Chainlit message tracking in Langfuse trace.

Links Langfuse trace to Chainlit message and tags processing context.

Parameters:
  • message_id (str) –

    Chainlit message identifier

  • source_process (SourceProcess) –

    Source context of the query

Source code in src/common/query_engine.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def _set_chainlit_message_id(
    self, message_id: str, source_process: SourceProcess
) -> None:
    """Configure Chainlit message tracking in Langfuse trace.

    Links Langfuse trace to Chainlit message and tags processing context.

    Args:
        message_id: Chainlit message identifier
        source_process: Source context of the query
    """
    for handler in self.callback_manager.handlers:
        if isinstance(handler, LlamaIndexCallbackHandler):
            handler.set_trace_params(
                tags=[
                    self.chainlit_tag_format.format(message_id=message_id),
                    source_process.name.lower(),
                ]
            )

aquery(str_or_query_bundle, chainlit_message_id=None, source_process=SourceProcess.CHAT_COMPLETION) async

Asynchronously process a query using RAG pipeline.

Parameters:
  • str_or_query_bundle (QueryType) –

    Raw query string or structured query bundle

  • chainlit_message_id (str, default: None ) –

    Optional ID for Chainlit message tracking

  • source_process (SourceProcess, default: CHAT_COMPLETION ) –

    Source context of the query

Returns:
  • RESPONSE_TYPE( RESPONSE_TYPE ) –

    Generated response from RAG pipeline

Source code in src/common/query_engine.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def aquery(
    self,
    str_or_query_bundle: QueryType,
    chainlit_message_id: str = None,
    source_process: SourceProcess = SourceProcess.CHAT_COMPLETION,
) -> RESPONSE_TYPE:
    """Asynchronously process a query using RAG pipeline.

    Args:
        str_or_query_bundle: Raw query string or structured query bundle
        chainlit_message_id: Optional ID for Chainlit message tracking
        source_process: Source context of the query

    Returns:
        RESPONSE_TYPE: Generated response from RAG pipeline
    """
    self._set_chainlit_message_id(
        message_id=chainlit_message_id, source_process=source_process
    )
    return await super().aquery(str_or_query_bundle)

custom_query(query_str)

Execute custom RAG query processing pipeline.

Implements retrieval, post-processing, and response synthesis stages.

Parameters:
  • query_str (str) –

    Raw query string to process

Returns:
  • Response object containing generated answer

Source code in src/common/query_engine.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def custom_query(self, query_str: str):
    """Execute custom RAG query processing pipeline.

    Implements retrieval, post-processing, and response synthesis stages.

    Args:
        query_str: Raw query string to process

    Returns:
        Response object containing generated answer
    """
    nodes = self.retriever.retrieve(query_str)
    for postprocessor in self.postprocessors:
        nodes = postprocessor.postprocess_nodes(
            nodes, QueryBundle(query_str)
        )
    response_obj = self.response_synthesizer.synthesize(query_str, nodes)
    return response_obj

get_current_langfuse_trace()

Retrieve current Langfuse trace from callback handler.

Returns:
  • StatefulTraceClient( StatefulTraceClient ) –

    Active Langfuse trace or None if not found

Source code in src/common/query_engine.py
119
120
121
122
123
124
125
126
127
128
def get_current_langfuse_trace(self) -> StatefulTraceClient:
    """Retrieve current Langfuse trace from callback handler.

    Returns:
        StatefulTraceClient: Active Langfuse trace or None if not found
    """
    for handler in self.callback_manager.handlers:
        if isinstance(handler, LlamaIndexCallbackHandler):
            return handler.trace
    return None

query(str_or_query_bundle, chainlit_message_id=None, source_process=SourceProcess.CHAT_COMPLETION)

Process a query using RAG pipeline.

Parameters:
  • str_or_query_bundle (QueryType) –

    Raw query string or structured query bundle

  • chainlit_message_id (str, default: None ) –

    Optional ID for Chainlit message tracking

  • source_process (SourceProcess, default: CHAT_COMPLETION ) –

    Source context of the query

Returns:
  • RESPONSE_TYPE( RESPONSE_TYPE ) –

    Generated response from RAG pipeline

Source code in src/common/query_engine.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def query(
    self,
    str_or_query_bundle: QueryType,
    chainlit_message_id: str = None,
    source_process: SourceProcess = SourceProcess.CHAT_COMPLETION,
) -> RESPONSE_TYPE:
    """Process a query using RAG pipeline.

    Args:
        str_or_query_bundle: Raw query string or structured query bundle
        chainlit_message_id: Optional ID for Chainlit message tracking
        source_process: Source context of the query

    Returns:
        RESPONSE_TYPE: Generated response from RAG pipeline
    """
    self._set_chainlit_message_id(
        message_id=chainlit_message_id, source_process=source_process
    )
    return super().query(str_or_query_bundle)

set_session_id(session_id)

Set session ID for Langfuse tracing.

Parameters:
  • session_id (str) –

    Unique identifier for current session

Source code in src/common/query_engine.py
130
131
132
133
134
135
136
137
138
def set_session_id(self, session_id: str) -> None:
    """Set session ID for Langfuse tracing.

    Args:
        session_id: Unique identifier for current session
    """
    for handler in self.callback_manager.handlers:
        if isinstance(handler, LlamaIndexCallbackHandler):
            handler.session_id = session_id

SourceProcess

Bases: Enum

Enumeration of possible query processing sources.

Attributes:
  • CHAT_COMPLETION

    Query from interactive chat

  • DEPLOYMENT_EVALUATION

    Query from deployment testing

Source code in src/common/query_engine.py
16
17
18
19
20
21
22
23
24
25
class SourceProcess(Enum):
    """Enumeration of possible query processing sources.

    Attributes:
        CHAT_COMPLETION: Query from interactive chat
        DEPLOYMENT_EVALUATION: Query from deployment testing
    """

    CHAT_COMPLETION = 1
    DEPLOYMENT_EVALUATION = 2