首頁>技術>

本文將展示如何使用Python生成的資料來填充資料庫。我還將向你展示如何使用Neo4j沙箱,這樣就可以使用不同的Neo4j資料庫設定。

可以在這裡找到一個谷歌Colab筆記本:https://colab.research.google.com/drive/1J9__HotNoINHpucoipLH-4qWc48GALAk?usp=sharing

裡面有這篇文章的程式碼。(那本筆記本上有如何連線Colab和Kaggle的說明,可以讓你更快地下載資料。)

必要的工具Neo4j Python驅動程式(撰寫本文時為4.2版)jupiter notebook/Lab或谷歌Colab(可選)pandas使用Python清理資料

現在我們可以開始用Python做一些資料處理了。

為了寫這篇文章,我們將使用在Kaggle上找到的arXiv資料集,其中包含超過170萬篇STEM學術論文。(在寫這篇文章的時候,已經是第18版了。)你可以將資料下載到本地機器

https://www.kaggle.com/Cornell-University/arxiv

現在進入筆記本,我們可以開始檢視資料。我透過以下方式載入資料:

file = "./arxiv-metadata-oai-snapshot.json"metadata  = []lines = 100000    # 100k 測試with open(file, 'r') as f:        for line in tqdm(f):        metadata.append(json.loads(line))        lines -= 1        if lines == 0: break            df = pd.DataFrame(metadata)

(你不必使用tqdm,但我發現在檔案大小超過179萬個條目時檢查進度很有幫助。)

可以透過df看到,我們的資料結構為:

id                objectsubmitter         objectauthors           objecttitle             objectcomments          objectjournal-ref       objectdoi               objectreport-no         objectcategories        objectlicense           objectabstract          objectversions          objectupdate_date       objectauthors_parsed    object

假設我們想用這個資料框構建一個圖,我們想知道哪些作者發表了哪些論文,以及這些論文與哪些類別相關聯。

然後,我們希望有三種不同的節點型別與之對應:作者、論文和類別。

每個節點型別都有一兩個屬性。對於作家來說,有作者的名字。論文可以有ID和標題。最後,類別有自己的名稱。我們也有一些關係:作者和作者,論文和論文。

因此,我們的目標是擁有以下資料模型(用arrows.app繪製):

有一些列對我們很有用。例如,我打算保留id,這樣我們就可以使用它作為每個論文的唯一索引。之後,我想要得到每個作者的個人列表。此外,authors_parsed列為我們提供了一個更清晰的所有作者列表。當然,我們將保留標題欄作為論文的主要屬性。最後,我想保留categories列。

下一步是稍微清理一下資料,這樣資料幀的每行有一個作者,每行有一個類別。例如,我們看到authors_parsed列給出了一個列表,其中每個條目在名稱後面都有一個多餘的逗號。

如果我們簡單地將其匯入到資料庫中,我們將得到author節點,如(顯示一個小示例):

╒════════════════════════════════════╕│"n"                                 │╞════════════════════════════════════╡│{"name":["Balázs","C.",""]}         │├────────────────────────────────────┤│{"name":["Berger","E. L.",""]}      │├────────────────────────────────────┤│{"name":["Nadolsky","P. M.",""]}    │├────────────────────────────────────┤│{"name":["Yuan","C. -P.",""]}       │├────────────────────────────────────┤│{"name":["Streinu","Ileana",""]}    │└────────────────────────────────────┘

由於這不是一件令人愉快的事情(並且會導致查詢不是最優雅的),我們需要稍微清理一下。我們還看到categories列可以有一個單獨的類別,也可以有幾個不採用傳統列表格式的類別(如本示例的最後一行所示):

╒═══════════════════════════════════╕│"c"                                │╞═══════════════════════════════════╡│{"category":"hep-ph"}              │├───────────────────────────────────┤│{"category":"math.CO cs.CG"}       │├───────────────────────────────────┤│{"category":"physics.gen-ph"}      │├───────────────────────────────────┤│{"category":"math.CO"}             │├───────────────────────────────────┤│{"category":"math.CA math.FA"}     │└───────────────────────────────────┘

我們可以在Cypher中這樣做,但為了這篇文章的目的,我們將在Python中做清理,以便說明。

建立兩個幫助函式來清理這兩列:

def get_author_list(line):    # 清除author dataframe列,在行中建立作者列表。    return [e[1] + ' ' + e[0] for e in line]def get_category_list(line):    # 清除“category”列,在該行中建立類別列表。    return list(line.split(" "))df['cleaned_authors_list'] = df['authors_parsed'].map(get_author_list)df['category_list'] = df['categories'].map(get_category_list)df = df.drop(['submitter', 'authors',              'comments', 'journal-ref',              'doi', 'report-no', 'license',              'versions', 'update_date',              'abstract', 'authors_parsed',              'categories'], axis=1)

得到的資料幀:

現在我們有東西可以用了!

建立一個Neo4j沙箱

Neo4j沙箱可以對Neo4j免費使用。你可以啟動一個例項,該例項將持續3天並開始工作!

出於本文的目的,當進入沙箱時,你將建立一個基本的、空白的沙箱,像這樣:

正如你在建立視窗中看到的那樣,還有許多其他有用的沙箱,但是我們將選擇這個選項,因為我們將用自己的資料填充資料庫。休息幾分鐘,等待執行完成。一旦完成,你將得到你的連線資訊,如下所示:

這個視窗有一些你需要的東西。首先,你將注意到Bolt URL,並完成其埠號。

連線到Neo4j並填充資料庫

現在,我們需要在本地機器(或任何有Python程式碼的地方)和沙箱資料庫之間建立連線。這就需要用到BOLT URL和密碼。

我已經建立了一個helper類來做這一點:

class Neo4jConnection:        def __init__(self, uri, user, pwd):        self.__uri = uri        self.__user = user        self.__pwd = pwd        self.__driver = None        try:            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))        except Exception as e:            print("Failed to create the driver:", e)            def close(self):        if self.__driver is not None:            self.__driver.close()            def query(self, query, parameters=None, db=None):        assert self.__driver is not None, "Driver not initialized!"        session = None        response = None        try:             session = self.__driver.session(database=db) if db is not None else self.__driver.session()             response = list(session.run(query, parameters))        except Exception as e:            print("Query failed:", e)        finally:             if session is not None:                session.close()        return responseconn = Neo4jConnection(uri="bolt://52.87.205.91:7687",                        user="neo4j",                                     pwd="difficulties-pushup-gaps")

現在我們可以開始填充資料庫了。我們首先在資料庫中建立一些約束,以確保節點不重複,同時建立一些索引:

conn.query('CREATE CONSTRAINT papers IF NOT EXISTS ON (p:Paper)     ASSERT p.id IS UNIQUE')conn.query('CREATE CONSTRAINT authors IF NOT EXISTS ON (a:Author) ASSERT a.name IS UNIQUE')conn.query('CREATE CONSTRAINT categories IF NOT EXISTS ON (c:Category) ASSERT c.category IS UNIQUE')

現在建立三個函式來為category和author節點建立資料框,我們將使用它們分別填充到資料庫中:

def add_categories(categories):    # 向Neo4j圖中新增類別節點。    query = '''            UNWIND $rows AS row            MERGE (c:Category {category: row.category})            RETURN count(*) as total            '''    return conn.query(query, parameters = {'rows':categories.to_dict('records')})def add_authors(rows, batch_size=10000):    # #以批處理作業的形式將作者節點新增到Neo4j圖中。    query = '''            UNWIND $rows AS row            MERGE (:Author {name: row.author})            RETURN count(*) as total            '''    return insert_data(query, rows, batch_size)def insert_data(query, rows, batch_size = 10000):    # 以批處理方式更新Neo4j資料庫。        total = 0    batch = 0    start = time.time()    result = None        while batch * batch_size < len(rows):        res = conn.query(query,                          parameters= {                         'rows': rows[batch*batch_sizebatch+1)*batch_size].to_dict('records')})        total += res[0]['total']        batch += 1        result = {"total":total,                   "batches":batch,                   "time":time.time()-start}        print(result)            return result

這些函式將每一列放入變數$rows中,這些列是列表格式的。

UNWIND命令獲取列表中的每個實體並將其新增到資料庫中。在此之後,我們使用一個輔助函式以批處理模式更新資料庫,當你處理超過50k的上傳時,它會很有幫助。

載入這些節點後,我們將新增論文節點以及與所有關係:

def add_papers(rows, batch_size=5000):   # 新增論文節點 , (:Author)--(:Paper) ,    # (:Paper)--(:Category) 關係    query = '''   UNWIND $rows as row   MERGE (p:Paper {id:row.id}) ON CREATE SET p.title = row.title    // connect categories   WITH row, p   UNWIND row.category_list AS category_name   MATCH (c:Category {category: category_name})   MERGE (p)-[:IN_CATEGORY]->(c)    // connect authors   WITH distinct row, p // reduce cardinality   UNWIND row.cleaned_authors_list AS author   MATCH (a:Author {name: author})   MERGE (a)-[:AUTHORED]->(p)   RETURN count(distinct p) as total   '''    return insert_data(query, rows, batch_size)

因此,與category和author節點類似,我們建立了每一篇論文,然後透過資料幀中每一行的:authorated或:IN_CATEGORY關係將其連線起來。

請注意,在這個函式中有更多的資料在管道中移動,因此它可能有助於減少批處理大小,以防止超時錯誤。

同樣,在這個步驟中,我們可能會在完整的資料幀上使用類似於explosion的方法,為每個列表的每個元素獲取一行,並以這種方式將整個資料幀載入到資料庫中。這是可行的,這正是我們將在下面對少量資料所做的。

然而,對於更大的資料集,將資料載入到Neo4j並不是一種非常有效的方法。因為Neo4j是一個事務性資料庫,我們建立一個數據庫,資料幀的每一行就執行一條語句,這會非常緩慢。它也可能超出可用記憶體。沙箱例項有大約500 MB的堆記憶體和500 MB的頁面快取。因此,這進一步推動了以批處理方式更新資料庫。

執行所有這些函式來填充圖,我們有:

categories = pd.DataFrame(df[['category_list']])categories.rename(columns={'category_list':'category'},                  inplace=True)categories = categories.explode('category') \                       .drop_duplicates(subset=['category'])authors = pd.DataFrame(df[['cleaned_authors_list']])authors.rename(columns={'cleaned_authors_list':'author'},               inplace=True)authors=authors.explode('author').drop_duplicates(subset=['author'])add_categories(categories)add_authors(authors)add_papers(df)

太棒了!我們現在有一個填充資料庫!下面是該圖的子樣本,透過該命令執行得到:MATCH (a:Author)-[:AUTHORED]->(p:Paper)-[:IN_CATEGORY]->(c:Category) RETURN a, p, c LIMIT 300

確保它有我們想要的東西……

查詢資料庫以獲得一些答案

一個提示:當你有了一個已填充的資料庫時,你應該讓Neo4j處理儘可能多的計算,然後再將答案帶回Python(如果你需要的話)。

在本例中,假設我們想計算每個類別的相關度,並返回前20個類別的類別。顯然,我們可以在Python中完成這個簡單的工作,但在Neo4j中完成它。

在某些時候,你可能需要進行更復雜的計算(例如節點中心性、路徑查詢或社群檢測),這些都可以並且應該在將結果下載回Python之前在Neo4j中完成。

為了在Cypher中做到這一點,我們可以使用許多方法,但這裡有一個快速有效的方法:

query_string = '''MATCH (c:Category) RETURN c.category_name, SIZE(()-[:IN_CATEGORY]->(c)) AS inDegree ORDER BY inDegree DESC LIMIT 20'''top_cat_df = pd.DataFrame([dict(_) for _ in conn.query(query_string)])top_cat_df.head(20)

這應該返回:

上述資料子集的入度分佈如下:

因此,這表明資料庫已經填充,以及我們如何獲得結果。無論如何,另一種方法可以得到相同的結果返回的列表形式是:

result = conn.query(query_string)for record in result:    print(record['c.category'], record['inDegree'])
總結

我們已經展示瞭如何從Python連線到Neo4j沙箱,並在滿足要求的情況下上傳資料。

就像編碼中的其他事情一樣,有很多不同的方法可以實現這一點,我們鼓勵感興趣的使用者主要使用Cypher而不是Python來探索上面的演示。

透過使用Neo4j Python聯結器,可以很容易地在Python和Neo4j資料庫之間來回切換,就像其他資料庫一樣。這將為資料科學和機器學習帶來各種令人興奮的可能性,比如自動節點分類、連結預測和節點聚類。

感謝閱讀!

7
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 如何快速開發樹形列表和分頁查詢整合的Winform程式介面?