parallelize 결과 정확하지 않음. 파티션에 따라 어떻게 나오는지 하려고 임의로 둔 것.

Action

reduce(func)

  • 함수로 RDD를 단일 객체가 될 때까지 줄임
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data_reduce = data.reduce(lambda x, y : x + y) # [6] [15] [34] -> 55
    print(data_reduce) # 55
    

.collect()

  • 데이터 셋 전체를 리스트로 반환
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data.collect() #[1,2,3,4,5,6,7,8,9,10]
    

.count()

  • RDD element 개수
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data.count() # [3] [3] [4] -> 10
    

.first()

  • 처음 1개의 element
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data.first() # 1 ... 1번 파티션의 첫 번째 element driver로 가져옴
    

.take(n)

  • 처음 n개의 elements를 리스트로 반환
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data.take(7) # [1,2,3,4,5,6,7]
    

.takeSample()

  • RDD에서 랜덤한 샘플 뽑아 드라이버로 반환
  • takeSample(withReplacement, num, seed=None)
    • withReplacement : 복원(True), 비복원(False) 뽑기 선택
    • num : sample 크기
    • seed : random seed 값
      data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
      data.takeSample(False,3) # [1, 4, 9]
      

.takeOrdered()

  • RDD에서 작은 순서대로 지정한 개수 리턴. 매우 큰 데이터에서는 sort() 사용하기
    data = sc.parallelize([10,9,8,7,6,5,4,3,2,1], 3) # [10, 9, 8] [7, 6, 5, 4] [3, 2, 1]
    data.takeOredered(3) # [1, 2, 3]
    

.saveAsTextFile(path)

  • 데이터 셋 파티션 하나당 하나의 파일로 저장
    • path : RDD가 저장될 위치
    • local filesystem, HDFS 등
      data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
      data.saveAsTextFile("data")
      

.countByKey()

  • key-value 쌍으로 이루어진 RDD에서 작동. key 기준으로 개수 셈
  • 각 key의 count를 갖는 dictionary 리턴
    data = sc.paralleize([('a',1),('b',2),('c',3),('d',4)])
    data.countByKey() # defaultdict(int, {'a':2, 'b':1, 'c':1})
    

Transformation

Transformation

  • RDD -> RDD로 변경
  • ex) map(), flatmap(), sample(), reduceByKey()

map(func)

  • RDD 각 element에 func을 적용하여 새로운 RDD 리턴
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    data = data.map(lambda x : x + 1)
    data.collect() # [2,3,4,5,6,7,8,9,10,11]
    

filter(func)

  • .filter()는 조건 통과한 값만 출력
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    filtered = data.filter(lambda x : x < 5)
    filtered.collect() # [1, 2, 3, 4]
    

flatMap(func)

  • map()과 비슷. element 하나 받아 0, 1, 또는 복수개 출력 가능
  • map() 하고 flatten
    data = sc.parallelize([1,2,3,4], 1)
    data.map(lambda x : [x, x*x]).collect() # [[1,1],[2,4],[3,9],[4,16]]
    data.flatMap(lambda x : [x, x*x]).collect() # [1, 1, 2, 4, 3, 9, 4, 16]
    

mapPartitions(func)

  • RDD 각 파티션에 대해 map tlfgod
    data = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3) # [1, 2, 3] [4, 5, 6] [7, 8, 9, 10]
    
    def f(x) :
      yield sum(x)
    
    data.mapPartitions(f).collect() # [6, 15, 34]
    

mapValues(func)

  • key-value 쌍 RDD에 대해 값에 대해서만 func 적용
    x = sc.paralleize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
    def f(x) : return len(x)
    x.mapValues(f).collect() # [("a", 3), ("b", 1)]
    

reduceByKey(func, numPartitions)

  • key별로 func tngod
  • numPartitions(Optional) : 결과로 생성되는 RDD 파티션 개수
    data = sc.parallelize([('a',1),('b',2),('c',3),('a',4)])
    print(data.reduceByKey(lambda x, y : x + y).collect()) #[('b',2), ('c',3),('a',5)]
    

Word Count

text = "Spark is fast"
data = text.flatMap(lambda line : (line.split(' ')) # ['spark', 'is', 'fast']
data = data.map(lambda word : (word, 1)) # [('spark',1), ('is',1), ('fast',1)]

추가로 text에 “spark is easy” 라는 문장이 있어 이에 대해서도 위의 함수 실행했다고 한다면

[('spark',1), ('is',1), ('fast',1)] [('spark',1), ('is',1), ('easy',1)]
data = data.reduceByKey(lambda a, b : a+b) # [('spark',2), ('is',2), ('fast',1), ('easy',1)]