Nesta atribuição, você implementará o cache de resultado do UDF (função definida pelo usuário) no Apache Spark, que é uma estrutura para a computação distribuída no molde do MapReduce. Este projeto ilustrará conceitos-chave em dados de dados e avaliação de consultas, e você terá alguma experiência prática modificando o Spark, que é amplamente utilizado no campo. Além disso, você terá exposição ao Scala, um idioma baseado em JVM que está ganhando popularidade por seu estilo funcional limpo.
A data de vencimento da tarefa é publicada no site da classe.
Você pode concluir isso em pares , se quiser. Por fim, há muito código neste diretório. Por favor, olhe aqui aqui para encontrar o diretório em que o código está localizado.
O Spark é um sistema de computação distribuído de código aberto escrito em Scala. O projeto foi iniciado por Ph.D. Alunos do Amplab e é parte integrante da pilha de análises de dados de Berkeley (BDAs-pronunciada afetuosamente "Bad-Ass").
Como o Hadoop MapReduce, o Spark foi projetado para executar funções sobre grandes coleções de dados, apoiando um conjunto simplificado de operações de processamento de dados de alto nível, semelhantes aos iteradores sobre os quais aprendemos na aula. Um dos usos mais comuns de tais sistemas é implementar o processamento de consultas paralelas em idiomas de alto nível, como o SQL. De fato, muitos esforços recentes de pesquisa e desenvolvimento no Spark foram submetidos a apoiar uma abstração escalável e interativa do banco de dados relacional.
Usaremos, modificando e estudando aspectos da Spark nesta classe para entender os principais conceitos dos sistemas de dados modernos. Mais importante, você verá que as idéias que estamos cobrindo em sala de aula - algumas das quais décadas de idade - ainda são muito relevantes hoje. Especificamente, adicionaremos recursos para Spark SQL.
Uma principal limitação do Spark SQL é que atualmente é um sistema apenas de memória principal. Como parte desta classe, vamos estendê-la para incluir alguns algoritmos fora do núcleo também.
O Scala é um idioma estaticamente tipo que suporta muitos paradigmas de programação diferentes. Sua flexibilidade, poder e portabilidade tornaram-se especialmente úteis na pesquisa de sistemas distribuídos.
O Scala se assemelha a Java, mas possui um conjunto muito mais amplo de recursos de sintaxe para facilitar vários paradigmas. Conhecer o Java o ajudará a entender algum código da Scala, mas não muito disso, e não saber que o Scala o impedirá de aproveitar completamente seu poder expressivo. Como você deve escrever código no Scala, recomendamos fortemente que você adquira pelo menos uma familiaridade passageira com o idioma.
Intellij A idéia tende a ser a IDE mais usada para o desenvolvimento do Spark. Intellij é um Java IDE que possui um plugin Scala (e Vim!). Existem também outras opções, como o Scala-Ide.
Você pode encontrar os seguintes tutoriais para serem úteis:
As funções definidas pelo usuário permitem que os desenvolvedores definam e explorem operações personalizadas dentro das expressões. Imagine, por exemplo, que você tenha um catálogo de produtos que inclua fotos da embalagem do produto. Você pode registrar uma função definida pelo usuário extract_text que chama um algoritmo OCR e retorna o texto em uma imagem, para que você possa obter informações consultáveis das fotos. No SQL, você pode imaginar uma consulta como esta:
SELECT P.name, P.manufacturer, P.price, extract_text(P.image),
FROM Products P;
A capacidade de registrar o UDFS é muito poderosa - ele essencialmente transforma sua estrutura de processamento de dados em uma estrutura de computação distribuída geral. Mas os UDFs geralmente podem introduzir gargalos de desempenho, especialmente quando os executamos em milhões de itens de dados.
Se a (s) coluna (s) de entrada para um UDF contiver muitos valores duplicados, pode ser benéfico melhorar o desempenho, garantindo que o UDF seja chamado apenas uma vez por valor de entrada distinto , em vez de uma vez por linha . (Por exemplo, em nossos produtos Exemplo acima, todas as diferentes configurações de um PC específico podem ter a mesma imagem.) Nesta tarefa, implementaremos essa otimização. Vamos levá-lo em etapas-primeiro o faça funcionar para dados que se encaixam na memória e depois para conjuntos maiores que exigem uma abordagem fora do núcleo. Usaremos o hash externo como técnica para "renderzvous" todas as linhas com os mesmos valores de entrada para o UDF.
Se você estiver interessado no tópico, o artigo a seguir será uma leitura interessante (incluindo otimizações adicionais além do que temos tempo neste trabalho de casa):
Todo o código que você tocará será em três arquivos - CS143Utils.scala , basicOperators.scala e DiskHashedRelation.scala . No entanto, você pode precisar consultar outros arquivos dentro do Spark ou nas APIs gerais de Scala para concluir a tarefa minuciosamente. Certifique -se de examinar todo o código fornecido nos três arquivos mencionados acima antes de começar a escrever seu próprio código. Existem muitas funções úteis no CS143Utils.scala , bem como no DiskHashedRelation.scala que economizarão muito tempo e xingamento - aproveite -os!
Em geral, definimos a maioria (se não todos) dos métodos que você precisará. Como antes, neste projeto, você precisa preencher o esqueleto. A quantidade de código que você escreverá não é muito alta - a solução total da equipe é inferior a 100 linhas de código (sem incluir testes). No entanto, reunir os componentes certos de maneira eficiente em termos de memória (ou seja, não ler toda a relação na memória de uma só vez) exigirá algum pensamento e planejamento cuidadoso.
Existem algumas diferenças potencialmente confusas entre a terminologia que usamos em classe e a terminologia usada na base do código SparkSQL:
O conceito "iterador" que aprendemos na palestra é chamado de "nó" no código SparkSQL - existem definições no código para unilsato e binário. SparkPlan.scala plano de consulta é chamado de plano de faísca e, na verdade nós.
Em alguns dos comentários no SparkSQL, eles também usam o termo "operador" para significar "nó". O arquivo basicOperators.scala define vários nós específicos (por exemplo, classificação, distinta etc.).
Não confunda o Iterator da interface Scala com o conceito de iterador que abordamos na palestra. O Iterator que você usará neste projeto é um recurso de linguagem Scala que você usará para implementar seus nós SparkSQL. Iterator fornece uma interface para as coleções Scala que reforçam uma API específica: as next funções e hasNext .
git e Github git é um sistema de controle de versão , ajudando você a rastrear diferentes versões do seu código, sincronizá -las em diferentes máquinas e colaborar com outras pessoas. O GitHub é um site que suporta esse sistema, hospedando -o como um serviço.
Se você não sabe muito sobre git , recomendamos fortemente que você se familiarize com este sistema; Você vai passar muito tempo com isso! Existem muitos guias para usar git online - aqui está um ótimo para ler.
Você deve primeiro configurar um repositório privado remoto (por exemplo, casa de faísca). O Github fornece repositório privado aos alunos (mas isso pode levar algum tempo). Se você não possui um repositório privado, pense duas vezes em verificar -o no repositório público, pois estará disponível para que outros estejam checheckout.
$ cd ~
Clone seu repositório pessoal. Deve estar vazio.
$ git clone "https://github.com/xx/yy.git"
Digite o repositório clonado, rastreie o repositório do curso e clone -o.
$ cd yy/
$ git remote add course "https://github.com/ariyam/cs143_spark_hw.git"
$ git pull course master
Nota: Por favor, não fique impressionado com a quantidade de código que está aqui. O Spark é um grande projeto com muitos recursos. O código que tocaremos estará contido em um diretório específico: SQL/CORE/SRC/MAIN/SCALA/ORG/APACHE/Spark/SQL/Execution/. Todos os testes estarão contidos em sql/core/src/test/scala/org/apache/spark/sql/execução/
Empurre o clone para o seu repositório pessoal.
$ git push origin master
Toda vez que você adiciona algum código, você pode comprometer as modificações no repositório remoto.
$ git commit -m 'update to homework'
$ git push origin master
Pode ser necessário receber atualizações em nossa tarefa (mesmo que tentemos liberá -las o mais "perfeitamente" possível na primeira vez). Supondo que você configure o rastreamento corretamente, você pode simplesmente executar este seguinte comando para receber atualizações de atribuição:
$ git pull course master
O comando UNIX a seguir será útil, quando você precisar encontrar a localização de um arquivo. Exemplo- Encontre a localização de um arquivo chamado 'diskhashedrelation.scala' no meu repositório atual.
$ find ./ -name 'DiskHashedRelation.scala'
Depois de obter o código puxado, cd em {repo root} e execute make compile . Na primeira vez em que você executa este comando, deve demorar um pouco - sbt baixará todas as dependências e compilará todo o código no Spark (há um pouco de código). Depois que os comandos de montagem inicial terminarem, você poderá iniciar seu projeto! (As compilações futuras não devem demorar tanto - sbt é inteligente o suficiente para recompilar apenas os arquivos alterados, a menos que você seja make clean , o que removerá todos os arquivos de classe compilados.)
Fornecemos o código de esqueleto para DiskHashedRelation.scala . Este arquivo tem 4 coisas importantes:
trait DiskHashedRelation Define a interface de Relação de Discosclass GeneralDiskHashedRelation é nossa implementação do traço de DiskedHashedRelationclass DiskPartition representa uma única partição no discoobject DiskHashedRelation pode ser considerado uma fábrica de objetos que constrói GeneralDiskHashedRelation SDiskPartition e GeneralDiskHashedRelation Primeiro, você precisará implementar os métodos insert , closeInput e getData no DiskPartition para esta parte. Para os dois primeiros, os Docstrings devem fornecer uma descrição abrangente do que você deve implementar. A ressalva com getData é que você não pode ler toda a partição na memória uma vez. A razão pela qual estamos aplicando essa restrição é que não existe uma boa maneira de aplicar a memória libertadora na JVM e, ao transformar dados em diferentes formas, haveria várias cópias por aí. Como tal, ter várias cópias de uma partição inteira faria com que as coisas fossem derramadas no disco e nos deixariam tristes. Em vez disso, você deve transmitir um bloco para a memória por vez.
Neste ponto, você deve passar os testes no DiskPartitionSuite.scala .
object DiskHashedRelation Sua tarefa nesta parte será implementar a fase 1 do hash externo-usando uma função de hash de grão grosso para transmitir uma entrada para várias relações de partição no disco. Para nossos propósitos, o método hashCode que todo objeto possui é suficiente para gerar um valor de hash e levar o módulo pelo número de partições é uma função de hash aceitável.
Nesse ponto, você deve passar todos os testes no DiskHashedRelationSuite.scala .
Nesta seção, estaremos lidando com case class CacheProject em basicOperators.scala . Você pode notar que existem apenas 4 linhas de código nesta classe e, mais importante, não // IMPLEMENT ME . Na verdade, você não precisa escrever nenhum código aqui. No entanto, se você rastrear a chamada de função na linha 66, descobrirá que há duas partes dessa pilha que você deve implementar para ter uma implementação funcional da UDF na memória.
CS143Utils Para esta tarefa, você precisará implementar getUdfFromExpressions e os métodos Iterator no CachingIteratorGenerator#apply . Por favor, leia os Docstrings - especialmente para apply - de perto antes de começar.
Após a implementação desses métodos, você deve passar os testes no CS143UtilsSuite.scala .
Dica: pense cuidadosamente sobre por que esses métodos podem fazer parte dos utilitários
Agora vem o momento da verdade! Implementamos o particionamento de hash baseado em disco e implementamos o cache de UDF na memória-o que às vezes é chamado de memórias. A memórias é uma ferramenta muito poderosa em muitos contextos, mas aqui em bancos de dados-terra, lidamos com quantidades maiores de dados do que a memórias pode lidar. Se tivermos valores mais exclusivos do que se pode caber em um cache na memória, nosso desempenho se degradará rapidamente. Assim, voltamos à tradição dos bancos de dados consagrados pelo tempo de dividir e conquistar. Se nossos dados não se encaixam na memória, podemos particioná -los no disco uma vez, leia uma partição de cada vez (pense no motivo pelo qual isso funciona (dica: rendezvous!)) E execute cache de UDF, avaliando uma partição de cada vez .
PartitionProject Esta tarefa final exige que você preencha a implementação do PartitionProject . Todo o código que você precisará escrever está no método generateIterator . Pense cuidadosamente sobre como você precisa organizar sua implementação. Você não deve buffer todos os dados na memória ou qualquer coisa semelhante a isso.
Neste ponto, você deve passar todos os testes.
Não há código que você deve escrever aqui, mas para sua própria edificação, gaste algum tempo pensando na seguinte pergunta:
Um dos principais pontos de venda da Spark é que ele é "na memória". O que eles querem dizer é o seguinte: Quando você amarra vários trabalhos Hadoop (ou qualquer outra estrutura do MapReduce), o Hadoop escreverá os resultados de cada fase em disco e os lerá novamente, o que é muito caro; Spark, por outro lado, mantém seus dados na memória. No entanto, se nossa suposição é que, se nossos dados não se encaixam na memória, por que o Spark SQL não é enviado com a implementação baseada em disco desses operadores? A esse respeito, por que a Spark é diferente dos bancos de dados relacionais paralelos "tradicionais" sobre os quais aprendemos na aula? Não há resposta certa para esta pergunta!
Fornecemos alguns testes de amostra em DiskPartitionSuite.scala , DiskHasedRelationSuite.scala , CS143UtilsSuite.scala e ProjectSuite.scala . Esses testes podem guiá -lo à medida que você concluir este projeto. No entanto, lembre -se de que eles não são abrangentes e você é bem aconselhado a escrever seus próprios testes para pegar bugs. Felizmente, você pode usar esses testes como modelos para gerar seus próprios testes.
Para executar nossos testes, fornecemos um simples makefile. Para executar os testes para a Tarefa 1, a execução make t1 . Correspondentemente para a tarefa, a execução make t2 e o mesmo para todos os outros testes. make all será executado em todos os testes.
O link de envio será criado no CCLE, onde você poderá enviar seu código até a data de vencimento.
Muito obrigado a Matteo Interlandi.
Boa sorte!