Jump to content

My python code to insert large data in elasticsearch


Kurd

Recommended Posts

If you have a very huge database and want to insert it in elasticsearch you can go with the manual .conf or use my following code as an example of how to import it line by line:

import pandas as pd
import requests
import json
from concurrent.futures import ThreadPoolExecutor

# specify the chunk size to read 100,000 rows at a time
chunksize = 100000

# function to process a chunk of data and upload to Elasticsearch
def process_chunk(chunk):
    # remove Followers and Created At columns
    chunk = chunk.drop(columns=['Followers', 'Created At'])

    # rename Email to email, Name to name, and ScreenName to username
    chunk = chunk.rename(columns={'Email': 'email', 'Name': 'name', 'ScreenName': 'username'})

    # convert each row to a JSON object and append to a list
    actions = []
    for index, row in chunk.iterrows():
        action = {
            'index': {
                '_index': 'twitter',
                '_type': '_doc',
            }
        }
        source = row.to_dict()
        actions.append(json.dumps(action))
        actions.append(json.dumps(source))

    # join the list of JSON objects using the newline character
    bulk_data = '\n'.join(actions) + '\n'

    # Upload with requests to Elasticsearch
    r = requests.post('http://localhost:9200/_bulk', data=bulk_data, headers={'Content-Type': 'application/json'})

    # return the response text and the number of rows processed
    return r.text, len(chunk)

# create an empty DataFrame to append the results to
df = pd.DataFrame()
processed = 0

# create a thread pool with a maximum of 700 threads
executor = ThreadPoolExecutor(max_workers=3000)

# iterate over the chunks of data and submit each chunk to the thread pool
for chunk in pd.read_csv('../twitter_200M_Dec2021.csv', chunksize=chunksize):
    # submit the chunk to the thread pool
    future = executor.submit(process_chunk, chunk)
    # add the result of the future to the total processed count
    processed += future.result()[1]
    # print the response from the future
    print(future.result()[0])
    print('Processed', processed, 'rows')

# shut down the thread pool
executor.shutdown()

 

 

In my previous code, I had .csv twitter database and dropped the columns that I don't need then renamed the rest and upload it with 3000 threads.

The code was able to do the task without any memory-related issues but it takes sometimes (2 hours to import 200M records)

Edited by Kurd
Link to comment
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×
×
  • Create New...