Bases: BaseDatasourceOrchestrator
Orchestrator for multi-datasource content processing.
Manages extraction, embedding and storage of content from multiple
datasources through a unified interface.
Attributes: |
-
embedder
–
Component for generating embeddings
-
datasources
–
Mapping of datasource type to manager
-
documents
(List[Document] )
–
Raw documents from datasources
-
cleaned_documents
(List[Document] )
–
-
nodes
(List[TextNode] )
–
|
Source code in src/embedding/orchestrators/datasource_orchestrator.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | class DatasourceOrchestrator(BaseDatasourceOrchestrator):
"""Orchestrator for multi-datasource content processing.
Manages extraction, embedding and storage of content from multiple
datasources through a unified interface.
Attributes:
embedder: Component for generating embeddings
datasources: Mapping of datasource type to manager
documents: Raw documents from datasources
cleaned_documents: Processed documents
nodes: Text nodes for embedding
"""
def __init__(
self,
datasource_managers: Dict[DatasourceName, BaseDatasourceManager],
embedder: BaseEmbedder,
):
"""Initialize orchestrator with managers and embedder.
Args:
datasource_managers: Mapping of datasource types to managers
embedder: Component for generating embeddings
"""
self.embedder = embedder
self.datasources = datasource_managers
self.documents: List[Document] = []
self.cleaned_documents: List[Document] = []
self.nodes: List[TextNode] = []
async def extract(self):
"""Extract and process content from all datasources.
Processes each configured datasource to extract documents,
clean content and generate text nodes.
"""
for datasource_manager in self.datasources.values():
await self._extract(datasource_manager)
def embed(self):
"""Generate embeddings for extracted content.
Creates vector embeddings for all text nodes using
configured embedding model.
"""
self.embedder.embed(self.nodes)
def save_to_vector_storage(self):
"""Persist embedded content to vector store.
Saves all embedded text nodes to configured vector
storage backend.
"""
self.embedder.save(self.nodes)
def update_vector_storage(self):
"""Update existing vector store content.
Raises:
NotImplementedError: Method must be implemented by subclass
"""
raise NotImplementedError
async def _extract(self, datasource_manager: BaseDatasourceManager) -> None:
"""Extract and store content from single datasource.
Args:
datasource_manager: Manager for specific datasource type
Note:
Updates documents, cleaned_documents and nodes lists in-place
"""
documents, cleaned_documents, nodes = await datasource_manager.extract()
self.documents.extend(documents)
self.cleaned_documents.extend(cleaned_documents)
self.nodes.extend(nodes)
|
__init__(datasource_managers, embedder)
Initialize orchestrator with managers and embedder.
Parameters: |
-
datasource_managers
(Dict[DatasourceName, BaseDatasourceManager] )
–
Mapping of datasource types to managers
-
embedder
(BaseEmbedder )
–
Component for generating embeddings
|
Source code in src/embedding/orchestrators/datasource_orchestrator.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 | def __init__(
self,
datasource_managers: Dict[DatasourceName, BaseDatasourceManager],
embedder: BaseEmbedder,
):
"""Initialize orchestrator with managers and embedder.
Args:
datasource_managers: Mapping of datasource types to managers
embedder: Component for generating embeddings
"""
self.embedder = embedder
self.datasources = datasource_managers
self.documents: List[Document] = []
self.cleaned_documents: List[Document] = []
self.nodes: List[TextNode] = []
|
Extract and store content from single datasource.
Parameters: |
-
datasource_manager
(BaseDatasourceManager )
–
Manager for specific datasource type
|
Note
Updates documents, cleaned_documents and nodes lists in-place
Source code in src/embedding/orchestrators/datasource_orchestrator.py
77
78
79
80
81
82
83
84
85
86
87
88
89 | async def _extract(self, datasource_manager: BaseDatasourceManager) -> None:
"""Extract and store content from single datasource.
Args:
datasource_manager: Manager for specific datasource type
Note:
Updates documents, cleaned_documents and nodes lists in-place
"""
documents, cleaned_documents, nodes = await datasource_manager.extract()
self.documents.extend(documents)
self.cleaned_documents.extend(cleaned_documents)
self.nodes.extend(nodes)
|
embed()
Generate embeddings for extracted content.
Creates vector embeddings for all text nodes using
configured embedding model.
Source code in src/embedding/orchestrators/datasource_orchestrator.py
| def embed(self):
"""Generate embeddings for extracted content.
Creates vector embeddings for all text nodes using
configured embedding model.
"""
self.embedder.embed(self.nodes)
|
Extract and process content from all datasources.
Processes each configured datasource to extract documents,
clean content and generate text nodes.
Source code in src/embedding/orchestrators/datasource_orchestrator.py
| async def extract(self):
"""Extract and process content from all datasources.
Processes each configured datasource to extract documents,
clean content and generate text nodes.
"""
for datasource_manager in self.datasources.values():
await self._extract(datasource_manager)
|
save_to_vector_storage()
Persist embedded content to vector store.
Saves all embedded text nodes to configured vector
storage backend.
Source code in src/embedding/orchestrators/datasource_orchestrator.py
| def save_to_vector_storage(self):
"""Persist embedded content to vector store.
Saves all embedded text nodes to configured vector
storage backend.
"""
self.embedder.save(self.nodes)
|
update_vector_storage()
Update existing vector store content.
Raises: |
-
NotImplementedError
–
Method must be implemented by subclass
|
Source code in src/embedding/orchestrators/datasource_orchestrator.py
| def update_vector_storage(self):
"""Update existing vector store content.
Raises:
NotImplementedError: Method must be implemented by subclass
"""
raise NotImplementedError
|