반응형

출처 : https://stackoverflow.com/Questions/4374455/how-to-set-sys-stdout-encoding-in-python-3

Python3에서 sys.stdout 인코딩 하는 방법

Python 2에서 기본 출력 인코딩으로 설정하는 것은 잘 알려진 구문입니다.

sys.stdout = codecs.getwriter("utf-8")(sys.stdout)    

이는 UTF-8로 출력을 인코딩하여 codec writer에서 sys.stdout를 포장(wrap)합니다.

하지만, 이 기술은 Python 3 에서는 작동하지 않습니다. 그 이유는 sys.stdout.write()str를 예상하는데 인코딩의 결과는 bytes이고 codecs가 원래 sys.stdout로 인코딩된 바이트배열을 출력하려고 할 때 오류가 발생합니다.

Python 3에서 이를 할 수 있는 올바른 방법은 무엇입니까?


7개의 답변 중 2개의 답변만 추려냄.

Python 3.7부터 당신은 reconfigure()로 표준 스트림의 인코딩을 변경할 수 있습니다.

sys.stdout.reconfigure(encoding='utf-8')

당신은 errors 파라미터를 추가하여 인코딩 오류가 다뤄질 수 있도록 수정할 수 있습니다.


Python 3.1에 io.TextIOBase.detach()가 추가되었습니다. 다음은 sys.stdout에 대한 문서 내용입니다.

표준 스트림은 기본으로 text 모드입니다. 이 스트림에 이진(binary) 데이터를 쓰거나 읽기 위해 기본 바이너리 버퍼를 사용합니다. 예를 들어 stdout에 바이트 배열을 쓰기 위해 sys.stdout.buffer.write(b'abc')를 사용합니다. io.TextIOBase.detach()를 사용함으로써, 스트림은 기본으로 바이너리가 될 수 있습니다. 이 함수는 바이너리로 stdinstdout을 설정합니다.

def make_streams_binary():
    sys.stdin = sys.stdin.detach()
    sys.stdout = sys.stdout.detach()

그리하여, Python 3.1 이상에서 대응되는 구문은 다음과 같습니다.

sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
반응형
반응형

출처 : https://stackoverflow.com/questions/21268470/making-python-2-7-code-run-with-python-2-6/

Python 2.7 코드를 Python 2.6에서도 작동하게 만들기

저는 zip 파일을 풀 수 있는 간단한 Python 함수를 만들었습니다. (플랫폼 독립적으로)

def unzip(source, target):
    with zipfile.ZipFile(source , "r") as z:
        z.extractall(target)
    print "Extracted : " + source +  " to: " + target

이는 Python 2.7에서는 잘 실행되지만 Python 2.6에서는 실패합니다.

AttributeError: ZipFile instance has no attribute '__exit__':

저는 2.6에서 2.7로 업그레이드 할 것을 제안한 글을 찾았습니다.

https://bugs.launchpad.net/horizon/+bug/955994

하지만 위의 코드를 Python 2.6에서 작동할 수 있도록 플랫폼에 관계없이 이식이 가능한가요??


2개의 답변 중 1 개의 답변만 추려냄.

다음 코드는 어떻습니까?

import contextlib

def unzip(source, target):
    with contextlib.closing(zipfile.ZipFile(source , "r")) as z:
        z.extractall(target)
    print "Extracted : " + source +  " to: " + target

contextlib.closingZipFile에서 빠져있는 __exit__메소드가 하려고 했던 것을 분명히 실행합니다. 이름처럼 close 메소드를 호출합니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/19447603/how-to-kill-a-python-child-process-created-with-subprocess-check-output-when-t

부모 프로세스가 죽을 때 subprocess.check_output()로 생성된 Python 자식 프로세스를 죽이는 방법?

저는 다음처럼 subprocess.check_output()을 사용하는 자식 프로세스를 생성하는 python script를 리눅스 머신에서 실행하려고 합니다.

subprocess.check_output(["ls", "-l"], stderr=subprocess.STDOUT)

문제는 부모 프로세스가 죽었을 때 조차 자식 프로세스가 실행중이라는 것입니다. 부모 프로세스가 죽었을 때 자식 프로세스도 함께 죽일 수 있는 방법이 있습니까?


5개의 답변 중 2개의 답변

당신의 문제는 subprocess.check_output을 사용하는 것입니다. - 당신은 맞게 작성 했습니다만 check_ouput 인터페이스로는 자식 프로세스의 PID를 얻을 수 없습니다. 대신에 Popen을 사용하세요.

proc = subprocess.Popen(["ls", "-l"], stdout=PIPE, stderr=PIPE)

# 여기에서 자식 프로세스의 PID를 얻을 수 있습니다.
global child_pid
child_pid = proc.pid

# 여기서 자식 프로세스가 완료될 때까지 기다릴 수 있습니다.
(output, error) = proc.communicate()

if error:
    print "error:", error

print "output:", output

종료할 때 자식 프로세스를 죽이는 것을 분명하게 만드세요.

import os
import signal
def kill_child():
    if child_pid is None:
        pass
    else:
        os.kill(child_pid, signal.SIGTERM)

import atexit
atexit.register(kill_child)

당신은 두 개의 방법으로 이를 할 수 있습니다. 그들은 check_output 대신에 Popen을 사용해야 합니다. 첫 번째는 다음처럼 try..finally 를 사용하는 더 간단한 방법입니다.

from contextlib import contextmanager

@contextmanager
def run_and_terminate_process(*args, **kwargs):
try:
    p = subprocess.Popen(*args, **kwargs)
    yield p        
finally:
    p.terminate() # sigterm을 보내고, ...
    p.kill()      # sigkill을 보냅니다.

def main():
    with run_and_terminate_process(args) as running_proc:
        # running_proc.stdout.readline() 처럼 당신의 코드를 여기에 작성합니다.

이는 sigint(키보드 인터럽트)와 sigterm을 잡아내지만, sigkill(-9로 kill 스트립트 실행)을 잡아내지 못합니다.

다른 방법은 좀 더 복잡한데 ctypes의 rctl PR_SET_PDEATHSIG을 사용하는 것입니다. 시스템은 자식에게 시그널을 보낼것이고 부모는 어떤 이유(심지어 sigkill)로든 종료합니다.

import signal
import ctypes
libc = ctypes.CDLL("libc.so.6")
def set_pdeathsig(sig = signal.SIGTERM):
    def callable():
        return libc.prctl(1, sig)
    return callable
p = subprocess.Popen(args, preexec_fn = set_pdeathsig(signal.SIGTERM))
반응형
반응형

출처 : https://stackoverflow.com/questions/36476841/python-how-to-read-stdout-of-subprocess-in-a-nonblocking-way/36477512

nonblocking 방법으로 subprocess의 출력을 읽는 방법

저는 subprocess를 시작하고 표준 출력을 확인하는 간단한 python script를 작성하려고 합니다.

여기에 코드의 스니펫이 있습니다.

process = subprocess.Popen([path_to_exe, os.path.join(temp_dir,temp_file)], stdout=subprocess.PIPE)
while True:   
    output=process.stdout.readline()
    print "test"

문제는 script가 output=process.stdout.readline()에서 대기하고 subprocess가 끝난 후에만 print "test"를 실행하는 것입니다.

subprocess가 종료되는 것을 기다리지 않고 표준 출력을 읽어 그것을 출력하는 방법이 있을까요?

제가 시작한 subprocess는 제가 소스 코드를 가지고 있지 않은 윈도우즈 바이너리입니다.

비슷한 질문을 몇 개 찾았지만 그 답변은 리눅스에서만 적용할 수 있거나 제가 시작한 subprocess의 소스를 가지고 있을 경우에만 적용할 수 있었습니다.


2개의 답변 중 1 개의 답변만 추려냄.

select 모듈을 확인하세요.

import subprocess
import select
import time

x=subprocess.Popen(['/bin/bash','-c',"while true; do sleep 5; echo yes; done"],stdout=subprocess.PIPE)

y=select.poll()
y.register(x.stdout,select.POLLIN)

while True:
  if y.poll(1):
     print x.stdout.readline()
  else:
     print "nothing here"
     time.sleep(1)

편집:

POSIX가 아닌 시스템을 위한 쓰레드 처리된 해결책입니다.

import subprocess
from threading import Thread 
import time

linebuffer=[]
x=subprocess.Popen(['/bin/bash','-c',"while true; do sleep 5; echo yes; done"],stdout=subprocess.PIPE)

def reader(f,buffer):
   while True:
     line=f.readline()
     if line:
        buffer.append(line)
     else:
        break

t=Thread(target=reader,args=(x.stdout,linebuffer))
t.daemon=True
t.start()

while True:
  if linebuffer:
     print linebuffer.pop(0)
  else:
     print "nothing here"
     time.sleep(1)
반응형
반응형

출처

https://www.metachris.com/2016/04/python-threadpool/

https://github.com/SDRLurker/TIL/blob/master/python/thread_pool.md


Python 쓰레드 풀

쓰레드 풀은 주어진 일을 할 준비가 된 미리 만들어진 한가한 쓰레드 그룹입니다. 이들은 마쳐야 할 긴 작업의 작은 쓰레드 개수보다 짧은 작업의 많은 쓰레드 개수가 있을 때 각 작업에 대한 쓰레드를 인스턴스화 하는 것이 더 선호됩니다.

인터넷에서 문서 1000개를 다운로드하고 싶지만 한 번에 50개를 다운로드 할 수 있는 리소스 만 갖고 있다고 가정합니다. 해결책은 스레드 풀을 사용하여 고정된 수의 스레드를 생성하여 큐에서 모든 URL을 한 번에 50개씩 다운로드합니다.

스레드 풀을 사용하기 위해 Python 3.x에는 ThreadPoolExecutor 클래스가 포함되어 있고 Python 2.x와 3.x에는 multiprocessing.dummy.ThreadPool 이 있습니다. multiprocessing.dummy멀티프로세싱(multiprocessing) API를 복제하지만 threading 모듈 주변의 래퍼(wrapper)일 뿐입니다.

multiprocessing.dummy.ThreadPool 의 단점은 Python 2.x에서 대기열의 모든 작업이 스레드에 의해 완료되기 전에 프로그램을 종료 할 수 없다는 것입니다. 예를 들어 KeyboardInterrupt가 있습니다.

Python 2.x 및 3.x에서 (PDFx에서 사용하기 위해) 인터럽트 가능한 스레드 대기열(큐,queue)을 얻기 위해 stackoverflow.com/a/7257510에서 영감을 얻어 이 코드를 작성했습니다. Python 2.x 및 3.x에서 작동하는 스레드 풀을 구현합니다.

import sys
IS_PY2 = sys.version_info < (3, 0)

if IS_PY2:
    from Queue import Queue
else:
    from queue import Queue

from threading import Thread


class Worker(Thread):
    """ 주어진 작업들에 대한 대기열(큐,queue)로부터 작업을 실행할 쓰레드 """
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception as e:
                # 이 쓰레드에서 발생된 exception
                print(e)
            finally:
                # exception이 발생하던 안 하던 이 작업의 종료를 마크합니다.
                self.tasks.task_done()


class ThreadPool:
    """ 대기열(큐,queue)로부터 작업을 소비하는 쓰레드 풀 """
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """ 대기열(큐,queue)에 작업을 추가 """
        self.tasks.put((func, args, kargs))

    def map(self, func, args_list):
        """ 대기열(큐,queue)에 작업의 리스트를 추가 """
        for args in args_list:
            self.add_task(func, args)

    def wait_completion(self):
        """ 대기열(큐,queue)에 모든 작업의 완료를 기다림 """
        self.tasks.join()


if __name__ == "__main__":
    from random import randrange
    from time import sleep

    # 쓰레드에서 실행될 함수
    def wait_delay(d):
        print("sleeping for (%d)sec" % d)
        sleep(d)

    # 임의의 지연시간 생성
    delays = [randrange(3, 7) for i in range(50)]

    # 5개의 worker 쓰레드로 쓰레드 풀을 인스턴스화
    pool = ThreadPool(5)

    # 쓰레드 풀로 대량의 작업을 추가. 하나씩 작업을 추가하기 위해 `pool.add_task`
    # 사용 가능. 이 코드는 이 곳에서 막힐(block) 것이지만 
    # 현재 실행하고 있는 worker의 배치작업이 완료되면
    # exception으로 쓰레드 풀을 취소하는 것이 가능하도록 만들 수 있습니다.
    pool.map(wait_delay, delays)
    pool.wait_completion()

큐 크기는 스레드 수와 유사합니다 (self.tasks = Queue(num_threads) 참조). 따라서 pool.map(..)pool.add_task(..)로 작업을 추가하면 Queue의 새 슬롯이 사용가능할 때까지 막힐(block)것 입니다.

Ctrl + C를 눌러 KeyboardInterrupt를 실행하면 현재 Worker 배치가 완료되고 프로그램이 pool.map(..) 단계에서 exception로 종료됩니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/23786674/python-mysqldb-how-to-get-columns-name-without-executing-select-in-a-big-tab

python: MySQLdb. 의 큰 테이블에서 select * 없이 컬럼명을 얻는 방법

저는 테이블에서 컬럼명을 얻고 싶습니다만 100만 개 이상의 데이터가 그 안에 있습니다. 그래서 다음 쿼리를 사용할 수 없습니다.

cursor.execute("SELECT * FROM table_name")
print cursor.description

sqlite3에서 저는 이 방법을 사용하였습니다.

crs.execute("PRAGMA table_info(%s)" %(tablename[0]))
for info in crs:
    print info

이는 python mysqldb에서는 작동하지 않습니다. 방법을 아는 분 계신가요?


5개 답변중 1개의 답변만 추려냄.

당신은 SHOW columns을 사용할 수 있습니다.

cursor.execute("SHOW columns FROM table_name")
print [column[0] for column in cursor.fetchall()]

참고 바람니다. 이는 본질적으로 desc를 사용하는 것과 같습니다.

cursor.execute("desc table_name")
print [column[0] for column in cursor.fetchall()]
반응형
반응형

출처 : https://stackoverflow.com/questions/11892729/how-to-log-in-to-a-website-using-pythons-requests-module/

Python의 Requests 모듈을 사용하여 웹사이트에 "로그인"하는 방법

저는 Python으로 Requests 모듈을 사용하여 웹사이트로 로그인하는 요청을 post 방식으로 처리하려 합니다만 잘 작동하지 않습니다. 저는 이걸 처음 해 봅니다... 그래서 제 사용자명과 비밀번호 쿠키나 HTTP 인증 같은 형태를 만들어야 하는지 알 수 없습니다.

from pyquery import PyQuery
import requests

url = 'http://www.locationary.com/home/index2.jsp'

지금부터 저는 "post" 방식과 쿠키를 사용합니다.

ck = {'inUserName': 'USERNAME/EMAIL', 'inUserPass': 'PASSWORD'}

r = requests.post(url, cookies=ck)

content = r.text

q = PyQuery(content)

title = q("title").text()

print title

저는 쿠키에서 뭔가 잘못하고 있는 거 같지만... 모르겠습니다.

제가 정확히 로그인하지 못 했다면, 홈페이지 제목은 "Locationary.com"이 나올 것이고 로그인을 했다면 "Home Page"가 되어야 합니다.

만약 requests와 쿠키에 관한 몇가지를 저에게 설명해 주시고 이에 관해 저를 도와주신다면 정말 감사하겠습니다.

감사합니다.

... 아직 잘 작동하지는 않지만 로그인 전에 홈페이지의 HTML은 다음처럼 나옵니다.

저는 잘하고 있다고 생각하지만, 출력은 아직 "Locationary.com"입니다.

2번째 편집:

저는 그 도메인의 페이지를 요청할 때마다 오래동안 로그인을 유지할 수 있기를 원합니다. 제가 로그인을 했다면 나타날 그 내용이 나타나기를 원합니다.


6개의 답변 중 1 개의 답변만 추려냄.

저는 당신이 다른 해결책을 찾았다는 것을 알고 있지만 저처럼 이 질문의 해결책 혹은 같은 질문을 찾기 원하는 사람들을 위해 다음처럼 requests를 사용하여 할 수 있습니다.

첫째, 마커스(Marcus)가 한 것처럼 로그인 form(양식)에 post할 URL, 사용자이름과 비밀번호 필드의 name 속성을 소스에서 확인합니다. 그의 예시에서는 inUserName과 inUserPass 입니다.

일단, payload로 로그인 세부 정보를 post 방식으로 요청하기 위해 request.Session() 인스턴스를 사용할 수 있습니다. Session 인스턴스로부터 요청하는 것은 본질적으로 일반적인 requests를 사용하는 것과 같습니다. 이는 간단하게 쿠키들을 사용하고 저장하도록 하는 지속성이 추가된 것입니다.

당신의 로그인 시도가 성공적이었다 가정하면 그 사이트에서 이후 요청을 하기 위해 session 인스턴스를 간단하게 사용할 수 있습니다. 당신을 식별하기 위한 쿠키가 요청을 승인하는 데 사용됩니다.

예시

import requests

# 로그인 form에 post 방식으로 전송될 세부 내용을 작성합니다.
payload = {
    'inUserName': 'username',
    'inUserPass': 'password'
}

# 'with'는 사용한 뒤에 session이 닫히도록 보장합니다.
with requests.Session() as s:
    p = s.post('LOGIN_URL', data=payload)
    # 성공적으로 로그인 했는지 보기 위해 더 현명하게 어떤게 리턴되었는지 html을 출력합니다.
    print p.text

    # 승인된 요청
    r = s.get('A protected web page url')
    print r.text
        # 기타...
반응형
반응형

출처 : https://stackoverflow.com/questions/4260280/if-else-in-pythons-list-comprehension

파이썬 list comprehension에서 if/else 구문 사용법?

파이썬에서 어떻게 다음 구문을 할 수 있습니까?

row = [unicode(x.strip()) for x in row if x is not None else '']

특별히:

  1. 빈 문자열을 None으로 대체하고

  2. 함수를 실행하고 싶습니다.


3 개의 답변 중 1 개의 답변만 추려냄.

이렇게 하실 수 있고 순서의 문제입니다.

[ unicode(x.strip()) if x is not None else '' for x in row ]

일반적으로,

[f(x) if condition else g(x) for x in sequence]

if 조건만으로 for list comprehensions을 사용한다면

[f(x) for x in sequence if condition]

이는 실제로 다른 언어 구문인 조건부 표현식을 사용하는 데, 그 자체는 list comprehension 문법의 일부가 아니며 if 뒤에 for…in 이 list comprehension이며 원래 데이터(source)를 순회(iterable)하면서 각 요소를 필터링하는데 (조건부 표현식이) 사용됩니다.

조건식은 2가지 값 중 선택하려는 모든 종류의 상황에서 사용할 수 있습니다. 이 조건식은 다른 언어에도 존재하는 ?: 삼항 연산자 와 같습니다. 예를 들면

value = 123
print(value, 'is', 'even' if value % 2 == 0 else 'odd')

이렇게 사용하실 수 있습니다.

반응형
반응형

출처 : https://stackoverflow.com/questions/4770297/python-convert-utc-datetime-string-to-local-datetime

파이썬 - UTC 날짜시간 문자열을 local 날짜시간으로 변환하기

저는 시간과 utc간에 변환 할 필요가 없었습니다. 최근에 제 앱이 시간대를 인식하도록 요청이 있었으며 스스로 알아보고 있었습니다. 현지 시간을 utc로 변환하는 방법에 대한 많은 정보를 얻었는데 초보자 (어쩌면 그렇게 잘못하고있는 것 같습니다)라 최종 사용자 시간대로 UTC 시간을 쉽게 변환하는 방법에 대한 정보는 찾을 수 없습니다.

간단히 말해서 android app은 나를 (appengine app) 데이터로 보내고 그 데이터는 타임 스탬프입니다. 해당 타임 스탬프를 utc 시간으로 저장하기 위해 다음을 사용하였습니다.

datetime.utcfromtimestamp(timestamp)

이는 효과가 있는 것 같습니다. 내 앱이 데이터를 저장하면 5시간 이전 시간으로 저장됩니다. (저는 EST-5 시간대에 있습니다.)

데이터가 appengine의 BigTable에 저장되고 검색되면 다음과 같은 문자열로 나타납니다.

"2011-01-21 02:37:21"

이 문자열을 사용자의 올바른 시간대에있는 DateTime으로 변환하려면 어떻게 해야 합니까?

또한, 사용자 시간대 정보가 주로 저장됩니까? tz(시간대) 정보를 일반적으로 어떻게 저장합니까? (예시: "-5:00" 또는 "EST" 등등?) 첫 번째 질문에 대한 대답에는 두 번째 대답에 포함될 수 있습니다.


7 개의 답변 중 1 개의 답변만 추려냄.

당신이 tzinfo 객체를 제공하기 원하지 않으면 python-dateutil 라이브러리를 확인하시면 됩니다. 시간대에 대한 표준명으로 시간대 규칙을 참조할 수 있도록 zoneinfo (Olson) 데이터베이스 위에 tzinfo 구현을 제공합니다.

from datetime import datetime
from dateutil import tz

# 방법 1: 시간대 하드코딩:
from_zone = tz.gettz('UTC')
to_zone = tz.gettz('America/New_York')

# 방법 2: 시간대 자동감지:
from_zone = tz.tzutc()
to_zone = tz.tzlocal()

# utc = datetime.utcnow()
utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S')

# 날짜시간 객체가 기본으로 '순수'하기 때문에  
# UTC 시간대에 있다고 날짜시간 객체에게 알려줍니다.
utc = utc.replace(tzinfo=from_zone)

# 시간대를 변환합니다.
central = utc.astimezone(to_zone)

편집 strptime 사용법을 보여주기 위해 예제를 확장함

편집2 더 좋은 entry point 메소드를 보여주기 위해 API 사용법을 수정

편집3 시간대 (Yarin)를 자동 감지하기 위한 메소드 포함

반응형
반응형

출처 

http://chrisalbon.com/python/pandas_create_column_with_loop.html

For 루프로 Pandas 열 만들기


사전준비

import pandas as pd
import numpy as np

데이터프레임 예시 만들기

raw_data = {'student_name': ['Miller', 'Jacobson', 'Ali', 'Milner', 'Cooze', 'Jacon', 'Ryaner', 'Sone', 'Sloan', 'Piger', 'Riani', 'Ali'], 
        'test_score': [76, 88, 84, 67, 53, 96, 64, 91, 77, 73, 52, np.NaN]}
df = pd.DataFrame(raw_data, columns = ['student_name', 'test_score'])


학점을 할당하기 위한 함수 만들기

# 데이터를 저장할 list(리스트)를 만듭니다. grades = [] # 열에 추가할 각 행을 For로 순회합니다, for row in df['test_score']: # 이 값보다 크면, if row > 95: # 'A' 학점으로 list에 추가합니다. grades.append('A') # 아니고, 이 값보다 크면, elif row > 90: # 'A-' 학점으로 list에 추가합니다. grades.append('A-') # 아니고, 이 값보다 크면, elif row > 85: # 'B' 학점으로 list에 추가합니다. grades.append('B') # 아니고, 이 값보다 크면, elif row > 80: # 'B-' 학점으로 list에 추가합니다. grades.append('B-') # 아니고, 이 값보다 크면, elif row > 75: # 'C' 학점으로 list에 추가합니다. grades.append('C') # 아니고, 이 값보다 크면, elif row > 70: # 'C-' 학점으로 list에 추가합니다. grades.append('C-') # 아니고, 이 값보다 크면, elif row > 65: # 'D' 학점으로 list에 추가합니다. grades.append('D') # 아니고, 이 값보다 크면, elif row > 60: # 'D-' 학점으로 list에 추가합니다. grades.append('D-') # 아니면, else: # 'F' 학점을 추가합니다. grades.append('Failed') # list(리스트)로부터 'grades'열을 추가합니다. df['grades'] = grades

# 새로운 dataframe(데이터프레임)을 봅니다. df

student_nametest_scoregrades
0Miller76C
1Jacobson88B
2Ali84B-
3Milner67D
4Cooze53Failed
5Jacon96A
6Ryaner64D-
7Sone91A-
8Sloan77C
9Piger73C-
10Riani52Failed
11AliNaNFailed


반응형

+ Recent posts