반응형

출처 : https://bitdatatechie.com/2019/09/13/spark-journal-return-multiple-dataframes-from-a-scala-method/

Spark Journal: Scala 메소드로부터 여러 개의 dataframe을 리턴

지금까지, 저는 Spark에 한해서 글을 남기는 데 집중하였지만, Spark Framework를 사용할 때 사용되는 주요 언어 중 하나는 당신이 알 듯이 Scala입니다. 흥미로운 사용 사례를 보여주기 위해 Spark API와 Scala 언어 모두 사용할 것입니다.

이번 작업은 Scala 메소드로부터 여러 개의 dataframe을 리턴하는 것입니다. Int, String, Dataframe일 수 있는 리턴 값이 있을 때 메소드의 리턴 부분에 1개의 값만으로 이 작업을 해왔습니다.
저의 동료와 Architect는 이를 매우 쉽게 할 수 있는 다른 옵션을 저에게 보여주었고 도움이 되었습니다.

더 읽기 전에 StackOverflow의 이 게시물을 살펴 보는 것이 좋습니다. 이 방법은 Scala에서 List와 Tuple의 개념적 차이를 분명히 하는 데 도움이 됩니다.

접근 1
리턴 값으로 List를 사용

import org.apache.spark.sql.DataFrame

def returMultipleDf  : List[DataFrame] = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")

    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")

    List(df1, df2)

}

val dfList = returMultipleDf 
val dataFrame1 = dfList(0)
val dataFrame2 = dfList(1)

dataFrame2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+

접근 2
리턴 값으로 Tuple을 사용

import org.apache.spark.sql.DataFrame

def returMultipleDf : (DataFrame, DataFrame) = {
    val dataList1 = List((1,"abc"),(2,"def"))
    val df1 = dataList1.toDF("id","Name")

    val dataList2 = List((3,"ghi","home"),(4,"jkl","ctrl"))
    val df2 = dataList2.toDF("id","Name","Type")

    (df1, df2)

}

val (df1, df2) = returMultipleDf


df2.show

+---+----+----+
| id|Name|Type|
+---+----+----+
|  3| ghi|home|
|  4| jkl|ctrl|
+---+----+----+
반응형
반응형

출처 : https://stackoverflow.com/questions/50791975/spark-structured-streaming-multiple-writestreams-to-same-sink

같은 Sink로 여러개의 Spark Structured Streaming WriteStreams하기

Spark Structured Streaming 2.2.1에서 순서대로 같은 데이터베이스 sink로 두 개의 Writestream 하는 것이 안 됩니다. 이 2개의 Writestream이 순서대로 실행하는 방법을 제안해주세요.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

위의 코드에서 사용한대로 deleteSinkUpsertSink 뒤에 실행됩니다.


1개의 답변

만약 당신이 병렬로 두개의 stream을 실행하고 싶으시면 다음을 사용해야 합니다.

sparkSession.streams.awaitAnyTermination()

다음 것 대신에 말이지요.

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

당신 코드의 UpsertSink의 경우 deleteSink가 멈추거나 exception이 발생하지 않으면 시작하지 않을 것입니다. scaladoc에 이런 내용이 나와 있습니다.

exception 발생 또는 query.stop() 또는 this 쿼리의 종료를 기다립니다. exception과 함께 쿼리가 종료되면 exception이 발생될 것입니다. 만약 쿼리가 정상 종료되면, 이 메소드와 모든 후속 호출은 바로 리턴될 것입니다. (쿼리가 stop()에 의해 종료되면) exception이 바로 발생할 것입니다. (쿼리가 exception이 발생하여 종료했다면)

반응형

+ Recent posts