
MongoDB Atlas矢量搜索可以根据向量表示实现有效的相似性搜索。在使用非结构化数据(例如文本,图像或音频)时,这是特别有益的,这些数据,传统的基于关键字的搜索可能会缺乏。
关键优势:
与OpenFGA集成:
当与OpenFGA结合使用时,MongoDB Atlas Vector Search为安全文档访问提供了强大的解决方案。您可以使用矢量搜索根据其内容检索相关文档,然后应用OpenFGA的访问控制规则,以确保只有授权的用户才能查看结果。
例子:
想象一个文档管理系统,用户可以根据其内容搜索文档。通过使用MongoDB Atlas矢量搜索,您可以有效检索与用户查询在语义上相似的文档。然后,OpenFGA可用于执行访问控制,以确保用户仅查看他们被授权查看的文档。
OpenFGA是一个开源的授权平台,旨在为云本地应用提供细粒度的访问控制。它提供了一种灵活且可扩展的解决方案,用于管理用户权限并访问资源。
OpenFGA在元组的概念上运作。元组代表由用户,关系(例如“读”,“写”)和对象(例如,文件,数据库)组成的权限。定义策略以指定允许或拒绝哪些元组。当用户试图访问资源时,OpenFGA会根据定义的策略评估相应的元组,以确定访问是否授予或拒绝。
拉开OpenFGA Docker映像:
docker pull openfga/openfga运行带有裸露端口的OpenFGA容器:
docker run -p 8080:8080 -p 3000:3000 openfga/openfga run拉动MongoDB Atlas本地Docker图像:
docker pull mongodb/mongodb-atlas-local运行MongoDB Atlas本地容器:
docker run -p 27017:27017 mongodb/mongodb-atlas-local本节指导您创建矢量搜索索引,以在OpenFGA数据中进行有效的相似性搜索。
使用mongosh连接到本地地图集:
mongosh " mongodb://localhost/demo?directConnection=true "切换到demo数据库(如果需要的话,用实际的数据库名称替换):
use demo
在“嵌入”字段上创建一个名为“ vector_index”的向量搜索索引:
db . mdb_fga . createSearchIndex (
"vector_index" ,
"vectorSearch" , // index type
{
fields : [
{
"type" : "vector" ,
"numDimensions" : 1536 ,
"path" : "embeddings" ,
"similarity" : "cosine"
} ,
]
}
) ;安装所需的Python库
pip install asyncio requests pymongo unstructured openai此命令为运行Python代码安装了所有必要的库( asyncio , requests ,pymongo, pymongo , unstructured和openai )。
非结构化:从文档中提取含义
非结构化使您能够将文本文档分解为较小,更可管理的单元。想象一下研究论文:非结构化可以将其分为部分,段落甚至句子,从而更容易处理和分析。图书馆还可以帮助提取诸如姓名,日期和位置之类的实体,以帮助信息检索。
OpenFGA:确保访问提取的数据
您定义基于特定条件来管理用户权限的策略。当用户尝试访问文档或其提取的数据时,OpenFGA会根据这些策略评估相应的用户和文档,从而授予或拒绝访问权限。
MongoDB Atlas向量搜索:有效查找类似的文档
想象一下,您正在搜索文档集合中的特定概念。基于关键字的搜索可能会错过不包含确切关键字的相关文档。但是,向量搜索分析了文档内容的向量表示,即使它们使用不同的措辞,您也可以找到语义上相似的文档。
结合这些工具的力量
通过集成这三个工具,您可以创建一个健壮且安全的文档管理系统。这是工作流程:
add_tuple添加权限您在代码段中看到的add_tuple函数在管理MDB-OPENFGA应用程序中的访问控制方面起着至关重要的作用。它与OpenFGA进行交互,授予用户权限查看特定资源。
这是add_tuple工作方式的细分:
参数:
USER :这代表您要授予的用户。代码将其格式化为"user:"+USER在OpenFGA中的一致性。RESOURCE :这表示用户被授予访问权限的资源。在示例中,它的格式为"doc:"+RESOURCE (假设文档)。API呼叫:
/stores/{store_id}/write )。此终点用于将数据(元组)撰写到OpenFGA商店。writes :此键保持一个对象,指定要添加的元素。tuple_keys :这是一个包含定义权限的元组对象的数组。每个元组对象都有三个属性:user :前面格式的用户ID。relation :这定义了授予许可的类型。在这种情况下,它设置为"viewer"以指示阅读访问。object :前面格式的资源ID。authorization_model_id :这指定了OpenFGA中使用的授权模型的ID。该模型定义了控制元素的评估方式的访问控制规则。回复:
本质上, add_tuple在OpenFGA中创建了一个新的元组,指出特定用户( USER )有权查看特定资源( RESOURCE )。然后,OpenFGA的访问控制机制将使用此元组来确定用户是否有权在以后的请求中访问资源。
check_authorization check_authorization功能在MDB-OpenFGA应用程序中起着至关重要的作用。它负责确定给定用户是否有权根据OpenFGA中定义的访问控制策略访问特定资源。
它的工作原理:
参数:
tuple_key :这是代表元组的JSON对象。如您所知,元组定义了许可。它通常包含三个属性:user :用户ID。relation :许可类型(例如“查看者”,“编辑器”)。object :资源ID。API呼叫:
/stores/{store_id}/check 。该终点用于评估针对定义的授权模型的元组。authorization_model_id :使用的授权模型的ID。tuple_key :您要检查的元组对象。回复:
bool值:true :用户有许可。false :用户没有许可。本质上, check_authorization将元组作为输入和查询OpenFGA确定元组中指定的用户是否允许在指定资源(对象)上执行操作(关系)。
运行演示
python3 demo.py
Starting FGA setup...
FGA setup response: {'code': 'write_failed_due_to_invalid_input', 'message': "cannot write a tuple which already exists: user: 'user:demo_user', relation: 'viewer', object: 'doc:demo.pdf': invalid write input"}
Clearing the db first...
Database cleared.
Starting PDF document partitioning...
PDF partitioning and database insertion completed successfully.
Waiting for index to be updated. This may take a few seconds...
Starting search tool...
Access Granted: User 'demo_user' has permission to read document 'demo.pdf'.
Access Denied: User 'demo_user-denyme' does not have permission to read document 'demo.pdf'.
import asyncio
import requests
import json
import pymongo
from unstructured . partition . auto import partition
from openai import AzureOpenAI
class FGA_MDB_DEMO :
def __init__ ( self , azure_endpoint , api_version , api_key , mongo_uri , fga_api_url , fga_store_id , fga_api_token , authorization_model_id , db_name , collection_name ):
self . az_client = AzureOpenAI ( azure_endpoint = azure_endpoint , api_version = api_version , api_key = api_key )
self . mongo_client = pymongo . MongoClient ( mongo_uri )
self . fga_api_url = fga_api_url
self . fga_store_id = fga_store_id
self . fga_api_token = fga_api_token
self . authorization_model_id = authorization_model_id
self . db_name = db_name
self . collection_name = collection_name
def generate_embeddings ( self , text , model = "" ):
return self . az_client . embeddings . create ( input = [ text ], model = model ). data [ 0 ]. embedding
def check_authorization ( self , tuple_key ):
url = f" { self . fga_api_url } /stores/ { self . fga_store_id } /check"
headers = {
"Authorization" : f"Bearer { self . fga_api_token } " ,
"content-type" : "application/json" ,
}
data = {
"authorization_model_id" : self . authorization_model_id ,
"tuple_key" : tuple_key
}
response = requests . post ( url , headers = headers , data = json . dumps ( data ))
return response . json ()
def add_tuple ( self , USER , RESOURCE ):
url = f" { self . fga_api_url } /stores/ { self . fga_store_id } /write"
headers = {
"Authorization" : f"Bearer { self . fga_api_token } " ,
"content-type" : "application/json" ,
}
data = {
"writes" : {
"tuple_keys" : [
{
"user" : "user:" + USER ,
"relation" : "viewer" ,
"object" : "doc:" + RESOURCE
}
]
},
"authorization_model_id" : self . authorization_model_id
}
response = requests . post ( url , headers = headers , data = json . dumps ( data ))
return response . json ()
def search_tool ( self , text , USER_ID ):
response = self . mongo_client [ self . db_name ][ self . collection_name ]. aggregate ([
{
"$vectorSearch" : {
"index" : "vector_index" ,
"queryVector" : self . az_client . embeddings . create ( model = "text-embedding-ada-002" , input = text ). data [ 0 ]. embedding ,
"path" : "embeddings" ,
"limit" : 5 ,
"numCandidates" : 30
}
}, { "$project" :{ "_id" : 0 , "embeddings" : 0 , "metadata" : 0 }}
])
for doc in response :
tuple_key = { "user" : "user:" + USER_ID , "relation" : "viewer" , "object" : "doc:" + doc [ "source" ]}
response = self . check_authorization ( tuple_key )
if response [ 'allowed' ]:
print ( f"Access Granted: User ' { USER_ID } ' has permission to read document ' { doc [ 'source' ] } '." )
else :
print ( f"Access Denied: User ' { USER_ID } ' does not have permission to read document ' { doc [ 'source' ] } '." )
def partition_pdf ( self , resource ):
mdb_db = self . mongo_client [ self . db_name ]
mdb_collection = mdb_db [ self . collection_name ]
print ( "Clearing the db first..." )
mdb_collection . delete_many ({})
print ( "Database cleared." )
print ( "Starting PDF document partitioning..." )
elements = partition ( resource )
for element in elements :
mdb_collection . insert_one ({
"text" : str ( element . text ),
"embeddings" : self . generate_embeddings ( str ( element . text ), "text-embedding-ada-002" ),
"metadata" : {
"raw_element" : element . to_dict (),
},
"source" : resource
})
print ( "PDF partitioning and database insertion completed successfully." )
def fga_setup ( self , user , resource ):
response = self . add_tuple ( user , resource )
print ( f"FGA setup response: { response } " )
async def main ( self , user , resource ):
print ( "Starting FGA setup..." )
self . fga_setup ( user , resource )
self . partition_pdf ( resource )
print ( "Waiting for index to be updated. This may take a few seconds..." )
await asyncio . sleep ( 15 )
print ( "Starting search tool..." )
self . search_tool ( "test" , user )
self . search_tool ( "test" , user + "-denyme" )
print ( "Process completed successfully." )
if __name__ == "__main__" :
fga_mdb_demo = FGA_MDB_DEMO (
azure_endpoint = "" ,
api_version = "2024-04-01-preview" ,
api_key = "" ,
mongo_uri = "mongodb://localhost:27017/demo?directConnection=true" ,
fga_api_url = 'http://localhost:8080' ,
fga_store_id = '01J8VP1HYCHN459VT76DQG0W2R' ,
fga_api_token = '' ,
authorization_model_id = '01J8VP3BMPZNFJ480G5ZNF3H0C' ,
db_name = "demo" ,
collection_name = "mdb_fga"
)
asyncio . run ( fga_mdb_demo . main ( "demo_user" , "demo.pdf" ))