Se proporciona el idioma español mediante traducción automática para su comodidad. En caso de alguna inconsistencia, el inglés precede al español.
Apéndice C: verify_data_netapp.py
Colaboradores
Sugerir cambios
Esta sección contiene un script Python de ejemplo que se puede utilizar para validar la base de datos vectorial en la solución de base de datos vectorial NetApp.
Apéndice C: verify_data_netapp.py
root@node2:~# cat verify_data_netapp.py
import time
import os
import numpy as np
from pymilvus import (
connections,
utility,
FieldSchema, CollectionSchema, DataType,
Collection,
)
fmt = "\n=== {:30} ===\n"
search_latency_fmt = "search latency = {:.4f}s"
num_entities, dim = 3000, 16
rng = np.random.default_rng(seed=19530)
entities = [
# provide the pk field because `auto_id` is set to False
[i for i in range(num_entities)],
rng.random(num_entities).tolist(), # field random, only supports list
rng.random((num_entities, dim)), # field embeddings, supports numpy.ndarray and list
]
################################################################################
# 1. get recovered collection hello_milvus_ntapnew_update2_sc
print(fmt.format("start connecting to Milvus"))
host = os.environ.get('MILVUS_HOST')
if host == None:
host = "localhost"
print(fmt.format(f"Milvus host: {host}"))
#connections.connect("default", host=host, port="19530")
connections.connect("default", host=host, port="27017")
recover_collections = ["hello_milvus_ntapnew_update2_sc", "hello_milvus_ntapnew_update2_sc2"]
for recover_collection_name in recover_collections:
has = utility.has_collection(recover_collection_name)
print(f"Does collection {recover_collection_name} exist in Milvus: {has}")
recover_collection = Collection(recover_collection_name)
print(recover_collection.schema)
recover_collection.flush()
print(f"Number of entities in Milvus: {recover_collection_name} : {recover_collection.num_entities}") # check the num_entites
################################################################################
# 4. create index
# We are going to create an IVF_FLAT index for hello_milvus_ntapnew_update2_sc collection.
# create_index() can only be applied to `FloatVector` and `BinaryVector` fields.
print(fmt.format("Start Creating index IVF_FLAT"))
index = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128},
}
recover_collection.create_index("embeddings", index)
################################################################################
# 5. search, query, and hybrid search
# After data were inserted into Milvus and indexed, you can perform:
# - search based on vector similarity
# - query based on scalar filtering(boolean, int, etc.)
# - hybrid search based on vector similarity and scalar filtering.
#
# Before conducting a search or a query, you need to load the data in `hello_milvus` into memory.
print(fmt.format("Start loading"))
recover_collection.load()
# -----------------------------------------------------------------------------
# search based on vector similarity
print(fmt.format("Start searching based on vector similarity"))
vectors_to_search = entities[-1][-2:]
search_params = {
"metric_type": "L2",
"params": {"nprobe": 10},
}
start_time = time.time()
result = recover_collection.search(vectors_to_search, "embeddings", search_params, limit=3, output_fields=["random"])
end_time = time.time()
for hits in result:
for hit in hits:
print(f"hit: {hit}, random field: {hit.entity.get('random')}")
print(search_latency_fmt.format(end_time - start_time))
# -----------------------------------------------------------------------------
# query based on scalar filtering(boolean, int, etc.)
print(fmt.format("Start querying with `random > 0.5`"))
start_time = time.time()
result = recover_collection.query(expr="random > 0.5", output_fields=["random", "embeddings"])
end_time = time.time()
print(f"query result:\n-{result[0]}")
print(search_latency_fmt.format(end_time - start_time))
# -----------------------------------------------------------------------------
# hybrid search
print(fmt.format("Start hybrid searching with `random > 0.5`"))
start_time = time.time()
result = recover_collection.search(vectors_to_search, "embeddings", search_params, limit=3, expr="random > 0.5", output_fields=["random"])
end_time = time.time()
for hits in result:
for hit in hits:
print(f"hit: {hit}, random field: {hit.entity.get('random')}")
print(search_latency_fmt.format(end_time - start_time))
###############################################################################
# 7. drop collection
# Finally, drop the hello_milvus, hello_milvus_ntapnew_update2_sc collection
#print(fmt.format(f"Drop collection {recover_collection_name}"))
#utility.drop_collection(recover_collection_name)
root@node2:~#