package n_event_hub.client.util.kafka;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import n_event_hub.dtos.EventDTOs;
import n_event_hub.dtos.EventValidation;
import n_event_hub.dtos.requests.KafkaIngestionDTOs;
import n_event_hub.dtos.responses.EventValidationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import play.libs.Json;
import play.libs.ws.WSClient;
import play.libs.ws.WSResponse;
import util.Configurations;

@Singleton
/* loaded from: input_file:n_event_hub/client/util/kafka/KafkaServiceImpl.class */
public class KafkaServiceImpl implements KafkaService {
    private final WSClient wsClient;
    private final Logger logger = LoggerFactory.getLogger(KafkaServiceImpl.class);
    private final String EVENT_SERVICE = Configurations.getString("apps.kafka-rest");

    @Inject
    public KafkaServiceImpl(WSClient wSClient) {
        this.wsClient = wSClient;
    }

    @Override // n_event_hub.client.util.kafka.KafkaService
    public CompletionStage<EventValidationResponse> sentEventsToKafka(String str, List<EventDTOs.EventDTO> list) {
        return doKafkaIngestion(str, list).thenApply(wSResponse -> {
            if (wSResponse.getStatus() == 200) {
                return new EventValidationResponse().setRecords((List) list.stream().map(eventDTO -> {
                    return buildEventResponse(eventDTO.getValue(), eventDTO.getValue().getW3().getVl());
                }).collect(Collectors.toList()));
            }
            this.logger.error("Kafka Ingestion Failed ! Response Error : Error code {} , subject key : {} , records -> {}", new Object[]{Integer.valueOf(wSResponse.getStatus()), str, Json.toJson(list)});
            throw new RuntimeException("Unable to Run kafka Ingestion for subject key -> " + str);
        });
    }

    private CompletionStage<WSResponse> doKafkaIngestion(String str, List<EventDTOs.EventDTO> list) {
        KafkaIngestionDTOs.KafkaIngestionDTO build = KafkaIngestionDTOs.KafkaIngestionDTO.builder().records(list).build();
        this.logger.debug("Kafka ingestion to {} : {}", this.EVENT_SERVICE.concat("/" + str + "-activity"), Integer.valueOf(list.size()));
        return this.wsClient.url(this.EVENT_SERVICE.concat("/topics/" + str + "-activity")).addHeader("Content-type", "application/vnd.kafka.json.v2+json").post(Json.toJson(build));
    }

    private EventValidation buildEventResponse(EventDTOs.EventValueDTO eventValueDTO, int i) {
        return eventValueDTO.getW3().getNm().equals("pack") ? EventValidation.builder().cv(eventValueDTO.getW3().getCv()).event_name(eventValueDTO.getW3().getNm()).event_value(eventValueDTO.getW3().getVl()).valid(true).value(i).mi(eventValueDTO.getW2().getMi()).v_id(eventValueDTO.getA().getEcx().getTag().getSearchId()).pt_id(eventValueDTO.getA().getEcx().getTag().getPtId()).build() : EventValidation.builder().cv(eventValueDTO.getW3().getCv()).event_name(eventValueDTO.getW3().getNm()).event_value(eventValueDTO.getW3().getVl()).valid(true).value(i).mi(eventValueDTO.getW2().getMi()).v_id(eventValueDTO.getVId()).pt_id("").build();
    }
}
