首页 >后端开发 >Python教程 >将数据加载到 Neo4j 中

将数据加载到 Neo4j 中

王林
王林原创
2024-08-19 16:40:031165浏览

在上一篇博客中,我们了解了如何使用 2 个插件 APOC 和图形数据科学库 - GDS 在本地安装和设置 neo4j。在这篇博客中,我将获取一个玩具数据集(电子商务网站中的产品)并将其存储在 Neo4j 中。

 

为 Neo4j 分配足够的内存

在开始加载数据之前,如果您的用例中有大量数据,请确保为 Neo4j 分配了足够的内存。为此:

  • 点击打开右侧的三个点

Load Data Into Neo4j

  • 点击打开文件夹 -> 配置

Load Data Into Neo4j

  • 点击neo4j.conf

Load Data Into Neo4j

  • 在 neo4j.conf 中搜索 heap,取消注释第 77、78 行,并将 256m 更改为 2048m,这样可以确保 Neo4j 中分配 2048mb 用于数据存储.

Load Data Into Neo4j

 
 

创建节点

  • 图有两个主要组成部分:节点和关系,我们先创建节点,然后再建立关系。

  • 我正在使用的数据在这里 - data

  • 使用此处提供的requirements.txt来创建一个python虚拟环境-requirements.txt

  • 让我们定义各种函数来推送数据。

  • 导入必要的库

import pandas as pd
from neo4j import GraphDatabase
from openai import OpenAI
  • 我们将使用 openai 生成嵌入
client = OpenAI(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')
  • 生成嵌入
def get_embedding(text):
    """
    Used to generate embeddings using OpenAI embeddings model
    :param text: str - text that needs to be converted to embeddings
    :return: embedding
    """
    model = "text-embedding-3-small"
    text = text.replace("\n", " ")
    return client.embeddings.create(input=[text], model=model).data[0].embedding
  • 根据我们的数据集,我们可以有两个唯一的节点标签,类别:产品类别,产品:产品名称。让我们创建类别标签,neo4j 提供了一种称为属性的东西,您可以将它们想象为特定节点的元数据。这里 nameembedding 是属性。因此,我们将类别名称及其相应的嵌入存储在数据库中。
def create_category(product_data_df):
    """
    Used to generate queries for creating category nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for category
    """
    cat_query = """CREATE (a:Category {name: '%s', embedding: %s})"""
    distinct_category = product_data_df['Category'].unique()
    query_list = []
    for category in distinct_category:
        embedding = get_embedding(category)
        query_list.append(cat_query % (category, embedding))
    return query_list
  • 类似地,我们可以创建产品节点,这里的属性将是 name, description, price, warranty_period, 可用库存评论评级产品发布日期嵌入
def create_product(product_data_df):
    """
    Used to generate queries for creating product nodes in neo4j
    :param product_data_df: pandas dataframe - data 
    :return: query_list: list - list containing all create node queries for product 
    """
    product_query = """CREATE (a:Product {name: '%s', description: '%s', price: %d, warranty_period: %d, 
    available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        embedding = get_embedding(row['Product Name'] + " - " + row['Description'])
        query_list.append(product_query % (row['Product Name'], row['Description'], int(row['Price (INR)']),
                                           int(row['Warranty Period (Years)']), int(row['Stock']),
                                           float(row['Review Rating']), str(row['Product Release Date']), embedding))
    return query_list
  • 现在让我们创建另一个函数来执行上述两个函数生成的查询。适当更新您的用户名和密码。
def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")
  • 完整代码
import pandas as pd
from neo4j import GraphDatabase
from openai import OpenAI

client = OpenAI(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')


def preprocessing(df, columns_to_replace):
    """
    Used to preprocess certain column in dataframe
    :param df: pandas dataframe - data
    :param columns_to_replace: list - column name list
    :return: df: pandas dataframe - processed data
    """
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
    return df


def get_embedding(text):
    """
    Used to generate embeddings using OpenAI embeddings model
    :param text: str - text that needs to be converted to embeddings
    :return: embedding
    """
    model = "text-embedding-3-small"
    text = text.replace("\n", " ")
    return client.embeddings.create(input=[text], model=model).data[0].embedding


def create_category(product_data_df):
    """
    Used to generate queries for creating category nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for category
    """
    cat_query = """CREATE (a:Category {name: '%s', embedding: %s})"""
    distinct_category = product_data_df['Category'].unique()
    query_list = []
    for category in distinct_category:
        embedding = get_embedding(category)
        query_list.append(cat_query % (category, embedding))
    return query_list


def create_product(product_data_df):
    """
    Used to generate queries for creating product nodes in neo4j
    :param product_data_df: pandas dataframe - data
    :return: query_list: list - list containing all create node queries for product
    """
    product_query = """CREATE (a:Product {name: '%s', description: '%s', price: %d, warranty_period: %d, 
    available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        embedding = get_embedding(row['Product Name'] + " - " + row['Description'])
        query_list.append(product_query % (row['Product Name'], row['Description'], int(row['Price (INR)']),
                                           int(row['Warranty Period (Years)']), int(row['Stock']),
                                           float(row['Review Rating']), str(row['Product Release Date']), embedding))
    return query_list


def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")

# PREPROCESSING
product_data_df = preprocessing(product_data_df, ['Product Name', 'Description'])

# CREATE CATEGORY
query_list = create_category(product_data_df)
execute_bulk_query(query_list)

# CREATE PRODUCT
query_list = create_product(product_data_df)
execute_bulk_query(query_list)

 
 

建立关系

  • 我们将在 类别产品 之间创建关系,关系的名称为 CATEGORY_CONTAINS_PRODUCT
from neo4j import GraphDatabase
import pandas as pd

product_data_df = pd.read_csv('../data/product_data.csv')


def preprocessing(df, columns_to_replace):
    """
    Used to preprocess certain column in dataframe
    :param df: pandas dataframe - data
    :param columns_to_replace: list - column name list
    :return: df: pandas dataframe - processed data
    """
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
    df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
    return df


def create_category_food_relationship_query(product_data_df):
    """
    Used to create relationship between category and products
    :param product_data_df: dataframe - data
    :return: query_list: list - cypher queries
    """
    query = """MATCH (c:Category {name: '%s'}), (p:Product {name: '%s'}) CREATE (c)-[:CATEGORY_CONTAINS_PRODUCT]->(p)"""
    query_list = []
    for idx, row in product_data_df.iterrows():
        query_list.append(query % (row['Category'], row['Product Name']))
    return query_list


def execute_bulk_query(query_list):
    """
    Executes queries is a list one by one
    :param query_list: list - list of cypher queries
    :return: None
    """
    url = "bolt://localhost:7687"
    auth = ("neo4j", "neo4j@123")

    with GraphDatabase.driver(url, auth=auth) as driver:
        with driver.session() as session:
            for query in query_list:
                try:
                    session.run(query)
                except Exception as error:
                    print(f"Error in executing query - {query}, Error - {error}")


# PREPROCESSING
product_data_df = preprocessing(product_data_df, ['Product Name', 'Description'])

# CATEGORY - FOOD RELATIONSHIP
query_list = create_category_food_relationship_query(product_data_df)
execute_bulk_query(query_list)

  • 通过使用 MATCH 查询来匹配已创建的节点,我们在它们之间建立关系。

 
 

可视化创建的节点

将鼠标悬停在 打开 图标上,然后单击 neo4j 浏览器 以可视化我们创建的节点。
Load Data Into Neo4j

Load Data Into Neo4j

Load Data Into Neo4j

我们的数据连同它们的嵌入一起加载到 Neo4j 中。

 
在接下来的博客中,我们将看到如何使用 python 构建图形查询引擎并使用获取的数据进行增强生成。

希望这有帮助...再见!!!

领英 - https://www.linkedin.com/in/praveenr2998/
Github - https://github.com/praveenr2998/Creating-Lightweight-RAG-Systems-With-Graphs/tree/main/push_data_to_db

以上是将数据加载到 Neo4j 中的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn