Kurd Posted March 1 Share Posted March 1 (edited) If you have a very huge database and want to insert it in elasticsearch you can go with the manual .conf or use my following code as an example of how to import it line by line: import pandas as pd import requests import json from concurrent.futures import ThreadPoolExecutor # specify the chunk size to read 100,000 rows at a time chunksize = 100000 # function to process a chunk of data and upload to Elasticsearch def process_chunk(chunk): # remove Followers and Created At columns chunk = chunk.drop(columns=['Followers', 'Created At']) # rename Email to email, Name to name, and ScreenName to username chunk = chunk.rename(columns={'Email': 'email', 'Name': 'name', 'ScreenName': 'username'}) # convert each row to a JSON object and append to a list actions = [] for index, row in chunk.iterrows(): action = { 'index': { '_index': 'twitter', '_type': '_doc', } } source = row.to_dict() actions.append(json.dumps(action)) actions.append(json.dumps(source)) # join the list of JSON objects using the newline character bulk_data = '\n'.join(actions) + '\n' # Upload with requests to Elasticsearch r = requests.post('http://localhost:9200/_bulk', data=bulk_data, headers={'Content-Type': 'application/json'}) # return the response text and the number of rows processed return r.text, len(chunk) # create an empty DataFrame to append the results to df = pd.DataFrame() processed = 0 # create a thread pool with a maximum of 700 threads executor = ThreadPoolExecutor(max_workers=3000) # iterate over the chunks of data and submit each chunk to the thread pool for chunk in pd.read_csv('../twitter_200M_Dec2021.csv', chunksize=chunksize): # submit the chunk to the thread pool future = executor.submit(process_chunk, chunk) # add the result of the future to the total processed count processed += future.result()[1] # print the response from the future print(future.result()[0]) print('Processed', processed, 'rows') # shut down the thread pool executor.shutdown() In my previous code, I had .csv twitter database and dropped the columns that I don't need then renamed the rest and upload it with 3000 threads. The code was able to do the task without any memory-related issues but it takes sometimes (2 hours to import 200M records) Edited March 1 by Kurd Link to comment Share on other sites More sharing options...
Recommended Posts
Create an account or sign in to comment
You need to be a member in order to leave a comment
Create an account
Sign up for a new account in our community. It's easy!
Register a new accountSign in
Already have an account? Sign in here.
Sign In Now