반응형


출처

https://stackoverflow.com/questions/44450889/why-does-spark-shell-fail-to-load-a-file-with-class-with-rdd-imported/44451056


왜 spark-shell은 RDD를 import한 class를 가진 파일을 불러오기를 실패할까요?

저는 Scala 2.11.8로 Spark 2.1.1을 사용합니다.

spark-shell에서 저는 RDD를 메소드로 가지는 클래스를 불러오기 위해 :load명령을 사용합니다.

그 클래스를 불러올 때 컴파일 오류가 나옵니다.

error: not found: type RDD

왜일까요? import 구문이 있습니다.

image

다음은 제가 작업한 코드입니다.

image1


답변

spark-shell에서 :load의 특징인듯 합니다. 해결책은 당신의 클래스 정의하는 부분으로 import org.apache.spark.rdd.RDD(.이나 _ 없이)를 이동하는 것입니다.

이는 RDD클래스에만 국한 된 것이 아니고 모든 클래스에 해당합니다. import문이 클래스 자체 내에 정의되어 있지 않으면 작동하지 않습니다.

말했듯이, 다음은 클래스 밖에 import하고 있기 때문에 작동하지 않을 것입니다.

import org.apache.spark.rdd.RDD
class Hello {
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load hello.scala
Loading hello.scala...
import org.apache.spark.rdd.RDD
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                                    ^
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd

:load의 -v플래그를 사용하여 무슨 일이 발생하는 지 볼 수 있습니다.

scala> :load -v hello.scala
Loading hello.scala...

scala>

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> class Hello {
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                                    ^
<console>:12: error: not found: type RDD
         def get(rdd: RDD[String]): RDD[String] = rdd
                      ^

이는 클래스 정의 안에서 import를 하는 것이 도움이 될 것이라고 생각하게 되었습니다. 그리고 (저는 크게 놀라며) 실제로 그랬습니다!

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load -v hello.scala
Loading hello.scala...

scala> class Hello {
     |   import org.apache.spark.rdd.RDD
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
defined class Hello

또한 :paste명령을 사용하여 클래스를 spark-shell에 붙여 넣을 수 있습니다. 고유한 패키지에 클래스를 정의할 수 있는 raw 모드가 있습니다.

package mypackage

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

scala> :load -v hello.scala
Loading hello.scala...

scala> package mypackage
<console>:1: error: illegal start of definition
package mypackage
^

scala>

scala> class Hello {
     |   import org.apache.spark.rdd.RDD
     |   def get(rdd: RDD[String]): RDD[String] = rdd
     | }
defined class Hello

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

package mypackage

class Hello {
  import org.apache.spark.rdd.RDD
  def get(rdd: RDD[String]): RDD[String] = rdd
}

// Exiting paste mode, now interpreting.
// 붙여넣기 모드를 종료하고 인터프리터 모드입니다.


반응형
반응형

출처

스파크에서 fold action이 왜 필요합니까?

저는 PySpark에서 fold와 reduce에 관한 질문이 있습니다. 이 2개의 메소드의 차이점은 알고 있습니다. 하지만, 둘 다 적용된 함수끼리 교환하여 사용 가능하고 저는 fold가 reduce로 대체될 수 없다는 예시를 알 수 없습니다.

게다가, fold 구현에서 acc = op(obj, acc)가 사용됩니다. 왜 acc = op(acc, obj) 대신에 앞의 연산의 순서가 사용됩니까? (이 두번째 순서는 저에겐 leftFold에 가깝다고 이해됩니다)

토마스가


1개의 답변

빈 RDD

RDD가 비었을 때 그것은 대체될 수 없습니다.

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...

rdd.fold(0)(_ + _)
// Int = 0

당신은 당연히 isEmpty조건과 함께 reduce를 결합하여 사용할 수 있지만 코드는 더 추해집니다.

변경가능한(Mutable) 버퍼

다른 사용 방법은 변경가능(mutable)한 버퍼에 누적하는 것입니다. 다음 RDD가 있다고 생각합시다.

import breeze.linalg.DenseVector

val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

모든 요소의 합계를 원한다고 합시다. 소박한 해결책은 +와 함께 하는 겁니다.

rdd.reduce(_ + _)

불행히도 이는 각 요소에 대한 새로운 벡터를 생성합니다. 객체 생성과 계속되는 garbage collection 때문에 비용이 많이 들며 변경가능한(Mutable) 객체를 사용하는 것이 더 좋습니다. 이는 reduce로는 불가능하지만 (모든 요소의 변경불가능성을 내포하지는 않습니다.) 다음처럼 fold로는 이룰 수 있습니다.

rdd.fold(DenseVector(0))((acc, x) => acc += x)

Zero 요소는 실제 데이터를 변경하지 않고 하나의 파티션 당 버퍼를 초기화 함으로서 여기서 사용될 수 있습니다.

이것이 acc = op(acc, obj) 대신에 acc = op(obj, acc) 연산 순서를 사용하는 이유입니다.

SPARK-6416SPARK-7683 내용도 확인해주세요.

반응형

+ Recent posts