本文整理汇总了Python中searx.logger.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了debug函数的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: process_callback
def process_callback(response, **kwargs):
# check if redirect comparing to the True value,
# because resp can be a Mock object, and any attribut name returns something.
if response.is_redirect is True:
logger.debug('{0} redirect on: {1}'.format(engine_name, response))
return
response.search_params = params
timeout_overhead = 0.2 # seconds
search_duration = time() - params['started']
timeout_limit = engines[engine_name].timeout + timeout_overhead
if search_duration > timeout_limit:
engines[engine_name].stats['page_load_time'] += timeout_limit
engines[engine_name].stats['errors'] += 1
return
# callback
search_results = callback(response)
# add results
for result in search_results:
result['engine'] = engine_name
results_queue.put_nowait((engine_name, search_results))
# update stats with current page-load-time
engines[engine_name].stats['page_load_time'] += search_duration
开发者ID:erdoukki,项目名称:searx,代码行数:28,代码来源:search.py
示例2: get_time
def get_time(claims, propertyName, locale, defaultValue=None):
propValue = claims.get(propertyName, {})
if len(propValue) == 0:
return defaultValue
result = []
for e in propValue:
mainsnak = e.get('mainsnak', {})
datavalue = mainsnak.get('datavalue', {})
if datavalue is not None:
value = datavalue.get('value', '')
result.append(value.get('time', ''))
if len(result) == 0:
date_string = defaultValue
else:
date_string = ', '.join(result)
try:
parsed_date = datetime.strptime(date_string, "+%Y-%m-%dT%H:%M:%SZ")
except:
if date_string.startswith('-'):
return date_string.split('T')[0]
try:
parsed_date = dateutil_parse(date_string, fuzzy=False, default=False)
except:
logger.debug('could not parse date %s', date_string)
return date_string.split('T')[0]
return format_date_by_locale(parsed_date, locale)
开发者ID:Cqoicebordel,项目名称:searx,代码行数:31,代码来源:wikidata.py
示例3: run
def run():
logger.debug('starting webserver on %s:%s', settings['server']['port'], settings['server']['bind_address'])
app.run(
debug=searx_debug,
use_debugger=searx_debug,
port=settings['server']['port'],
host=settings['server']['bind_address'],
threaded=True
)
开发者ID:asciimoo,项目名称:searx,代码行数:9,代码来源:webapp.py
示例4: initialize_engines
def initialize_engines(engine_list):
load_engines(engine_list)
for engine in engines.items():
if hasattr(engine, 'init'):
init_fn = getattr(engine, engine_attr)
def engine_init():
init_fn()
logger.debug('%s engine initialized', engine_data['name'])
logger.debug('Starting background initialization of %s engine', engine_data['name'])
threading.Thread(target=engine_init).start()
开发者ID:MrLpk,项目名称:searx,代码行数:11,代码来源:__init__.py
示例5: image_proxy
def image_proxy():
url = request.args.get('url').encode('utf-8')
if not url:
return '', 400
h = hashlib.sha256(url + settings['server']['secret_key'].encode('utf-8')).hexdigest()
if h != request.args.get('h'):
return '', 400
headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'})
headers['User-Agent'] = gen_useragent()
resp = requests.get(url,
stream=True,
timeout=settings['outgoing']['request_timeout'],
headers=headers,
proxies=outgoing_proxies)
if resp.status_code == 304:
return '', resp.status_code
if resp.status_code != 200:
logger.debug('image-proxy: wrong response code: {0}'.format(resp.status_code))
if resp.status_code >= 400:
return '', resp.status_code
return '', 400
if not resp.headers.get('content-type', '').startswith('image/'):
logger.debug('image-proxy: wrong content-type: {0}'.format(resp.headers.get('content-type')))
return '', 400
img = ''
chunk_counter = 0
for chunk in resp.iter_content(1024 * 1024):
chunk_counter += 1
if chunk_counter > 5:
return '', 502 # Bad gateway - file is too big (>5M)
img += chunk
headers = dict_subset(resp.headers, {'Content-Length', 'Length', 'Date', 'Last-Modified', 'Expires', 'Etag'})
return Response(img, mimetype=resp.headers['content-type'], headers=headers)
开发者ID:GreenLunar,项目名称:searx,代码行数:45,代码来源:webapp.py
示例6: code_highlighter
def code_highlighter(codelines, language=None):
if not language:
language = 'text'
try:
# find lexer by programing language
lexer = get_lexer_by_name(language, stripall=True)
except:
# if lexer is not found, using default one
logger.debug('highlighter cannot find lexer for {0}'.format(language))
lexer = get_lexer_by_name('text', stripall=True)
html_code = ''
tmp_code = ''
last_line = None
# parse lines
for line, code in codelines:
if not last_line:
line_code_start = line
# new codeblock is detected
if last_line is not None and\
last_line + 1 != line:
# highlight last codepart
formatter = HtmlFormatter(linenos='inline',
linenostart=line_code_start)
html_code = html_code + highlight(tmp_code, lexer, formatter)
# reset conditions for next codepart
tmp_code = ''
line_code_start = line
# add codepart
tmp_code += code + '\n'
# update line
last_line = line
# highlight last codepart
formatter = HtmlFormatter(linenos='inline', linenostart=line_code_start)
html_code = html_code + highlight(tmp_code, lexer, formatter)
return html_code
开发者ID:GreenLunar,项目名称:searx,代码行数:45,代码来源:webapp.py
示例7: image_proxy
def image_proxy():
url = request.args.get("url").encode("utf-8")
if not url:
return "", 400
h = hashlib.sha256(url + settings["server"]["secret_key"].encode("utf-8")).hexdigest()
if h != request.args.get("h"):
return "", 400
headers = dict_subset(request.headers, {"If-Modified-Since", "If-None-Match"})
headers["User-Agent"] = gen_useragent()
resp = requests.get(
url, stream=True, timeout=settings["outgoing"]["request_timeout"], headers=headers, proxies=outgoing_proxies
)
if resp.status_code == 304:
return "", resp.status_code
if resp.status_code != 200:
logger.debug("image-proxy: wrong response code: {0}".format(resp.status_code))
if resp.status_code >= 400:
return "", resp.status_code
return "", 400
if not resp.headers.get("content-type", "").startswith("image/"):
logger.debug("image-proxy: wrong content-type: {0}".format(resp.headers.get("content-type")))
return "", 400
img = ""
chunk_counter = 0
for chunk in resp.iter_content(1024 * 1024):
chunk_counter += 1
if chunk_counter > 5:
return "", 502 # Bad gateway - file is too big (>5M)
img += chunk
headers = dict_subset(resp.headers, {"Content-Length", "Length", "Date", "Last-Modified", "Expires", "Etag"})
return Response(img, mimetype=resp.headers["content-type"], headers=headers)
开发者ID:jibe-b,项目名称:searx,代码行数:43,代码来源:webapp.py
示例8: response
def response(resp):
'''post-response callback
resp: requests response object
'''
results = []
dom = html.fromstring(resp.text)
try:
number_of_results_string = re.sub('[^0-9]', '', dom.xpath(
'//a[@class="active" and contains(@href,"/suchen/dudenonline")]/span/text()')[0]
)
results.append({'number_of_results': int(number_of_results_string)})
except:
logger.debug("Couldn't read number of results.")
pass
for result in dom.xpath('//section[@class="wide" and not(contains(@style,"overflow:hidden"))]'):
try:
logger.debug("running for %s" % str(result))
link = result.xpath('.//h2/a')[0]
url = link.attrib.get('href')
title = result.xpath('string(.//h2/a)')
content = extract_text(result.xpath('.//p'))
# append result
results.append({'url': url,
'title': title,
'content': content})
except:
logger.debug('result parse error in:\n%s', etree.tostring(result, pretty_print=True))
continue
return results
开发者ID:asciimoo,项目名称:searx,代码行数:35,代码来源:duden.py
示例9: get_resources_directory
from io import StringIO
if sys.version_info[0] == 3:
unicode = str
PY3 = True
else:
PY3 = False
# serve pages with HTTP/1.1
from werkzeug.serving import WSGIRequestHandler
WSGIRequestHandler.protocol_version = "HTTP/{}".format(settings['server'].get('http_protocol_version', '1.0'))
# about static
static_path = get_resources_directory(searx_dir, 'static', settings['ui']['static_path'])
logger.debug('static directory is %s', static_path)
static_files = get_static_files(static_path)
# about templates
default_theme = settings['ui']['default_theme']
templates_path = get_resources_directory(searx_dir, 'templates', settings['ui']['templates_path'])
logger.debug('templates directory is %s', templates_path)
themes = get_themes(templates_path)
result_templates = get_result_templates(templates_path)
global_favicons = []
for indice, theme in enumerate(themes):
global_favicons.append([])
theme_img_path = os.path.join(static_path, 'themes', theme, 'img', 'icons')
for (dirpath, dirnames, filenames) in os.walk(theme_img_path):
global_favicons[indice].extend(filenames)
开发者ID:asciimoo,项目名称:searx,代码行数:30,代码来源:webapp.py
示例10: response
def response(resp):
results = []
# detect google sorry
resp_url = urlparse(resp.url)
if resp_url.netloc == 'sorry.google.com' or resp_url.path == '/sorry/IndexRedirect':
raise RuntimeWarning('sorry.google.com')
if resp_url.path.startswith('/sorry'):
raise RuntimeWarning(gettext('CAPTCHA required'))
# which hostname ?
google_hostname = resp.search_params.get('google_hostname')
google_url = "https://" + google_hostname
# convert the text to dom
dom = html.fromstring(resp.text)
instant_answer = dom.xpath('//div[@id="_vBb"]//text()')
if instant_answer:
results.append({'answer': u' '.join(instant_answer)})
try:
results_num = int(dom.xpath('//div[@id="resultStats"]//text()')[0]
.split()[1].replace(',', ''))
results.append({'number_of_results': results_num})
except:
pass
# parse results
for result in dom.xpath(results_xpath):
try:
title = extract_text(result.xpath(title_xpath)[0])
url = parse_url(extract_url(result.xpath(url_xpath), google_url), google_hostname)
parsed_url = urlparse(url, google_hostname)
# map result
if parsed_url.netloc == google_hostname:
# TODO fix inside links
continue
# if parsed_url.path.startswith(maps_path) or parsed_url.netloc.startswith(map_hostname_start):
# print "yooooo"*30
# x = result.xpath(map_near)
# if len(x) > 0:
# # map : near the location
# results = results + parse_map_near(parsed_url, x, google_hostname)
# else:
# # map : detail about a location
# results = results + parse_map_detail(parsed_url, result, google_hostname)
# # google news
# elif parsed_url.path == search_path:
# # skipping news results
# pass
# # images result
# elif parsed_url.path == images_path:
# # only thumbnail image provided,
# # so skipping image results
# # results = results + parse_images(result, google_hostname)
# pass
else:
# normal result
content = extract_text_from_dom(result, content_xpath)
if content is None:
continue
content_misc = extract_text_from_dom(result, content_misc_xpath)
if content_misc is not None:
content = content_misc + "<br />" + content
# append result
results.append({'url': url,
'title': title,
'content': content
})
except:
logger.debug('result parse error in:\n%s', etree.tostring(result, pretty_print=True))
continue
# parse suggestion
for suggestion in dom.xpath(suggestion_xpath):
# append suggestion
results.append({'suggestion': extract_text(suggestion)})
for correction in dom.xpath(spelling_suggestion_xpath):
results.append({'correction': extract_text(correction)})
# return results
return results
开发者ID:asciimoo,项目名称:searx,代码行数:87,代码来源:google.py
示例11: search
def search(self):
global number_of_searches
# init vars
requests = []
# increase number of searches
number_of_searches += 1
# set default useragent
# user_agent = request.headers.get('User-Agent', '')
user_agent = gen_useragent()
search_query = self.search_query
# start search-reqest for all selected engines
for selected_engine in search_query.engines:
if selected_engine['name'] not in engines:
continue
engine = engines[selected_engine['name']]
# skip suspended engines
if engine.suspend_end_time >= time():
logger.debug('Engine currently suspended: %s', selected_engine['name'])
continue
# if paging is not supported, skip
if search_query.pageno > 1 and not engine.paging:
continue
# if search-language is set and engine does not
# provide language-support, skip
if search_query.lang != 'all' and not engine.language_support:
continue
# if time_range is not supported, skip
if search_query.time_range and not engine.time_range_support:
continue
# set default request parameters
request_params = default_request_params()
request_params['headers']['User-Agent'] = user_agent
request_params['category'] = selected_engine['category']
request_params['started'] = time()
request_params['pageno'] = search_query.pageno
if hasattr(engine, 'language') and engine.language:
request_params['language'] = engine.language
else:
request_params['language'] = search_query.lang
# 0 = None, 1 = Moderate, 2 = Strict
request_params['safesearch'] = search_query.safesearch
request_params['time_range'] = search_query.time_range
# update request parameters dependent on
# search-engine (contained in engines folder)
engine.request(search_query.query.encode('utf-8'), request_params)
if request_params['url'] is None:
# TODO add support of offline engines
pass
# create a callback wrapper for the search engine results
callback = make_callback(
selected_engine['name'],
engine.response,
request_params,
self.result_container)
# create dictionary which contain all
# informations about the request
request_args = dict(
headers=request_params['headers'],
hooks=dict(response=callback),
cookies=request_params['cookies'],
timeout=engine.timeout,
verify=request_params['verify']
)
# specific type of request (GET or POST)
if request_params['method'] == 'GET':
req = requests_lib.get
else:
req = requests_lib.post
request_args['data'] = request_params['data']
# ignoring empty urls
if not request_params['url']:
continue
# append request to list
requests.append((req, request_params['url'],
request_args,
selected_engine['name']))
if not requests:
return self.result_container
# send all search-request
#.........这里部分代码省略.........
开发者ID:NotoriousDev,项目名称:searx,代码行数:101,代码来源:search.py
示例12: engine_init
def engine_init():
init_fn()
logger.debug('%s engine initialized', engine_data['name'])
开发者ID:MrLpk,项目名称:searx,代码行数:3,代码来源:__init__.py
示例13: search
def search(self):
global number_of_searches
# start time
start_time = time()
# answeres ?
answerers_results = ask(self.search_query)
if answerers_results:
for results in answerers_results:
self.result_container.extend('answer', results)
return self.result_container
# init vars
requests = []
# increase number of searches
number_of_searches += 1
# set default useragent
# user_agent = request.headers.get('User-Agent', '')
user_agent = gen_useragent()
search_query = self.search_query
# max of all selected engine timeout
timeout_limit = 0
# start search-reqest for all selected engines
for selected_engine in search_query.engines:
if selected_engine['name'] not in engines:
continue
engine = engines[selected_engine['name']]
# skip suspended engines
if engine.suspend_end_time >= time():
logger.debug('Engine currently suspended: %s', selected_engine['name'])
continue
# if paging is not supported, skip
if search_query.pageno > 1 and not engine.paging:
continue
# if time_range is not supported, skip
if search_query.time_range and not engine.time_range_support:
continue
# set default request parameters
request_params = default_request_params()
request_params['headers']['User-Agent'] = user_agent
request_params['category'] = selected_engine['category']
request_params['pageno'] = search_query.pageno
if hasattr(engine, 'language') and engine.language:
request_params['language'] = engine.language
else:
request_params['language'] = search_query.lang
# 0 = None, 1 = Moderate, 2 = Strict
request_params['safesearch'] = search_query.safesearch
request_params['time_range'] = search_query.time_range
# append request to list
requests.append((selected_engine['name'], search_query.query.encode('utf-8'), request_params))
# update timeout_limit
timeout_limit = max(timeout_limit, engine.timeout)
if requests:
# send all search-request
search_multiple_requests(requests, self.result_container, start_time, timeout_limit)
start_new_thread(gc.collect, tuple())
# return results, suggestions, answers and infoboxes
return self.result_container
开发者ID:mmuman,项目名称:searx,代码行数:77,代码来源:search.py
示例14: engine_init
def engine_init():
init_fn()
logger.debug('%s engine initialized', engine_name)
开发者ID:asciimoo,项目名称:searx,代码行数:3,代码来源:__init__.py
注:本文中的searx.logger.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论