Base_orchestrator

This module contains functionality related to the the base_orchestrator module for embedding.orchestrators.

Base_orchestrator

BaseEmbeddingOrchestrator

Bases: ABC

Abstract base class for embedding orchestration.

This class defines the interface for embedding orchestrators that coordinate data extraction, splitting, and embedding processes.

Source code in src/embedding/orchestrators/base_orchestrator.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class BaseEmbeddingOrchestrator(ABC):
    """
    Abstract base class for embedding orchestration.

    This class defines the interface for embedding orchestrators that coordinate
    data extraction, splitting, and embedding processes.
    """

    def __init__(
        self,
        datasource_orchestrator: BaseDatasourceOrchestrator,
        splitter: BaseSplitter,
        embedder: BaseEmbedder,
    ) -> None:
        """
        Initialize a new embedding orchestrator.

        Args:
            datasource_orchestrator: Orchestrator for extracting data from sources
            splitter: Component responsible for splitting documents into nodes
            embedder: Component that generates embeddings for nodes
        """
        self.datasource_orchestrator = datasource_orchestrator
        self.splitter = splitter
        self.embedder = embedder

    @abstractmethod
    async def embed(self) -> None:
        """
        Execute the embedding process.

        This method must be implemented by concrete subclasses to define
        the specific embedding workflow.
        """
        pass

__init__(datasource_orchestrator, splitter, embedder)

Initialize a new embedding orchestrator.

Parameters:
  • datasource_orchestrator (BaseDatasourceOrchestrator) –

    Orchestrator for extracting data from sources

  • splitter (BaseSplitter) –

    Component responsible for splitting documents into nodes

  • embedder (BaseEmbedder) –

    Component that generates embeddings for nodes

Source code in src/embedding/orchestrators/base_orchestrator.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(
    self,
    datasource_orchestrator: BaseDatasourceOrchestrator,
    splitter: BaseSplitter,
    embedder: BaseEmbedder,
) -> None:
    """
    Initialize a new embedding orchestrator.

    Args:
        datasource_orchestrator: Orchestrator for extracting data from sources
        splitter: Component responsible for splitting documents into nodes
        embedder: Component that generates embeddings for nodes
    """
    self.datasource_orchestrator = datasource_orchestrator
    self.splitter = splitter
    self.embedder = embedder

embed() abstractmethod async

Execute the embedding process.

This method must be implemented by concrete subclasses to define the specific embedding workflow.

Source code in src/embedding/orchestrators/base_orchestrator.py
34
35
36
37
38
39
40
41
42
@abstractmethod
async def embed(self) -> None:
    """
    Execute the embedding process.

    This method must be implemented by concrete subclasses to define
    the specific embedding workflow.
    """
    pass

BasicEmbeddingOrchestrator

Bases: BaseEmbeddingOrchestrator

Basic implementation of embedding orchestration.

This orchestrator implements a simple workflow that: 1. Retrieves documents from the datasource 2. Splits each document into nodes 3. Embeds the nodes 4. Flushes any remaining embeddings

Source code in src/embedding/orchestrators/base_orchestrator.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BasicEmbeddingOrchestrator(BaseEmbeddingOrchestrator):
    """
    Basic implementation of embedding orchestration.

    This orchestrator implements a simple workflow that:
    1. Retrieves documents from the datasource
    2. Splits each document into nodes
    3. Embeds the nodes
    4. Flushes any remaining embeddings
    """

    async def embed(self) -> None:
        """
        Execute the basic embedding process.

        Retrieves documents asynchronously from the datasource orchestrator,
        splits each document into nodes, embeds the nodes, and finally
        flushes any remaining embeddings to ensure all data is processed.
        """
        async for doc in self.datasource_orchestrator.full_refresh_sync():
            nodes = self.splitter.split(doc)
            self.embedder.embed(nodes)
        self.embedder.embed_flush()

embed() async

Execute the basic embedding process.

Retrieves documents asynchronously from the datasource orchestrator, splits each document into nodes, embeds the nodes, and finally flushes any remaining embeddings to ensure all data is processed.

Source code in src/embedding/orchestrators/base_orchestrator.py
56
57
58
59
60
61
62
63
64
65
66
67
async def embed(self) -> None:
    """
    Execute the basic embedding process.

    Retrieves documents asynchronously from the datasource orchestrator,
    splits each document into nodes, embeds the nodes, and finally
    flushes any remaining embeddings to ensure all data is processed.
    """
    async for doc in self.datasource_orchestrator.full_refresh_sync():
        nodes = self.splitter.split(doc)
        self.embedder.embed(nodes)
    self.embedder.embed_flush()