Área do cliente

Ingerindo Arquivos CSV com StreamSets e Inserindo em Tabela no PostgreSQL

CSV em postgreSQL tutorial

Ingredientes:

  • OS Linux (ubuntu, fedora, centos, kubuntu, debian, outros)
  • Docker (versão 20.10.2 ou superior)
  • Container PostgreSQL (versão 13.1 ou superior)
  • Container Streamsets Data Collector (versão 3.13.0 ou superior)
  • Navicat (versão 15.0.0 ou superior)
  • Arquivos Tabulares (.csv)

🍲 Modo de Preparo: Container PostgreSQL (versão 13.1 ou superior)

Primeiramente baixe o container pelo terminal:

$ docker pull postgres

Após baixar a imagem podemos ver ela na sessão de imagens do docker:

$ docker images | grep "postgres"

Agora vamos iniciar o serviço do postgres

$ docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres

Vamos usar dessa forma, mais caso você queira mudar a senha fique a vontade. para saber se o container ta operativo escreva. o comando abaixo, ele vai mostrar informações de status, portas etc..

$ docker ps -a --format "table {{.ID}}\t{{.Image}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}"

Agora vamos procurar o endereço de ip desse container para fazermos a conexão via ferramenta de administração de banco de dados. porém é precisa saber o “CONTAINER_ID” ele é o identificador do container em execução. você consegue descobrir essa informação usando o comando anterior. o próximo comando vai mostrar todas as configurações do container em execução.

$ docker inspect CONTAINER_ID

A saida do comando no terminal vai exibir dados no formato .JSON contendo todos os parâmetros do container, localize no final das linhas o IPAddress ou você pode obter de forma rápida usando usando o comando.

$ docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' CONTAINER_ID 

Com o ip em mãos vamos nos conectar usando a ferramenta de administração de banco de dados chamada Navicat porém você pode usar outras ferramentas como: dbeaver, pgadmin ou qualquer outra ferramenta na qual você consiga se conectar ao PostgreSQL. então vamos aos dados da conexão com o banco de dados:

host: ip CONTAINER_ID
port: 5432
user: postgres
password: mysecretpassword

mais caso você queira entrar via linha de comando você deve matar o container em questão e usar o comando:

$ docker run -it --rm --network some-network postgres psql -h some-postgres -U postgres

Dentro da de linha de comando vamos criar nosso usuário, banco de dados e garantir permissão a todos os recursos da instancia. voce pode executar a sequencia de comandos a seguir via linha de comando ou query pelo seu gerenciador favorito.

primeiro vamos criar o nosso banco de dados, vou usar o nome sdc.

CREATE DATABASE sdc;

vou definir o mesmo nome que dei ao banco de dados para o usuário.

CREATE USER sdc WITH ENCRYPTED PASSWORD '102030';

garantindo altos privilégios.

GRANT ALL PRIVILEGES on DATABASE sdc TO sdc;

criando o esquema.

CREATE SCHEMA "streamsets";

Por fim vamos criar nossa tabela, essa é uma etapa muito importante pois vamos preparar a tabela para receber os dados dos arquivos. nessa tabela vamos definir a estrutura da mesma forma como está nos arquivos, com o mesmo nomes de campos e quantidades de colunas. vamos criar todos os campos com o tipo de dados varchar,  vou fazer isso para não ter problemas durante a inserção de dados via StreamSets.

CREATE TABLE "streamsets"."cardata" (
   "Processing_Date" varchar(255),
   "Car_Name" varchar(255),
   "Year" varchar(255),
   "Selling_Price" varchar(255),
   "Present_Price" varchar(255),
   "Kms_Driven" varchar(255),
   "Fuel_Type" varchar(255),
   "Seller_Tyoe" varchar(255),
   "Transmission" varchar(255),
   "Owner" varchar(255)
 );

bom essas são as configurações necessárias nesse container, agora vamos salvar as alterações  nesse container usando o comando commit.

$ docker commit CONTAINER_ID nome:tag

Passamos o CONTAINER_ID e o nome desejado com tag, no meu caso eu coloquei como: docker commit aaeb2b4ef100 postgres-configurado:latest. lembrando que toda alteração que você fizer no container precisa ser comitada para que elas serem permanentes, caso o contrário se você matar o container ou por algum outro motivo ele cair, você perde tudo aqui que alterou dentro dele.

Prontinho agora que fizemos todas as configurações necessárias vamos ao próximo passo.

Modo de Preparo: Container Streamsets Data Collector (versão 3.13.0 ou superior)

Para essa receita vou usar a versão 3.13.0, mas fique a vontade para testar essa receita em outras versões.

$ docker pull streamsets/datacollector:3.13.0-latest

após baixar a imagem podemos ver ela na sessão de imagens do docker:

$ docker images | grep "stre"

feito isso vamos iniciar o serviço do sdc com o comando:

$ docker run --restart on-failure -p 18630:18630 -d --name sdc streamsets/datacollector:3.13.0-latest

esse serviço vai ficar escutando na porta 18630, se você abrir o navegador e digitar localhost:18630 ele vai abrir na tela de login e senha.

usuário: admin
senha: admin

após isso vamos fazer algumas mudanças nesse container para que possamos prosseguir, vamos entrar no modo de linha de comando.

$ docker exec -it CONTAINER_ID /bin/bash

o único arquivo que vamos editar fica em /etc/sdc/form-realm.properties abra ele com o vi, após isso localize a linha que é similar a:

admin:   MD5:21232f297a57a5a743894a0e4a801fc3,user,admin

vamos adicionar mais 2 permissões a esse usuário, pressione shift + i e adicione creator,manager, deve ficar assim:

admin:   MD5:21232f297a57a5a743894a0e4a801fc3,user,admin,creator,manager

Lembrando que estamos alterando um usuário existente que é o Admin, mas caso precise adicionar um usuário com o seu nome e senha podemos disponibilizar esse conhecimento em outro material aqui no blog.

Após as alterações no arquivo /etc/sdc/form-realm.properties pressione esc para sair do modo de inserção, salve os dados inseridos e saia do arquivo usando as teclas shift + : e digite wq. para sair do modo de linha de comando do container pressione a sequência de teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container. após essas alterações no container vamos reiniciar ele para validar as alterações que fizemos.

$ docker restart CONTAINER_ID

após isso vá no navegador e digite: localhost:18630 entre com o usuário e senha: Admin

se tudo deu certo você vai cair na interface de trabalho do streamsets. clique em: CREATE NEW PIPELINE

Coloque um nome de sua preferência para o pipeline e depois clique em Save. agora vamos instalar  o componente responsável pela conexão com o PostgreSQL, clique onde a seleção está na cor vermelha.

após isso vai aparecer uma lista de componentes procure por jdbc na caixa de pesquisa.

1.selecione o checkbox

2.instalação

após clicar em instalar vai aparecer outra opção:

Clique em Restart Data Collector vai demorar uns 5 segundos e após isso vai aparecer a tela de login, faça o login novamente.

E por último vamos criar uma pasta no diretório /opt chamada /opt/data/input é aqui que nossos arquivos vão residir. vamos mais uma vez entrar via linha de comando no container., vamos usar nesse comando a flag –user root ele é necessário para criarmos a pasta no diretório do linux.

$ docker exec -it --user root CONTAINER_ID /bin/bash

Vamos a criação das pastas.

$ mkdir /opt/data && mkdir /opt/data/input

O diretório input vai ser usando para montarmos um volume externo nele com os arquivos (.csv)

após isso vamos sair do modo de linha de comando pressionando as teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container.

vamos gravar as mudanças nesse container usando o comando commit

$ docker commit CONTAINER_ID nome:tag

Após isso vamos matar o container e remover ele da lista de execução

$ docker stop CONTAINER_ID && docker rm CONTAINER_ID

essa ação foi necessário para podermos criar um novo container. usando a flag -v montamos a pasta externa: /home/romerito/Documents/data na pasta do container /opt/data/input lembrando que voce precisa usar a imagem do container comitada anteriormente, lembre que foi nela que fizemos as alterações.

$ docker run --restart on-failure -p 18630:18630 -v /home/romerito/Documents/data:/opt/data/input -d --name sdc nome_imagem_commitada:tag

Vamos saber se a montagem das pastas funcionou, repare que na minha pasta externa tenho 2 arquivos.

agora conecte no container via linha de comando e digite: ls -lha /opt/data/input e verás que esses mesmos arquivos vão aparecer no diretório.

Fazendo a Ingestão dos arquivos

Abra o pipeline que você criou anteriormente e vamos começar, para essa receita vamos trabalhar com o conjunto de dados https://www.kaggle.com/savitanair/cardata é uma lista de nomes de carros, km rodado, ano etc… a estrutura desses arquivos são similares a nossa tabela criada no PostgreSQL. esses arquivos já se encontram na nossa pasta /home/romerito/Documents/data. então vamos lá.

do lado direito do botão Start tem um ícone, clique nele e vai aparecer uma dock com vários componentes, acima no combobox ta selecionado origins mude para All Stages e procure pelo componente Directory digitando na caixa de pesquisa, ao encontrar segure e arraste para o espaço a esquerda

Com esse componente vamos ler os nossos arquivos (.csv) do diretório /opt/data/input muita atenção nessa hora, a seguir vai ter várias imagens com configurações e uma breve explicação de ambas.

Directory: [General] aqui definimos o nome do componente:

Directory: [Files] aqui definimos o diretório dos arquivos, a ordem de leitura e a extensão dos arquivos.

Directory: [Data Format] aqui definimos o tipo de arquivo se é: JSON,TEXT, Excell etc.. No nosso caso selecionamos DELIMITED, definimos também que a configuração vai ignorar as linhas vazias dos arquivos e que eles têm cabeçalho.

É importante lembrar que exibimos apenas as configurações necessárias para essa receita, ao arrastar um novo componente não esquece de conectá-los. você faz isso clicando na bolinha branco do componente e arrastando-a até a bolinha do outro componente, sem essa ligação o fluxo não vai funcionar.

adicione outro componente chamado: Expression Evaluator

vamos usá-lo para criar o campo chamado: /Processing_Date que vai ter como finalidade armazenar a hora em que o arquivo foi processado escreva no componente da mesma forma como está na imagem.

adicione o componente: Field Type Converter vamos usar ele para converter o campo /Processing_Date de Datetime para String.

adicione outro componente chamado: Expression Evaluator vamos usár-lo para modificar os dados do campo /Processing_Date  vamos criar um campo provisório chamado /data e atribuir a ele a data/hora/minuto/segundo advindos do campo /Processing_Date  e depois atribuir os dados do campo /data já modificados para o campo original /Processing_Date.

adicione o componente: Field Remover vamos usár-lo para remover o campo provisório /data

adicione o componente: Field Order vamos usár-lo para deixar os dados na ordem desejada. para adicionar os campos basta você clicar no campo que ele vai mostrar uma lista de campos. você vai clicando e deixando na ordem desejada.

adicione o componente: JDBC Producer vamos usár-lo para pegar os dados tratados e inserir na tabela cardata que criamos anteriormente no PostgreSQL.

JDBC Producer: [JDBC] aqui definimos a String de conexão, onde passamos o IP do nosso container, porta e banco de dados. definimos também o nome do esquema que criamos e o nome da tabela

JDBC Producer: [Credentials] aqui adicionamos o usuário e senha do banco de dados

Se seu pipeline estiver assim:

Você fez tudo certinho, perceba que todas as caixinhas estão conectadas isso é necessário pois cada componente desse é responsável por uma etapa do processo de adequação quando o dado passa por ele. agora vamos debugar o nosso pipeline para ver se as regras aplicadas estão corretas. clique em Preview

ao clicar vai aparecer outra janela, deixa as opções como na janela e click em Run Preview

Vai demorar uns 5 segundos e vai aparecer essa tela.

perceba que o primeiro componente está em azul e abaixo exibe os dados desse componente,  a medida que você vai clicando na setinha selecionada em vermelho os dados vão sendo exibidos de acordo com o componente, repare agora que avançamos para o estágio de ordenação dos dados, os dados abaixo à esquerda ficam em vermelho indicando como os dados eram antes, e verde como estão agora passando pelo componente.

Caso apareça algum erro vai aparecer um ícone em vermelho no componente indicando o número de erros. aparentemente está tudo correto, para sair do modo de debug clique onde está selecionado em vermelho.

Antes de iniciarmos o pipeline para que ele faça o processo completo, vamos adicionar alguns componentes para que quando ele terminar de carregar os dados ele finalize o pipeline, sem essa configuração ele vai ficar rodando para sempre. antes de arrastar os componentes marque a opção Produce Events do componente Directory

adicione o componente: Stream Selector clique e arraste da bolinha (E) do componente Directory até a bolinha do Stream Selector com ele vamos definir algumas regras:

Clicamos no + da caixinha do lado direito e adicionamos uma condição, quando não houver mais dados para processar ele vai fazer uma ligação a outro componente, para isso procure pelo componente Pipeline Finisher Executor  e arraste ele para a área de desenvolvimento e agora clique em cima de (1) do componente Stream Selector  e arraste até a bolinha do componente Pipeline Finisher Executor adicione também um componente chamado Trash, click em cima do número (2) do componente Stream Selector  e arraste até a bolinha do componente Trash, pronto finalizamos assim a configuração e deve ficar assim:

Agora sim vamos executar o nosso pipeline, procure pelo botão Start e clique nele. após isso vai aparecer essa janela.

Perceba nos gráficos que temos o numero 303 esse é o número registros carregados pelo componente Discovery, e temos 303 registros processados e inseridos a tabela cardata do PostgreSQL. assim que todos os dados forem processados o pipeline vai parar a execução. os arquivos processados não serão mais processados da próxima vez pois o streamsets guarda os metadados dos arquivos processados e só processa linhas de arquivos adicionados no diretório.

Agora vamos olhar nossa tabela pela Navicat e ver se os dados foram inseridos.

Prontinho! temos todos os dados inseridos de todos os arquivos do nosso diretório e toda vez que iniciarmos o pipeline ele vai inserir os dados nessa mesma tabela e com a data que os dados foram processados. Caso queiram saber mais sobre o Streamsets da uma olhada aqui https://streamsets.com/

Por Romerito Morais

rox-ball

Ingeringo Arquivos CSV com StreamSets

Espero que tenham gostado, o Streamsets é uma excelente ferramenta de Ingestão de dados. esse foi apenas um exemplo de como fazer ingestão de arquivos (.csv) mais é possível adaptar esse modelo para a leitura de arquivos  TXT ,JSON, Excell etc.. Caso queiram saber mais sobre o Streamsets.

Contato

Conheça a Rox School

Somos especialistas em cuidar dos seus dados, oferecendo soluções inovadoras e parcerias com os maiores nomes da tecnologia para manter você sempre à frente.

Veja os cursos