Python 多线程
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
- 使用线程可以把占据长时间的程序中的任务放到后台去处理。
- 用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度
- 程序的运行速度可能加快
- 在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
线程在执行过程中与进程还是有区别的。每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
- 线程可以被抢占(中断)。
- 在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。 Python中使用线程有两种方式:函数或者用类来包装线程对象。
1. 函数式
函数式:调用thread模块中的Thread()函数来产生新线程。语法如下:
from threading import Thread
thread_obj = Thread([group [, target[, name[, args [,kwargs]]]]])
- group 暂时无用,未来功能的预留参数
- target 执行的目标任务名
- name 线程名,一般不用设置
- args 以元组的方式给执行任务传参
- kwargs 以字典的方式给执行任务传参
#启动线程
thread_obj.start()
2. 继承Thread类
from threading import Thread
class MyThread(Thread):
def run(self):
for i in range(1000):
print("func", i)
if __name__ == '__main__':
t = MyThread()
t.start()
for i in range(1000):
print("main", i)
以上两种是最基本的python创建多线程的方案.
3. 线程模块
Python通过两个标准库thread和threading提供对线程的支持。thread提供了低级别的、原始的线程以及一个简单的锁。
threading 模块提供的其他方法:
threading.currentThread()
: 返回当前的线程变量。threading.enumerate()
: 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
: 返回正在运行的线程数量,与len(threading.enumerate())
有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
run()
: 用以表示线程活动的方法。getName()
: 返回线程名。setName()
: 设置线程名start()
当前线程准备就绪join()
等待当前线程任务执行完毕后再向下继续执行isAlive()
: 返回线程是否活动的。setDaemon(bool)
守护线程,必须放在start()
之前setDaemon(True)
设置为守护线程,主线程执行完毕后,子线程也自动关闭setDaemon(False)
设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
4. 线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。如下:
多线程的优势在于可以同时运行多个任务(至少感觉起来是这样)。但是当线程需要共享数据时,可能存在数据不同步的问题。
考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。
那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。为了避免这种情况,引入了锁的概念。
锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。
经过这样的处理,打印列表时要么全部输出0,要么全部输出1,不会再出现一半0一半1的尴尬场面。
#!/usr/bin/python3
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, delay):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.delay = delay
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.delay, 3)
# 释放锁,开启下一个线程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
执行以上程序,输出结果为:
开启线程: Thread-1
开启线程: Thread-2
Thread-1: Wed Jan 5 17:36:50 2023
Thread-1: Wed Jan 5 17:36:51 2023
Thread-1: Wed Jan 5 17:36:52 2023
Thread-2: Wed Jan 5 17:36:54 2023
Thread-2: Wed Jan 5 17:36:56 2023
Thread-2: Wed Jan 5 17:36:58 2023
退出主线程
5. 线程优先级队列( Queue)
Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。
这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。
Queue 模块中的常用方法:
Queue.qsize()
返回队列的大小Queue.empty()
如果队列为空,返回True,反之FalseQueue.full()
如果队列满了,返回True,反之FalseQueue.full
与 maxsize 大小对应Queue.get([block[, timeout]])
获取队列,timeout等待时间Queue.get_nowait()
相当Queue.get(False)Queue.put(item)
写入队列,timeout等待时间Queue.put_nowait(item)
相当Queue.put(item, False)Queue.task_done()
在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号Queue.join()
实际上意味着等到队列为空,再执行别的操作
6. 线程安全 & 线程锁 & 死锁
线程安全
一个进程中可以有多个线程,且线程共享所有进程中的资源。
多个线程同时去操作一个"东西",可能会存在数据混乱的情况,例如:
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
在开发的过程中要注意有些操作默认都是 线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,例如:
import threading
data_list = []
lock_object = threading.RLock()
def task():
print("开始")
for i in range(1000000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=task)
t.start()
Output:
开始
开始
1882647
2000000
线程锁
在程序中如果想要自己手动加锁,一般有两种:Lock
和 RLock
。
Lock
,同步锁。
import threading
num = 0
lock_object = threading.Lock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
Output:
开始
开始
1000000
2000000
RLock
,递归锁。
import threading
num = 0
lock_object = threading.RLock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
Output:
开始
开始
1000000
2000000
RLock
支持多次申请锁和多次释放;Lock
不支持。例如:
import threading
import time
lock_object = threading.RLock()
def task():
print("开始")
lock_object.acquire()
lock_object.acquire()
print(123)
lock_object.release()
lock_object.release()
for i in range(3):
t = threading.Thread(target=task)
t.start()
Output:
开始
123
开始
123
开始
123
import threading
lock = threading.RLock()
# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
with lock:
pass
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
print("其他功能")
func() # 调用程序员A写的func函数,内部用到了锁。
print("其他功能")
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
with lock:
print("其他功能")
func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
print("其他功能")
死锁
死锁,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象。
import threading
num = 0
lock_object = threading.Lock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
import threading
import time
lock_1 = threading.Lock()
lock_2 = threading.Lock()
def task1():
lock_1.acquire()
time.sleep(1)
lock_2.acquire()
print(11)
lock_2.release()
print(111)
lock_1.release()
print(1111)
def task2():
lock_2.acquire()
time.sleep(1)
lock_1.acquire()
print(22)
lock_1.release()
print(222)
lock_2.release()
print(2222)
t1 = threading.Thread(target=task1)
t1.start()
t2 = threading.Thread(target=task2)
t2.start()
Python 线程池
Executor
在学习线程池之前,先了解一下Python标准库中concurrent.futures
中的Executor
对象
抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。
异步执行可以由 ThreadPoolExecutor
使用线程或由 ProcessPoolExecutor
使用单独的进程来实现。 两者都是实现抽像类 Executor
定义的接口。
Executor
方法:
submit(fn, /, *args, **kwargs)
调度可调用对象fn
,以fn(*args, **kwargs)
方式执行并返回一个代表该可调用对象的执行的Future
对象。map(func, *iterables, timeout=None, chunksize=1)
类似于map(func, *iterables)
函数,但与map(func, *iterables)
函数有不同之处:- iterables 是立即执行而不是延迟执行的;
- func 是异步执行的,对 func 的多个调用可以并发执行。
如果从原始调用到
Executor.map()
经过timeout
秒后,__next__()
已被调用且返回的结果还不可用,那么已返回的迭代器将触发concurrent.futures.TimeoutError
。timeout
可以是整数或浮点数。如果timeout
没有指定或为None
,则没有超时限制。 如果func
调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。 使用ProcessPoolExecutor
时,这个方法会将iterables
分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由chunksize
指定正整数设置。 对很长的迭代器来说,使用大的chunksize
值比默认值 1 能显著地提高性能。chunksize
对ThreadPoolExecutor
没有效果。
shutdown(wait=True, *, cancel_futures=False)
当待执行的future
对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用Executor.submit()
和Executor.map()
将会引发RuntimeError
。 如果wait
为True
则此方法只有在所有待执行的future
对象完成执行且释放已分配的资源后才会返回。 如果wait
为False
,方法立即返回,所有待执行的future
对象完成执行后会释放已分配的资源。 不管wait
的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。 如果cancel_futures
为True
,此方法将取消所有执行器还未开始运行的挂起的Future
。 任何已完成或正在运行的Future
将不会被取消,无论cancel_futures
的值是什么? 如果cancel_futures
和wait
均为True
,则执行器已开始运行的所有Future
将在此方法返回之前完成。 其余的Future
会被取消。 如果使用with
语句,你就可以避免显式调用这个方法,它将会停止Executor
(就好像Executor.shutdown()
调用时wait
设为True
一样等待):
ThreadPoolExecutor
ThreadPoolExecutor
是 Executor
的子类,它使用线程池来异步执行调用。
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
max_workers
:Executor
子类使用最多max_workers
个线程的线程池来异步执行调用。thread_name_prefix
: 线程名称initializer
: 在每个工作者进程开始处调用的一个可选可调用对象initargs
: 传递给初始化器的元组参数。
Future 对象及其方法
Future
类将可调用对象封装为异步执行。Future
实例由 Executor.submit()
创建。
class concurrent.futures.Future
将可调用对象封装为异步执行。Future
实例由 Executor.submit()
创建。
cancel()
: 如果调用正在执行或已结束运行不能被取消则该方法将返回False
,否则调用会被取消并且该方法将返回True
。cancelled()
: 如果调用成功取消返回True
。running()
: 如果调用正在执行而且不能被取消那么返回True
。done()
: 如果调用已被取消或正常结束那么返回True
。result(timeout=None)
: 返回调用返回的值。如果调用还没完成那么这个方法将等待timeout
秒。如果在timeout
秒内没有执行完成,concurrent.futures.TimeoutError
将会被触发。timeout
可以是整数或浮点数。如果timeout
没有指定或为None
,那么等待时间就没有限制。exception(timeout=None)
: 返回由调用引发的异常。如果调用还没完成那么这个方法将等待timeout
秒。如果在timeout
秒内没有执行完成,concurrent.futures.TimeoutError
将会被触发。timeout
可以是整数或浮点数。如果timeout
没有指定或为None
,那么等待时间就没有限制。add_done_callback(fn)
: 附加可调用fn
到future
对象。当future
对象被取消或完成运行时,将会调用fn
,而这个future
对象将作为它唯一的参数。
下面这些 Future
方法用于单元测试和 Executor
实现。
set_running_or_notify_cancel()
: 这个方法只可以在执行关联Future
工作之前由Executor
实现调用或由单测试调用。set_result(result)
: 设置将Future
关联工作的结果给 result 。set_exception(exception)
: 设置Future
关联工作的结果给Exception exception
。
Python 多进程
进程是计算机中资源分配的最小单元;一个进程中可以有多个线程,同一个进程中的线程共享资源; 进程与进程之间则是相互隔离。 Python中通过多进程可以利用CPU的多核优势,计算密集型操作适用于多进程。
关于在Python中基于multiprocessiong
模块操作的进程:
根据不同的平台,multiprocessing
支持三种启动进程的方法。这些启动方法 有
fork
: 【“拷贝”几乎所有资源】【支持文件对象/线程锁等传参】【unix】【任意位置开始】【快】spawn
: 【run参数传必备资源】【不支持文件对象/线程锁等传参】【unix、win】【main代码块开始】【慢】forkserver
: 【run参数传必备资源】【不支持文件对象/线程锁等传参】【部分unix】【main代码块开始】
Example1:
import multiprocessing
import time
"""
def task():
print(name)
name.append(123)
if __name__ == '__main__':
multiprocessing.set_start_method("fork") # fork、spawn、forkserver
name = []
p1 = multiprocessing.Process(target=task)
p1.start()
time.sleep(2)
print(name) # []
"""
"""
def task():
print(name) # [123]
if __name__ == '__main__':
multiprocessing.set_start_method("fork") # fork、spawn、forkserver
name = []
name.append(123)
p1 = multiprocessing.Process(target=task)
p1.start()
"""
"""
def task():
print(name) # []
if __name__ == '__main__':
multiprocessing.set_start_method("fork") # fork、spawn、forkserver
name = []
p1 = multiprocessing.Process(target=task)
p1.start()
name.append(123)
"""
Example2:
import multiprocessing
def task():
print(name)
print(file_object)
if __name__ == '__main__':
multiprocessing.set_start_method("fork") # fork、spawn、forkserver
name = []
file_object = open('x1.txt', mode='a+', encoding='utf-8')
p1 = multiprocessing.Process(target=task)
p1.start()
常见功能
进程的常见方法:
p.start()
,当前进程准备就绪,等待被CPU调度(工作单元其实是进程中的线程)。p.join()
,等待当前进程的任务执行完毕后再向下继续执行。
import time
from multiprocessing import Process
def task(arg):
time.sleep(2)
print("执行中...")
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
p = Process(target=task, args=('xxx',))
p.start()
p.join()
print("继续执行...")
p.daemon = 布尔值,守护进程(必须放在start之前
p.daemon =True
,设置为守护进程,主进程执行完毕后,子进程也自动关闭。p.daemon =False
,设置为非守护进程,主进程等待子进程,子进程执行完毕后,主进程才结束。
import time
from multiprocessing import Process
def task(arg):
time.sleep(2)
print("执行中...")
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
p = Process(target=task, args=('xxx',))
p.daemon = True
p.start()
print("继续执行...")
- 进程的名称的设置和获取
import os
import time
import threading
import multiprocessing
def func():
time.sleep(3)
def task(arg):
for i in range(10):
t = threading.Thread(target=func)
t.start()
print(os.getpid(), os.getppid())
print("线程个数", len(threading.enumerate()))
time.sleep(2)
print("当前进程的名称:", multiprocessing.current_process().name)
if __name__ == '__main__':
print(os.getpid())
multiprocessing.set_start_method("spawn")
p = multiprocessing.Process(target=task, args=('xxx',))
p.name = "哈哈哈哈"
p.start()
print("继续执行...")
- 自定义进程类,直接将线程需要做的事写到run方法中。
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print('执行此进程', self._args)
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
p = MyProcess(args=('xxx',))
p.start()
print("继续执行...")
- CPU个数,程序一般创建多少个进程?(利用CPU多核优势)。
import multiprocessing
if __name__ == '__main__':
count = multiprocessing.cpu_count()
for i in range(count - 1):
p = multiprocessing.Process(target=xxxx)
p.start()
进程锁
如果多个进程抢占式去做某些操作时候,为了防止操作出问题,可以通过进程锁来避免。
import time
from multiprocessing import Process, Manager
def f(d, ):
d[1] += 1
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
d[1] = 0
for i in range(20):
p = Process(target=f, args=(d,))
p.start()
time.sleep(3)
print(d)
import time
import multiprocessing
def task():
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print("排队抢票了")
time.sleep(1)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
if __name__ == '__main__':
for i in range(20):
p = multiprocessing.Process(target=task)
p.start()
很显然,多进程在操作时就会出问题,此时就需要锁来介入:
import time
import multiprocessing
def task(lock):
print("开始")
lock.acquire()
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print("排队抢票了")
time.sleep(0.5)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
lock = multiprocessing.RLock() # 进程锁
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
# spawn模式,需要特殊处理。
time.sleep(7)
import time
import multiprocessing
import os
def task(lock):
print("开始")
lock.acquire()
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print(os.getpid(), "排队抢票了")
time.sleep(0.5)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method("spawn")
lock = multiprocessing.RLock()
process_list = []
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
process_list.append(p)
# spawn模式,需要特殊处理。
for item in process_list:
item.join()
import time
import multiprocessing
def task(lock):
print("开始")
lock.acquire()
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print("排队抢票了")
time.sleep(1)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
lock.release()
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
lock = multiprocessing.RLock()
for i in range(10):
p = multiprocessing.Process(target=task, args=(lock,))
p.start()
Python 进程池
ProcessPoolExecutor
类是 Executor
的子类,它使用进程池来异步地执行调用。
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
max_workers
:Executor
子类使用最多max_workers
个线程的线程池来异步执行调用。thread_name_prefix
: 线程名称initializer
: 在每个工作者进程开始处调用的一个可选可调用对象initargs
: 传递给初始化器的元组参数。
Example1:
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def task(num):
print("执行", num)
time.sleep(2)
if __name__ == '__main__':
# 修改模式
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, i)
print(1)
print(2)
Example2:
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def task(num):
print("执行", num)
time.sleep(2)
return num
def done(res):
print(multiprocessing.current_process())
time.sleep(1)
print(res.result())
time.sleep(1)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4)
for i in range(50):
fur = pool.submit(task, i)
fur.add_done_callback(done) # done的调用由主进程处理(与线程池不同)
print(multiprocessing.current_process())
pool.shutdown(True)
注意:如果在进程池中要使用进程锁,则需要基于Manager中的Lock和RLock来实现。
import time
import multiprocessing
from concurrent.futures.process import ProcessPoolExecutor
def task(lock):
print("开始")
# lock.acquire()
# lock.relase()
with lock:
# 假设文件中保存的内容就是一个值:10
with open('f1.txt', mode='r', encoding='utf-8') as f:
current_num = int(f.read())
print("排队抢票了")
time.sleep(1)
current_num -= 1
with open('f1.txt', mode='w', encoding='utf-8') as f:
f.write(str(current_num))
if __name__ == '__main__':
pool = ProcessPoolExecutor()
# lock_object = multiprocessing.RLock() # 不能使用
manager = multiprocessing.Manager()
lock_object = manager.RLock() # Lock
for i in range(10):
pool.submit(task, lock_object)
案例:计算每天用户访问情况。
import os
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
def task(file_name, count_dict):
ip_set = set()
total_count = 0
ip_count = 0
file_path = os.path.join("files", file_name)
file_object = open(file_path, mode='r', encoding='utf-8')
for line in file_object:
if not line.strip():
continue
user_ip = line.split(" - -", maxsplit=1)[0].split(",")[0]
total_count += 1
if user_ip in ip_set:
continue
ip_count += 1
ip_set.add(user_ip)
count_dict[file_name] = {"total": total_count, 'ip': ip_count}
time.sleep(1)
def run():
# 根据目录读取文件并初始化字典
"""
1.读取目录下所有的文件,每个进程处理一个文件。
"""
pool = ProcessPoolExecutor(4)
with Manager() as manager:
"""
count_dict={
"20210322.log":{"total":10000,'ip':800},
}
"""
count_dict = manager.dict()
for file_name in os.listdir("files"):
pool.submit(task, file_name, count_dict)
pool.shutdown(True)
for k, v in count_dict.items():
print(k, v)
if __name__ == '__main__':
run()