반응형

어떻게 접속이 끊어졌을 때 자동으로 재접속할 수 있나요?

issue 414를 보세요.


재접속을 위한 공통 패턴

pgrandinetti님 질문

몇 개의 제 프로젝트에서 접속 오류나 재접속 시도가 발생할 수 있는 시나리오를 다루기 위해 다음처럼 진행합니다.

async def listen_forever(self):
        while True:
        # 접속을 실패할 때마다 밖의 루프는 재실행됨
            try:
                async with websockets.connect(self.url) as ws:
                    while True:
                    # listener loop
                        try:
                            reply = await asyncio.wait_for(ws.recv(), timeout=***)
                        except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
                            try:
                                pong = await ws.ping()
                                await asyncio.wait_for(pong, timeout=self.ping_timeout)
                                logger.debug('Ping OK, keeping connection alive...')
                                continue
                            except:
                                await asyncio.sleep(self.sleep_time)
                                break  # 안쪽 loop
                        # dreply 객체로 작업을 진행합니다.
            except socket.gaierror:
                # 뭔가 로그를 남깁니다.
                continue
            except ConnectionRefusedError:
                # 뭔가 다른 것을 로그로 남깁니다.
                continue

저는 (1)이것이 괜찮은지 궁금하고요. (2) websockets에 이미 반복적인 행위같은 것을 처리하기 위해 제공되는 지름길 같은 방법이 있는지도 궁금합니다. (맞다고 가정합니다!)

aaugustin님 답변

이를 구현한 웹소켓에서 지름길 같은 방법은 없습니다.

이 사용 사례에 관해 이전에 들었고 저는 유효하다고 갱각합니다.

저는 좋은 API를 제공할 수 있을 지 확실하지는 않습니다. -- 저는 사용자 코드가 접속할 때 혹은 접속이 끊어졌을 때 이 작업(재접속)을 해야될 것으로 예상되지만 어떻게 이를 처리해야할지는 확실하지 않습니다.

heckad님 질문

이 특징을 추가하기 위해 필요한 것은 무멋이 있을까요?

aaugustin님 추가 제안

이 특징을 위해 저는 다음을 제안합니다.

  • 첫째, 이 특징이 정확히 반응하는지 묘사하는 문서가 필요합니다.
  • 우리가 그것에 동의하면 테스트와 함께 구현하겠습니다.

pgrandinetti 님 답변

@heckad 저는 최소한의 재생산 가능한 예시를 여기에 작성하였습니다.
https://gist.github.com/pgrandinetti/964747a9f2464e576b8c6725da12c1eb

반응형
반응형

출처

Python 튜플 리스트 그룹핑

(라벨, 개수) 튜플의 리스트가 다음처럼 있습니다.

[('grape', 100), ('grape', 3), ('apple', 15), ('apple', 10), ('apple', 4), ('banana', 3)]

이로부터 저는 같은 라벨(같은 라벨은 항상 연속됨)로 모든 개수값들을 합계를 내고 같은 라벨 순서로 리스트를 리턴하고 싶습니다.

[('grape', 103), ('apple', 29), ('banana', 3)]

저는 다음처럼 뭔가 해결은 할 수 있었습니다

def group(l):
    result = []
    if l:
        this_label = l[0][0]
        this_count = 0
        for label, count in l:
            if label != this_label:
                result.append((this_label, this_count))
                this_label = label
                this_count = 0
            this_count += count
        result.append((this_label, this_count))
    return result

하지만, 이것보다 더 Pythonic 하고 / 우아하고 / 효율적인 방법이 있을까요?


7개의 답변 중 2개의 답변만 추려냄

itertools.groupby 는 당신이 원하는 것을 할 수 있습니다.

import itertools
import operator

L = [('grape', 100), ('grape', 3), ('apple', 15), ('apple', 10),
     ('apple', 4), ('banana', 3)]

def accumulate(l):
    it = itertools.groupby(l, operator.itemgetter(0))
    for key, subiter in it:
       yield key, sum(item[1] for item in subiter) 

print(list(accumulate(L)))
# [('grape', 103), ('apple', 29), ('banana', 3)]

댓글

  • lambda 대신에 operator.itemgetter를 사용하는 것이 좋습니다. – jathanism Feb 12 '10 at 1:48

  • 이 방법은 첫번째 키로 정렬된 리스트가 필요합니다. 이미 정렬이 되지 않았다면, ghostdog74 님의 접근이 훨씬 좋은 해결책입니다. – Martijn Pieters♦ Oct 10 '16 at 21:05


ghostdog74 님의 답변

import collections
d=collections.defaultdict(int)
a=[]
alist=[('grape', 100), ('banana', 3), ('apple', 10), ('apple', 4), ('grape', 3), ('apple', 15)]
for fruit,number in alist:
    if not fruit in a: a.append(fruit)
    d[fruit]+=number
for f in a:
    print (f,d[f])

결과

$ ./python.py
('grape', 103)
('banana', 3)
('apple', 29)
반응형
반응형

출처 : https://stackoverflow.com/questions/32234156/how-to-unimport-a-python-module-which-is-already-imported

이미 import된 python 모듈을 unimport 하는 방법?

저는 NumPy/SciPy에 매우 신입입니다. 요즘 Matlab을 사용하는 대신에 숫자 계산을 위해 매우 활발하게 그것을 사용하기 시작하였습니다.

간단한 계산을 위해 스크립트를 작성하는 것보다 interactive 모드에서 이를 실행합니다. 이러한 경우 이미 import한 모듈을 unimport하는 방법이 있을까요? unimport는 제가 파이썬 프로그램을 작성할 때는 필요 없겠지만, interactive 모듈에서는 필요합니다.

4개의 답변 중 1개의 답변

당신이 import한 것을 unload 하는 방법은 없습니다. 파이썬은 cache에 모듈의 복사본을 유지하기 때문에 다음에 reload와 다시 초기화하지 않고 그것을 (그대로) import합니다.

만약 당신이 필요한 게 그것으로 접근하지 않도록 하려면 del을 사용할 수 있습니다.

import package
del package

그런 다음 패키지를 다시 import하면 모듈의 캐시된 복사본이 사용됩니다.

다시 가져올 때 코드를 다시 실행할 수 있도록 모듈의 캐시된 복사본을 무효화하려면 @DeepSOIC의 답변에 따라 대신 sys.modules.pop을 사용할 수 있습니다.

당신이 패키지를 변경했고 갱신된 내용을 보고 싶다면, 당신은 그것을 reload 할 수 있습니다. 이는 몇가지 경우 작동하지 않을 수 있는데 import된 패키지가 그것에 의존적인 패키지를 reload할 필요가 있을 때입니다. 이것에 의존적인 것 이전에 관련된 문서를 읽어봐야 합니다.

Python 버전 2.7까지는 build-in 함수인 reload 를 사용합니다.

reload(package)

Python 3.0부터 3.3까지는 당신은 imp.reload 를 사용할 수 있습니다.

import imp
imp.reload(package)

Python 3.4 이상이라면 당신은 importlib.reload 를 사용할 수 있습니다.

import importlib
importlib.reload(package)
반응형
반응형

출처
https://stackoverflow.com/questions/49684495/is-it-possible-to-set-environment-variables-in-googles-colaboratory

Google Colaboratory에서 환경 변수를 설정하는 것이 가능합니까?

저는 Google Colaboratory 플랫폼에서 파이썬 스크립트를 실행하고 있습니다. 이제, 저는 다음처럼 시스템의 환경 변수를 세팅하야 합니다.

!export PATH=drive/app/tf-models-fork/research;drive/app/tf-models-fork/research/object_detection;drive/app/tf-models-fork/research/slim;$PATH

I tried to add the location to the variable PATH. However, I am getting the following errors:
저는 PATH 변수를 추가하는 것을 시도했습니다. 하지만 다음과 같은 오류가 발생하였습니다.

/bin/sh: 1: drive/app/tf-models-fork/research/object_detection: Permission denied
/bin/sh: 1: drive/app/tf-models-fork/research/slim: Permission denied
/bin/sh: 1: drive/app/tf-models-fork/research: Permission denied

이 플랫폼에서 환경 변수를 설정할 수 있는 방법이 있을까요?


2개의 답변 중 1개의 답변만 추려냄

저는 os.environ으로 PATH를 다음처럼 설정합니다.

import os
os.environ['PATH'] += ":/usr/local/go/bin"
반응형
반응형

출처
https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python

python에서 subprocess.PIPE로 non-blocking 읽기

저는 subprocess를 시작하기 위해 을 출력 스트림(표준출력)으로 접속하기 위해 subprocess 모듈을 사용하고 있습니다. 저는 그 출력을 non-blocking으로 읽도록 실행하고 싶습니다. .readline을 non-blocking으로 만들거나 .readline를 실행하기 전에 스트림에 데이터가 있는지 검사하는 방법이 있습니까? 저는 이를 Windows나 Linux에서 최소한의 작업으로 이식하고 싶습니다.
여기는 현재 제가 한 방법입니다. (데이터가 없을 때 .readline는 blocking됩니다.)

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

29개의 답변 중 1개의 답변만 추려냄

fcntl, select, asyncproc는 이 경우 도움이 되지 않을 것입니다.
운영체제에 관계없이 blocking 없이 스트림을 읽는 신뢰성 있는 방법은 Queue.get_nowait()를 사용하는 것입니다.

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # 쓰레드가 프로그램과 함께 죽습니다.
t.start()

# ... 여기서 다른 것을 합니다

# blocking 없이 한 줄을 읽습니다.
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # 한 줄을 얻었습니다.
    # ... 그 한 줄로 뭔가를 합니다.
반응형
반응형

출처 : https://stackoverflow.com/Questions/4374455/how-to-set-sys-stdout-encoding-in-python-3

Python3에서 sys.stdout 인코딩 하는 방법

Python 2에서 기본 출력 인코딩으로 설정하는 것은 잘 알려진 구문입니다.

sys.stdout = codecs.getwriter("utf-8")(sys.stdout)    

이는 UTF-8로 출력을 인코딩하여 codec writer에서 sys.stdout를 포장(wrap)합니다.

하지만, 이 기술은 Python 3 에서는 작동하지 않습니다. 그 이유는 sys.stdout.write()str를 예상하는데 인코딩의 결과는 bytes이고 codecs가 원래 sys.stdout로 인코딩된 바이트배열을 출력하려고 할 때 오류가 발생합니다.

Python 3에서 이를 할 수 있는 올바른 방법은 무엇입니까?


7개의 답변 중 2개의 답변만 추려냄.

Python 3.7부터 당신은 reconfigure()로 표준 스트림의 인코딩을 변경할 수 있습니다.

sys.stdout.reconfigure(encoding='utf-8')

당신은 errors 파라미터를 추가하여 인코딩 오류가 다뤄질 수 있도록 수정할 수 있습니다.


Python 3.1에 io.TextIOBase.detach()가 추가되었습니다. 다음은 sys.stdout에 대한 문서 내용입니다.

표준 스트림은 기본으로 text 모드입니다. 이 스트림에 이진(binary) 데이터를 쓰거나 읽기 위해 기본 바이너리 버퍼를 사용합니다. 예를 들어 stdout에 바이트 배열을 쓰기 위해 sys.stdout.buffer.write(b'abc')를 사용합니다. io.TextIOBase.detach()를 사용함으로써, 스트림은 기본으로 바이너리가 될 수 있습니다. 이 함수는 바이너리로 stdinstdout을 설정합니다.

def make_streams_binary():
    sys.stdin = sys.stdin.detach()
    sys.stdout = sys.stdout.detach()

그리하여, Python 3.1 이상에서 대응되는 구문은 다음과 같습니다.

sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
반응형
반응형

출처 : https://stackoverflow.com/questions/21268470/making-python-2-7-code-run-with-python-2-6/

Python 2.7 코드를 Python 2.6에서도 작동하게 만들기

저는 zip 파일을 풀 수 있는 간단한 Python 함수를 만들었습니다. (플랫폼 독립적으로)

def unzip(source, target):
    with zipfile.ZipFile(source , "r") as z:
        z.extractall(target)
    print "Extracted : " + source +  " to: " + target

이는 Python 2.7에서는 잘 실행되지만 Python 2.6에서는 실패합니다.

AttributeError: ZipFile instance has no attribute '__exit__':

저는 2.6에서 2.7로 업그레이드 할 것을 제안한 글을 찾았습니다.

https://bugs.launchpad.net/horizon/+bug/955994

하지만 위의 코드를 Python 2.6에서 작동할 수 있도록 플랫폼에 관계없이 이식이 가능한가요??


2개의 답변 중 1 개의 답변만 추려냄.

다음 코드는 어떻습니까?

import contextlib

def unzip(source, target):
    with contextlib.closing(zipfile.ZipFile(source , "r")) as z:
        z.extractall(target)
    print "Extracted : " + source +  " to: " + target

contextlib.closingZipFile에서 빠져있는 __exit__메소드가 하려고 했던 것을 분명히 실행합니다. 이름처럼 close 메소드를 호출합니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/19447603/how-to-kill-a-python-child-process-created-with-subprocess-check-output-when-t

부모 프로세스가 죽을 때 subprocess.check_output()로 생성된 Python 자식 프로세스를 죽이는 방법?

저는 다음처럼 subprocess.check_output()을 사용하는 자식 프로세스를 생성하는 python script를 리눅스 머신에서 실행하려고 합니다.

subprocess.check_output(["ls", "-l"], stderr=subprocess.STDOUT)

문제는 부모 프로세스가 죽었을 때 조차 자식 프로세스가 실행중이라는 것입니다. 부모 프로세스가 죽었을 때 자식 프로세스도 함께 죽일 수 있는 방법이 있습니까?


5개의 답변 중 2개의 답변

당신의 문제는 subprocess.check_output을 사용하는 것입니다. - 당신은 맞게 작성 했습니다만 check_ouput 인터페이스로는 자식 프로세스의 PID를 얻을 수 없습니다. 대신에 Popen을 사용하세요.

proc = subprocess.Popen(["ls", "-l"], stdout=PIPE, stderr=PIPE)

# 여기에서 자식 프로세스의 PID를 얻을 수 있습니다.
global child_pid
child_pid = proc.pid

# 여기서 자식 프로세스가 완료될 때까지 기다릴 수 있습니다.
(output, error) = proc.communicate()

if error:
    print "error:", error

print "output:", output

종료할 때 자식 프로세스를 죽이는 것을 분명하게 만드세요.

import os
import signal
def kill_child():
    if child_pid is None:
        pass
    else:
        os.kill(child_pid, signal.SIGTERM)

import atexit
atexit.register(kill_child)

당신은 두 개의 방법으로 이를 할 수 있습니다. 그들은 check_output 대신에 Popen을 사용해야 합니다. 첫 번째는 다음처럼 try..finally 를 사용하는 더 간단한 방법입니다.

from contextlib import contextmanager

@contextmanager
def run_and_terminate_process(*args, **kwargs):
try:
    p = subprocess.Popen(*args, **kwargs)
    yield p        
finally:
    p.terminate() # sigterm을 보내고, ...
    p.kill()      # sigkill을 보냅니다.

def main():
    with run_and_terminate_process(args) as running_proc:
        # running_proc.stdout.readline() 처럼 당신의 코드를 여기에 작성합니다.

이는 sigint(키보드 인터럽트)와 sigterm을 잡아내지만, sigkill(-9로 kill 스트립트 실행)을 잡아내지 못합니다.

다른 방법은 좀 더 복잡한데 ctypes의 rctl PR_SET_PDEATHSIG을 사용하는 것입니다. 시스템은 자식에게 시그널을 보낼것이고 부모는 어떤 이유(심지어 sigkill)로든 종료합니다.

import signal
import ctypes
libc = ctypes.CDLL("libc.so.6")
def set_pdeathsig(sig = signal.SIGTERM):
    def callable():
        return libc.prctl(1, sig)
    return callable
p = subprocess.Popen(args, preexec_fn = set_pdeathsig(signal.SIGTERM))
반응형
반응형

출처 : https://stackoverflow.com/questions/36476841/python-how-to-read-stdout-of-subprocess-in-a-nonblocking-way/36477512

nonblocking 방법으로 subprocess의 출력을 읽는 방법

저는 subprocess를 시작하고 표준 출력을 확인하는 간단한 python script를 작성하려고 합니다.

여기에 코드의 스니펫이 있습니다.

process = subprocess.Popen([path_to_exe, os.path.join(temp_dir,temp_file)], stdout=subprocess.PIPE)
while True:   
    output=process.stdout.readline()
    print "test"

문제는 script가 output=process.stdout.readline()에서 대기하고 subprocess가 끝난 후에만 print "test"를 실행하는 것입니다.

subprocess가 종료되는 것을 기다리지 않고 표준 출력을 읽어 그것을 출력하는 방법이 있을까요?

제가 시작한 subprocess는 제가 소스 코드를 가지고 있지 않은 윈도우즈 바이너리입니다.

비슷한 질문을 몇 개 찾았지만 그 답변은 리눅스에서만 적용할 수 있거나 제가 시작한 subprocess의 소스를 가지고 있을 경우에만 적용할 수 있었습니다.


2개의 답변 중 1 개의 답변만 추려냄.

select 모듈을 확인하세요.

import subprocess
import select
import time

x=subprocess.Popen(['/bin/bash','-c',"while true; do sleep 5; echo yes; done"],stdout=subprocess.PIPE)

y=select.poll()
y.register(x.stdout,select.POLLIN)

while True:
  if y.poll(1):
     print x.stdout.readline()
  else:
     print "nothing here"
     time.sleep(1)

편집:

POSIX가 아닌 시스템을 위한 쓰레드 처리된 해결책입니다.

import subprocess
from threading import Thread 
import time

linebuffer=[]
x=subprocess.Popen(['/bin/bash','-c',"while true; do sleep 5; echo yes; done"],stdout=subprocess.PIPE)

def reader(f,buffer):
   while True:
     line=f.readline()
     if line:
        buffer.append(line)
     else:
        break

t=Thread(target=reader,args=(x.stdout,linebuffer))
t.daemon=True
t.start()

while True:
  if linebuffer:
     print linebuffer.pop(0)
  else:
     print "nothing here"
     time.sleep(1)
반응형
반응형

출처

https://www.metachris.com/2016/04/python-threadpool/

https://github.com/SDRLurker/TIL/blob/master/python/thread_pool.md


Python 쓰레드 풀

쓰레드 풀은 주어진 일을 할 준비가 된 미리 만들어진 한가한 쓰레드 그룹입니다. 이들은 마쳐야 할 긴 작업의 작은 쓰레드 개수보다 짧은 작업의 많은 쓰레드 개수가 있을 때 각 작업에 대한 쓰레드를 인스턴스화 하는 것이 더 선호됩니다.

인터넷에서 문서 1000개를 다운로드하고 싶지만 한 번에 50개를 다운로드 할 수 있는 리소스 만 갖고 있다고 가정합니다. 해결책은 스레드 풀을 사용하여 고정된 수의 스레드를 생성하여 큐에서 모든 URL을 한 번에 50개씩 다운로드합니다.

스레드 풀을 사용하기 위해 Python 3.x에는 ThreadPoolExecutor 클래스가 포함되어 있고 Python 2.x와 3.x에는 multiprocessing.dummy.ThreadPool 이 있습니다. multiprocessing.dummy멀티프로세싱(multiprocessing) API를 복제하지만 threading 모듈 주변의 래퍼(wrapper)일 뿐입니다.

multiprocessing.dummy.ThreadPool 의 단점은 Python 2.x에서 대기열의 모든 작업이 스레드에 의해 완료되기 전에 프로그램을 종료 할 수 없다는 것입니다. 예를 들어 KeyboardInterrupt가 있습니다.

Python 2.x 및 3.x에서 (PDFx에서 사용하기 위해) 인터럽트 가능한 스레드 대기열(큐,queue)을 얻기 위해 stackoverflow.com/a/7257510에서 영감을 얻어 이 코드를 작성했습니다. Python 2.x 및 3.x에서 작동하는 스레드 풀을 구현합니다.

import sys
IS_PY2 = sys.version_info < (3, 0)

if IS_PY2:
    from Queue import Queue
else:
    from queue import Queue

from threading import Thread


class Worker(Thread):
    """ 주어진 작업들에 대한 대기열(큐,queue)로부터 작업을 실행할 쓰레드 """
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                # 이 쓰레드에서 발생된 exception
                print(e)
            finally:
                # exception이 발생하던 안 하던 이 작업의 종료를 마크합니다.
                self.tasks.task_done()


class ThreadPool:
    """ 대기열(큐,queue)로부터 작업을 소비하는 쓰레드 풀 """
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """ 대기열(큐,queue)에 작업을 추가 """
        self.tasks.put((func, args, kargs))

    def map(self, func, args_list):
        """ 대기열(큐,queue)에 작업의 리스트를 추가 """
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """ 대기열(큐,queue)에 모든 작업의 완료를 기다림 """
        self.tasks.join()


if __name__ == "__main__":
    from random import randrange
    from time import sleep

    # 쓰레드에서 실행될 함수
    def wait_delay(d):
        print("sleeping for (%d)sec" % d)
        sleep(d)

    # 임의의 지연시간 생성
    delays = [randrange(3, 7) for i in range(50)]

    # 5개의 worker 쓰레드로 쓰레드 풀을 인스턴스화
    pool = ThreadPool(5)

    # 쓰레드 풀로 대량의 작업을 추가. 하나씩 작업을 추가하기 위해 `pool.add_task`
    # 사용 가능. 이 코드는 이 곳에서 막힐(block) 것이지만 
    # 현재 실행하고 있는 worker의 배치작업이 완료되면
    # exception으로 쓰레드 풀을 취소하는 것이 가능하도록 만들 수 있습니다.
    pool.map(wait_delay, delays)
    pool.wait_completion()

큐 크기는 스레드 수와 유사합니다 (self.tasks = Queue(num_threads) 참조). 따라서 pool.map(..)pool.add_task(..)로 작업을 추가하면 Queue의 새 슬롯이 사용가능할 때까지 막힐(block)것 입니다.

Ctrl + C를 눌러 KeyboardInterrupt를 실행하면 현재 Worker 배치가 완료되고 프로그램이 pool.map(..) 단계에서 exception로 종료됩니다.

반응형

+ Recent posts