Langfuse_binder

This module contains functionality related to the the langfuse_binder module for common.bootstrap.configuration.pipeline.augmentation.langfuse.

Langfuse_binder

LangfuseBinder

Bases: BaseBinder

Binder for the Langfuse service and callback handler.

Source code in src/common/bootstrap/configuration/pipeline/augmentation/langfuse/langfuse_binder.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class LangfuseBinder(BaseBinder):
    """Binder for the Langfuse service and callback handler."""

    def bind(self) -> None:
        """Bind components to the injector based on the configuration."""
        self._bind_configuration()
        self._bind_client()
        self._bind_callback()
        self._bind_dataset_service()
        self._bind_datasets()

    def _bind_configuration(self) -> None:
        """Bind the Langfuse configuration."""
        self.binder.bind(
            LangfuseConfiguration,
            to=lambda: self.configuration.pipeline.augmentation.langfuse,
            scope=singleton,
        )

    def _bind_client(self) -> None:
        """Bind the Langfuse client."""
        self.binder.bind(
            Langfuse,
            to=LangfuseClientBuilder.build,
            scope=singleton,
        )

    def _bind_callback(self) -> None:
        """Bind the Langfuse callback handler."""
        self.binder.bind(
            LlamaIndexCallbackHandler,
            to=LangfuseCallbackHandlerBuilder.build,
        )

    def _bind_dataset_service(self) -> None:
        """Bind the Langfuse dataset service."""
        self.binder.bind(
            LangfuseDatasetService,
            to=LangfuseDatasetServiceBuilder.build,
        )

    def _bind_datasets(self) -> None:
        """Bind the Langfuse datasets."""
        self.binder.bind(
            BoundFeedbackDatasetConfiguration,
            to=lambda: self.configuration.pipeline.augmentation.langfuse.datasets.feedback_dataset,
            scope=singleton,
        )
        self.binder.bind(
            BoundManualDatasetConfiguration,
            to=lambda: self.configuration.pipeline.augmentation.langfuse.datasets.manual_dataset,
            scope=singleton,
        )

bind()

Bind components to the injector based on the configuration.

Source code in src/common/bootstrap/configuration/pipeline/augmentation/langfuse/langfuse_binder.py
24
25
26
27
28
29
30
def bind(self) -> None:
    """Bind components to the injector based on the configuration."""
    self._bind_configuration()
    self._bind_client()
    self._bind_callback()
    self._bind_dataset_service()
    self._bind_datasets()