import co.elastic.clients.elasticsearch.core.*;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
//import org.elasticsearch.client.RestHighLevelClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ElasticsearchToCSVExporter {
private final ElasticsearchClient esClient;
public ElasticsearchToCSVExporter(String hostname, int port) {
// Create the credentials provider
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("username", "password")
);
// Create the low-level client
RestClient restClient = RestClient.builder(
// new HttpHost(hostname3, port,"https"),
// new HttpHost(hostname1, port,"https"),
new HttpHost(hostname, port,"https")
).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
this.esClient = new ElasticsearchClient(transport);
}
public void exportLargeIndexToCSV(String indexName, List<String> fields, String outputFilePath)
throws IOException {
try (BufferedWriter writer = new BufferedWriter(new FileWriter(outputFilePath))) {
// Initial search with scroll
SearchRequest searchRequest = SearchRequest.of(s -> s
.index(indexName)
.query(Query.of(q -> q.matchAll(m -> m)))
.size(1000) // Batch size
.scroll(t -> t.time("1m")) // Keep scroll alive for 1 minute
);
SearchResponse<Map> response = esClient.search(searchRequest, Map.class);
Number datta = response.shards().failed();
if ( datta.intValue() > 0) {
System.err.println("Some shards failed: " + response.shards());
}
// Process hits
List<Hit<Map>> hits = response.hits().hits();
if (hits.isEmpty()) {
System.out.println("No documents found in index " + indexName);
return;
}
String scrollId = response.scrollId();
// Process initial batch
processHits(response.hits().hits(), fields, writer);
// Continue scrolling until no more documents
while (true) {
String finalScrollId = scrollId;
ScrollRequest scrollRequest = ScrollRequest.of(s -> s
.scrollId(finalScrollId)
.scroll(t -> t.time("1m"))
);
ScrollResponse<Map> scrollResponse = esClient.scroll(scrollRequest, Map.class);
hits = scrollResponse.hits().hits();
if (hits.isEmpty()) {
break;
}
processHits(hits, fields, writer);
scrollId = scrollResponse.scrollId();
}
// Clear scroll
String finalScrollId1 = scrollId;
esClient.clearScroll(ClearScrollRequest.of(c -> c.scrollId(finalScrollId1)));
} catch (IOException e) {
System.err.println("Export failed: " + e.getMessage());
throw e;
}
}
private void processHits(List<Hit<Map>> hits, List<String> fields, BufferedWriter writer)
throws IOException {
for (Hit<Map> hit : hits) {
Map<String, Object> source = hit.source();
if (source == null) continue;
List<String> row = new ArrayList<>();
for (String field : fields) {
Object value = getNestedFieldValue(source, field);
row.add(value != null ? escapeCsvValue(value.toString()) : "");
}
writer.write(String.join("|", row));
writer.newLine();
}
}
private Object getNestedFieldValue(Map<String, Object> source, String fieldPath) {
String[] parts = fieldPath.split("\\.");
Map<String, Object> current = source;
for (int i = 0; i < parts.length - 1; i++) {
Object next = current.get(parts[i]);
if (next instanceof Map) {
current = (Map<String, Object>) next;
} else {
return null;
}
}
return current.get(parts[parts.length - 1]);
}
private String escapeCsvValue(String value) {
// Escape pipes and newlines
return value.replace("|", "\\|")
.replace("\r", "\\r")
.replace("\n", "\\n");
}
public void close() throws IOException {
esClient._transport().close();
}
public static void main(String[] args) {
// Example usage
ElasticsearchToCSVExporter exporter = new ElasticsearchToCSVExporter("localhost", 9200);
try {
// Specify which fields you want to export
List<String> fields = List.of("id", "name", "price", "category.name", "timestamp");
// Export the index to CSV
exporter.exportLargeIndexToCSV("products", fields, "output.csv");
System.out.println("Export completed successfully!");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
exporter.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}