My python code to insert large data in elasticsearch

Kurd

Kurd

GOLD
GOLD
Joined
Mar 1, 2023
Messages
7
Reaction score
0
Points
0
Location
Rojava
If you have a very huge database and want to insert it in elasticsearch you can go with the manual .conf or use my following code as an example of how to import it line by line:

import pandas as pd
import requests
import json
from concurrent.futures import ThreadPoolExecutor

# specify the chunk size to read 100,000 rows at a time
chunksize = 100000

# function to process a chunk of data and upload to Elasticsearch
def process_chunk(chunk):
# remove Followers and Created At columns
chunk = chunk.drop(columns=['Followers', 'Created At'])

# rename Email to email, Name to name, and ScreenName to username
chunk = chunk.rename(columns={'Email': 'email', 'Name': 'name', 'ScreenName': 'username'})

# convert each row to a JSON object and append to a list
actions = []
for index, row in chunk.iterrows():
action = {
'index': {
'_index': 'twitter',
'_type': '_doc',
}
}
source = row.to_dict()
actions.append(json.dumps(action))
actions.append(json.dumps(source))

# join the list of JSON objects using the newline character
bulk_data = '\n'.join(actions) + '\n'

# Upload with requests to Elasticsearch
r = requests.post('http://localhost:9200/_bulk', data=bulk_data, headers={'Content-Type': 'application/json'})

# return the response text and the number of rows processed
return r.text, len(chunk)

# create an empty DataFrame to append the results to
df = pd.DataFrame()
processed = 0

# create a thread pool with a maximum of 700 threads
executor = ThreadPoolExecutor(max_workers=3000)

# iterate over the chunks of data and submit each chunk to the thread pool
for chunk in pd.read_csv('../twitter_200M_Dec2021.csv', chunksize=chunksize):
# submit the chunk to the thread pool
future = executor.submit(process_chunk, chunk)
# add the result of the future to the total processed count
processed += future.result()[1]
# print the response from the future
print(future.result()[0])
print('Processed', processed, 'rows')

# shut down the thread pool
executor.shutdown()






In my previous code, I had .csv twitter database and dropped the columns that I don't need then renamed the rest and upload it with 3000 threads.

The code was able to do the task without any memory-related issues but it takes sometimes (2 hours to import 200M records)

 
Last edited by a moderator: