Datasource_orchestrator

This module contains functionality related to the the datasource_orchestrator module for embedding.orchestrators.

Datasource_orchestrator

DatasourceOrchestrator

Bases: BaseDatasourceOrchestrator

Orchestrator for multi-datasource content processing.

Manages extraction, embedding and storage of content from multiple datasources through a unified interface.

Attributes:
  • embedder

    Component for generating embeddings

  • datasources

    Mapping of datasource type to manager

  • documents (List[Document]) –

    Raw documents from datasources

  • cleaned_documents (List[Document]) –

    Processed documents

  • nodes (List[TextNode]) –

    Text nodes for embedding

Source code in src/embedding/orchestrators/datasource_orchestrator.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class DatasourceOrchestrator(BaseDatasourceOrchestrator):
    """Orchestrator for multi-datasource content processing.

    Manages extraction, embedding and storage of content from multiple
    datasources through a unified interface.

    Attributes:
        embedder: Component for generating embeddings
        datasources: Mapping of datasource type to manager
        documents: Raw documents from datasources
        cleaned_documents: Processed documents
        nodes: Text nodes for embedding
    """

    def __init__(
        self,
        datasource_managers: Dict[DatasourceName, BaseDatasourceManager],
        embedder: BaseEmbedder,
    ):
        """Initialize orchestrator with managers and embedder.

        Args:
            datasource_managers: Mapping of datasource types to managers
            embedder: Component for generating embeddings
        """
        self.embedder = embedder
        self.datasources = datasource_managers
        self.documents: List[Document] = []
        self.cleaned_documents: List[Document] = []
        self.nodes: List[TextNode] = []

    async def extract(self):
        """Extract and process content from all datasources.

        Processes each configured datasource to extract documents,
        clean content and generate text nodes.
        """
        for datasource_manager in self.datasources.values():
            await self._extract(datasource_manager)

    def embed(self):
        """Generate embeddings for extracted content.

        Creates vector embeddings for all text nodes using
        configured embedding model.
        """
        self.embedder.embed(self.nodes)

    def save_to_vector_storage(self):
        """Persist embedded content to vector store.

        Saves all embedded text nodes to configured vector
        storage backend.
        """
        self.embedder.save(self.nodes)

    def update_vector_storage(self):
        """Update existing vector store content.

        Raises:
            NotImplementedError: Method must be implemented by subclass
        """
        raise NotImplementedError

    async def _extract(self, datasource_manager: BaseDatasourceManager) -> None:
        """Extract and store content from single datasource.

        Args:
            datasource_manager: Manager for specific datasource type

        Note:
            Updates documents, cleaned_documents and nodes lists in-place
        """
        documents, cleaned_documents, nodes = await datasource_manager.extract()
        self.documents.extend(documents)
        self.cleaned_documents.extend(cleaned_documents)
        self.nodes.extend(nodes)

__init__(datasource_managers, embedder)

Initialize orchestrator with managers and embedder.

Parameters:
  • datasource_managers (Dict[DatasourceName, BaseDatasourceManager]) –

    Mapping of datasource types to managers

  • embedder (BaseEmbedder) –

    Component for generating embeddings

Source code in src/embedding/orchestrators/datasource_orchestrator.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    datasource_managers: Dict[DatasourceName, BaseDatasourceManager],
    embedder: BaseEmbedder,
):
    """Initialize orchestrator with managers and embedder.

    Args:
        datasource_managers: Mapping of datasource types to managers
        embedder: Component for generating embeddings
    """
    self.embedder = embedder
    self.datasources = datasource_managers
    self.documents: List[Document] = []
    self.cleaned_documents: List[Document] = []
    self.nodes: List[TextNode] = []

_extract(datasource_manager) async

Extract and store content from single datasource.

Parameters:
  • datasource_manager (BaseDatasourceManager) –

    Manager for specific datasource type

Note

Updates documents, cleaned_documents and nodes lists in-place

Source code in src/embedding/orchestrators/datasource_orchestrator.py
77
78
79
80
81
82
83
84
85
86
87
88
89
async def _extract(self, datasource_manager: BaseDatasourceManager) -> None:
    """Extract and store content from single datasource.

    Args:
        datasource_manager: Manager for specific datasource type

    Note:
        Updates documents, cleaned_documents and nodes lists in-place
    """
    documents, cleaned_documents, nodes = await datasource_manager.extract()
    self.documents.extend(documents)
    self.cleaned_documents.extend(cleaned_documents)
    self.nodes.extend(nodes)

embed()

Generate embeddings for extracted content.

Creates vector embeddings for all text nodes using configured embedding model.

Source code in src/embedding/orchestrators/datasource_orchestrator.py
53
54
55
56
57
58
59
def embed(self):
    """Generate embeddings for extracted content.

    Creates vector embeddings for all text nodes using
    configured embedding model.
    """
    self.embedder.embed(self.nodes)

extract() async

Extract and process content from all datasources.

Processes each configured datasource to extract documents, clean content and generate text nodes.

Source code in src/embedding/orchestrators/datasource_orchestrator.py
44
45
46
47
48
49
50
51
async def extract(self):
    """Extract and process content from all datasources.

    Processes each configured datasource to extract documents,
    clean content and generate text nodes.
    """
    for datasource_manager in self.datasources.values():
        await self._extract(datasource_manager)

save_to_vector_storage()

Persist embedded content to vector store.

Saves all embedded text nodes to configured vector storage backend.

Source code in src/embedding/orchestrators/datasource_orchestrator.py
61
62
63
64
65
66
67
def save_to_vector_storage(self):
    """Persist embedded content to vector store.

    Saves all embedded text nodes to configured vector
    storage backend.
    """
    self.embedder.save(self.nodes)

update_vector_storage()

Update existing vector store content.

Raises:
  • NotImplementedError

    Method must be implemented by subclass

Source code in src/embedding/orchestrators/datasource_orchestrator.py
69
70
71
72
73
74
75
def update_vector_storage(self):
    """Update existing vector store content.

    Raises:
        NotImplementedError: Method must be implemented by subclass
    """
    raise NotImplementedError