Ingerindo Arquivos CSV com StreamSets e Inserindo em Tabela no PostgreSQL
- Por Romerito Morais
[vc_row][vc_column][vc_column_text]
Ingredientes:
- OS Linux (ubuntu, fedora, centos, kubuntu, debian, outros)
- Docker (versão 20.10.2 ou superior)
- Container PostgreSQL (versão 13.1 ou superior)
- Container Streamsets Data Collector (versão 3.13.0 ou superior)
- Navicat (versão 15.0.0 ou superior)
- Arquivos Tabulares (.csv)
🍲 Modo de Preparo: Container PostgreSQL (versão 13.1 ou superior)
Primeiramente baixe o container pelo terminal:
$ docker pull postgres
Após baixar a imagem podemos ver ela na sessão de imagens do docker:
$ docker images | grep "postgres"
Agora vamos iniciar o serviço do postgres
$ docker run --name some-postgres -e POSTGRES_PASSWORD=mysecretpassword -d postgres
Vamos usar dessa forma, mais caso você queira mudar a senha fique a vontade. para saber se o container ta operativo escreva. o comando abaixo, ele vai mostrar informações de status, portas etc..
$ docker ps -a --format "table {{.ID}}\t{{.Image}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}"
Agora vamos procurar o endereço de ip desse container para fazermos a conexão via ferramenta de administração de banco de dados. porém é precisa saber o “CONTAINER_ID” ele é o identificador do container em execução. você consegue descobrir essa informação usando o comando anterior. o próximo comando vai mostrar todas as configurações do container em execução.
$ docker inspect CONTAINER_ID
A saida do comando no terminal vai exibir dados no formato .JSON contendo todos os parâmetros do container, localize no final das linhas o IPAddress ou você pode obter de forma rápida usando usando o comando.
$ docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' CONTAINER_ID
Com o ip em mãos vamos nos conectar usando a ferramenta de administração de banco de dados chamada Navicat porém você pode usar outras ferramentas como: dbeaver, pgadmin ou qualquer outra ferramenta na qual você consiga se conectar ao PostgreSQL. então vamos aos dados da conexão com o banco de dados:
host: ip CONTAINER_ID
port: 5432
user: postgres
password: mysecretpassword
mais caso você queira entrar via linha de comando você deve matar o container em questão e usar o comando:
$ docker run -it --rm --network some-network postgres psql -h some-postgres -U postgres
Dentro da de linha de comando vamos criar nosso usuário, banco de dados e garantir permissão a todos os recursos da instancia. voce pode executar a sequencia de comandos a seguir via linha de comando ou query pelo seu gerenciador favorito.
primeiro vamos criar o nosso banco de dados, vou usar o nome sdc.
CREATE DATABASE sdc;
vou definir o mesmo nome que dei ao banco de dados para o usuário.
CREATE USER sdc WITH ENCRYPTED PASSWORD '102030';
garantindo altos privilégios.
GRANT ALL PRIVILEGES on DATABASE sdc TO sdc;
criando o esquema.
CREATE SCHEMA "streamsets";
Por fim vamos criar nossa tabela, essa é uma etapa muito importante pois vamos preparar a tabela para receber os dados dos arquivos. nessa tabela vamos definir a estrutura da mesma forma como está nos arquivos, com o mesmo nomes de campos e quantidades de colunas. vamos criar todos os campos com o tipo de dados varchar, vou fazer isso para não ter problemas durante a inserção de dados via StreamSets.
CREATE TABLE "streamsets"."cardata" (
"Processing_Date" varchar(255),
"Car_Name" varchar(255),
"Year" varchar(255),
"Selling_Price" varchar(255),
"Present_Price" varchar(255),
"Kms_Driven" varchar(255),
"Fuel_Type" varchar(255),
"Seller_Tyoe" varchar(255),
"Transmission" varchar(255),
"Owner" varchar(255)
);
bom essas são as configurações necessárias nesse container, agora vamos salvar as alterações nesse container usando o comando commit.
$ docker commit CONTAINER_ID nome:tag
Passamos o CONTAINER_ID e o nome desejado com tag, no meu caso eu coloquei como: docker commit aaeb2b4ef100 postgres-configurado:latest. lembrando que toda alteração que você fizer no container precisa ser comitada para que elas serem permanentes, caso o contrário se você matar o container ou por algum outro motivo ele cair, você perde tudo aqui que alterou dentro dele.
Prontinho agora que fizemos todas as configurações necessárias vamos ao próximo passo.
🍲 Modo de Preparo: Container Streamsets Data Collector (versão 3.13.0 ou superior)
Para essa receita vou usar a versão 3.13.0, mas fique a vontade para testar essa receita em outras versões.
$ docker pull streamsets/datacollector:3.13.0-latest
após baixar a imagem podemos ver ela na sessão de imagens do docker:
$ docker images | grep "stre"
feito isso vamos iniciar o serviço do sdc com o comando:
$ docker run --restart on-failure -p 18630:18630 -d --name sdc streamsets/datacollector:3.13.0-latest
esse serviço vai ficar escutando na porta 18630, se você abrir o navegador e digitar localhost:18630 ele vai abrir na tela de login e senha.
usuário: admin
senha: admin
após isso vamos fazer algumas mudanças nesse container para que possamos prosseguir, vamos entrar no modo de linha de comando.
$ docker exec -it CONTAINER_ID /bin/bash
o único arquivo que vamos editar fica em /etc/sdc/form-realm.properties abra ele com o vi, após isso localize a linha que é similar a:
admin: MD5:21232f297a57a5a743894a0e4a801fc3,user,admin
vamos adicionar mais 2 permissões a esse usuário, pressione shift + i e adicione creator,manager, deve ficar assim:
admin: MD5:21232f297a57a5a743894a0e4a801fc3,user,admin,creator,manager
Lembrando que estamos alterando um usuário existente que é o Admin, mas caso precise adicionar um usuário com o seu nome e senha podemos disponibilizar esse conhecimento em outro material aqui no blog.
Após as alterações no arquivo /etc/sdc/form-realm.properties pressione esc para sair do modo de inserção, salve os dados inseridos e saia do arquivo usando as teclas shift + : e digite wq. para sair do modo de linha de comando do container pressione a sequência de teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container. após essas alterações no container vamos reiniciar ele para validar as alterações que fizemos.
$ docker restart CONTAINER_ID
após isso vá no navegador e digite: localhost:18630 entre com o usuário e senha: Admin
se tudo deu certo você vai cair na interface de trabalho do streamsets. clique em: CREATE NEW PIPELINE
Coloque um nome de sua preferência para o pipeline e depois clique em Save. agora vamos instalar o componente responsável pela conexão com o PostgreSQL, clique onde a seleção está na cor vermelha.
após isso vai aparecer uma lista de componentes procure por jdbc na caixa de pesquisa.
1.selecione o checkbox
2.instalação
após clicar em instalar vai aparecer outra opção:
Clique em Restart Data Collector vai demorar uns 5 segundos e após isso vai aparecer a tela de login, faça o login novamente.
E por último vamos criar uma pasta no diretório /opt chamada /opt/data/input é aqui que nossos arquivos vão residir. vamos mais uma vez entrar via linha de comando no container., vamos usar nesse comando a flag –user root ele é necessário para criarmos a pasta no diretório do linux.
$ docker exec -it --user root CONTAINER_ID /bin/bash
Vamos a criação das pastas.
$ mkdir /opt/data && mkdir /opt/data/input
O diretório input vai ser usando para montarmos um volume externo nele com os arquivos (.csv)
após isso vamos sair do modo de linha de comando pressionando as teclas ctrl + p + q cuidado para não matar o container, se aparecer assim “read escape sequence” quer dizer que você sai com segurança do container.
vamos gravar as mudanças nesse container usando o comando commit
$ docker commit CONTAINER_ID nome:tag
Após isso vamos matar o container e remover ele da lista de execução
$ docker stop CONTAINER_ID && docker rm CONTAINER_ID
essa ação foi necessário para podermos criar um novo container. usando a flag -v montamos a pasta externa: /home/romerito/Documents/data na pasta do container /opt/data/input lembrando que voce precisa usar a imagem do container comitada anteriormente, lembre que foi nela que fizemos as alterações.
$ docker run --restart on-failure -p 18630:18630 -v /home/romerito/Documents/data:/opt/data/input -d --name sdc nome_imagem_commitada:tag
Vamos saber se a montagem das pastas funcionou, repare que na minha pasta externa tenho 2 arquivos.
agora conecte no container via linha de comando e digite: ls -lha /opt/data/input e verás que esses mesmos arquivos vão aparecer no diretório.
Fazendo a Ingestão dos arquivos
Abra o pipeline que você criou anteriormente e vamos começar, para essa receita vamos trabalhar com o conjunto de dados https://www.kaggle.com/savitanair/cardata é uma lista de nomes de carros, km rodado, ano etc… a estrutura desses arquivos são similares a nossa tabela criada no PostgreSQL. esses arquivos já se encontram na nossa pasta /home/romerito/Documents/data. então vamos lá.
do lado direito do botão Start tem um ícone, clique nele e vai aparecer uma dock com vários componentes, acima no combobox ta selecionado origins mude para All Stages e procure pelo componente Directory digitando na caixa de pesquisa, ao encontrar segure e arraste para o espaço a esquerda
Com esse componente vamos ler os nossos arquivos (.csv) do diretório /opt/data/input muita atenção nessa hora, a seguir vai ter várias imagens com configurações e uma breve explicação de ambas.
Directory: [General] aqui definimos o nome do componente:
Directory: [Files] aqui definimos o diretório dos arquivos, a ordem de leitura e a extensão dos arquivos.
Directory: [Data Format] aqui definimos o tipo de arquivo se é: JSON,TEXT, Excell etc.. No nosso caso selecionamos DELIMITED, definimos também que a configuração vai ignorar as linhas vazias dos arquivos e que eles têm cabeçalho.
É importante lembrar que exibimos apenas as configurações necessárias para essa receita, ao arrastar um novo componente não esquece de conectá-los. você faz isso clicando na bolinha branco do componente e arrastando-a até a bolinha do outro componente, sem essa ligação o fluxo não vai funcionar.
adicione outro componente chamado: Expression Evaluator
vamos usá-lo para criar o campo chamado: /Processing_Date que vai ter como finalidade armazenar a hora em que o arquivo foi processado escreva no componente da mesma forma como está na imagem.
adicione o componente: Field Type Converter vamos usar ele para converter o campo /Processing_Date de Datetime para String.
adicione outro componente chamado: Expression Evaluator vamos usár-lo para modificar os dados do campo /Processing_Date vamos criar um campo provisório chamado /data e atribuir a ele a data/hora/minuto/segundo advindos do campo /Processing_Date e depois atribuir os dados do campo /data já modificados para o campo original /Processing_Date.
adicione o componente: Field Remover vamos usár-lo para remover o campo provisório /data
adicione o componente: Field Order vamos usár-lo para deixar os dados na ordem desejada. para adicionar os campos basta você clicar no campo que ele vai mostrar uma lista de campos. você vai clicando e deixando na ordem desejada.
adicione o componente: JDBC Producer vamos usár-lo para pegar os dados tratados e inserir na tabela cardata que criamos anteriormente no PostgreSQL.
JDBC Producer: [JDBC] aqui definimos a String de conexão, onde passamos o IP do nosso container, porta e banco de dados. definimos também o nome do esquema que criamos e o nome da tabela
JDBC Producer: [Credentials] aqui adicionamos o usuário e senha do banco de dados
Se seu pipeline estiver assim:
Você fez tudo certinho, perceba que todas as caixinhas estão conectadas isso é necessário pois cada componente desse é responsável por uma etapa do processo de adequação quando o dado passa por ele. agora vamos debugar o nosso pipeline para ver se as regras aplicadas estão corretas. clique em Preview
ao clicar vai aparecer outra janela, deixa as opções como na janela e click em Run Preview
Vai demorar uns 5 segundos e vai aparecer essa tela.
perceba que o primeiro componente está em azul e abaixo exibe os dados desse componente, a medida que você vai clicando na setinha selecionada em vermelho os dados vão sendo exibidos de acordo com o componente, repare agora que avançamos para o estágio de ordenação dos dados, os dados abaixo à esquerda ficam em vermelho indicando como os dados eram antes, e verde como estão agora passando pelo componente.
Caso apareça algum erro vai aparecer um ícone em vermelho no componente indicando o número de erros. aparentemente está tudo correto, para sair do modo de debug clique onde está selecionado em vermelho.
Antes de iniciarmos o pipeline para que ele faça o processo completo, vamos adicionar alguns componentes para que quando ele terminar de carregar os dados ele finalize o pipeline, sem essa configuração ele vai ficar rodando para sempre. antes de arrastar os componentes marque a opção Produce Events do componente Directory
adicione o componente: Stream Selector clique e arraste da bolinha (E) do componente Directory até a bolinha do Stream Selector com ele vamos definir algumas regras:
Clicamos no + da caixinha do lado direito e adicionamos uma condição, quando não houver mais dados para processar ele vai fazer uma ligação a outro componente, para isso procure pelo componente Pipeline Finisher Executor e arraste ele para a área de desenvolvimento e agora clique em cima de (1) do componente Stream Selector e arraste até a bolinha do componente Pipeline Finisher Executor adicione também um componente chamado Trash, click em cima do número (2) do componente Stream Selector e arraste até a bolinha do componente Trash, pronto finalizamos assim a configuração e deve ficar assim:
Agora sim vamos executar o nosso pipeline, procure pelo botão Start e clique nele. após isso vai aparecer essa janela.
Perceba nos gráficos que temos o numero 303 esse é o número registros carregados pelo componente Discovery, e temos 303 registros processados e inseridos a tabela cardata do PostgreSQL. assim que todos os dados forem processados o pipeline vai parar a execução. os arquivos processados não serão mais processados da próxima vez pois o streamsets guarda os metadados dos arquivos processados e só processa linhas de arquivos adicionados no diretório.
Agora vamos olhar nossa tabela pela Navicat e ver se os dados foram inseridos.
Prontinho! temos todos os dados inseridos de todos os arquivos do nosso diretório e toda vez que iniciarmos o pipeline ele vai inserir os dados nessa mesma tabela e com a data que os dados foram processados.
Espero que tenham gostado, o Streamsets é uma excelente ferramenta de Ingestão de dados. esse foi apenas um exemplo de como fazer ingestão de arquivos (.csv) mais é possível adaptar esse modelo para a leitura de arquivos TXT ,JSON, Excell etc.. Caso queiram saber mais sobre o Streamsets da uma olhada aqui https://streamsets.com/
[/vc_column_text][/vc_column][/vc_row]
Conheça a Rox School
Somos especialistas em cuidar dos seus dados, oferecendo soluções inovadoras e parcerias com os maiores nomes da tecnologia para manter você sempre à frente.