반응형

출처 : https://stackoverflow.com/questions/34629313/how-to-measure-the-execution-time-of-a-query-on-spark

Spark에서 쿼리의 실행 시간을 측정하는 방법

저는 Apache spark (Bluemix)에서 쿼리의 실행 시간을 측정할 필요가 있습니다. 제가 시도한 내용입니다.

import time

startTimeQuery = time.clock()
df = sqlContext.sql(query)
df.show()
endTimeQuery = time.clock()
runTimeQuery = endTimeQuery - startTimeQuery

좋은 방법입니까? 제가 얻은 시간은 테이블에서 보았을 때 상대적으로 너무 작은 거 같습니다.


9 개의 답변 중 1 개의 답변

spark-shell(Scala) 에서 이를 하기 위해서, spark.time()를 사용할 수 있습니다..

저의 다른 답변도 보실 수 있습니다 : https://stackoverflow.com/questions/36389019/spark-query-execution-time/50289329#50289329

df = sqlContext.sql(query)
spark.time(df.show())

출력은 다음처럼 나올 것입니다.

+----+----+
|col1|col2|
+----+----+
|val1|val2|
+----+----+
Time taken: xxx ms

관련 글 : 성능 문제 해결을 위한 Apache Spark 워크로드 메트릭 측정.

반응형
반응형

출처

https://data-flair.training/blogs/rdd-lineage/

Spark에서 RDD 계보 정보(리니지): ToDebugString 메소드

1. 목적

기본적으로 Spark에서는 실제 데이터에도 RDD 간의 모든 종속성이 그래프에 기록됩니다. 이것이 우리가 Spark에서 계보 그래프라고 부르는 것입니다. 이 문서는 Spark 논리적 실행 계획의 RDD 계보 개념을 담고 있습니다. 또한 toDebugString 메서드로 RDD 계보 정보 그래프를 얻는 방법을 자세히 알게 될 것입니다. 먼저 Spark RDD에 대해서도 알아 보겠습니다.

2. Spark RDD 소개

Spark RDD는 "Resilient Distributed Dataset"의 약어 입니다. RDD를 Apache Spark의 기본 데이터 구조로 간주할 수 있습니다. 구체적으로 말하면 RDD는 Apache Spark의 변경 불가능한 개체 모음입니다. 이는 클러스터의 다른 노드에서 계산하는 데 도움이 됩니다.
Spark RDD의 이름을 분해 할 때 :

  • 탄력성

이것은 내결함성을 의미합니다. RDD 계보 정보 그래프(DAG)를 사용하여 노드 장애로 인해 누락되거나 손상된 파티션을 다시 계산할 수 있습니다.

  • 분산

데이터가 여러 노드에 있음을 의미합니다.

  • 데이터 세트

작업하는 데이터의 기록일 뿐 입니다. 또한 사용자는 데이터 세트를 외부에서 불러올 수 있습니다. 예를 들어, 특정 데이터 구조가 없는 JDBC를 통한 JSON 파일, CSV 파일, 텍스트 파일 또는 데이터베이스일 수 있습니다.

당신은 Spark dataSet 튜토리얼을 읽어야 합니다.

3. RDD 계보정보(리니지) 소개

기본적으로, RDD의 평가는 자연적으로 게으릅니다. 이는 변환의 시리즈가 RDD에서 수행되지만, 바로 평가되지는 않습니다.
Spark RDD로부터 새로운 RDD를 만드는 동안, 새로운 RDD는 Spark에서 부모 RDD의 포인터를 가져옵니다. 이는 실제 데이터가 아니라 그래프에 기록된 RDD 간의 모든 종속성과 동일합니다. 우리가 계보 그래프라고 부르는 것입니다. RDD 계보는 RDD의 모든 부모 RDD의 그래프일 뿐 입니다. RDD 연산자 그래프 또는 RDD 종속성 그래프라고도 합니다. 구체적으로 말하자면 스파크에 Transformation을 적용한 결과입니다. 그런 다음 논리적 실행 계획을 생성합니다.
또한 실제 실행 계획 또는 실행 DAG를 단계의 DAG라고 합니다.
잘 이해하기 위해 Cartesian 또는 zip을 사용하여 Spark RDD 계보의 한 예부터 시작하겠습니다. 그러나 다른 연산자를 사용하여 Spark에서 RDD 그래프를 작성할 수도 있습니다.

예시

위 그림은 다음과 같은 일련의 Transformation의 결과인 RDD 그래프를 보여줍니다.

Spark의 게으른 평가

val r00 = sc.parallelize(0 to 9)
val r01 = sc.parallelize(0 to 90 by 10)
val r10 = r00 cartesian df01
val r11 = r00.map(n => (n, n))
val r12 = r00 zip df01
val r13 = r01.keyBy(_ / 20)
val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

다른 예시

다음과 같은 RDD val b=a.map()이 있다고 합시다.

RDD b는 부모 RDD a에 대한 참조를 유지해야 합니다. 이것이 RDD 계보 정보(리니지)의 종류입니다.

4. RDD 계보정보(리니지)의 논리적 실행 계획

기본적으로, 논리적 실행 계획은 초기 RDD들과 함께 초기화 됩니다. 초기 RDD는 다른 RDD에 의존하지 않는 RDD 일뿐입니다. 매우 구체적으로 말하자면 이들은 참조 캐시 데이터와 독립적입니다. 또한 실행을 위해 호출된 작업의 결과를 생성하는 RDD로 끝납니다.

Spark 작업을 실행하기 위해 SparkContext가 요청될 때 실행되는 DAG라고도 말할 수 있습니다.

5. Spark에서 RDD 계보정보(리니지) 그래프를 얻기 위한 ToDebugString 메소드

Spark에서 RDD 계보정보(리니지) 그래프를 얻기 위한 몇가지 방법이 있지만, 메소드 중 하나는 toDebugString 메소드 입니다.

toDebugString: String

Spark DStream 살펴보기

기본적으로 이 방법을 사용하여 Spark RDD 계보정보(리니지) 그래프에 대해 배울 수 있습니다.

scala> val wordCount1 = sc.textFile(“README.md”).flatMap(_.split(“\\s+”)).map((_, 1)).reduceByKey(_ + _)
wordCount1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:24
scala> wordCount1.toDebugString
res13: String =
(2) ShuffledRDD[21] at reduceByKey at <console>:24 []
+-(2) MapPartitionsRDD[20] at map at <console>:24 []
|  MapPartitionsRDD[19] at flatMap at <console>:24 []
|  README.md MapPartitionsRDD[18] at textFile at <console>:24 []
|  README.md HadoopRDD[17] at textFile at <console>:24 []

기본적으로 여기에서 괄호() 안의 H는 각 단계에서 병렬 처리 수준을 나타내는 숫자를 나타냅니다.
예를 들어, 위 출력에서 (2) 입니다.

scala> wordCount1.getNumPartitions
res14: Int = 2

toDebugString 메서드는 action을 실행할 때 포함되며 spark.logLineage 속성이 활성화됩니다.

$ ./bin/spark-shell –conf spark.logLineage=true
scala> sc.textFile(“README.md”, 4).count
…
15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25
15/10/17 14:46:42 INFO SparkContext: RDD’s recursive dependencies:
(4) MapPartitionsRDD[1] at textFile at <console>:25 []
|  README.md HadoopRDD[0] at textFile at <console>:25 []

Spark 성능 조정에 대해 읽어 보세요.

그래서 이것은 Spark RDD Lineage Tutorial에 관한 것입니다. 우리의 설명이 마음에 드셨으면 좋겠습니다.

6. 결론

따라서 이 블로그를 통해 Apache Spark RDD 계보정보(리니지) 그래프의 실제 의미를 배웠습니다. 또한 Apache Spark에서 논리적 실행 계획의 풍미를 맛 보았습니다. 그러나 toDebugString 메서드도 자세히 살펴 보았습니다. 또한 Apache Spark RDD에서 모든 계보정보(리니지) 그래프 개념을 다루었습니다.

또한 궁금한 점이 있으시면 댓글란에 문의 해주세요.

Spark를 배우려면 인기 도서를 참조하십시오.

반응형
반응형

출처

https://sparkbyexamples.com/spark/spark-how-to-create-an-empty-dataset/

Spark - 비어있는 Dataset을 만드는 방법

이 글에서, 저는 몇가지 Scala 예시를 사용하여 비어있는 Spark Dataset(emptyDataset())을 스키마가 있고 없고에 따라 만드는 방법을 설명하겠습니다. 우리는 시작하기 전에, 비어있는 Dataset을 만들 필요가 있는 많은 시나리오 중에 하나를 설명하겠습니다.

Spark에서 파일로 작업하는 동안 처리할 파일을 받지 못하는 경우도 있지만, 파일을 받을 때 생성한 데이터 세트와 유사한 (동일한 스키마) 빈 데이터 세트를 생성해야 합니다. 동일한 스키마로 생성하지 않으면 표시되지 않을 수 있는 열을 참조하므로 데이터 세트에 대한 작업 / 변환이 실패합니다.

관련글: Spark 비어있는 DataFrame 생성하기

이와 유사한 상황을 처리하려면 항상 동일한 스키마로 Dataset을 생성해야 합니다. 즉, 파일이 존재하거나 빈 파일 처리에 관계없이 동일한 열 이름과 데이터 유형을 의미합니다.

먼저 예제 전체에서 사용할 SparkSessionSpark StructType 스키마와 case class를 생성해 보겠습니다.

val spark:SparkSession = SparkSession.builder()
   .master("local[1]")
   .appName("SparkByExamples.com")
   .getOrCreate()

import spark.implicits._

val schema = StructType(
    StructField("firstName", StringType, true) ::
      StructField("lastName", IntegerType, false) ::
      StructField("middleName", IntegerType, false) :: Nil)

val colSeq = Seq("firstName","lastName","middleName")
case class Name(firstName: String, lastName: String, middleName:String)

emptyDataset() – 컬럼이 없는 비어있는 Dataset 생성

SparkSession은 스키마 없이 빈 Dataset을 반환하는 emptyDataset() 메서드를 제공하지만 이것은 우리가 원하는 것이 아닙니다. 다음 예제는 스키마로 생성하는 방법을 보여줍니다.

case class Empty()
val ds0 = spark.emptyDataset[Empty]
ds0.printSchema()
// Outputs following
root

emptyDataset() – 스키마로 비어있는 Dataset 생성

아래 예에서는 스키마 (열 이름 및 데이터 type)가 있는 Spark 빈 데이터 세트를 만듭니다.

val ds1=spark.emptyDataset[Name]
ds1.printSchema()
// Outputs following
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- middleName: string (nullable = true)

createDataset() – 스키마로 비어있는 Dataset 생성

SparkSession의 createDataset() 메서드를 사용하여 스키마가 있는 빈 Spark Dataset을 만들 수 있습니다. 아래의 두 번째 예는 먼저 빈 RDD를 생성하고 RDD를 데이터 셋으로 변환하는 방법을 설명합니다.

val ds2=spark.createDataset(Seq.empty[Name])
ds2.printSchema()
val ds3=spark.createDataset(spark.sparkContext.emptyRDD[Name])
ds3.printSchema()
//These both Outputs following
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- middleName: string (nullable = true)

createDataset () – 기본 열 이름으로 빈 Dataset 만들기

val ds4=spark.createDataset(Seq.empty[(String,String,String)])
ds4.printSchema()
// Outputs following
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

암시적인 인코더 사용

암시적인 인코더를 사용하는 다른 방법을 살펴 보겠습니다.

val ds5 = Seq.empty[(String,String,String)].toDS()
ds5.printSchema()
// Outputs following
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

case class 사용

Scala case class에서 원하는 스키마로 빈 데이터 세트를 만들 수도 있습니다.

val ds6 = Seq.empty[Name].toDS()
ds6.printSchema()
// Outputs following
root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- middleName: string (nullable = true)
반응형
반응형

출처

https://stackoverflow.com/questions/24718697/pyspark-drop-rows

PySpark에서 행 버리기

PySpark에서 RDD로부터 행을 어떻게 버릴 수 있을까요? 특별히 첫 번째 행에 제 데이터 셋에 컬럼명이 포함되어 있기 때문입니다. API를 자세히 살펴보면 이 작업을 쉽게 하는 방법을 찾을 수 없는 거 같습니다. 당연히 저는 Bash / HDFS를 통해 이를 할 수 있지만 PySpark로만 이를 할 수 있는 방법을 알고 싶습니다.


6개 답변 중 1개만 추려냄

제가 아는 한 이를 하는 '쉬운' 방법은 없습니다.

그래서 트릭을 수행해야 합니다.

val header = data.first
val rows = data.filter(line => line != header)
반응형
반응형

원본 : http://nbviewer.jupyter.org/github/SDRLurker/TIL/blob/master/python/ipynb/10%EB%B6%84%20%EC%BD%94%EC%95%8C%EB%9D%BC.ipynb

Running Pyspark in Colab

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python.  Follow the steps to install the dependencies:

Colab에서 Pyspark 실행하기

Colab에서 스파크를 실행하려면 먼저 모든 종속성을 Colab 환경에 설치해야 합니다 (예 : Apache Spark 2.3.2 with hadoop 2.7, Java 8 및 Findspark)는 시스템에서 스파크를 찾습니다. 도구 설치는 Colab의 Jupyter 노트북 내에서 수행할 수 있습니다. 한 가지 중요한 참고 사항은 Spark를 처음 사용하는 경우 일부 사람들이 이미 Python과의 호환성 문제에 대해 불평했기 때문에 Spark 2.4.0 버전을 피하는 것이 좋습니다. 다음 단계에 따라 종속성을 설치하십시오.

 

In [2]:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz 
!tar xf spark-2.4.7-bin-hadoop2.7.tgz 
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

이제 Colab에 Spark와 Java를 설치 했으므로 Colab 환경에서 Pyspark를 실행할 수 있는 환경 경로를 설정할 차례입니다. 다음 코드를 실행하여 Java 및 Spark의 위치를 ​​설정합니다.

 

In [5]:

import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

 

Run a local spark session to test your installation:

로컬 스파크 세션을 실행하여 설치를 테스트합니다.

 

In [6]:

import findspark 
findspark.init() 
from pyspark.sql import SparkSession 
spark = SparkSession.builder.master("local[*]").getOrCreate()

10 minutes to Koalas

This is a short introduction to Koalas, geared mainly for new users. This notebook shows you some key differences between pandas and Koalas. You can run this examples by yourself on a live notebook here. For Databricks users, you can import the current .ipynb file and run it after installing Koalas.

Customarily, we import Koalas as follows:

10분 코알라

이것은 주로 신규 사용자를 대상으로 한 Koalas에 대한 짧은 소개입니다. 이 노트북은 pandas와 Koalas의 몇 가지 주요 차이점을 보여줍니다. 여기에서 라이브 노트북에서 직접이 예제를 실행할 수 있습니다. Databricks 사용자의 경우 현재 .ipynb 파일을 가져와 Koalas를 설치 한 후 실행할 수 있습니다.

일반적으로 다음과 같이 Koalas를 가져옵니다.

 

In [8]:

!pip install koalas
Collecting koalas Downloading 
...

 

In [9]:

import pandas as pd 
import numpy as np 
import databricks.koalas as ks 
from pyspark.sql import SparkSession

Object Creation

객체 생성

Creating a Koalas Series by passing a list of values, letting Koalas create a default integer index:

코알라 시리즈를 값의 리스트를 전달함으로써 생성하여, 코알라가 기본 정수 인덱스를 생성하도록 합니다.

In [10]:

s = ks.Series([1, 3, 5, np.nan, 6, 8])

s = ks.Series([1, 3, 5, np.nan, 6, 8])

 

In [11]:

s

 

Out[11]:

0 1.0

1 3.0

2 5.0

3 NaN

4 6.0

5 8.0

dtype: float64

 

Creating a Koalas DataFrame by passing a dict of objects that can be converted to series-like.

시리즈처럼 변환될 수 있는 객체의 dict를 전달함으로써 코알라 데이터 프레임을 생성합니다.

In [12]:

kdf = ks.DataFrame(
	{'a': [1, 2, 3, 4, 5, 6], 
     'b': [100, 200, 300, 400, 500, 600], 
     'c': ["one", "two", "three", "four", "five", "six"]}, 
    index=[10, 20, 30, 40, 50, 60])

In [13]:

kdf

Out[13]:

  a b c
10 1 100 one
20 2 200 two
30 3 300 three
40 4 400 four
50 5 500 five
60 6 600 six

Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

datetime 인덱스와 레이블이 있는 컬럼으로 numpy 배열을 전달함으로써 pandas 데이터프레임을 생성합니다.

 

In [14]:

dates = pd.date_range('20130101', periods=6)

 

In [15]:

dates

Out[15]:

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', '2013-01-05', '2013-01-06'], dtype='datetime64[ns]', freq='D')

 

In [16]:

pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

 

In [17]:

 

pdf

Out[17]:

  A B C D
2013-01-01 0.246792 0.536389 0.292430 -0.593033
2013-01-02 -0.134876 1.100264 -0.311183 0.923779
2013-01-03 0.137727 0.105094 -0.970088 0.584534
2013-01-04 -0.245857 2.213910 1.932763 0.803901
2013-01-05 -0.497545 0.541320 -0.323730 -0.454794
2013-01-06 0.357657 -0.778258 -0.135661 0.905264

Now, this pandas DataFrame can be converted to a Koalas DataFrame

이제, 이 pandas 데이터프레임은 코알라 데이터프레임으로 변환될 수 있습니다.

 

In [18]:

kdf = ks.from_pandas(pdf)

In [19]:

type(kdf)

Out[19]:

databricks.koalas.frame.DataFrame

 

It looks and behaves the same as a pandas DataFrame though

이는 pandas 데이터프레임과 똑같이 보이고 행동합니다

 

In [20]:

kdf

Out[20]:

  A B C D
2013-01-01 0.246792 0.536389 0.292430 -0.593033
2013-01-02 -0.134876 1.100264 -0.311183 0.923779
2013-01-03 0.137727 0.105094 -0.970088 0.584534
2013-01-04 -0.245857 2.213910 1.932763 0.803901
2013-01-05 -0.497545 0.541320 -0.323730 -0.454794
2013-01-06 0.357657 -0.778258 -0.135661 0.905264

Also, it is possible to create a Koalas DataFrame from Spark DataFrame.

Creating a Spark DataFrame from pandas DataFrame

또한, Spark DataFrame으로부터 코알라 데이터프레임을 생성할 수 있습니다.

pandas DataFrame으로 Spark 데이터프레임을 생성합니다.

 

In [21]:

#spark = SparkSession.builder.getOrCreate()

In [22]:

sdf = spark.createDataFrame(pdf)

In [23]:

sdf.show()

 

+--------------------+-------------------+--------------------+--------------------+

|                         A|                       B|                         C|                       D|

+--------------------+-------------------+--------------------+--------------------+

| 0.2467916344312529| 0.5363885661296115| 0.29242981074832786| -0.5930334293597112|

|-0.13487637556398294| 1.1002643172222797|-0.31118252856050166| 0.9237787493823764|

| 0.13772736631889093| 0.105094112056177| -0.9700876227314351| 0.5845338086842855|

|-0.24585721059025922| 2.213909904836645| 1.9327634581838828| 0.8039009110324693|

| -0.4975445167193649| 0.5413197244143908| -0.3237299566752663|-0.45479420585587926|

| 0.35765732299914443|-0.7782577978361066| -0.1356607177712088| 0.9052638419278891|

+--------------------+-------------------+--------------------+--------------------+

 

Creating Koalas DataFrame from Spark DataFrame. to_koalas() is automatically attached to Spark DataFrame and available as an API when Koalas is imported.

Spark 데이터프레임으로부터 코알라 데이터프레임을 생성합니다. to_koalas()는 자동으로 Spark 데이터프레임에 접근하여 Koalas를 가져올 때 API로 사용할 수 있습니다.

 

In [24]:

kdf = sdf.to_koalas()

 

In [25]:

kdf

Out[25]:

  A B C D
0 0.246792 0.536389 0.292430 -0.593033
1 -0.134876 1.100264 -0.311183 0.923779
2 0.137727 0.105094 -0.970088 0.584534
3 -0.245857 2.213910 1.932763 0.803901
4 -0.497545 0.541320 -0.323730 -0.454794
5 0.357657 -0.778258 -0.135661 0.905264

Having specific dtypes . Types that are common to both Spark and pandas are currently supported.

특정 dtypes가 있습니다. 현재 Spark 및 pandas에서 공통적으로 가지는 Type이 지원됩니다.

 

In [26]:

kdf.dtypes

Out[26]:

A float64

B float64

C float64

D float64

dtype: object

Viewing Data

데이터 보기

See the API Reference.

API Reference를 확인하세요.

See the top rows of the frame. The results may not be the same as pandas though: unlike pandas, the data in a Spark dataframe is not ordered, it has no intrinsic notion of index. When asked for the head of a dataframe, Spark will just take the requested number of rows from a partition. Do not rely on it to return specific rows, use .loc or iloc instead.

프레임의 최상단 몇개의 행을 확인합니다. 결과는 pandas와 똑같지 않을 수 있습니다. pandas와는 다르게 Spark 데이터프레임의 데이터는 정렬되지 않으며 인덱스에 대한 본질적인 개념이 없습니다. dataframe의 head를 요청하면 Spark는 파티션으로부터 요청한 행의 개수를 가져(take)옵니다. 특정 행을 반환하는 데 의존하지 않으며 대신 .loc나 .iloc를 사용하세요.

 

In [27]:

kdf.head()

Out[27]:

  A B C D
0 0.246792 0.536389 0.292430 -0.593033
1 -0.134876 1.100264 -0.311183 0.923779
2 0.137727 0.105094 -0.970088 0.584534
3 -0.245857 2.213910 1.932763 0.803901
4 -0.497545 0.541320 -0.323730 -0.454794

 

Display the index, columns, and the underlying numpy data.

You can also retrieve the index; the index column can be ascribed to a DataFrame, see later

인덱스, 열(컬럼), 기본 numpy 데이터를 표시합니다.

인덱스를 받을 수도 있습니다. 인덱스 열은 데이터프레임에 속할 수 있습니다. 나중에 확인해 보겠습니다.

 

In [28]:

kdf.index

Out[28]:

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

 

In [29]:

kdf.columns

Out[29]:

Index(['A', 'B', 'C', 'D'], dtype='object')

 

In [30]:

kdf.to_numpy()

Out[30]:

array([[ 0.24679163, 0.53638857, 0.29242981, -0.59303343],

[-0.13487638, 1.10026432, -0.31118253, 0.92377875],

[ 0.13772737, 0.10509411, -0.97008762, 0.58453381],

[-0.24585721, 2.2139099 , 1.93276346, 0.80390091],

[-0.49754452, 0.54131972, -0.32372996, -0.45479421],

[ 0.35765732, -0.7782578 , -0.13566072, 0.90526384]])

 

Describe shows a quick statistic summary of your data

Describe는 데이터의 빠른 통계 요약도 보여줍니다.

In [31]:

kdf.describe()

Out[31]:

  A B C D
count 6.000000 6.000000 6.000000 6.000000
mean -0.022684 0.619786 0.080755 0.361608
std 0.325851 1.000464 0.994291 0.697821
min -0.497545 -0.778258 -0.970088 -0.593033
25% -0.245857 0.105094 -0.323730 -0.454794
50% -0.134876 0.536389 -0.311183 0.584534
75% 0.246792 1.100264 0.292430 0.905264
max 0.357657 2.213910 1.932763 0.923779

Transposing your data

데이터의 전치행렬도 가능합니다.

 

In [32]:

kdf.T

Out[32]:

  0 1 2 3 4 5
A 0.246792 -0.134876 0.137727 -0.245857 -0.497545 0.357657
B 0.536389 1.100264 0.105094 2.213910 0.541320 -0.778258
C 0.292430 -0.311183 -0.970088 1.932763 -0.323730 -0.135661
D -0.593033 0.923779 0.584534 0.803901 -0.454794 0.905264

Sorting by its index

인덱스를 정렬합니다.

 

In [33]:

kdf.sort_index(ascending=False)

Out[33]:

  A B C D
5 0.357657 -0.778258 -0.135661 0.905264
4 -0.497545 0.541320 -0.323730 -0.454794
3 -0.245857 2.213910 1.932763 0.803901
2 0.137727 0.105094 -0.970088 0.584534
1 -0.134876 1.100264 -0.311183 0.923779
0 0.246792 0.536389 0.292430 -0.593033

Sorting by value

값으로 정렬합니다.

In [34]:

kdf.sort_values(by='B')

 

Out[34]:

  A B C D
5 0.357657 -0.778258 -0.135661 0.905264
4 0.137727 0.105094 -0.970088 0.584534
3 0.246792 0.536389 0.292430 -0.593033
2 -0.497545 0.541320 -0.323730 -0.454794
1 -0.134876 1.100264 -0.311183 0.923779
0 -0.245857 2.213910 1.932763 0.803901

Missing Data

결측치

Koalas primarily uses the value np.nan to represent missing data. It is by default not included in computations.

코알라는 결측치 데이터를 표현하기 위해 np.nan 값을 주로 사용합니다. 기본적으로 계산시 포함되지 않습니다.

 

In [35]:

pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])

 

In [36]:

pdf1.loc[dates[0]:dates[1], 'E'] = 1

 

In [37]:

kdf1 = ks.from_pandas(pdf1)

 

In [38]:

kdf1

Out[38]:

  A B C D E
2013-01-01 0.246792 0.536389 0.292430 -0.593033 1.0
2013-01-02 -0.134876 1.100264 -0.311183 0.923779 1.0
2013-01-03 0.137727 0.105094 -0.970088 0.584534 NaN
2013-01-04 -0.245857 2.213910 1.932763 0.803901 NaN

To drop any rows that have missing data.

결측치를 가진 행을 버립니다.

 

In [39]:

kdf1.dropna(how='any')

Out[39]:

  A B C D E
2013-01-01 0.246792 0.536389 0.292430 -0.593033 1.0
2013-01-02 -0.134876 1.100264 -0.311183 0.923779 1.0

Filling missing data.

결측치를 특정값으로 채웁니다.

 

In [40]:

kdf1.fillna(value=5)

Out[40]:

  A B C D E
2013-01-01 0.246792 0.536389 0.292430 -0.593033 1.0
2013-01-02 -0.134876 1.100264 -0.311183 0.923779 1.0
2013-01-03 0.137727 0.105094 -0.970088 0.584534 5.0
2013-01-04 -0.245857 2.213910 1.932763 0.803901 5.0

Operations

연산

Stats

통계

Operations in general exclude missing data.

Performing a descriptive statistic:

일반적으로 결측치를 제외한 연산을 합니다.

통계치를 묘사하는 연산을 수행합니다.

 

In [41]:

kdf.mean()

Out[41]:

A -0.022684

B 0.619786

C 0.080755

D 0.361608

dtype: float64

Spark Configurations

Various configurations in PySpark could be applied internally in Koalas. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See PySpark Usage Guide for Pandas with Apache Arrow.

Spark 설정

PySpark의 다양한 설정이 코알라 내부적으로 적용될 수 있습니다. 예를 들어 내부 pandas 변환의 속도를 매우 높이기 위해 Arrow 최적화가 가능합니다. Apache Arrow로 Pandas를 위한 PySpark 사용자 가이드를 확인해 주세요.

In [42]:

prev = spark.conf.get("spark.sql.execution.arrow.enabled") # Keep its default value. 기존 값을 유지 
ks.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead. 오버헤드 방지를 위해 기본 index 사용 
import warnings warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations. Arrow 최적화에서 오는 warning 무시하기.

 

In [43]:

spark.conf.set("spark.sql.execution.arrow.enabled", True) 
%timeit ks.range(300000).to_pandas()

The slowest run took 4.29 times longer than the fastest. This could mean that an intermediate result is being cached. 1 loop, best of 3: 286 ms per loop

 

In [44]:

spark.conf.set("spark.sql.execution.arrow.enabled", False) 
%timeit ks.range(300000).to_pandas()

1 loop, best of 3: 1.24 s per loop

 

In [45]:

ks.reset_option("compute.default_index_type") 
spark.conf.set("spark.sql.execution.arrow.enabled", prev) # Set its default value back. 기본 값으로 다시 설정합니다.

Grouping

By “group by” we are referring to a process involving one or more of the following steps:

  • Splitting the data into groups based on some criteria
  • Applying a function to each group independently
  • Combining the results into a data structure

그룹화

“group by”는 다음 단계 중 하나 이상을 포함하는 과정을 의미합니다.

  • 일부 기준에 따라 데이터를 그룹으로 분할
  • 각 그룹에 독립적인 함수 적용
  • 결과를 데이터 구조로 결합

In [46]:

kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'foo'], 'B': ['one', 'one', 'two', 'three', 'two', 'two', 'one', 'three'], 'C': np.random.randn(8), 'D': np.random.randn(8)})

kdf = ks.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'], 
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'], 
                    'C': np.random.randn(8), 
                    'D': np.random.randn(8)})

 

In [47]:

kdf

 

Out[47]:

  A B C D
0 foo one -0.049080 1.047839
1 bar one -0.047054 -0.349258
2 foo two -1.595671 1.756440
3 bar three 2.167124 0.335527
4 foo two -0.939517 0.613638
5 bar two -0.257032 -1.379603
6 foo one -0.446948 1.938402
7 foo three -0.089810 2.017092

Grouping and then applying the sum() function to the resulting groups.

sum() 합계 결과를 적용하고 그룹화합니다.

 

In [48]:

kdf.groupby('A').sum()

Out[48]:


A
C D
bar 1.863037 -1.393334
foo -3.121026 7.373411

 

Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.

여러 열로 그룹화하면 계층적 인덱스가 형성되고 다시 sum 함수를 적용할 수 있습니다.

 

In [49]:

kdf.groupby(['A', 'B']).sum()

Out[49]:


A

B
C D
foo one -0.496027 2.986241
  two -2.535188 2.370078
bar three 2.167124 0.335527
foo three -0.089810 2.017092
bar two -0.257032 -1.379603
  one -0.047054 -0.349258

Plotting

그래프 그리기

See the Plotting docs.

Plotting 문서를 확인하세요.

 

In [50]:

%matplotlib inline 
from matplotlib import pyplot as plt

 

In [51]:

pser = pd.Series(np.random.randn(1000), 
                 index=pd.date_range('1/1/2000', periods=1000))

 

In [52]:

kser = ks.Series(pser)

 

In [53]:

kser = kser.cummax()

 

In [54]:

kser.plot()

Out[54]:

<matplotlib.axes._subplots.AxesSubplot at 0x7f95cfeea2b0>

 

On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:

데이터 프레임에서 plot() 메소드는 레이블이 있는 모든 열을 그리는 데 편리합니다.

 

In [55]:

pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index, 
                   columns=['A', 'B', 'C', 'D'])

 

In [56]:

kdf = ks.from_pandas(pdf)

 

In [57]:

kdf = kdf.cummax()

 

In [58]:

kdf.plot()

Out[58]:

<matplotlib.axes._subplots.AxesSubplot at 0x7f95cdb5e278>

Getting data in/out

데이터 입/출력 하기

See the Input/Output docs.

입/출력 문서를 확인하세요.

CSV

CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.

CSV는 사용하기 쉽고 직관적입니다. CSV 파일을 쓰기 위해서는 여기를 확인 하시고 CSV 파일을 읽기 위해서는 여기를 확인하세요.

 

In [59]:

kdf.to_csv('foo.csv') 
ks.read_csv('foo.csv').head(10)

Out[59]:

  A B C D
0 0.496167 0.716324 0.055572 0.956235
1 0.496167 0.716324 0.055572 0.956235
2 1.188582 0.716324 0.055572 0.956235
3 1.188582 0.763502 1.351446 0.956235
4 1.188582 1.583660 1.351446 2.841457
5 1.188582 1.583660 1.351446 2.841457
6 1.188582 1.583660 1.351446 2.841457
7 1.188582 1.583660 1.351446 2.841457
8 1.188582 1.583660 1.351446 2.841457
9 1.188582 1.583660 1.351446 2.841457

Parquet

Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.

파케이(Parquet)는 더 빠르게 읽고 쓰기 위한 효율적이며 압축된 파일 포멧입니다. 파케이 파일을 쓰기 위해서는 여기를 확인 하시고 파케이 파일을 읽기 위해서는 여기를 확인하세요.

 

In [60]:

kdf.to_parquet('bar.parquet') 
ks.read_parquet('bar.parquet').head(10)

Out[60]:

  A B C D
0 0.496167 0.716324 0.055572 0.956235
1 0.496167 0.716324 0.055572 0.956235
2 1.188582 0.716324 0.055572 0.956235
3 1.188582 0.763502 1.351446 0.956235
4 1.188582 1.583660 1.351446 2.841457
5 1.188582 1.583660 1.351446 2.841457
6 1.188582 1.583660 1.351446 2.841457
7 1.188582 1.583660 1.351446 2.841457
8 1.188582 1.583660 1.351446 2.841457
9 1.188582 1.583660 1.351446 2.841457

Spark IO

In addition, Koalas fully support Spark's various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.

추가적으로 코알라는 ORC나 외부 데이터소스 같은 Spark의 다양한 데이터소스를 완전 지원합니다. 특정 데이터소스로 쓰기 위해서 여기를 확인 하시고 특정 데이터소스로부터 읽기 위해서 여기를 확인해 주세요.

 

In [61]:

kdf.to_spark_io('zoo.orc', format="orc") 
ks.read_spark_io('zoo.orc', format="orc").head(10)

Out[61]:

  A B C D
0 0.496167 0.716324 0.055572 0.956235
1 0.496167 0.716324 0.055572 0.956235
2 1.188582 0.716324 0.055572 0.956235
3 1.188582 0.763502 1.351446 0.956235
4 1.188582 1.583660 1.351446 2.841457
5 1.188582 1.583660 1.351446 2.841457
6 1.188582 1.583660 1.351446 2.841457
7 1.188582 1.583660 1.351446 2.841457
8 1.188582 1.583660 1.351446 2.841457
9 1.188582 1.583660 1.351446 2.841457

In [62]:

!ls -lrt

total 227888

drwxr-xr-x 13 1000 1000 4096 Sep 8 05:48 spark-2.4.7-bin-hadoop2.7

-rw-r--r-- 1 root root 233333392 Sep 8 07:13 spark-2.4.7-bin-hadoop2.7.tgz

drwxr-xr-x 1 root root 4096 Oct 14 16:31 sample_data

drwxr-xr-x 2 root root 4096 Oct 23 07:16 foo.csv

drwxr-xr-x 2 root root 4096 Oct 23 07:18 bar.parquet

drwxr-xr-x 2 root root 4096 Oct 23 07:21 zoo.orc

반응형
반응형

출처 : https://bitdatatechie.com/2019/09/13/spark-journal-return-multiple-dataframes-from-a-scala-method/

Spark Journal: Scala 메소드로부터 여러 개의 dataframe을 리턴

지금까지, 저는 Spark에 한해서 글을 남기는 데 집중하였지만, Spark Framework를 사용할 때 사용되는 주요 언어 중 하나는 당신이 알 듯이 Scala입니다. 흥미로운 사용 사례를 보여주기 위해 Spark API와 Scala 언어 모두 사용할 것입니다.

이번 작업은 Scala 메소드로부터 여러 개의 dataframe을 리턴하는 것입니다. Int, String, Dataframe일 수 있는 리턴 값이 있을 때 메소드의 리턴 부분에 1개의 값만으로 이 작업을 해왔습니다.
저의 동료와 Architect는 이를 매우 쉽게 할 수 있는 다른 옵션을 저에게 보여주었고 도움이 되었습니다.

더 읽기 전에 StackOverflow의 이 게시물을 살펴 보는 것이 좋습니다. 이 방법은 Scala에서 List와 Tuple의 개념적 차이를 분명히 하는 데 도움이 됩니다.

접근 1
리턴 값으로 List를 사용

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")

    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")

    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

접근 2
리턴 값으로 Tuple을 사용

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")

    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")

    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+
반응형
반응형

출처
https://stackoverflow.com/questions/37791685/what-is-the-use-case-for-start-awaittermination-and-stop-with-regard-to-sp

스파크 스트리밍에 관해 start(), awaitTermination(), stop()의 사용 사례는 무엇입니까?

저는 스파크 스트리밍 초보입니다. 저는 터미널에서 데이터를 추출하여 HDFS로 불러오는 하나의 응용프로그램을 개발하고 있습니다. 인터넷에서 찾아 보았지만 스트리밍 응용프로그램을 멈추는 방법을 이해할 수 없었습니다.

또한 sc.awaittermination()과 sc.stop()의 사용 사례를 저에게 설명해 주실 수 있으신가요?

감사합니다.


2개의 답변 중 1개의 답변만 추려냄

streamingContext.awaitTermination() --> 사용자로부터 종료 신호를 기다립니다. 사용자로부터 신호를 받을 때(예시 CTRL+c 또는 SIGTERM) 스트리밍 context는 멈출 것입니다. 이는 java의 shutdownhook 종류입니다.

streamingContext.stop() --> 스트리밍 context를 바로 멈춥니다. 스파크 context에 관해 스트리밍 context에 말할 수 있습니다. 만약 스파크 context가 아니고 스트리밍 context만 멈추기를 원한다면 streamingContext.stop(false)를 호출할 수 있습니다.

반응형
반응형

출처

https://stackoverflow.com/questions/7938585/what-does-param-mean-in-scala


스칼라에서 파라미터:_* 는 무엇을 뜻합니까?

스칼라(2.9.1)로 새로워지면서, List[Event]가 있고 이를 Queue[Event]로 복사하고 싶습니다. 하지만 다음 문법은 대신에 Queue[List[Event]]를 yield 합니다. 

val eventQueue = Queue(events)

이러한 이유로 다음은 작동합니다.

val eventQueue = Queue(events : _*)

하지만 저는 이것이 무엇인지 왜 작동 하는지 이해하고 싶습니다. Queue.apply 함수의 signature를 이미 보았습니다.

def apply[A](elems: A*)

그리고 저는 첫 번째 시도가 왜 작동하지 않는지와 두 번째(_*)의 의미가 무엇인지 이해하고 싶습니다. 이 경우 :와 _*는 무엇입니까? 그리고 왜 apply함수는 Iterable[A]를 취하지 않는 것입니까?

이 발생할 것입니다.

1개의 답변만 발췌

a: A는 타입 선언(ascription)입니다. 스칼라에서 타입 선언(ascriptions)의 목적은 무엇입니까? 라는 글을 보세요.

: _*는 가변 인자 시퀀스로 시퀀스 타입을 하나의 인수로 취급 하겠다고 컴파일러에게 알리는 타입 선언(ascription)의 특별한 인스턴스 입니다.

시퀀스나 iterable한 하나의 요소를 가지는 Queue.apply를 사용한 Queue를 생성하는 것은 완전히 유효합니다. 그래서 이는 하나의 Iterable[A]가 주어질 때 정확히 발생합니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/50791975/spark-structured-streaming-multiple-writestreams-to-same-sink

같은 Sink로 여러개의 Spark Structured Streaming WriteStreams하기

Spark Structured Streaming 2.2.1에서 순서대로 같은 데이터베이스 sink로 두 개의 Writestream 하는 것이 안 됩니다. 이 2개의 Writestream이 순서대로 실행하는 방법을 제안해주세요.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

위의 코드에서 사용한대로 deleteSinkUpsertSink 뒤에 실행됩니다.


1개의 답변

만약 당신이 병렬로 두개의 stream을 실행하고 싶으시면 다음을 사용해야 합니다.

sparkSession.streams.awaitAnyTermination()

다음 것 대신에 말이지요.

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

당신 코드의 UpsertSink의 경우 deleteSink가 멈추거나 exception이 발생하지 않으면 시작하지 않을 것입니다. scaladoc에 이런 내용이 나와 있습니다.

exception 발생 또는 query.stop() 또는 this 쿼리의 종료를 기다립니다. exception과 함께 쿼리가 종료되면 exception이 발생될 것입니다. 만약 쿼리가 정상 종료되면, 이 메소드와 모든 후속 호출은 바로 리턴될 것입니다. (쿼리가 stop()에 의해 종료되면) exception이 바로 발생할 것입니다. (쿼리가 exception이 발생하여 종료했다면)

반응형
반응형


출처

https://stackoverflow.com/questions/44450889/why-does-spark-shell-fail-to-load-a-file-with-class-with-rdd-imported/44451056


왜 spark-shell은 RDD를 import한 class를 가진 파일을 불러오기를 실패할까요?

저는 Scala 2.11.8로 Spark 2.1.1을 사용합니다.

spark-shell에서 저는 RDD를 메소드로 가지는 클래스를 불러오기 위해 :load명령을 사용합니다.

그 클래스를 불러올 때 컴파일 오류가 나옵니다.

error: not found: type RDD

왜일까요? import 구문이 있습니다.

image

다음은 제가 작업한 코드입니다.

image1


답변

spark-shell에서 :load의 특징인듯 합니다. 해결책은 당신의 클래스 정의하는 부분으로 import org.apache.spark.rdd.RDD(.이나 _ 없이)를 이동하는 것입니다.

이는 RDD클래스에만 국한 된 것이 아니고 모든 클래스에 해당합니다. import문이 클래스 자체 내에 정의되어 있지 않으면 작동하지 않습니다.

말했듯이, 다음은 클래스 밖에 import하고 있기 때문에 작동하지 않을 것입니다.

import org.apache.spark.rdd.RDD
class Hello {
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load hello.scala
Loading hello.scala...
import org.apache.spark.rdd.RDD
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                                    ^
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd

:load의 -v플래그를 사용하여 무슨 일이 발생하는 지 볼 수 있습니다.

scala> :load -v hello.scala
Loading hello.scala...

scala>

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> class Hello {
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                                    ^
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                      ^

이는 클래스 정의 안에서 import를 하는 것이 도움이 될 것이라고 생각하게 되었습니다. 그리고 (저는 크게 놀라며) 실제로 그랬습니다!

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load -v hello.scala
Loading hello.scala...

scala> class Hello {
     |   import org.apache.spark.rdd.RDD
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
defined class Hello

또한 :paste명령을 사용하여 클래스를 spark-shell에 붙여 넣을 수 있습니다. 고유한 패키지에 클래스를 정의할 수 있는 raw 모드가 있습니다.

package mypackage

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load -v hello.scala
Loading hello.scala...

scala> package mypackage
<console>:1: error: illegal start of definition
package mypackage
^

scala>

scala> class Hello {
     |   import org.apache.spark.rdd.RDD
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
defined class Hello

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

package mypackage

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

// Exiting paste mode, now interpreting.
// 붙여넣기 모드를 종료하고 인터프리터 모드입니다.


반응형

+ Recent posts