This script is the entry point for the embedding process.
It initializes the embedding orchestrator and starts the embedding workflow.
To run the script, execute the following command from the root directory of the project:
python src/embed.py
Optional flags
--clear-collection: Clear/delete the vector store collection before embedding
--on-prem-config: Use on-premise configuration files
--env: Specify the environment (local, test, dev, prod)
run(clear_collection=False, logger=LoggerConfiguration.get_logger(__name__))
async
Execute the embedding process.
| Parameters: |
-
clear_collection
(bool, default:
False
)
–
If True, clear the collection before embedding
-
logger
(Logger, default:
get_logger(__name__)
)
–
Logger instance for logging messages
|
Source code in src/embed.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | async def run(
clear_collection: bool = False,
logger: logging.Logger = LoggerConfiguration.get_logger(__name__),
):
"""
Execute the embedding process.
Args:
clear_collection: If True, clear the collection before embedding
logger: Logger instance for logging messages
"""
initializer = EmbeddingInitializer()
configuration = initializer.get_configuration()
vector_store_config = configuration.embedding.vector_store
# Clear collection if requested
if clear_collection:
logger.info(
f"Clearing collection '{vector_store_config.collection_name}'..."
)
vector_store = VectorStoreRegistry.get(vector_store_config.name).create(
vector_store_config
)
vector_store.clear()
logger.info(
f"Collection '{vector_store_config.collection_name}' cleared successfully."
)
else:
# Only validate if we didn't just clear (to avoid false positives)
validator = VectorStoreValidatorRegistry.get(
vector_store_config.name
).create(vector_store_config)
try:
validator.validate()
except CollectionExistsException as e:
logger.info(
f"Collection '{e.collection_name}' already exists. "
"Skipping embedding process. Use --clear-collection to override."
)
return
logger.info("Starting embedding process.")
orchestrator = EmbeddingOrchestratorRegistry.get(
configuration.embedding.orchestrator_name
).create(configuration)
await orchestrator.embed()
logger.info("Embedding process finished.")
|