HIVEのMAPPERがINITIALIZINGのまま動かない問題
状況
hiveを実行中、MAPPERのSTATUSがINITIALIZINGのまま全く動かないことがあった。
実行環境
hive-cli 1.2.1000.2.6.2.0-205
対策
Dynamic Partition Pruningのバグによりこの現象が発生することがあるらしい
https://issues.apache.org/jira/browse/HIVE-10559
対策としてはHiveをバージョンアップしてやるか以下のようにこの機能をオフにしてやるのが良さそう。
set hive.tez.dynamic.partition.pruning=false;
Spark入門(by python)
Sparkとは?
分散環境で高速に計算を行うための汎用システムです。Java、Scala、Python、Rに対してAPIが用意されており、以下のような応用ツールも用意されています。
- Shark(Hive on Spark)
- MLib( 機械学習)
- GraphX(グラフ処理)
- Spark Streaming(ストリーム処理)
Hadoopがファイル経由で分散処理を行うのに対してSparkはオンメモリで分散処理を行えることが特徴となります。また、Sparkは単独で動かすことが出来る他に以下のような既存のcluster managerのもとで動かすことが可能です。
- Apache Mesos
- Hadoop YARN
- Kubernets
- Amazon EC2
(図:Pythonで大量データ処理! PySparkを用いたデータ処理と分析のきほん - Speaker Deck)
以下ではPythonをベースに説明を行います。
Sparkの使い方
実行
"spark-submit"コマンドを用いて以下のように実行します。
$ ./bin/spark-submit main.py
引数で様々な設定を追記することが出来ます。(参考:Submitting Applications - Spark 2.4.4 Documentation)
以下のコマンドで、対話モードで実行を行うことが出来ます。
$ ./bin/pyspark
初期化
実行ファイルにSparkをインポートします。
from pyspark import SparkContext, SparkConf
次にSparkの最も基本的なクラスである"SparkContext"を初期化します。アプリケーションに関する情報をSparkConfオブジェクトとして渡してやります。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
- master: クラスタの管理を行うmasterのURLを記述します。
- appName: アプリケーションの名前(任意)を記述します。この名前がweb UIに表示されます。
参考)SparkContexの設定:pyspark package — PySpark 2.4.4 documentation
PySparkシェル(./bin/pyspark)を使用する場合"sc"は既に作られているのでこの操作は不要です。
Sparkの基礎概念
Sparkにおいて覚えておく概念としてRDD(resilient distributed dataset)とshared variablesがあります。
各ノードで分割されたデータの集合、パラレルに処理を行うことが可能
- shared variables
各ノードで共有して利用される変数(全ノードのメモリに保存される"broadcast variables"と、加算処理だけが可能な"accumulators"がある。)
RDD
RDDには現在、以下の二つのタイプがあります。
- parallelized collections
iterable型やコレクション型から以下のように生成されるRDDです。
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
- External datasets
Hadoopでサポートされているストレージ(HDFS, Cassandra, HBase, Amazon S3, etc.)に格納されているファイル(text files, Sequence files, etc.)から以下のように生成されるRDDです。
distFile = sc.textFile("data.txt")
これらRDDに次節で定義するOperationをかけてやることによって分散処理が行われます。
RDD Operations
RDDには並列化可能な処理が複数用意されており、新しくdatasetを作成する"Transformation"と値を返す"Action"の2つに分類されます。 Actionは呼ばれる都度計算されるのに対し、Transformationは作成したdatasetに対してActionが呼ばれた時に初めて計算されます。
各処理一覧は下記参照
pyspark package — PySpark 2.4.4 documentation
RDD Persistence
Transformationは返り値のRDDがActionで呼ばれる度に実行されるので、何回もActionが呼ばれるような場合Transformationの返り値をキャッシュしておくことがSparkに置いて重要になります。 "persist()"、もしくは"cache()"を対象RDDに対して呼び出すことでキャッシュが行われます。 キャッシュには様々なレベルが存在するのでメモリ容量と計算効率のトレードオフを考えて上手く使い分ける必要があるようです。
Shared Variables
Sparkの関数は通常、各ノードにコピーされた変数に対して個別に処理を行うので、変数の更新を全ノードで共有することが出来ません。そこでSparkでは"broadcast variables"と"shared variables"という2つのタイプの変数を提供しています。
Broadcast Variables
broadcast variablesは読み取り専用の変数です。通常の変数を全ノードにコピーするよりも効率的にブロードキャスト出来るので、巨大なデータを全ノードで読み取りたい時などに使われます。
以下のように生成することが出来ます。
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
Accumulators
Accumulatorは加算専用の変数です。各ノードでAccumulatorに加算を行い、呼び出しもとのプログラムでのみ読み取ることが出来ます。
以下は実行例です。
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10
参考資料
Pythonで大量データ処理! PySparkを用いたデータ処理と分析のきほん - Speaker Deck