Ich habe mit Python eine recht umfangreiche Simulation geschrieben, die wenn ich sie ausführe irgendwann abgebrochen wird. Einzige Rückmeldung ist ein
killed
Eine Python-Exception bekomme ich nicht zu sehen.
Das Programm läuft sehr grenzwertig an meiner RAM-Kapazität entlang. Liegt das eventuell daran? Kann ich das unterdrücken? Ich bin für jeden Hinweis dankbar.
Ich denke zwar nicht, das das Problem explizit im Code liegt, hänge den aber der Vollständigkeit halber mit an. (Das gleiche Problem hatte ich auch schon bei nicht-multiprocessing Skripten)
from pool import ListPool from pool_preparation import * from statistics import * import math def post_production(all): outbreaks = [(r,l) for i,r,l,f in all if i>1] outbreak_ratio = float(len(outbreaks))/len(all) loyalty_ratio = sum([math.copysign(1.0, l) for l in [l for r,l in outbreaks]])/len(outbreaks) av_range, dummy, sd_range, dummy, dummy, dummy = simple_statistics([r for r,l in outbreaks]) av_loyalty, dummy, sd_loyalty, dummy, dummy, dummy = simple_statistics([l for r,l in outbreaks]) return outbreak_ratio, loyalty_ratio, av_range, sd_range, av_loyalty, sd_loyalty def write_out(file, *args): strs = [str(arg) for arg in args] seperator = " " line = seperator.join(strs) line += "\n" file.write(line) def write_full_node_data(node, liste): file = open("results/node_" + str(node) + ".dat", "w") for i,r,l,f in liste: write_out(file,i,r,l,f) file.close() tg = TracingGraph("data/D_edgelist.txt", "data/D_node_cluster.txt") nodes = nodes_of_cluster(132, tg.mdgraph) pool = ListPool(trace_single, tg.mdgraph, nodes) pool.run() file = open("results/cluster_132.dat", "w") for k, v in pool.results.iteritems(): write_full_node_data(k, v) ra, lr, ar, sr, al, sl = post_production(v) write_out(file, k, ra, lr, ar, sr, al, sl)
import networkx as nx from numpy import loadtxt from tracker import SimTracker class TracingGraph(object): "parameters for tracing using multiprocessing pool" def __init__(self, edgelist_filename, cluster_filename): self.__mdgraph = nx.MultiDiGraph() nodelist = loadtxt(cluster_filename, dtype=int) nodelist = [(n, {'cluster': c}) for n,c in nodelist] self.__mdgraph.add_nodes_from(nodelist) edgelist = loadtxt(edgelist_filename, dtype=int, usecols=[0,1,3]) edgelist = [(u,v, {'day': d}) for u,v,d in edgelist] self.__mdgraph.add_edges_from(edgelist) def getMdgraph(self): return self.__mdgraph mdgraph = property(getMdgraph, None, None, None) def nodes_of_cluster(id, mdgraph): nodes = [n for n in mdgraph.nodes_iter() if mdgraph.node[n]['cluster'] == id] return nodes def trace_single(farm_id, mdgraph): inf_period = 68 tracker = SimTracker(farm_id, mdgraph) results = [] for start in range(1, 1052-inf_period): results.append(tracker.trace(start, inf_period)) return farm_id, results if __name__ == "__main__": tg = TracingGraph("data/T_edgelist.txt", "data/T_node_cluster.txt") #print tg.mdgraph.nodes(data=True)
from multiprocessing import Pool class ListPool(object): def __init__(self, funktion, parameters, liste): #input self.parameters = parameters self.liste = liste self.funktion = funktion #output self.__results = {} def cb(self, result): self.results[result[0]] = result[1] def run(self): pool = Pool() for element in iter(self.liste): pool.apply_async(self.funktion,(element, self.parameters), callback=self.cb) pool.close() pool.join() def getResults(self): return self.__results results = property(getResults) if __name__ == "__main__": def funktion(element, parameters): return element, True listpool = ListPool(funktion, [], range(10)) listpool.run() print listpool.results
import networkx as nx import math class SimTracker(object): """ follows a boolean infection from index through the network in a first step a digraph is made from edgelist data from this all nodes which are in the range of index are recorded all other nodes can be ignored a dictionary containing cluster_ids of all relevant nodes is made a dictionary with a list of all edges for every day is made for tracing the latter is used to iterate through network and time """ # CONSTRUCTOR def __init__(self, index, mdgraph): nodes_in_range = self.__find_children(mdgraph, index) self.__day_dict = self.__make_day_dict(mdgraph, nodes_in_range) self.__cluster_size = self.__calc_cluster_size(index, mdgraph) self.__mdgraph = mdgraph self.__index = index self.__range = len(nodes_in_range) # PUBLIC FUNCTIONS def trace(self, start, period): """ actual tracing by going through the edgelist and remebering infected nodes saves all infected nodes in a dictionary together with the infection date this dict. is daily updated with removal of recovered nodes and addition of nodes which are contacted by the edges of the day another list contains all nodes which have ever been in the dictionary this is the logistic range of the index returned is the: absolute number of infected the ratio of the logistic range to range of in condensed directed graph the ratio of infected nodes within own cluster a flag noting if infection was dead by the end of the data set """ if self.range == 1: return 1, 1, 0, True inf_dict = {self.index: start} infected = [] last_day = max(self.day_dict.keys()) day = start while inf_dict and day <= last_day: #daily update of dictionary returning recovered nodes and updated dict inf_dict, rec_nodes = self.__update(inf_dict, day, period) #inf_list remeberes recovered nodes from dict infected.extend(rec_nodes) day += 1 #add still infected nodes infected.extend(inf_dict.keys()) #remove duplicates infected = list(set(infected)) if len(infected) == 1: return 1, 1, 0, True log_range = float(len(infected))/(self.range) loyalty = self.__loyalty(infected) return len(infected), log_range, loyalty, inf_dict=={} # PRIVATE FUNCTIONS def __update(self, dct, day, period): "the daily update" #recovery rec_nodes = [k for k,v in dct.iteritems() if v <= day-period] for k in iter(rec_nodes): del dct[k] #iteration if day in self.day_dict: for u,v in self.day_dict[day]: if u in dct and v not in dct: dct[v] = day return dct, rec_nodes def __loyalty(self, nodes): """relative loyalty to index cluster =1 infection stays completely in index_cluster =0 infection is randomly distributed over clusters <0 leaves clusters more than expected for random distribution =-1 index is the only node in index' cluster = 1/sin(1) * sin(2*x^(-ln(2)/ln(c/n))-1) where x = relative number of infected in index cluster (0<=x<=1) where the index node itself is not counted c = the size of the index cluster n = the size of the network (in nodes) if only the index node is infected loyalty is defined as 0 !!!call only for at least index+1 node in nodes!!! """ in_nodes = [n for n in nodes if self.mdgraph.node[n]['cluster'] == self.mdgraph.node[self.index]['cluster']] x = float(len(in_nodes)-1)/(len(nodes)-1) if x == 0: return -1.0 exp = -math.log(2.0)/math.log(self.cluster_size) fak = 1.0/math.sin(1.0) arg = 2.0*math.pow(x, exp)-1.0 result = fak*math.sin(arg) return result # CONSTRUCTOR METHODS def __find_children(self, graph, parent_node): children = nx.dfs_predecessor(graph, source=parent_node) return children def __make_day_dict(self, mdgraph, nodes): from collections import defaultdict day_dict = defaultdict(list) for u,v,d in mdgraph.edges_iter(data=True): if u in nodes and v in nodes: day_dict[d['day']].append((u,v)) return day_dict def __calc_cluster_size(self, n, mdgraph): own_cluster = mdgraph.node[n]['cluster'] cluster_ids = [d['cluster'] for n,d in mdgraph.nodes(data=True)] abs_size = cluster_ids.count(own_cluster) return float(abs_size-1)/(len(mdgraph)-1) #PROPERTIES def getDay_dict(self): return self.__day_dict day_dict = property(getDay_dict, None, None, None) def getCluster_size(self): return self.__cluster_size cluster_size = property(getCluster_size, None, None, None) def getMdgraph(self): return self.__mdgraph mdgraph = property(getMdgraph, None, None, None) def getIndex(self): return self.__index index = property(getIndex, None, None, None) def getRange(self): return self.__range range = property(getRange, None, None, None) if __name__ == "__main__": from pool_tracker import TracingGraph tg = TracingGraph("data/T_edgelist.txt", "data/T_node_cluster.txt") st = SimTracker(1, tg.mdgraph) print st.trace(1,3)