ThreadPoolExecutor().map이 ThreadPoolExecutor().submit와 어떻게 다릅니까?
제가 작성한 코드 때문에 매우 혼란스러웠을 뿐입니다.저는 다음과 같은 것을 발견하고 놀랐습니다.
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(f, iterable))
그리고.
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(map(lambda x: executor.submit(f, x), iterable))
다른 결과를 낳습니다.첫 번째 것은 어떤 유형이든 간에f
반환, 두 번째는 목록을 생성합니다.concurrent.futures.Future
그 다음에 그들의 것으로 평가되어야 하는 객체들.result()
그 가치를 얻기 위한 방법.f
반환된
저의 주된 우려는 이것이executor.map
을 이용할 수 없습니다concurrent.futures.as_completed
데이터베이스를 사용할 수 있게 되었을 때 데이터베이스에 대해 장시간 실행된 호출의 결과를 평가하는 매우 편리한 방법인 것 같습니다.
어떻게 해야 할지 전혀 모르겠습니다.concurrent.futures.ThreadPoolExecutor
객체는 작동합니다. -- 순진하게도, 저는 (좀 더 장황한) 것을 선호합니다.
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
results = [f.result() for f in futures.as_completed(result_futures)]
간략하게 말하면.executor.map
성능 향상의 가능성을 활용하기 위해.제가 그렇게 하는 것이 잘못되었습니까?
문제는 당신이 그 결과를 변형시킨다는 것입니다.ThreadPoolExecutor.map
일람표까지이 작업을 수행하지 않고 결과 생성기를 직접 반복하면 결과가 원래 순서대로 생성되지만 모든 결과가 준비되기 전에 루프가 계속됩니다.다음 예를 사용하여 이를 테스트할 수 있습니다.
import time
import concurrent.futures
e = concurrent.futures.ThreadPoolExecutor(4)
s = range(10)
for i in e.map(time.sleep, s):
print(i)
순서가 유지되는 이유는 때때로 결과를 맵에 지정한 순서대로 가져오는 것이 중요하기 때문일 수 있습니다.결과는 미래의 객체에 포함되지 않을 수도 있습니다. 필요한 경우 모든 결과를 얻기 위해 목록 위에 다른 맵을 추가하는 데 너무 오랜 시간이 걸릴 수도 있기 때문입니다.그리고 대부분의 경우 루프가 첫 번째 값을 처리하기 전에 다음 값이 준비될 가능성이 높습니다.이 예는 다음과 같습니다.
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor() # Or ProcessPoolExecutor
data = some_huge_list()
results = executor.map(crunch_number, data)
finals = []
for value in results:
finals.append(do_some_stuff(value))
이 예에서는 다음과 같은 가능성이 있을 수 있습니다.do_some_stuff
시간이 보다 오래 걸립니다.crunch_number
이 경우 맵을 쉽게 사용할 수 있는 상태에서 성능이 크게 저하되지 않습니다.
또한 작업자 스레드(/프로세스)는 목록의 시작 부분에서 처리를 시작하고 목록의 끝 부분까지 작업하므로 제출한 결과는 반복자가 이미 제출한 순서대로 완료되어야 합니다.그 말은 대부분의 경우에executor.map
그냥 괜찮지만, 어떤 경우에는, 예를 들어, 어떤 순서로 값과 전달한 함수를 처리하든 상관없다면.map
데 매우 됩니다.future.as_completed
더 빠를 수도 있습니다.
을 concurrent.futures.as_completed
각 함수에 대한 예외를 처리할 수 있습니다.
import concurrent.futures
iterable = [1,2,3,4,6,7,8,9,10]
def f(x):
if x == 2:
raise Exception('x')
return x
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result_futures = list(map(lambda x: executor.submit(f, x), iterable))
# -> using `executor.submit()` **requires** calling
# `concurrent.futures.as_completed()` <-
#
for future in concurrent.futures.as_completed(result_futures):
try:
print('resutl is', future.result())
except Exception as e:
print('e is', e, type(e))
# resutl is 3
# resutl is 1
# resutl is 4
# e is x <class 'Exception'>
# resutl is 6
# resutl is 7
# resutl is 8
# resutl is 9
# resutl is 10
executor.map
function에서 .작업자 기능에서 예외를 처리해야 합니다.
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# -> Do not call `concurrent.futures.as_completed()`
# when using `executor.map()` <-
#
for each in executor.map(f, iterable):
print(each)
# if there is any exception, executor.map would stop
은 다은의예다의 입니다..submit()
대.map()
둘 다 작업을 즉시 수락합니다(제출됨/매핑됨 - 시작).완료하는 데 걸리는 시간은 11초(마지막 결과 시간 - 시작)입니다. 하만지,.submit()
의 스레드가 있으면 즉시 결과를 제공합니다.ThreadPoolExecutor
maxThreads=2
완료됨(순서 없음!).하는 동안에.map()
제출된 순서대로 결과를 제공합니다.
import time
import concurrent.futures
def worker(i):
time.sleep(i)
return i,time.time()
e = concurrent.futures.ThreadPoolExecutor(2)
arrIn = range(1,7)[::-1]
print arrIn
f = []
print 'start submit',time.time()
for i in arrIn:
f.append(e.submit(worker,i))
print 'submitted',time.time()
for r in concurrent.futures.as_completed(f):
print r.result(),time.time()
print
f = []
print 'start map',time.time()
f = e.map(worker,arrIn)
print 'mapped',time.time()
for r in f:
print r,time.time()
출력:
[6, 5, 4, 3, 2, 1]
start submit 1543473934.47
submitted 1543473934.47
(5, 1543473939.473743) 1543473939.47
(6, 1543473940.471591) 1543473940.47
(3, 1543473943.473639) 1543473943.47
(4, 1543473943.474192) 1543473943.47
(1, 1543473944.474617) 1543473944.47
(2, 1543473945.477609) 1543473945.48
start map 1543473945.48
mapped 1543473945.48
(6, 1543473951.483908) 1543473951.48
(5, 1543473950.484109) 1543473951.48
(4, 1543473954.48858) 1543473954.49
(3, 1543473954.488384) 1543473954.49
(2, 1543473956.493789) 1543473956.49
(1, 1543473955.493888) 1543473956.49
여기에 있는 답변에 대한 설명 외에도 소스로 바로 이동하는 것이 도움이 될 수 있습니다.이는 다음과 같은 다른 답변에서 나온 성명을 재확인하는 것입니다.
.map()
, 제출된 순서대로 결과를 제공합니다.- 목에 대기반
Future
이 순서가 있는 개체는 이 순서를 보장하지 않습니다. 왜냐하면 이것이 속성이기 때문입니다.as_completed()
.map()
는 기본 클래스, :에 정의됩니다.
class Executor(object):
def submit(self, fn, *args, **kwargs):
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)] # <!!!!!!!!
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse() # <!!!!!!!!
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result() # <!!!!!!!!
else:
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
처럼 당이언것처럼급한, 또한신도 ..submit()
즉 하위클에정하는것야어되즉의, .ProcessPoolExecutor
및 , 를 반환합니다._base.Future
▁라고 불러야 할 .result()
실제로 무엇이든 하게 할 수 있습니다.
에서 온 중요한 대사들..map()
요약:
fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs.reverse()
while fs:
yield fs.pop().result()
그.reverse()
플러스.pop()
를 얻는 평가 결과는 1차 평가 결과에서 나옵니다.iterables
첫 두 첫 번째로 산출되는 결과, 두 번째로 산출되는 결과 등이 있습니다.는 결과반의요다음같습다니과소가 .Future
s; 그것들은 실제 결과 그 자체입니다.
언급URL : https://stackoverflow.com/questions/20838162/how-does-threadpoolexecutor-map-differ-from-threadpoolexecutor-submit
'programing' 카테고리의 다른 글
Python 3의 문자열 형식 (0) | 2023.05.16 |
---|---|
Swift: '#if DEBUG'와 같은 PRECPURG 플래그를 사용하여 API 키를 구현하는 방법은 무엇입니까? (0) | 2023.05.16 |
WP7 응용 프로그램 모음 아이콘이 시뮬레이터에 표시되지 않음(단, 블렌드에서는 작동함) (0) | 2023.05.16 |
Git에서 손실된 커밋을 복구하려면 어떻게 해야 합니까? (0) | 2023.05.16 |
Git에서 현재 분기만 표시 (0) | 2023.05.16 |