本文將展示如何使用Python生成的資料來填充資料庫。我還將向你展示如何使用Neo4j沙箱,這樣就可以使用不同的Neo4j資料庫設定。
可以在這裡找到一個谷歌Colab筆記本:https://colab.research.google.com/drive/1J9__HotNoINHpucoipLH-4qWc48GALAk?usp=sharing
裡面有這篇文章的程式碼。(那本筆記本上有如何連線Colab和Kaggle的說明,可以讓你更快地下載資料。)
必要的工具Neo4j Python驅動程式(撰寫本文時為4.2版)jupiter notebook/Lab或谷歌Colab(可選)pandas使用Python清理資料現在我們可以開始用Python做一些資料處理了。
為了寫這篇文章,我們將使用在Kaggle上找到的arXiv資料集,其中包含超過170萬篇STEM學術論文。(在寫這篇文章的時候,已經是第18版了。)你可以將資料下載到本地機器
https://www.kaggle.com/Cornell-University/arxiv
現在進入筆記本,我們可以開始檢視資料。我透過以下方式載入資料:
file = "./arxiv-metadata-oai-snapshot.json"metadata = []lines = 100000 # 100k 測試with open(file, 'r') as f: for line in tqdm(f): metadata.append(json.loads(line)) lines -= 1 if lines == 0: break df = pd.DataFrame(metadata)
(你不必使用tqdm,但我發現在檔案大小超過179萬個條目時檢查進度很有幫助。)
可以透過df看到,我們的資料結構為:
id objectsubmitter objectauthors objecttitle objectcomments objectjournal-ref objectdoi objectreport-no objectcategories objectlicense objectabstract objectversions objectupdate_date objectauthors_parsed object
假設我們想用這個資料框構建一個圖,我們想知道哪些作者發表了哪些論文,以及這些論文與哪些類別相關聯。
然後,我們希望有三種不同的節點型別與之對應:作者、論文和類別。
每個節點型別都有一兩個屬性。對於作家來說,有作者的名字。論文可以有ID和標題。最後,類別有自己的名稱。我們也有一些關係:作者和作者,論文和論文。
因此,我們的目標是擁有以下資料模型(用arrows.app繪製):
有一些列對我們很有用。例如,我打算保留id,這樣我們就可以使用它作為每個論文的唯一索引。之後,我想要得到每個作者的個人列表。此外,authors_parsed列為我們提供了一個更清晰的所有作者列表。當然,我們將保留標題欄作為論文的主要屬性。最後,我想保留categories列。
下一步是稍微清理一下資料,這樣資料幀的每行有一個作者,每行有一個類別。例如,我們看到authors_parsed列給出了一個列表,其中每個條目在名稱後面都有一個多餘的逗號。
如果我們簡單地將其匯入到資料庫中,我們將得到author節點,如(顯示一個小示例):
╒════════════════════════════════════╕│"n" │╞════════════════════════════════════╡│{"name":["Balázs","C.",""]} │├────────────────────────────────────┤│{"name":["Berger","E. L.",""]} │├────────────────────────────────────┤│{"name":["Nadolsky","P. M.",""]} │├────────────────────────────────────┤│{"name":["Yuan","C. -P.",""]} │├────────────────────────────────────┤│{"name":["Streinu","Ileana",""]} │└────────────────────────────────────┘
由於這不是一件令人愉快的事情(並且會導致查詢不是最優雅的),我們需要稍微清理一下。我們還看到categories列可以有一個單獨的類別,也可以有幾個不採用傳統列表格式的類別(如本示例的最後一行所示):
╒═══════════════════════════════════╕│"c" │╞═══════════════════════════════════╡│{"category":"hep-ph"} │├───────────────────────────────────┤│{"category":"math.CO cs.CG"} │├───────────────────────────────────┤│{"category":"physics.gen-ph"} │├───────────────────────────────────┤│{"category":"math.CO"} │├───────────────────────────────────┤│{"category":"math.CA math.FA"} │└───────────────────────────────────┘
我們可以在Cypher中這樣做,但為了這篇文章的目的,我們將在Python中做清理,以便說明。
建立兩個幫助函式來清理這兩列:
def get_author_list(line): # 清除author dataframe列,在行中建立作者列表。 return [e[1] + ' ' + e[0] for e in line]def get_category_list(line): # 清除“category”列,在該行中建立類別列表。 return list(line.split(" "))df['cleaned_authors_list'] = df['authors_parsed'].map(get_author_list)df['category_list'] = df['categories'].map(get_category_list)df = df.drop(['submitter', 'authors', 'comments', 'journal-ref', 'doi', 'report-no', 'license', 'versions', 'update_date', 'abstract', 'authors_parsed', 'categories'], axis=1)
得到的資料幀:
現在我們有東西可以用了!
建立一個Neo4j沙箱Neo4j沙箱可以對Neo4j免費使用。你可以啟動一個例項,該例項將持續3天並開始工作!
出於本文的目的,當進入沙箱時,你將建立一個基本的、空白的沙箱,像這樣:
正如你在建立視窗中看到的那樣,還有許多其他有用的沙箱,但是我們將選擇這個選項,因為我們將用自己的資料填充資料庫。休息幾分鐘,等待執行完成。一旦完成,你將得到你的連線資訊,如下所示:
這個視窗有一些你需要的東西。首先,你將注意到Bolt URL,並完成其埠號。
連線到Neo4j並填充資料庫現在,我們需要在本地機器(或任何有Python程式碼的地方)和沙箱資料庫之間建立連線。這就需要用到BOLT URL和密碼。
我已經建立了一個helper類來做這一點:
class Neo4jConnection: def __init__(self, uri, user, pwd): self.__uri = uri self.__user = user self.__pwd = pwd self.__driver = None try: self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd)) except Exception as e: print("Failed to create the driver:", e) def close(self): if self.__driver is not None: self.__driver.close() def query(self, query, parameters=None, db=None): assert self.__driver is not None, "Driver not initialized!" session = None response = None try: session = self.__driver.session(database=db) if db is not None else self.__driver.session() response = list(session.run(query, parameters)) except Exception as e: print("Query failed:", e) finally: if session is not None: session.close() return responseconn = Neo4jConnection(uri="bolt://52.87.205.91:7687", user="neo4j", pwd="difficulties-pushup-gaps")
現在我們可以開始填充資料庫了。我們首先在資料庫中建立一些約束,以確保節點不重複,同時建立一些索引:
conn.query('CREATE CONSTRAINT papers IF NOT EXISTS ON (p:Paper) ASSERT p.id IS UNIQUE')conn.query('CREATE CONSTRAINT authors IF NOT EXISTS ON (a:Author) ASSERT a.name IS UNIQUE')conn.query('CREATE CONSTRAINT categories IF NOT EXISTS ON (c:Category) ASSERT c.category IS UNIQUE')
現在建立三個函式來為category和author節點建立資料框,我們將使用它們分別填充到資料庫中:
def add_categories(categories): # 向Neo4j圖中新增類別節點。 query = ''' UNWIND $rows AS row MERGE (c:Category {category: row.category}) RETURN count(*) as total ''' return conn.query(query, parameters = {'rows':categories.to_dict('records')})def add_authors(rows, batch_size=10000): # #以批處理作業的形式將作者節點新增到Neo4j圖中。 query = ''' UNWIND $rows AS row MERGE (:Author {name: row.author}) RETURN count(*) as total ''' return insert_data(query, rows, batch_size)def insert_data(query, rows, batch_size = 10000): # 以批處理方式更新Neo4j資料庫。 total = 0 batch = 0 start = time.time() result = None while batch * batch_size < len(rows): res = conn.query(query, parameters= { 'rows': rows[batch*batch_sizebatch+1)*batch_size].to_dict('records')}) total += res[0]['total'] batch += 1 result = {"total":total, "batches":batch, "time":time.time()-start} print(result) return result
這些函式將每一列放入變數$rows中,這些列是列表格式的。
UNWIND命令獲取列表中的每個實體並將其新增到資料庫中。在此之後,我們使用一個輔助函式以批處理模式更新資料庫,當你處理超過50k的上傳時,它會很有幫助。
載入這些節點後,我們將新增論文節點以及與所有關係:
def add_papers(rows, batch_size=5000): # 新增論文節點 , (:Author)--(:Paper) , # (:Paper)--(:Category) 關係 query = ''' UNWIND $rows as row MERGE (p:Paper {id:row.id}) ON CREATE SET p.title = row.title // connect categories WITH row, p UNWIND row.category_list AS category_name MATCH (c:Category {category: category_name}) MERGE (p)-[:IN_CATEGORY]->(c) // connect authors WITH distinct row, p // reduce cardinality UNWIND row.cleaned_authors_list AS author MATCH (a:Author {name: author}) MERGE (a)-[:AUTHORED]->(p) RETURN count(distinct p) as total ''' return insert_data(query, rows, batch_size)
因此,與category和author節點類似,我們建立了每一篇論文,然後透過資料幀中每一行的:authorated或:IN_CATEGORY關係將其連線起來。
請注意,在這個函式中有更多的資料在管道中移動,因此它可能有助於減少批處理大小,以防止超時錯誤。
同樣,在這個步驟中,我們可能會在完整的資料幀上使用類似於explosion的方法,為每個列表的每個元素獲取一行,並以這種方式將整個資料幀載入到資料庫中。這是可行的,這正是我們將在下面對少量資料所做的。
然而,對於更大的資料集,將資料載入到Neo4j並不是一種非常有效的方法。因為Neo4j是一個事務性資料庫,我們建立一個數據庫,資料幀的每一行就執行一條語句,這會非常緩慢。它也可能超出可用記憶體。沙箱例項有大約500 MB的堆記憶體和500 MB的頁面快取。因此,這進一步推動了以批處理方式更新資料庫。
執行所有這些函式來填充圖,我們有:
categories = pd.DataFrame(df[['category_list']])categories.rename(columns={'category_list':'category'}, inplace=True)categories = categories.explode('category') \ .drop_duplicates(subset=['category'])authors = pd.DataFrame(df[['cleaned_authors_list']])authors.rename(columns={'cleaned_authors_list':'author'}, inplace=True)authors=authors.explode('author').drop_duplicates(subset=['author'])add_categories(categories)add_authors(authors)add_papers(df)
太棒了!我們現在有一個填充資料庫!下面是該圖的子樣本,透過該命令執行得到:MATCH (a:Author)-[:AUTHORED]->(p:Paper)-[:IN_CATEGORY]->(c:Category) RETURN a, p, c LIMIT 300
確保它有我們想要的東西……
查詢資料庫以獲得一些答案一個提示:當你有了一個已填充的資料庫時,你應該讓Neo4j處理儘可能多的計算,然後再將答案帶回Python(如果你需要的話)。
在本例中,假設我們想計算每個類別的相關度,並返回前20個類別的類別。顯然,我們可以在Python中完成這個簡單的工作,但在Neo4j中完成它。
在某些時候,你可能需要進行更復雜的計算(例如節點中心性、路徑查詢或社群檢測),這些都可以並且應該在將結果下載回Python之前在Neo4j中完成。
為了在Cypher中做到這一點,我們可以使用許多方法,但這裡有一個快速有效的方法:
query_string = '''MATCH (c:Category) RETURN c.category_name, SIZE(()-[:IN_CATEGORY]->(c)) AS inDegree ORDER BY inDegree DESC LIMIT 20'''top_cat_df = pd.DataFrame([dict(_) for _ in conn.query(query_string)])top_cat_df.head(20)
這應該返回:
上述資料子集的入度分佈如下:
因此,這表明資料庫已經填充,以及我們如何獲得結果。無論如何,另一種方法可以得到相同的結果返回的列表形式是:
result = conn.query(query_string)for record in result: print(record['c.category'], record['inDegree'])
總結
我們已經展示瞭如何從Python連線到Neo4j沙箱,並在滿足要求的情況下上傳資料。
就像編碼中的其他事情一樣,有很多不同的方法可以實現這一點,我們鼓勵感興趣的使用者主要使用Cypher而不是Python來探索上面的演示。
透過使用Neo4j Python聯結器,可以很容易地在Python和Neo4j資料庫之間來回切換,就像其他資料庫一樣。這將為資料科學和機器學習帶來各種令人興奮的可能性,比如自動節點分類、連結預測和節點聚類。
感謝閱讀!