반응형

출처 : https://stackoverflow.com/questions/30506489/python-multiprocessing-leading-to-many-zombie-processes/32442923

많은 좀비 프로세스를 이끄는 파이썬 멀티프로세싱

저는 worker의 pool을 사용하여 파이썬의 멀티프로세싱 라이브러리를 구현하고 있습니다. 저는 다음 코드를 구현하였습니다.

import main1
t1 = time.time()
p = Pool(cores) 
result = p.map(main1, client_list[client])
if result == []:
    return []
p.close()
p.join()
print "Time taken in performing request:: ", time.time()-t1
return shorted(result)

하지만, 프로세스를 실행한 후에 저의 응용프로그램에 백그라운드 프로세스가 많이 나왔습니다. 저의 응용프로그램에 ps aux를 실행한 후에 스냅샷입니다.

Stackoverflow에서 멀티프로세싱 모듈에 의해 생성된 좀비 프로세스를 죽이는 방법 같은 비슷한 질문을 많이 읽었습니다. 이미 구현한 .join()을 사용해야 하며 여기에서 Python Multiprocessing Kill Processes에서 이러한 모든 프로세스를 종료하는 방법을 배웠습니다. 그러나 제 코드에 무엇이 잘못 될 수 있는지 알고 싶습니다. main1 함수에서 모든 코드를 공유할 수는 없지만 main 코드의 오류로 인해 좀비 프로세스가 발생할 수 있는 경우를 피하기 위해 try catch 블록에 전체 코드 블록을 넣었습니다.

def main1((param1, param2, param3)):
    try:
       resout.append(some_data) # 오류가 없는 경우 resout
    except:
        print traceback.format_exc()
        resout = []  # 오류가 발생한 경우 비어 있는 resout 리턴
    return resout

저는 병렬 프로그래밍 및 디버깅 문제라는 개념에 대해 아직 매우 신입이고 매우 까다롭습니다. 어떤 도움이든 매우 감사하겠습니다.


1개의 답변

보통 가장 보편적인 문제는 pool이 생성되었지만 닫지 않는 것입니다.

제가 알고있는 최고의 방법은 try/finally 구문을 사용하여 pool이 닫히도록 보장하는 것 입니다.

try:
    pool = Pool(ncores)
    pool.map(yourfunction, arguments)
finally:
    pool.close()
    pool.join()

multiprocessing과 싸우고 싶지 않다면, 저는 제 인생(잠재적으로 당신의 인생도)을 더 쉽게 멀티프로세싱을 wrapping 한 parmap이라 불리는 간단한 패키지를 작성하였습니다.

pip install parmap

import parmap
parmap.map(yourfunction, arguments)

여기서부터는 parmap 사용법 입니다.

  • 간단한 병렬처리 예시:
import parmap
y1 = [myfunction(x, argument1, argument2) for x in mylist]
y2 = parmap.map(myfunction, mylist, argument1, argument2)
y1 == y2
  • 튜플의 리스트를 순회하기
# 당신이 원하는 것:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
z = parmap.starmap(myfunction, mylist, argument1, argument2)


# 당신이 원하는 것:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
    listz.append(myfunction(x, y, param1, param2))
# 병렬로 실행:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
반응형
반응형

출처 : https://stackoverflow.com/questions/40286841/what-is-the-best-way-to-raise-request-errors

저는 다음 방법으로 나쁜 요청(requests)에 대해 오류를 발생하는 소스 코드를 읽고 있습니다.

import requests

response = requests.get("www.google.com")   # http:// 가 없기 때문에 작동하지 않습니다.
if response.ok is False or response.json()['status'] != 'success':
    raise Exception("API error: {}".format(response.json()['message']))

저는 마지막 2줄을 다음 구문으로 대체해야겠다고 생각하였습니다.

response.raise_for_status()

오류가 리턴한 내용에서 차이점을 발견하지 못했습니다. 2가지 모두 다음과 같이 나왔습니다.

Traceback (most recent call last):
  File "/home/kurt/Documents/Scratch/requests_test.py", line 3, in <module>
    response = requests.get("www.google.com")   # This won't work because it's missing the http://
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/api.py", line 69, in get
    return request('get', url, params=params, **kwargs)
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/api.py", line 50, in request
    response = session.request(method=method, url=url, **kwargs)
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/sessions.py", line 451, in request
    prep = self.prepare_request(req)
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/sessions.py", line 382, in prepare_request
    hooks=merge_hooks(request.hooks, self.hooks),
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/models.py", line 304, in prepare
    self.prepare_url(url, params)
  File "/home/kurt/.local/lib/python2.7/site-packages/requests/models.py", line 362, in prepare_url
    to_native_string(url, 'utf8')))
requests.exceptions.MissingSchema: Invalid URL 'www.google.com': No schema supplied. Perhaps you meant http://www.google.com?

raise_for_status()는 더 간결하고 아마도 원래 예외에 대한 정보를 잃지 않는 것 같습니다 (참조. "except Exception"대 파이썬에서 "except ... raise" 사용). 이것이 실제로 더 나은 접근법일까요?


1개의 답변

response.raise_for_status()는 응답의 상태 코드가 200이 아닌 응답일 때만 예외(Exception)를 발생합니다. 두 번째 경우인 response.json()['status'] != 'success'가 만족할 경우, 커버하지 않습니다.

하지만 다른 오류가 있습니다. requests.get() 호출에 의해 예외가 발생하므로 if 테스트에 도달하지 않습니다. 스키마를 전달하지 못했습니다 (문자열 앞에 http:// 또는 https:// 없음). requests.get() 표현식에서 예외가 발생하므로 다음 행은 절대로 실행되지 않습니다. 요청도 전송되지 않으므로 응답에 대한 assertion도 만들 수 없습니다.

테스트는 더 많은 문제가 있습니다.

  • requests.ok is False 관용적인 파이썬 표현이 아닙니다. 대신에 not requests.ok를 사용하는 것이 좋습니다.
  • 만약 requests.ok is False를 만족하면, requests.json()은 대부분 실패할 것입니다. 그래서 response.json()['message']를 사용한 다음 줄은 다른 오류를 발생할 것입니다.
반응형
반응형

출처 : https://stackoverflow.com/questions/3428536/python-list-subtraction-operation

파이썬 리스트 차집합 연산

저는 다음과 비슷하게 뭔가 하고 싶습니다.

>>> x = [1,2,3,4,5,6,7,8,9,0]  
>>> x  
[1, 2, 3, 4, 5, 6, 7, 8, 9, 0]  
>>> y = [1,3,5,7,9]  
>>> y  
[1, 3, 5, 7, 9]  
>>> y - x   # ([2,4,6,8,0]를 리턴해야 합니다.)

이 방법은 파이썬 리스트에서 지원되지 않습니다. 이를 할 수 있는 최고의 방법은 무엇입니까?

12개의 답변 중 2개의 답변만 추려냄

리스트 내포(comprehension)을 사용합니다.

[item for item in x if item not in y]

만약 - 중위 연산 문법을 사용하고 싶으면 다음처럼 할 수 있습니다.

class MyList(list):
    def __init__(self, *args):
        super(MyList, self).__init__(args)

    def __sub__(self, other):
        return self.__class__(*[item for item in self if item not in other])

다음처럼 사용할 수 있습니다.

x = MyList(1, 2, 3, 4)
y = MyList(2, 5, 2)
z = x - y

하지만 만약 리스트 속성이 절대적으로 필요하지 않다면(예시, 순서), 다른 답변처럼 set(집합)을 사용하는 것을 추천합니다.


set difference 를 사용합니다.

>>> z = list(set(x) - set(y))
>>> z
[0, 8, 2, 4, 6]

또는 x와 y를 집합으로 설정하여 (리스트로) 변환을 수행할 필요가 없습니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/2719017/how-to-set-timeout-on-pythons-socket-recv-method

파이썬 소켓 수신 메소드에서 timeout을 정하는 방법?

저는 파이썬의 소켓 수신 메소드에서 timeout을 정하고 싶습니다. 어떻게 할 수 있을까요?

11개의 답변 중 1개의 답변만 추려냄

일반적인 접근은 timeout이 발생할 때까지 데이터가 접근 가능할 때까지 select()를 사용하는 것입니다. 데이터가 실제로 가능할 때만 recv()를 호출합니다. 안전을 위해, 우리는 recv가 무한으로 block하지 않는 것을 보장하기 위해 non-blocking 모드로 소켓을 설정할 수 있습니다. select()는 한번에 하나 이상의 소켓을 기다리도록 하는데 사용될 수 있습니다.

import select

mysocket.setblocking(0)

ready = select.select([mysocket], [], [], timeout_in_seconds)
if ready[0]:
    data = mysocket.recv(4096)

만약 많은 열려있는 파일 디스크립터가 있다면, poll()select()보다 더 효율적인 대안입니다.

다른 옵션은 socket.settimeout()을 사용하여 소켓의 모든 연산에 timeout을 설정하는 것입니다. 하지만 당신은 다른 답변에서 해당 솔루션을 명시적으로 거부했던 것을 확인하였습니다.

반응형
반응형

출처 : https://web.archive.org/web/20210404233204/https://ongspxm.gitlab.io/blog/2016/11/assertraises-testing-for-errors-in-unittest/

assertRaises - unittest에서 오류 테스트 하기

이 글에서, 저는 파이썬의 빌트인 unittest 모듈을 사용하였습니다.

갑자기 프레임워크에 대한 (잘못된 데이터가 입력되면 예외가 발생해야 하는) 새로운 요구가 생겼을 때 저는 제 프로젝트에서 몇 가지 테스트 작업을 하고 있었습니다.

저는 몇가지를 검색하였고 제가 필요한 것 - TestCase.assertRaises 같은 함수를 찾았습니다.

다른 모든 assert 함수가 unittest에서 사용되는 것과 동일한 형식으로 시도했습니다. 그래서 저는 다음과 같이 작성하였습니다.

import unittest

def func():
    raise Exception('lets see if this works')

class ExampleTest(unittest.TestCase):
    def test_error(self):
        self.assertRaises(func(), Exception)

if __name__=='__main__':
    unittest.main()

논리적으로 보입니다. 그렇지 않습니까? 글쎄요. 나는 그것을 실행하려고 시도했고 테스트는 오류로 판명되었습니다.

예, 오류가 있다는 것을 알고 있습니다. 그것이 제가 테스트하고 있는 것입니다. 그렇지요? 왜 이런 일이 일어나고 있는지에 대해 혼란스러워서 파이썬 문서와 stackOverflow를 찾아 답을 찾았습니다.

lambda 해결책

다음은 제가 검색하여 찾은 가장 일반적인 해결책입니다.

import unittest

def func():
    raise Exception('lets see if this works')

class ExampleTest(unittest.TestCase):
    def test_error(self):
        self.assertRaises(Exception, lambda:func())

if __name__=='__main__':
    unittest.main()

제가 좋아하는 해결책

하지만 lambda는 그 자체로 전부 새로운 괴물입니다. 함수에 대한 wrapper로 사용하기 위해 올바른 도구는 아닌 듯 합니다.

다음은 assertRaise를 사용하는 매우 더 적절한 파이썬스러운 방법입니다.

import unittest

def func():
    raise Exception('lets see if this works')

class ExampleTest(unittest.TestCase):
    def test_error(self):
        with self.assertRaises(Exception): func()

if __name__=='__main__':
    unittest.main()

왜 작동하지 않았을까요?

분명히 assertRaises가 작동하려면 예외를 잡기 위해 함수 호출이 일종의 wrapper 내에 포함되어야 합니다.

첫 번째 예제처럼 실행하면, 그 함수는 스스로 실행될 것이며 그 예외는 잡히지 않고 test case는 대신에 오류가 발생하게 됩니다.

lambda 메소드를 사용하면, lambda 함수는 함수가 실행될 wrapper로 작동하기 때문에 그 예외는 잡히게 되며 비교가 발생합니다.

"with 구문"도 똑같습니다. "with 구문"으로 함수 호출을 wrapping하면, 그 예외는 구문에서 "assertRaises"로 전달될 것입니다. "assertRaises" 함수는 이제 실행될 예외와 비교할 수 있으며 그래서 유용한 결과를 얻습니다.

성공적인 테스트 실행

더 기술적인 세부사항에 관심 있으시면, unittest 소스 코드를 살펴보세요 (소스 코드는 언제든지 가능하며, 파이썬이 짱인 이유입니다.)

다른 문제가 있으시면 저에게 세부사항에 대한 이메일이나 댓글 남겨주세요.

반응형
반응형

출처 : https://towardsdatascience.com/3-ways-to-load-csv-files-into-colab-7c14fcbdcb92

시작하기: Colab에서 CSV 파일을 불러오는 3가지 방법

데이터 사이언스는 데이터 없이는 아무것도 아닙니다. 예 그것은 분명합니다. 분명하지 않은 것은 데이터를 탐색할 수 있는 형식으로 데이터를 가져오기 위한 단계입니다. (쉼표로 구분된 값을 줄여서) CSV 형식의 데이터 셋을 가질 수 있지만 다음에 수행할 작업은 알 수 없습니다. 이 글은 Colab에서 CSV 파일을 불러옴으로써 데이터 사이언스를 시작하는 데 도움이 될 것입니다.

Colab(Colaboratory를 줄여서)은 Python으로 코딩하도록 구글로부터 온 무료 플랫폼입니다. Colab은 본질적으로 Jupyter Notebook의 구글 Suite 버전입니다. Jupyter 위의 Colab의 몇가지 이점은 문서로 공유하기와 더 쉬운 패키지 설치가 있습니다. 아직, CSV 파일 처럼 파일을 불러올 때 몇가지 추가 코딩이 필요합니다. 저는 Colab에서 CSV 파일을 불러오기 위한 3가지 방법을 보여드리고 이를 Pandas 데이터프레임으로 추가할 것입니다.

(참고 : 공통 데이터 세트를 전달하는 Python 패키지가 있습니다. 이 기사에서 이러한 데이터 세트를 불러오는 것에 대해서는 논의하지 않을 것 입니다.)

시작하기 위해 당신의 구글 계정으로 로그인하여 Google Drive로 갑니다. 왼쪽에 새로 만들기 버튼을 클릭하고 만약 설치되었다면 Colaboratory 를 선택합니다. (더 많은 앱 연결을 클릭하지 않으면 Colaboratory을 검색하여 설치하십시오.) 여기에서 아래와 같이 Pandas를 import 합니다. (Colab에 이미 설치되어 있음).

import pandas as pd

1) Github로 부터(파일 < 25MB)

CSV 파일을 업로드하는 가장 쉬운 방법은 GitHub 저장소를 사용하는 것입니다. 당신의 저장소에서 데이터 세트를 클릭하고 View Raw를 클릭합니다. raw 데이터 세트에 대한 링크를 복사하여 아래 표시된대로 Colab에 url이라는 문자열 변수로 저장합니다 (보다 깔끔한 방법이지만 필요하지는 않습니다). 마지막 단계는 URL을 Pandas read_csv로 불러와 데이터 프레임을 얻는 것입니다.

url = '복사한_raw_GitHub_링크'
df1 = pd.read_csv(url)
# 데이터 세트는 Pandas Dataframe에 이제 저장됩니다.

2) local drive로 부터

당신의 local drive로부터 업로드하여, 다음 코드로 시작합니다.

from google.colab import files
uploaded = files.upload()

선택할 파일이 프롬프트로 나올 것입니다. "파일 선택하기"를 클릭하여 파일을 선택하여 업로드 합니다. 파일이 100% 업로드 되기를 기다립니다. Colab이 파일을 업로드를 했을 때 파일의 이름이 보여야 합니다.

마지막으로, 데이터 프레임으로 그것을 import하기 위해 다음 코드를 타이핑합니다. (파일 이름은 업로드한 파일의 이름과 똑같은지 확인해야 합니다).

import io
df2 = pd.read_csv(io.BytesIO(uploaded['Filename.csv']))
# 데이터 세트는 Pandas Dataframe에 이제 저장됩니다.

3) PyDrive를 통한 Google Drive로 부터

이는 3가지 방법 중에 가장 복잡합니다. 워크 플로 제어를 위해 CSV 파일을 Google 드라이브에 업로드 하는 것을 보여 드리겠습니다. 먼저 다음 코드를 입력하십시오.

# Colaboratory에서 csv 파일을 읽기 위한 코드:
!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
# PyDrive 클라이언트를 생성하고 인증하기
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

프롬프트가 표시되면 링크를 클릭하여 Google이 드라이브에 액세스 할 수 있도록 인증을 받습니다. 상단에 'Google Cloud SDK가 Google 계정에 액세스 하려고 합니다'라는 화면이 표시됩니다. 권한을 허용한 후 제공된 확인 코드를 복사하여 Colab의 상자에 붙여 넣습니다.

확인이 완료되면 Google 드라이브의 CSV 파일로 이동하여 마우스 오른쪽 버튼으로 클릭하고 "공유 가능 링크 가져 오기"를 선택하십시오. 링크가 클립 보드에 복사됩니다. 이 링크를 Colab의 문자열 변수에 붙여 넣습니다.

link = 'https://drive.google.com/open?id=1DPZZQ43w8brRhbEMolgLqOWKbZbE-IQu' # 공유 가능 링크

원하는 것은 등호 뒤의 ID 부분입니다. 해당 부분을 얻으려면 다음 코드를 입력하십시오.

fluff, id = link.split('=')
print (id) # '=' 뒤에 모든 부분이 있는지 확인

마지막으로, 데이터프레임으로 파일을 얻기 위해 다음 코드를 타이핑 합니다.

downloaded = drive.CreateFile({'id':id}) 
downloaded.GetContentFile('Filename.csv')  
df3 = pd.read_csv('Filename.csv')
# 데이터 세트는 Pandas Dataframe에 이제 저장됩니다.

마지막 의견

Colab에서 CSV 파일을 업로드하기 위한 3가지 접근이 있습니다. 각각 파일 크기와 워크 플로 구성 방법에 따라 장점이 있습니다. 데이터가 Pandas 데이터프레임처럼 더 좋은 포멧이라면, 작업할 준비가 된 것입니다.

보너스 방법 - 내 드라이브

당신의 지원에 매우 감사합니다. 이 글은 50,000 View와 25K의 읽기에 도달하는 영광이 있었고, 저는 Colab에서 CSV 파일을 얻기 위한 추가적인 방벙을 제공합니다. 이는 더 간단하고 분명합니다. Google Drive("My Drive")에서, 당신이 선택한 위치에서 data라 불리는 폴더를 만듭니다. 이는 당신의 데이터를 업로드할 곳이 될 것입니다.

Colab 노트북에서, 다음을 타이핑합니다.

from google.colab import drive
drive.mount('/content/drive')

세 번째 방법과 마찬가지로 명령을 사용하면 Google 인증 단계로 이동합니다. Google 드라이브 파일 스트림에서 Google 계정에 액세스하시오 라는 화면이 표시됩니다. 권한을 허용한 후 제공된 확인 코드를 복사하여 Colab의 상자에 붙여 넣습니다.

노트북에서 노트북 왼쪽 상단에 >를 클릭하고 파일을 클릭하십시오. 앞에서 만든 data 폴더를 찾고 데이터를 찾으십시오. 데이터를 마우스 오른쪽 버튼으로 클릭하고 경로 복사를 선택하십시오. 이 복사된 경로를 변수에 저장하면 바로 사용할 수 있습니다.

path = "copied path"
df_bonus = pd.read_csv(path)
# 데이터 세트는 Pandas Dataframe에 이제 저장됩니다.

이 방법의 장점은 세 번째 방법과 관련된 추가 단계없이 자체 Google 드라이브에서 생성한 별도의 데이터 세트 폴더에서 데이터 세트에 액세스 할 수 있다는 것입니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/5537876/get-utc-offset-from-time-zone-name-in-python

Python에서 시간대(timezone) 이름으로 UTC 시차(offset) 구하기

Python에서 시간대(timezone) 이름으로 UTC 시차(offset)을 어떻게 구합니까?

예시: 저는 Asia/Jerusalem을 통해 +0200을 얻고 싶습니다.


미국동부시간으로 한국 시간 구하기 예시

import datetime
import pytz

est = datetime.datetime.now(pytz.timezone('America/New_York'))
diff_min = (est.utcoffset().seconds - 86400) // 60 - 540
# ...중략...
# 02:38:00 28.02.20
svr_time = datetime.datetime.strptime(svr_str, "%H:%M:%S %d.%m.%y") - datetime.timedelta(minutes=diff_min)

3개의 답변 중 2개의 답변만 추려냄

pytz 프로젝트와 utcoffset 메소드 사용을 시도하신 적 있으신가요?

예시

>>> import datetime
>>> import pytz
>>> pacific_now = datetime.datetime.now(pytz.timezone('US/Pacific'))
>>> pacific_now.utcoffset().total_seconds()/60/60
-7.0

DST(일광절약시간, 서머타임) 때문에 결과는 그 해의 시간에 따라 다릅니다.

import datetime, pytz

datetime.datetime.now(pytz.timezone('Asia/Jerusalem')).strftime('%z')

# returns '+0300' ('now' 지금은 DST(일광절약시간, 서머타임)이기 때문입니다)

pytz.timezone('Asia/Jerusalem').localize(datetime.datetime(2011,1,1)).strftime('%z')

# returns '+0200' (1월에는 DST(일광절약시간, 서머타임)이 아니기 때문입니다)
반응형
반응형

출처 : https://stackoverflow.com/questions/5574702/how-to-print-to-stderr-in-python

Python에서 표준에러로 출력하는 방법?

표준 출력으로 쓰는 몇가지 방법이 있습니다.

# Note: 첫번째 문장은 Python 3에서 실행되지 않습니다.
print >> sys.stderr, "spam"

sys.stderr.write("spam\n")

os.write(2, b"spam\n")

from __future__ import print_function
print("spam", file=sys.stderr)

그것은 파이썬의 선 #13†과 모순되는 것처럼 보입니다. 그래서 여기의 차이점은 무엇이며 어떤 방법을 사용하던 장단점이 있습니까? 어떤 방법을 사용해야 합니까?

† 문제를 해결할 하나의 - 바람직하고 유일한 - 명백한 방법이 있을 것이다.

15개의 답변 중 1개의 답변만 추려냄

저는 짧고 유연하고 이식하기 좋고 읽기 좋은 유일한 방법을 찾았습니다.

from __future__ import print_function
import sys

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

함수 eprint는 표준 print 함수와 같은 방법으로 사용될 수 있습니다.

>>> print("Test")
Test
>>> eprint("Test")
Test
>>> eprint("foo", "bar", "baz", sep="---")
foo---bar---baz
반응형
반응형

출처 : https://stackoverflow.com/questions/32234156/how-to-unimport-a-python-module-which-is-already-imported

이미 import된 python 모듈을 unimport 하는 방법?

저는 NumPy/SciPy에 매우 신입입니다. 요즘 Matlab을 사용하는 대신에 숫자 계산을 위해 매우 활발하게 그것을 사용하기 시작하였습니다.

간단한 계산을 위해 스크립트를 작성하는 것보다 interactive 모드에서 이를 실행합니다. 이러한 경우 이미 import한 모듈을 unimport하는 방법이 있을까요? unimport는 제가 파이썬 프로그램을 작성할 때는 필요 없겠지만, interactive 모듈에서는 필요합니다.

4개의 답변 중 1개의 답변

당신이 import한 것을 unload 하는 방법은 없습니다. 파이썬은 cache에 모듈의 복사본을 유지하기 때문에 다음에 reload와 다시 초기화하지 않고 그것을 (그대로) import합니다.

만약 당신이 필요한 게 그것으로 접근하지 않도록 하려면 del을 사용할 수 있습니다.

import package
del package

그런 다음 패키지를 다시 import하면 모듈의 캐시된 복사본이 사용됩니다.

다시 가져올 때 코드를 다시 실행할 수 있도록 모듈의 캐시된 복사본을 무효화하려면 @DeepSOIC의 답변에 따라 대신 sys.modules.pop을 사용할 수 있습니다.

당신이 패키지를 변경했고 갱신된 내용을 보고 싶다면, 당신은 그것을 reload 할 수 있습니다. 이는 몇가지 경우 작동하지 않을 수 있는데 import된 패키지가 그것에 의존적인 패키지를 reload할 필요가 있을 때입니다. 이것에 의존적인 것 이전에 관련된 문서를 읽어봐야 합니다.

Python 버전 2.7까지는 build-in 함수인 reload 를 사용합니다.

reload(package)

Python 3.0부터 3.3까지는 당신은 imp.reload 를 사용할 수 있습니다.

import imp
imp.reload(package)

Python 3.4 이상이라면 당신은 importlib.reload 를 사용할 수 있습니다.

import importlib
importlib.reload(package)
반응형
반응형

출처
https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python

python에서 subprocess.PIPE로 non-blocking 읽기

저는 subprocess를 시작하기 위해 을 출력 스트림(표준출력)으로 접속하기 위해 subprocess 모듈을 사용하고 있습니다. 저는 그 출력을 non-blocking으로 읽도록 실행하고 싶습니다. .readline을 non-blocking으로 만들거나 .readline를 실행하기 전에 스트림에 데이터가 있는지 검사하는 방법이 있습니까? 저는 이를 Windows나 Linux에서 최소한의 작업으로 이식하고 싶습니다.
여기는 현재 제가 한 방법입니다. (데이터가 없을 때 .readline는 blocking됩니다.)

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

29개의 답변 중 1개의 답변만 추려냄

fcntl, select, asyncproc는 이 경우 도움이 되지 않을 것입니다.
운영체제에 관계없이 blocking 없이 스트림을 읽는 신뢰성 있는 방법은 Queue.get_nowait()를 사용하는 것입니다.

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # 쓰레드가 프로그램과 함께 죽습니다.
t.start()

# ... 여기서 다른 것을 합니다

# blocking 없이 한 줄을 읽습니다.
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # 한 줄을 얻었습니다.
    # ... 그 한 줄로 뭔가를 합니다.
반응형

+ Recent posts