programing

ThreadPoolExecutor().map이 ThreadPoolExecutor().submit와 어떻게 다릅니까?

muds 2023. 5. 16. 23:11
반응형

ThreadPoolExecutor().map이 ThreadPoolExecutor().submit와 어떻게 다릅니까?

제가 작성한 코드 때문에 매우 혼란스러웠을 뿐입니다.저는 다음과 같은 것을 발견하고 놀랐습니다.

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(f, iterable))

그리고.

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    results = list(map(lambda x: executor.submit(f, x), iterable))

다른 결과를 낳습니다.첫 번째 것은 어떤 유형이든 간에f반환, 두 번째는 목록을 생성합니다.concurrent.futures.Future그 다음에 그들의 것으로 평가되어야 하는 객체들.result()그 가치를 얻기 위한 방법.f반환된

저의 주된 우려는 이것이executor.map을 이용할 수 없습니다concurrent.futures.as_completed데이터베이스를 사용할 수 있게 되었을 때 데이터베이스에 대해 장시간 실행된 호출의 결과를 평가하는 매우 편리한 방법인 것 같습니다.

어떻게 해야 할지 전혀 모르겠습니다.concurrent.futures.ThreadPoolExecutor객체는 작동합니다. -- 순진하게도, 저는 (좀 더 장황한) 것을 선호합니다.

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    results = [f.result() for f in futures.as_completed(result_futures)]

간략하게 말하면.executor.map성능 향상의 가능성을 활용하기 위해.제가 그렇게 하는 것이 잘못되었습니까?

문제는 당신이 그 결과를 변형시킨다는 것입니다.ThreadPoolExecutor.map일람표까지이 작업을 수행하지 않고 결과 생성기를 직접 반복하면 결과가 원래 순서대로 생성되지만 모든 결과가 준비되기 전에 루프가 계속됩니다.다음 예를 사용하여 이를 테스트할 수 있습니다.

import time
import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
    print(i)

순서가 유지되는 이유는 때때로 결과를 맵에 지정한 순서대로 가져오는 것이 중요하기 때문일 수 있습니다.결과는 미래의 객체에 포함되지 않을 수도 있습니다. 필요한 경우 모든 결과를 얻기 위해 목록 위에 다른 맵을 추가하는 데 너무 오랜 시간이 걸릴 수도 있기 때문입니다.그리고 대부분의 경우 루프가 첫 번째 값을 처리하기 전에 다음 값이 준비될 가능성이 높습니다.이 예는 다음과 같습니다.

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []

for value in results:
    finals.append(do_some_stuff(value))

이 예에서는 다음과 같은 가능성이 있을 수 있습니다.do_some_stuff시간이 보다 오래 걸립니다.crunch_number이 경우 맵을 쉽게 사용할 수 있는 상태에서 성능이 크게 저하되지 않습니다.

또한 작업자 스레드(/프로세스)는 목록의 시작 부분에서 처리를 시작하고 목록의 끝 부분까지 작업하므로 제출한 결과는 반복자가 이미 제출한 순서대로 완료되어야 합니다.그 말은 대부분의 경우에executor.map그냥 괜찮지만, 어떤 경우에는, 예를 들어, 어떤 순서로 값과 전달한 함수를 처리하든 상관없다면.map데 매우 됩니다.future.as_completed더 빠를 수도 있습니다.

concurrent.futures.as_completed각 함수에 대한 예외를 처리할 수 있습니다.

import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]

def f(x):
    if x == 2:
        raise Exception('x')
    return x

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    result_futures = list(map(lambda x: executor.submit(f, x), iterable))
    # -> using `executor.submit()` **requires** calling
    #      `concurrent.futures.as_completed()` <-
    #
    for future in concurrent.futures.as_completed(result_futures):
        try:
            print('resutl is', future.result())
        except Exception as e:
            print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10

executor.mapfunction에서 .작업자 기능에서 예외를 처리해야 합니다.

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # -> Do not call `concurrent.futures.as_completed()`
    #    when using `executor.map()` <-
    #
    for each in executor.map(f, iterable):
        print(each)
# if there is any exception, executor.map would stop

은 다은의예다의 입니다..submit().map()둘 다 작업을 즉시 수락합니다(제출됨/매핑됨 - 시작).완료하는 데 걸리는 시간은 11초(마지막 결과 시간 - 시작)입니다. 하만지,.submit()의 스레드가 있으면 즉시 결과를 제공합니다.ThreadPoolExecutor maxThreads=2완료됨(순서 없음!).하는 동안에.map()제출된 순서대로 결과를 제공합니다.

import time
import concurrent.futures

def worker(i):
    time.sleep(i)
    return i,time.time()

e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn

f = []
print 'start submit',time.time()
for i in arrIn:
    f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
    print r.result(),time.time()
print

f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
    print r,time.time()    

출력:

[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48

start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49

여기에 있는 답변에 대한 설명 외에도 소스로 바로 이동하는 것이 도움이 될 수 있습니다.이는 다음과 같은 다른 답변에서 나온 성명을 재확인하는 것입니다.

  • .map(), 제출된 순서대로 결과를 제공합니다.
  • 목에 대기반 Future이 순서가 있는 개체는 이 순서를 보장하지 않습니다. 왜냐하면 이것이 속성이기 때문입니다.as_completed()

.map()는 기본 클래스, :에 정의됩니다.

class Executor(object):
    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]  # <!!!!!!!!

        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()  # <!!!!!!!!
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()  # <!!!!!!!!
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

처럼 당이언것처럼급한, 또한신도 ..submit() 즉 하위클에정하는것야어되즉의, .ProcessPoolExecutor및 , 를 반환합니다._base.Future▁라고 불러야 할 .result()실제로 무엇이든 하게 할 수 있습니다.

에서 온 중요한 대사들..map()요약:

fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
    yield fs.pop().result()

.reverse()플러스.pop()를 얻는 평가 결과는 1차 평가 결과에서 나옵니다.iterables첫 두 첫 번째로 산출되는 결과, 두 번째로 산출되는 결과 등이 있습니다.는 결과반의요다음같습다니과소가 .Futures; 그것들은 실제 결과 그 자체입니다.

언급URL : https://stackoverflow.com/questions/20838162/how-does-threadpoolexecutor-map-differ-from-threadpoolexecutor-submit

반응형