Initializer

This module contains functionality related to the the initializer module for augmentation.bootstrap.

Initializer

AugmentationInitializer

Bases: EmbeddingInitializer

Initializer for the augmentation process.

Extends the EmbeddingInitializer to set up the environment for augmentation tasks. This initializer is responsible for loading the required configuration and registering all necessary components with the dependency injection container.

Multiple components are used in the embedding, augmentation and evaluation processes. To avoid code duplication, this initializer is used to bind the components to the injector. It is intended to be subclassed by the specific initializers for each process.

Source code in src/augmentation/bootstrap/initializer.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
class AugmentationInitializer(EmbeddingInitializer):
    """Initializer for the augmentation process.

    Extends the EmbeddingInitializer to set up the environment for augmentation tasks.
    This initializer is responsible for loading the required configuration and
    registering all necessary components with the dependency injection container.

    Multiple components are used in the embedding, augmentation and evaluation processes.
    To avoid code duplication, this initializer is used to bind the components to the injector.
    It is intended to be subclassed by the specific initializers for each process.
    """

    def __init__(
        self,
        configuration_class: Type[
            BaseConfiguration
        ] = AugmentationConfiguration,
        package_loader: BasePackageLoader = AugmentationPackageLoader(),
    ):
        """Initialize the AugmentationInitializer.

        Args:
            configuration_class: The configuration class to use for loading settings.
                                Defaults to AugmentationConfiguration.
            package_loader: Package loader instance responsible for loading required packages.
                           Defaults to a new AugmentationPackageLoader instance.
        """
        super().__init__(
            configuration_class=configuration_class,
            package_loader=package_loader,
        )
        self.scheduler = AugmentationScheduler(
            configuration=self.get_configuration(),
            logger=LoggerConfiguration.get_logger(__name__),
        )
        self._initialize_default_prompt()

    def get_scheduler(self) -> AugmentationScheduler:
        """Get the scheduler instance for managing scheduled tasks.

        Returns:
            AsyncIOScheduler: The scheduler instance used for scheduling jobs.
        """
        return self.scheduler

    def _initialize_default_prompt(self) -> None:
        """
        Initialize the default prompt templates for the augmentation process managed by Langfuse.
        """
        configuration = self.get_configuration()
        langfuse_prompt_service = LangfusePromptServiceFactory.create(
            configuration=configuration.augmentation.langfuse
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_condense_prompt",
            prompt_template=DEFAULT_CONDENSE_PROMPT_TEMPLATE,
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_context_prompt",
            prompt_template=DEFAULT_CONTEXT_PROMPT_TEMPLATE,
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_context_refine_prompt",
            prompt_template=DEFAULT_CONTEXT_REFINE_PROMPT_TEMPLATE,
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_system_prompt",
            prompt_template="",
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_input_guardrail_prompt",
            prompt_template=DEFAULT_INPUT_GUARDRAIL_PROMPT_TEMPLATE,
        )

        langfuse_prompt_service.create_prompt_if_not_exists(
            prompt_name="default_output_guardrail_prompt",
            prompt_template=DEFAULT_OUTPUT_GUARDRAIL_PROMPT_TEMPLATE,
        )

__init__(configuration_class=AugmentationConfiguration, package_loader=AugmentationPackageLoader())

Initialize the AugmentationInitializer.

Parameters:
  • configuration_class (Type[BaseConfiguration], default: AugmentationConfiguration ) –

    The configuration class to use for loading settings. Defaults to AugmentationConfiguration.

  • package_loader (BasePackageLoader, default: AugmentationPackageLoader() ) –

    Package loader instance responsible for loading required packages. Defaults to a new AugmentationPackageLoader instance.

Source code in src/augmentation/bootstrap/initializer.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def __init__(
    self,
    configuration_class: Type[
        BaseConfiguration
    ] = AugmentationConfiguration,
    package_loader: BasePackageLoader = AugmentationPackageLoader(),
):
    """Initialize the AugmentationInitializer.

    Args:
        configuration_class: The configuration class to use for loading settings.
                            Defaults to AugmentationConfiguration.
        package_loader: Package loader instance responsible for loading required packages.
                       Defaults to a new AugmentationPackageLoader instance.
    """
    super().__init__(
        configuration_class=configuration_class,
        package_loader=package_loader,
    )
    self.scheduler = AugmentationScheduler(
        configuration=self.get_configuration(),
        logger=LoggerConfiguration.get_logger(__name__),
    )
    self._initialize_default_prompt()

get_scheduler()

Get the scheduler instance for managing scheduled tasks.

Returns:
Source code in src/augmentation/bootstrap/initializer.py
190
191
192
193
194
195
196
def get_scheduler(self) -> AugmentationScheduler:
    """Get the scheduler instance for managing scheduled tasks.

    Returns:
        AsyncIOScheduler: The scheduler instance used for scheduling jobs.
    """
    return self.scheduler

AugmentationPackageLoader

Bases: EmbeddingPackageLoader

Package loader for augmentation components.

Extends the EmbeddingPackageLoader to load additional packages required for the augmentation process, including LLMs, retrievers, postprocessors, and chat engines.

Source code in src/augmentation/bootstrap/initializer.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class AugmentationPackageLoader(EmbeddingPackageLoader):
    """Package loader for augmentation components.

    Extends the EmbeddingPackageLoader to load additional packages required
    for the augmentation process, including LLMs, retrievers,
    postprocessors, and chat engines.
    """

    def __init__(
        self, logger: logging.Logger = LoggerConfiguration.get_logger(__name__)
    ):
        """Initialize the AugmentationPackageLoader.

        Args:
            logger: Logger instance for logging information. Defaults to a logger
                   configured with the current module name.
        """
        super().__init__(logger)

    def load_packages(self) -> None:
        """Load all required packages for augmentation.

        Calls the parent class's load_packages method first to load embedding packages,
        then loads additional packages specific to augmentation.
        """
        super().load_packages()
        self._load_packages(
            [
                "src.augmentation.components.guardrails",
                "src.augmentation.components.llms",
                "src.augmentation.components.retrievers",
                "src.augmentation.components.postprocessors",
                "src.augmentation.components.chat_engines",
            ]
        )

__init__(logger=LoggerConfiguration.get_logger(__name__))

Initialize the AugmentationPackageLoader.

Parameters:
  • logger (Logger, default: get_logger(__name__) ) –

    Logger instance for logging information. Defaults to a logger configured with the current module name.

Source code in src/augmentation/bootstrap/initializer.py
124
125
126
127
128
129
130
131
132
133
def __init__(
    self, logger: logging.Logger = LoggerConfiguration.get_logger(__name__)
):
    """Initialize the AugmentationPackageLoader.

    Args:
        logger: Logger instance for logging information. Defaults to a logger
               configured with the current module name.
    """
    super().__init__(logger)

load_packages()

Load all required packages for augmentation.

Calls the parent class's load_packages method first to load embedding packages, then loads additional packages specific to augmentation.

Source code in src/augmentation/bootstrap/initializer.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def load_packages(self) -> None:
    """Load all required packages for augmentation.

    Calls the parent class's load_packages method first to load embedding packages,
    then loads additional packages specific to augmentation.
    """
    super().load_packages()
    self._load_packages(
        [
            "src.augmentation.components.guardrails",
            "src.augmentation.components.llms",
            "src.augmentation.components.retrievers",
            "src.augmentation.components.postprocessors",
            "src.augmentation.components.chat_engines",
        ]
    )

AugmentationScheduler

Source code in src/augmentation/bootstrap/initializer.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class AugmentationScheduler:

    def __init__(
        self,
        configuration: AugmentationConfiguration,
        logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
    ):
        """
        Args:
            configuration (AugmentationConfiguration): The configuration object for the augmentation process.
            logger: Logger instance for logging information. Defaults to a logger
                   configured with the current module name.
        """
        self.logger = logger
        self.configuration = configuration

        self.scheduler = AsyncIOScheduler()

    def start(self) -> None:
        """Start the scheduler and schedule the daily queries retention job."""
        langfuse_configuration = self.configuration.augmentation.langfuse
        queries_retention_job = LangfuseRenetionJobFactory.create(
            langfuse_configuration
        )

        try:
            self.scheduler.add_job(
                queries_retention_job.run,
                CronTrigger.from_crontab(
                    langfuse_configuration.retention_job.crontab
                ),
                id=langfuse_configuration.retention_job.name,
                replace_existing=True,
            )
            self.logger.info(
                "Daily queries retention job scheduled successfully"
            )

            self.scheduler.start()
            self.logger.info("Scheduler started successfully")
        except Exception as e:
            self.logger.error(f"Failed to initialize scheduler: {e}")

    def stop(self) -> None:
        """Stop the scheduler if it is running."""
        if self.scheduler.running:
            self.scheduler.shutdown()
            self.logger.info("Scheduler stopped successfully")
        else:
            self.logger.warning("Scheduler was not running")

__init__(configuration, logger=LoggerConfiguration.get_logger(__name__))

Parameters:
  • configuration (AugmentationConfiguration) –

    The configuration object for the augmentation process.

  • logger (Logger, default: get_logger(__name__) ) –

    Logger instance for logging information. Defaults to a logger configured with the current module name.

Source code in src/augmentation/bootstrap/initializer.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    configuration: AugmentationConfiguration,
    logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
    """
    Args:
        configuration (AugmentationConfiguration): The configuration object for the augmentation process.
        logger: Logger instance for logging information. Defaults to a logger
               configured with the current module name.
    """
    self.logger = logger
    self.configuration = configuration

    self.scheduler = AsyncIOScheduler()

start()

Start the scheduler and schedule the daily queries retention job.

Source code in src/augmentation/bootstrap/initializer.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def start(self) -> None:
    """Start the scheduler and schedule the daily queries retention job."""
    langfuse_configuration = self.configuration.augmentation.langfuse
    queries_retention_job = LangfuseRenetionJobFactory.create(
        langfuse_configuration
    )

    try:
        self.scheduler.add_job(
            queries_retention_job.run,
            CronTrigger.from_crontab(
                langfuse_configuration.retention_job.crontab
            ),
            id=langfuse_configuration.retention_job.name,
            replace_existing=True,
        )
        self.logger.info(
            "Daily queries retention job scheduled successfully"
        )

        self.scheduler.start()
        self.logger.info("Scheduler started successfully")
    except Exception as e:
        self.logger.error(f"Failed to initialize scheduler: {e}")

stop()

Stop the scheduler if it is running.

Source code in src/augmentation/bootstrap/initializer.py
107
108
109
110
111
112
113
def stop(self) -> None:
    """Stop the scheduler if it is running."""
    if self.scheduler.running:
        self.scheduler.shutdown()
        self.logger.info("Scheduler stopped successfully")
    else:
        self.logger.warning("Scheduler was not running")