Ingerindo Arquivos CSV com StreamSets e Inserindo em Tabela no PostgreSQL

Ingredientes:

  • OS Linux (ubuntu, fedora, centos, kubuntu, debian, outros)
  • Docker (versão 20.10.2 ou superior)
  • Container PostgreSQL (versão 13.1 ou superior)
  • Container Streamsets Data Collector (versão 3.13.0 ou superior)
  • Navicat (versão 15.0.0 ou superior)
  • Arquivos Tabulares (.csv)

🍲 Modo de Preparo: Container PostgreSQL (versão 13.1 ou superior)

Primeiramente baixe o container pelo terminal:

$ docker pull postgres

Após baixar a imagem podemos ver ela na sessão de imagens do docker:

$ docker images | grep "postgres"

Agora vamos iniciar o serviço do postgres

$ docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres

Vamos usar dessa forma, mais caso você queira mudar a senha fique a vontade. para saber se o container ta operativo escreva. o comando abaixo, ele vai mostrar informações de status, portas etc..

$ docker ps -a --format "table {{.ID}}\t{{.Image}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}"

Agora vamos procurar o endereço de ip desse container para fazermos a conexão via ferramenta de administração de banco de dados. porém é precisa saber o “CONTAINER_ID” ele é o identificador do container em execução. você consegue descobrir essa informação usando o comando anterior. o próximo comando vai mostrar todas as configurações do container em execução.

$ docker inspect CONTAINER_ID

A saida do comando no terminal vai exibir dados no formato .JSON contendo todos os parâmetros do container, localize no final das linhas o IPAddress ou você pode obter de forma rápida usando usando o comando.

$ docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' CONTAINER_ID 

Com o ip em mãos vamos nos conectar usando a ferramenta de administração de banco de dados chamada Navicat porém você pode usar outras ferramentas como: dbeaver, pgadmin ou qualquer outra ferramenta na qual você consiga se conectar ao PostgreSQL. então vamos aos dados da conexão com o banco de dados:

host: ip CONTAINER_ID
port: 5432
user: postgres
password: mysecretpassword

mais caso você queira entrar via linha de comando você deve matar o container em questão e usar o comando:

$ docker run -it --rm --network some-network postgres psql -h some-postgres -U postgres

Dentro da de linha de comando vamos criar nosso usuário, banco de dados e garantir permissão a todos os recursos da instancia. voce pode executar a sequencia de comandos a seguir via linha de comando ou query pelo seu gerenciador favorito.

primeiro vamos criar o nosso banco de dados, vou usar o nome sdc.

CREATE DATABASE sdc;

vou definir o mesmo nome que dei ao banco de dados para o usuário.

CREATE USER sdc WITH ENCRYPTED PASSWORD '102030';

garantindo altos privilégios.

GRANT ALL PRIVILEGES on DATABASE sdc TO sdc;

criando o esquema.

CREATE SCHEMA "streamsets";

Por fim vamos criar nossa tabela, essa é uma etapa muito importante pois vamos preparar a tabela para receber os dados dos arquivos. nessa tabela vamos definir a estrutura da mesma forma como está nos arquivos, com o mesmo nomes de campos e quantidades de colunas. vamos criar todos os campos com o tipo de dados varchar,  vou fazer isso para não ter problemas durante a inserção de dados via StreamSets.

CREATE TABLE "streamsets"."cardata" (
   "Processing_Date" varchar(255),
   "Car_Name" varchar(255),
   "Year" varchar(255),
   "Selling_Price" varchar(255),
   "Present_Price" varchar(255),
   "Kms_Driven" varchar(255),
   "Fuel_Type" varchar(255),
   "Seller_Tyoe" varchar(255),
   "Transmission" varchar(255),
   "Owner" varchar(255)
 );

bom essas são as configurações necessárias nesse container, agora vamos salvar as alterações  nesse container usando o comando commit.

$ docker commit CONTAINER_ID nome:tag

Passamos o CONTAINER_ID e o nome desejado com tag, no meu caso eu coloquei como: docker commit aaeb2b4ef100 postgres-configurado:latest. lembrando que toda alteração que você fizer no container precisa ser comitada para que elas serem permanentes, caso o contrário se você matar o container ou por algum outro motivo ele cair, você perde tudo aqui que alterou dentro dele.

Prontinho agora que fizemos todas as configurações necessárias vamos ao próximo passo.

🍲 Modo de Preparo: Container Streamsets Data Collector (versão 3.13.0 ou superior)

Para essa receita vou usar a versão 3.13.0, mas fique a vontade para testar essa receita em outras versões.

$ docker pull streamsets/datacollector:3.13.0-latest

após baixar a imagem podemos ver ela na sessão de imagens do docker:

$ docker images | grep "stre"

feito isso vamos iniciar o serviço do sdc com o comando:

$ docker run --restart on-failure -p 18630:18630 -d --name sdc streamsets/datacollector:3.13.0-latest

esse serviço vai ficar escutando na porta 18630, se você abrir o navegador e digitar localhost:18630 ele vai abrir na tela de login e senha.

usuário: admin
senha: admin

após isso vamos fazer algumas mudanças nesse container para que possamos prosseguir, vamos entrar no modo de linha de comando.

$ docker exec -it CONTAINER_ID /bin/bash

o único arquivo que vamos editar fica em /etc/sdc/form-realm.properties abra ele com o vi, após isso localize a linha que é similar a:

admin:   MD5:21232f297a57a5a743894a0e4a801fc3,user,admin

vamos adicionar mais 2 permissões a esse usuário, pressione shift + i e adicione creator,manager, deve ficar assim:

admin:   MD5:21232f297a57a5a743894a0e4a801fc3,user,admin,creator,manager

Lembrando que estamos alterando um usuário existente que é o Admin, mas caso precise adicionar um usuário com o seu nome e senha podemos disponibilizar esse conhecimento em outro material aqui no blog.

Após as alterações no arquivo /etc/sdc/form-realm.properties pressione esc para sair do modo de inserção, salve os dados inseridos e saia do arquivo usando as teclas shift + : e digite wq. para sair do modo de linha de comando do container pressione a sequência de teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container. após essas alterações no container vamos reiniciar ele para validar as alterações que fizemos.

$ docker restart CONTAINER_ID

após isso vá no navegador e digite: localhost:18630 entre com o usuário e senha: Admin

se tudo deu certo você vai cair na interface de trabalho do streamsets. clique em: CREATE NEW PIPELINE

Coloque um nome de sua preferência para o pipeline e depois clique em Save. agora vamos instalar  o componente responsável pela conexão com o PostgreSQL, clique onde a seleção está na cor vermelha.

após isso vai aparecer uma lista de componentes procure por jdbc na caixa de pesquisa.

1.selecione o checkbox

2.instalação

após clicar em instalar vai aparecer outra opção:

Clique em Restart Data Collector vai demorar uns 5 segundos e após isso vai aparecer a tela de login, faça o login novamente.

E por último vamos criar uma pasta no diretório /opt chamada /opt/data/input é aqui que nossos arquivos vão residir. vamos mais uma vez entrar via linha de comando no container., vamos usar nesse comando a flag –user root ele é necessário para criarmos a pasta no diretório do linux.

$ docker exec -it --user root CONTAINER_ID /bin/bash

Vamos a criação das pastas.

$ mkdir /opt/data && mkdir /opt/data/input

O diretório input vai ser usando para montarmos um volume externo nele com os arquivos (.csv)

após isso vamos sair do modo de linha de comando pressionando as teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container.

vamos gravar as mudanças nesse container usando o comando commit

$ docker commit CONTAINER_ID nome:tag

Após isso vamos matar o container e remover ele da lista de execução

$ docker stop CONTAINER_ID && docker rm CONTAINER_ID

essa ação foi necessário para podermos criar um novo container. usando a flag -v montamos a pasta externa: /home/romerito/Documents/data na pasta do container /opt/data/input lembrando que voce precisa usar a imagem do container comitada anteriormente, lembre que foi nela que fizemos as alterações.

$ docker run --restart on-failure -p 18630:18630 -v /home/romerito/Documents/data:/opt/data/input -d --name sdc nome_imagem_commitada:tag

Vamos saber se a montagem das pastas funcionou, repare que na minha pasta externa tenho 2 arquivos.

agora conecte no container via linha de comando e digite: ls -lha /opt/data/input e verás que esses mesmos arquivos vão aparecer no diretório.

Fazendo a Ingestão dos arquivos

Abra o pipeline que você criou anteriormente e vamos começar, para essa receita vamos trabalhar com o conjunto de dados https://www.kaggle.com/savitanair/cardata é uma lista de nomes de carros, km rodado, ano etc… a estrutura desses arquivos são similares a nossa tabela criada no PostgreSQL. esses arquivos já se encontram na nossa pasta /home/romerito/Documents/data. então vamos lá.

do lado direito do botão Start tem um ícone, clique nele e vai aparecer uma dock com vários componentes, acima no combobox ta selecionado origins mude para All Stages e procure pelo componente Directory digitando na caixa de pesquisa, ao encontrar segure e arraste para o espaço a esquerda

Com esse componente vamos ler os nossos arquivos (.csv) do diretório /opt/data/input muita atenção nessa hora, a seguir vai ter várias imagens com configurações e uma breve explicação de ambas.

Directory: [General] aqui definimos o nome do componente:

Directory: [Files] aqui definimos o diretório dos arquivos, a ordem de leitura e a extensão dos arquivos.

Directory: [Data Format] aqui definimos o tipo de arquivo se é: JSON,TEXT, Excell etc.. No nosso caso selecionamos DELIMITED, definimos também que a configuração vai ignorar as linhas vazias dos arquivos e que eles têm cabeçalho.

É importante lembrar que exibimos apenas as configurações necessárias para essa receita, ao arrastar um novo componente não esquece de conectá-los. você faz isso clicando na bolinha branco do componente e arrastando-a até a bolinha do outro componente, sem essa ligação o fluxo não vai funcionar.

adicione outro componente chamado: Expression Evaluator

vamos usá-lo para criar o campo chamado: /Processing_Date que vai ter como finalidade armazenar a hora em que o arquivo foi processado escreva no componente da mesma forma como está na imagem.

adicione o componente: Field Type Converter vamos usar ele para converter o campo /Processing_Date de Datetime para String.

adicione outro componente chamado: Expression Evaluator vamos usár-lo para modificar os dados do campo /Processing_Date  vamos criar um campo provisório chamado /data e atribuir a ele a data/hora/minuto/segundo advindos do campo /Processing_Date  e depois atribuir os dados do campo /data já modificados para o campo original /Processing_Date.

adicione o componente: Field Remover vamos usár-lo para remover o campo provisório /data

adicione o componente: Field Order vamos usár-lo para deixar os dados na ordem desejada. para adicionar os campos basta você clicar no campo que ele vai mostrar uma lista de campos. você vai clicando e deixando na ordem desejada.

adicione o componente: JDBC Producer vamos usár-lo para pegar os dados tratados e inserir na tabela cardata que criamos anteriormente no PostgreSQL.

JDBC Producer: [JDBC] aqui definimos a String de conexão, onde passamos o IP do nosso container, porta e banco de dados. definimos também o nome do esquema que criamos e o nome da tabela

JDBC Producer: [Credentials] aqui adicionamos o usuário e senha do banco de dados 

Se seu pipeline estiver assim:

Você fez tudo certinho, perceba que todas as caixinhas estão conectadas isso é necessário pois cada componente desse é responsável por uma etapa do processo de adequação quando o dado passa por ele. agora vamos debugar o nosso pipeline para ver se as regras aplicadas estão corretas. clique em Preview

ao clicar vai aparecer outra janela, deixa as opções como na janela e click em Run Preview

Vai demorar uns 5 segundos e vai aparecer essa tela.

perceba que o primeiro componente está em azul e abaixo exibe os dados desse componente,  a medida que você vai clicando na setinha selecionada em vermelho os dados vão sendo exibidos de acordo com o componente, repare agora que avançamos para o estágio de ordenação dos dados, os dados abaixo à esquerda ficam em vermelho indicando como os dados eram antes, e verde como estão agora passando pelo componente.

Caso apareça algum erro vai aparecer um ícone em vermelho no componente indicando o número de erros. aparentemente está tudo correto, para sair do modo de debug clique onde está selecionado em vermelho.

Antes de iniciarmos o pipeline para que ele faça o processo completo, vamos adicionar alguns componentes para que quando ele terminar de carregar os dados ele finalize o pipeline, sem essa configuração ele vai ficar rodando para sempre. antes de arrastar os componentes marque a opção Produce Events do componente Directory

adicione o componente: Stream Selector clique e arraste da bolinha (E) do componente Directory até a bolinha do Stream Selector com ele vamos definir algumas regras:

Clicamos no + da caixinha do lado direito e adicionamos uma condição, quando não houver mais dados para processar ele vai fazer uma ligação a outro componente, para isso procure pelo componente Pipeline Finisher Executor  e arraste ele para a área de desenvolvimento e agora clique em cima de (1) do componente Stream Selector  e arraste até a bolinha do componente Pipeline Finisher Executor adicione também um componente chamado Trash, click em cima do número (2) do componente Stream Selector  e arraste até a bolinha do componente Trash, pronto finalizamos assim a configuração e deve ficar assim:

Agora sim vamos executar o nosso pipeline, procure pelo botão Start e clique nele. após isso vai aparecer essa janela.

Perceba nos gráficos que temos o numero 303 esse é o número registros carregados pelo componente Discovery, e temos 303 registros processados e inseridos a tabela cardata do PostgreSQL. assim que todos os dados forem processados o pipeline vai parar a execução. os arquivos processados não serão mais processados da próxima vez pois o streamsets guarda os metadados dos arquivos processados e só processa linhas de arquivos adicionados no diretório.

Agora vamos olhar nossa tabela pela Navicat e ver se os dados foram inseridos.

Prontinho! temos todos os dados inseridos de todos os arquivos do nosso diretório e toda vez que iniciarmos o pipeline ele vai inserir os dados nessa mesma tabela e com a data que os dados foram processados.

Espero que tenham gostado, o Streamsets é uma excelente ferramenta de Ingestão de dados. esse foi apenas um exemplo de como fazer ingestão de arquivos (.csv) mais é possível adaptar esse modelo para a leitura de arquivos  TXT ,JSON, Excell etc.. Caso queiram saber mais sobre o Streamsets da uma olhada aqui https://streamsets.com/

Learn More