HIVEのMAPPERがINITIALIZINGのまま動かない問題

状況

hiveを実行中、MAPPERのSTATUSがINITIALIZINGのまま全く動かないことがあった。

実行環境

 hive-cli 1.2.1000.2.6.2.0-205

対策

Dynamic Partition Pruningのバグによりこの現象が発生することがあるらしい

https://issues.apache.org/jira/browse/HIVE-10559

対策としてはHiveをバージョンアップしてやるか以下のようにこの機能をオフにしてやるのが良さそう。

set hive.tez.dynamic.partition.pruning=false;

Spark入門(by python)

Sparkとは?

 分散環境で高速に計算を行うための汎用システムです。JavaScalaPython、Rに対してAPIが用意されており、以下のような応用ツールも用意されています。  

  • Shark(Hive on Spark)
  • MLib( 機械学習)
  • GraphX(グラフ処理)
  • Spark Streaming(ストリーム処理)

 Hadoopがファイル経由で分散処理を行うのに対してSparkはオンメモリで分散処理を行えることが特徴となります。また、Sparkは単独で動かすことが出来る他に以下のような既存のcluster managerのもとで動かすことが可能です。

f:id:miruo-myan:20190919153642p:plain
Spark全体像

(図:Pythonで大量データ処理! PySparkを用いたデータ処理と分析のきほん - Speaker Deck

以下ではPythonをベースに説明を行います。

Sparkの使い方

実行

 "spark-submit"コマンドを用いて以下のように実行します。

 $ ./bin/spark-submit main.py

 引数で様々な設定を追記することが出来ます。(参考:Submitting Applications - Spark 2.4.4 Documentation)  

 以下のコマンドで、対話モードで実行を行うことが出来ます。

$ ./bin/pyspark

 

初期化

 実行ファイルにSparkをインポートします。

from pyspark import SparkContext, SparkConf

 次にSparkの最も基本的なクラスである"SparkContext"を初期化します。アプリケーションに関する情報をSparkConfオブジェクトとして渡してやります。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
  • master: クラスタの管理を行うmasterのURLを記述します。
  • appName: アプリケーションの名前(任意)を記述します。この名前がweb UIに表示されます。

 参考)SparkContexの設定:pyspark package — PySpark 2.4.4 documentation

 PySparkシェル(./bin/pyspark)を使用する場合"sc"は既に作られているのでこの操作は不要です。

Sparkの基礎概念

 Sparkにおいて覚えておく概念としてRDD(resilient distributed dataset)とshared variablesがあります。

 各ノードで分割されたデータの集合、パラレルに処理を行うことが可能

  • shared variables

 各ノードで共有して利用される変数(全ノードのメモリに保存される"broadcast variables"と、加算処理だけが可能な"accumulators"がある。)

RDD

 RDDには現在、以下の二つのタイプがあります。

  • parallelized collections

iterable型やコレクション型から以下のように生成されるRDDです。

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
  • External datasets

Hadoopでサポートされているストレージ(HDFS, Cassandra, HBase, Amazon S3, etc.)に格納されているファイル(text files, Sequence files, etc.)から以下のように生成されるRDDです。

distFile = sc.textFile("data.txt")

 これらRDDに次節で定義するOperationをかけてやることによって分散処理が行われます。

RDD Operations

 RDDには並列化可能な処理が複数用意されており、新しくdatasetを作成する"Transformation"と値を返す"Action"の2つに分類されます。  Actionは呼ばれる都度計算されるのに対し、Transformationは作成したdatasetに対してActionが呼ばれた時に初めて計算されます。

 各処理一覧は下記参照

 pyspark package — PySpark 2.4.4 documentation

RDD Persistence

 Transformationは返り値のRDDがActionで呼ばれる度に実行されるので、何回もActionが呼ばれるような場合Transformationの返り値をキャッシュしておくことがSparkに置いて重要になります。  "persist()"、もしくは"cache()"を対象RDDに対して呼び出すことでキャッシュが行われます。  キャッシュには様々なレベルが存在するのでメモリ容量と計算効率のトレードオフを考えて上手く使い分ける必要があるようです。

Shared Variables

 Sparkの関数は通常、各ノードにコピーされた変数に対して個別に処理を行うので、変数の更新を全ノードで共有することが出来ません。そこでSparkでは"broadcast variables"と"shared variables"という2つのタイプの変数を提供しています。

Broadcast Variables

 broadcast variablesは読み取り専用の変数です。通常の変数を全ノードにコピーするよりも効率的にブロードキャスト出来るので、巨大なデータを全ノードで読み取りたい時などに使われます。

以下のように生成することが出来ます。

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

Accumulators

 Accumulatorは加算専用の変数です。各ノードでAccumulatorに加算を行い、呼び出しもとのプログラムでのみ読み取ることが出来ます。

 以下は実行例です。

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

参考資料

spark.apache.org

Pythonで大量データ処理! PySparkを用いたデータ処理と分析のきほん - Speaker Deck