Orchestrator

This module contains functionality related to the the orchestrator module for embedding.orchestrators.basic.

Orchestrator

BasicEmbeddingOrchestrator

Bases: BaseEmbeddingOrchestrator

A basic orchestrator for embedding pipeline processing.

This orchestrator implements a straightforward process that: 1. Fetches documents from a datasource 2. Splits documents into nodes 3. Embeds those nodes

Source code in src/embedding/orchestrators/basic/orchestrator.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BasicEmbeddingOrchestrator(BaseEmbeddingOrchestrator):
    """
    A basic orchestrator for embedding pipeline processing.

    This orchestrator implements a straightforward process that:
    1. Fetches documents from a datasource
    2. Splits documents into nodes
    3. Embeds those nodes
    """

    async def embed(self) -> None:
        """
        Execute the embedding process.

        Asynchronously retrieves documents from the datasource,
        splits them into nodes using the configured splitter,
        and embeds those nodes with the configured embedder.
        Finally flushes any remaining embeddings.
        """
        async for doc in self.datasource_orchestrator.full_refresh_sync():
            nodes = self.splitter.split(doc)
            self.embedder.embed(nodes)
        self.embedder.embed_flush()

embed() async

Execute the embedding process.

Asynchronously retrieves documents from the datasource, splits them into nodes using the configured splitter, and embeds those nodes with the configured embedder. Finally flushes any remaining embeddings.

Source code in src/embedding/orchestrators/basic/orchestrator.py
23
24
25
26
27
28
29
30
31
32
33
34
35
async def embed(self) -> None:
    """
    Execute the embedding process.

    Asynchronously retrieves documents from the datasource,
    splits them into nodes using the configured splitter,
    and embeds those nodes with the configured embedder.
    Finally flushes any remaining embeddings.
    """
    async for doc in self.datasource_orchestrator.full_refresh_sync():
        nodes = self.splitter.split(doc)
        self.embedder.embed(nodes)
    self.embedder.embed_flush()

BasicEmbeddingOrchestratorFactory

Bases: Factory

Factory for creating BasicEmbeddingOrchestrator instances.

Creates and configures orchestrators with appropriate datasources, splitters, and embedders based on the provided configuration.

Source code in src/embedding/orchestrators/basic/orchestrator.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class BasicEmbeddingOrchestratorFactory(Factory):
    """
    Factory for creating BasicEmbeddingOrchestrator instances.

    Creates and configures orchestrators with appropriate datasources,
    splitters, and embedders based on the provided configuration.
    """

    _configuration_class: Type = EmbeddingConfiguration

    @classmethod
    def _create_instance(
        cls, configuration: EmbeddingConfiguration
    ) -> BasicEmbeddingOrchestrator:
        """
        Create a configured BasicEmbeddingOrchestrator instance.

        Args:
            configuration: Complete embedding configuration containing
                           datasource, splitter, and embedder specifications

        Returns:
            A configured BasicEmbeddingOrchestrator ready for use

        Raises:
            ValueError: If splitter configuration is missing
        """
        datasource_orchestrator = DatasourceOrchestratorRegistry.get(
            configuration.extraction.orchestrator_name
        ).create(configuration)

        embedding_model_configuration = configuration.embedding.embedding_model
        splitter_configuration = embedding_model_configuration.splitter
        if not splitter_configuration:
            raise ValueError(
                "Splitter configuration is required for embedding process."
            )
        splitter = SplitterRegistry.get(splitter_configuration.name).create(
            embedding_model_configuration
        )
        embedder = EmbedderRegistry.get(
            configuration.embedding.embedder_name
        ).create(configuration)
        return BasicEmbeddingOrchestrator(
            datasource_orchestrator=datasource_orchestrator,
            splitter=splitter,
            embedder=embedder,
        )