本文整理汇总了Python中MySQLdb.connect函数的典型用法代码示例。如果您正苦于以下问题:Python connect函数的具体用法?Python connect怎么用?Python connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了connect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self,
driver,
host,
port,
database,
username,
password,
logger):
'''
Initialize database driver: connect the database,
create connection and cusror objects.
'''
self.logger = logger
self.connection = None
self.cursor = None
self.driver = driver
if not database:
database = Database.default_database[self.driver]
# user supplied empty password -> prompt
if password == '':
password = getpass()
# user supplied no password -> query environment or use empty string
if password == None:
try:
password = os.environ['DATABASE_PASSWORD']
except KeyError:
password = ''
if self.driver == SQL.MYSQL:
from MySQLdb import connect
self.connection = connect(host=host,
port=port,
user=username,
passwd=password,
db=database)
elif self.driver == SQL.POSTGRESQL:
from psycopg2 import connect
self.connection = connect(host=host,
port=port,
user=username,
password=password,
database=database)
elif self.driver == SQL.SQLITE3:
from sqlite3 import connect
database = os.path.expanduser(database)
if (not os.path.isfile(database) or
not os.access(database, os.R_OK)):
raise RuntimeError('cannot read from file %s' % database)
self.connection = connect(database)
# the following fixes
# 'sqlite3.OperationalError: Could not decode to UTF-8'
self.connection.text_factory = str
else:
raise ValueError('unknown database driver %s.' % self.driver)
self.cursor = self.connection.cursor()
开发者ID:ZungBang,项目名称:baculafs,代码行数:54,代码来源:Database.py
示例2: _db_login
def _db_login(dbhost=CFG_DATABASE_HOST, relogin=0):
"""Login to the database."""
## Note: we are using "use_unicode=False", because we want to
## receive strings from MySQL as Python UTF-8 binary string
## objects, not as Python Unicode string objects, as of yet.
## Note: "charset='utf8'" is needed for recent MySQLdb versions
## (such as 1.2.1_p2 and above). For older MySQLdb versions such
## as 1.2.0, an explicit "init_command='SET NAMES utf8'" parameter
## would constitute an equivalent. But we are not bothering with
## older MySQLdb versions here, since we are recommending to
## upgrade to more recent versions anyway.
if CFG_MISCUTIL_SQL_USE_SQLALCHEMY:
return connect(
host=dbhost,
port=int(CFG_DATABASE_PORT),
db=CFG_DATABASE_NAME,
user=CFG_DATABASE_USER,
passwd=CFG_DATABASE_PASS,
use_unicode=False,
charset="utf8",
)
else:
thread_ident = (os.getpid(), get_ident())
if relogin:
connection = _DB_CONN[dbhost][thread_ident] = connect(
host=dbhost,
port=int(CFG_DATABASE_PORT),
db=CFG_DATABASE_NAME,
user=CFG_DATABASE_USER,
passwd=CFG_DATABASE_PASS,
use_unicode=False,
charset="utf8",
)
connection.autocommit(True)
return connection
else:
if _DB_CONN[dbhost].has_key(thread_ident):
return _DB_CONN[dbhost][thread_ident]
else:
connection = _DB_CONN[dbhost][thread_ident] = connect(
host=dbhost,
port=int(CFG_DATABASE_PORT),
db=CFG_DATABASE_NAME,
user=CFG_DATABASE_USER,
passwd=CFG_DATABASE_PASS,
use_unicode=False,
charset="utf8",
)
connection.autocommit(True)
return connection
开发者ID:jirikuncar,项目名称:invenio-old,代码行数:53,代码来源:dbquery.py
示例3: cargar_conexion_sql_local
def cargar_conexion_sql_local():
"""
Creates a pyobdc connection object pointing local Sql DB
:return: pyodbc connection object
"""
from pyodbc import connect
return connect('DRIVER={SQL Server};SERVER=LENOVO-PC\SQLEXPRESS;DATABASE=InfoAdex;Trusted_Connection=yes')
开发者ID:Edasn,项目名称:CaptorRadio-v2,代码行数:7,代码来源:varsUtilsCaptorRadioV2.py
示例4: cargar_conexion_as400_infoadex
def cargar_conexion_as400_infoadex():
"""
Creates a pyobdc connection object pointing InfoAdex AS400 DB
:return: pyodbc connection object
"""
from pyodbc import connect
return connect('DSN=AS400;SYSTEM=192.168.1.101;UID=INFOADEX;PWD=INFOADEX')
开发者ID:Edasn,项目名称:CaptorRadio-v2,代码行数:7,代码来源:varsUtilsCaptorRadioV2.py
示例5: data_connect
def data_connect():
# establish connection to database
HOST = 'publicdb.cs.princeton.edu'
PORT = 3306
DATABASE = 'tzha'
USER = 'tzha'
PASSWORD = '[email protected]@n'
host = HOST
if 'DB_SERVER_HOST' in environ:
host = environ['DB_SERVER_HOST']
connection = connect(host = host, port = PORT, user = USER, passwd = PASSWORD,
db = DATABASE)
global cursor
cursor = connection.cursor(cursors.DictCursor)
# find tower_count
cursor.execute('select max(celltower_oid) as celltower_oid from cellspan')
row = cursor.fetchone()
global tower_count
tower_count = row['celltower_oid']
# find person_count
cursor.execute('select max(person_oid) as person_oid from cellspan')
row = cursor.fetchone()
global person_count
person_count = row['person_oid']
开发者ID:tiantianzha,项目名称:Projects,代码行数:29,代码来源:tower.py
示例6: main
def main():
import logging
import yaml
from subspace.billing.server.biller import Biller
from subspace.billing.server.zone import Zone
from MySQLdb import connect, Error
logging.basicConfig(level=logging.DEBUG,
format="<%(threadName)25.25s > %(message)s")
zone_score_id = 1
zone_password = "test"
zone = Zone(('zone.aswz.org', 5000), zone_score_id, zone_password)
conf = yaml.load(open('/etc/chasm/biller.conf'))
db_conn = connect(**conf["db"])
biller = Biller(("0.0.0.0", conf["port"]), conf["name"], db_conn)
player = Player(biller, zone, 0, 'test_user','test_password')
print("Logging in with (usr=%s,pwd=%s)" % (player.name, player.password))
if player.login():
print("success (squad=%s)" % player.squad)
player.logout()
else:
print("failure")
player.password = "wrong_password!!"
print("Logging in with (usr=%s,pwd=%s)" % (player.name, player.password))
if player.login():
print("success (squad=%s)" % player.squad)
player.logout()
else:
print("failure")
db_conn.close()
开发者ID:dmccartney,项目名称:chasm,代码行数:31,代码来源:player.py
示例7: register
def register(args, conf):
# Check that sufficient arguments are available
if "user" not in args or "pass" not in args:
return ServiceResult(
"application/json", jsonsaves({"state": "failure", "message": "Need a username and password"})
)
db = connect(
host=conf["TRAIN_DB_HOST"],
user=conf["GATE_KEEPER_USERNAME"],
passwd=conf["GATE_KEEPER_PASSWD"],
db=conf["USER_DATABASE"],
port=conf["TRAIN_DB_PORT"],
)
username = args.getfirst("user")
passwd = args.getfirst("pass")
cur = db.cursor()
# Check that the user doesn't already exist
cur.execute("SELECT DISTINCT user FROM mysql.user WHERE user=%s", username)
if cur.fetchone() is not None:
cur.close()
db.close()
return ServiceResult("application/json", jsonsaves({"success": False, "message": "User already exists"}))
cur.execute("CREATE USER %[email protected]%s IDENTIFIED BY %s", (username, conf["TRAIN_DB_WEB_HOST"], passwd))
# duplicate backticks for the table name
table_name = username.replace("`", "``") + "_data"
cur.execute("CREATE TABLE `" + table_name + "` (Latitude DOUBLE, Longitude DOUBLE, Time DATETIME);")
cur.execute("GRANT INSERT, DROP ON `" + table_name + "` TO %[email protected]%s", (username, conf["TRAIN_DB_WEB_HOST"]))
cur.execute("GRANT SELECT ON `" + conf["TRAIN_DATABASE"] + "`.* TO %[email protected]%s", (username, conf["TRAIN_DB_WEB_HOST"]))
cur.close()
db.close()
return ServiceResult("application/json", jsonsaves({"success": True, "message": "Created new user"}))
开发者ID:Matt5sean3,项目名称:amtrak-recorder,代码行数:31,代码来源:serve.py
示例8: main
def main():
confPath = path.abspath(argv[1])
confDir = path.dirname(confPath)
confName = path.splitext(path.basename(confPath))[0]
output = '%s/%s' % (confDir, confName)
tarPath = path.abspath(argv[2])
tarDir = path.dirname(tarPath)
tarName = path.splitext(path.basename(tarPath))[0]
source = '%s/%s' % (tarDir, tarName)
f = open(confPath, 'r')
conf = load(f)
f.close()
tar = tarfile.open(tarPath, 'r')
tar.extractall(path=tarDir)
tar.close()
db = connect(database['host'], database['user'], database['password'], database['database'])
fin = open('%s/%s.csv' % (source, tarName), 'r')
fin.readline()
try:
mkdir(output)
except WindowsError, err:
print err
开发者ID:237693991,项目名称:ali2015,代码行数:28,代码来源:genFeature.py
示例9: main
def main ():
# Assume a MySQL fresh install.
user = root
try:
db_conn = connect(user=user, passwd="", db="ichoppedthatvideo")
except:
print "** FATAL ERROR ** Could not connect to database. Aborting..."
raise
cursor = db_conn.cursor()
cursor.execute("SET PASSWORD FOR 'root'@'localhost' = PASSWORD('bleh')")
cursor.execute("SET PASSWORD FOR 'root'@'localhost.localdomain' = PASSWORD('bleh')")
cursor.execute("SET PASSWORD FOR 'root'@'127.0.0.1' = PASSWORD('bleh')")
cursor.execute("CREATE DATABASE ichoppedthatvideo")
cursor.execute("USE ichoppedthatvideo")
cursor.execute("create user 'ichoppedthatvideo_server'@'localhost' identified by 'bleh'")
cursor.execute("grant all privileges on ichoppedthatvideo.* to 'ichoppedthatvideo_server'@'localhost' with grant option")
cursor.execute("""
create table requests (id bigint unsigned not null auto_increment, client_id char(40) not null, server_id varchar(30) not null, child_id smallint unsigned not null, type varchar(20), browser varchar(40), opsys varchar(40), city varchar(127), country varchar(127), cur_timestamp timestamp(8), primary key (id), unique idx_request (client_id, server_id, child_id, cur_timestamp))
""")
cursor.execute("""
create table streams (id bigint(20) unsigned not null auto_increment, request_id bigint(20) unsigned not null, video_id int(10) unsigned not null, seconds int(10) unsigned not null, primary key (id));
""")
开发者ID:mabeledo,项目名称:ichoppedthatvideo,代码行数:26,代码来源:create_db.py
示例10: check_message_format
def check_message_format(rp, msg):
"""检查提交说明格式和JIRA_KEY的有效性
:param rp: 检查提交说明格式的正则表达式
:param msg: 提交说明
"""
m = rp.match(msg)
format_error = False
key_error = False
if m:
task_key = m.groups()[1]
if task_key:
conn = connect(host=jira_db_host, db=jira_db_name, user=jira_db_user, passwd=jira_db_passwd, charset="utf8")
cursor = conn.cursor()
check_sql = (
"select issue.issuenum, issue.SUMMARY from jiraissue issue join project pro where "
"issue.issuenum='%s' and issue.project=pro.id and pro.pkey='%s'"
% (task_key.split("-")[1], task_key.split("-")[0])
)
cursor.execute(check_sql)
row = cursor.fetchone()
if not row or not row[0]:
key_error = True
cursor.close()
conn.close()
else:
format_error = True
return format_error, key_error
开发者ID:AKMFCJ,项目名称:tms,代码行数:30,代码来源:ref-update.py
示例11: mysql_connect
def mysql_connect(db):
conn = connect(host = host,
user = user,
passwd = passwd,
db = db)
c = conn.cursor()
return c, conn
开发者ID:liova99,项目名称:kelesidis.de,代码行数:7,代码来源:config.py
示例12: main
def main(repo_dir_path):
"""将指定目录下的repo仓库路径,添加到数据库中"""
repos = []
for dir_name in os.listdir(repo_dir_path):
if dir_name.endswith('.git') and os.path.isdir(os.path.join(repo_dir_path, dir_name)):
repos.append(os.path.join(repo_dir_path, dir_name))
print repos
con = connect(host='192.168.33.7', db='tms', user='gms', passwd='gms', charset='utf8')
cursor = con.cursor()
exits_sql = "SELECT path FROM repo_repo WHERE platform_id=89"
cursor.execute(exits_sql)
exists_repo = [tmp[0] for tmp in cursor.fetchall()]
print exists_repo
new_repo = []
for repo_path in repos:
if repo_path not in exists_repo:
new_repo.append((repo_path.split('/home/git/repositories/tinno/release/')[1].split('.git')[0].strip(),
repo_path, 89, 3))
print new_repo
insert_repo = "INSERT into repo_repo(name, path, platform_id, category) VALUES (%s, %s, %s, %s)"
cursor.executemany(insert_repo, new_repo)
con.commit()
con.close()
开发者ID:AKMFCJ,项目名称:tms,代码行数:28,代码来源:AddRepoToDB.py
示例13: add_config
def add_config(project_id, config_path):
"""解析ApkVersion将其加入数据库中"""
cp = ConfigParser()
cp.read(config_path)
app_config = []
for app_name in cp.sections():
# print app_name
if app_name == 'ProjectInfo':
continue
if cp.get(app_name, 'support') == 'yes':
support = 'yes'
else:
support = 'no'
if cp.has_option(app_name, 'overrides') and cp.get(app_name, 'overrides') and cp.get(app_name, 'overrides') != 'null':
overrides = cp.get(app_name, 'overrides')
else:
overrides = ''
app_config.append((app_name, support, cp.get(app_name, 'version').strip(), overrides, project_id))
insert_myos_config = "INSERT INTO project_myos_app_config(app_name, support, app_version, overrides, project_id)" \
"VALUES (%s, %s, %s, %s, %s)"
con = connect(host=db_host, db=db_name, user=db_user, passwd=db_passwd, charset='utf8')
cursor = con.cursor()
cursor.executemany(insert_myos_config, app_config)
con.commit()
con.close()
开发者ID:AKMFCJ,项目名称:tms,代码行数:31,代码来源:ImportProjectMyOSConfigs.py
示例14: init
def init(cf_plugin,logging):
# Initializes the plugin
import MySQLdb
from MySQLdb import connect
global logger
logger=logging
logger.info('MYSQL - Configuring plugin...')
configure(cf_plugin)
logger.info('MYSQL - Checking connection to database...')
try:
db = connect(user=user,passwd=password, host=host, port=int(port))
except MySQLdb.Error as err:
logger.error('MYSQL - '+str(err[1])+'')
exit(1)
try:
db.select_db(database)
except MySQLdb.Error as err:
if err[0] == 1049: # no database
logger.info("MYSQL - Creating database...")
create_database(db)
else:
logger.error('MYSQL - '+str(err[1])+'')
exit(1)
logger.info('MYSQL - Plugin initialized!')
开发者ID:svallero,项目名称:cloud-accounting,代码行数:26,代码来源:mysql.py
示例15: query_cellspan
def query_cellspan():
# establish connection to database
HOST = 'publicdb.cs.princeton.edu'
PORT = 3306
DATABASE = 'tzha'
USER = 'tzha'
PASSWORD = '[email protected]@n'
host = HOST
if 'DB_SERVER_HOST' in environ:
host = environ['DB_SERVER_HOST']
connection = connect(host = host, port = PORT, user = USER, passwd = PASSWORD,
db = DATABASE)
global cursor
cursor = connection.cursor(cursors.DictCursor)
for i in range(1, person_count + 1):
# open file
filename = "cellspan{0}-1".format(i)
file = open(filename, 'w+')
# make query
query = ('select * from cellspan where person_oid = ' + str(i)
+ ' and starttime > "' + str(sem_start1)
+ '" and starttime < "' + str(sem_end1) + '"')
cursor.execute(query)
# write to file
line = cursor.fetchone()
while (line):
pickle.dump(line, file)
line = cursor.fetchone()
开发者ID:tiantianzha,项目名称:Projects,代码行数:33,代码来源:query_cellspan.py
示例16: do_sql
def do_sql(query, variables=None, db='nova'):
'''Returns a resultDict for a given sql query.'''
# Must be called and connected every query time, otherwise the MySQL
# server will have 'gone away' from stale connections:
db_mappings = {
'nova': {
'server': '<NOVA MYSQL SERVER HERE>',
'password': '<SOME PASSWORD>'
},
'keystone': {
'server': '<KEYSTONE MYSQL SERVER HERE>',
'password': '<SOME PASSWORD>'
}
}
conn = connect(host=db_mappings[db]['server'],
user=db,
db=db,
passwd=db_mappings[db]['password']
)
cursor = conn.cursor(cursors.DictCursor)
if variables:
cursor.execute(query, variables)
else:
cursor.execute(query)
result_dict = cursor.fetchall()
conn.close()
return result_dict
开发者ID:ThatDevopsGuy,项目名称:openstack,代码行数:34,代码来源:nova_overview.py
示例17: __init__
def __init__():
global __cursor__, commit, rollback
connection = connect(host=DATABASE_HOST, user=DATABASE_USER, passwd=DATABASE_PASSWORD,
db=DATABASE_NAME, charset='utf8')
__cursor__ = connection.cursor()
commit = connection.commit
rollback = connection.rollback
开发者ID:Harikiranvuyyuru,项目名称:PS4M,代码行数:7,代码来源:databaseConnection.py
示例18: directFromMysql
def directFromMysql():
if db.packagePair.count() > 0:
print "packagePair exists"
return
mysqldb = connect(host="westlake.isr.cs.cmu.edu", port = 3306, user = "lsuper", passwd = "luansong", db = "appanalysisraw")
cur1 = mysqldb.cursor()
cur2 = mysqldb.cursor()
cur1.execute("select * from (select packagename, permission, 3rd_party_package, is_external from test_permissionlist group by packagename, permission, 3rd_party_package, is_external) as Z;")
appDict = {}
for i in range(cur1.rowcount):
row = cur1.fetchone()
packagename, permission, third_party_package, is_external = row
print row
if is_external == 0:
purpose = "INTERNAL"
elif third_party_package != "NA":
cur2.execute("select apitype from labeled3rdparty where externalpack=%s", third_party_package)
purpose = cur2.fetchone()[0]
else:
continue
appEntry = appDict.get(packagename, {})
appEntry.update({permission: appEntry.get(permission, set()) | set([purpose])})
appDict[row[0]] = appEntry
cur1.close()
cur2.close()
for key in appDict:
db.packagePair.insert({"packagename": key, "pairs": appDict[key]})
开发者ID:CMUChimpsLab,项目名称:privacyRating,代码行数:33,代码来源:extractApp.py
示例19: get_db
def get_db():
"""Opens a new database connection if there is none yet for the
current application context.
"""
if not hasattr(g, 'db'):
g.db = connect(host='sql.mit.edu', db='lucid+sofar', user='lucid', passwd='KO01BAHjiKGkZ')
return g.db
开发者ID:phun,项目名称:SofarAndroidApp,代码行数:7,代码来源:app.py
示例20: __init__
def __init__(self, hostname='localhost', user='mcdonald', password='stupid',
db='biom', table_basename='tmp_biomtable'):
self.con = connect(hostname, user, password, db)
self.db = db
self._tmp_tables = []
self._table_basename = table_basename
self._num_tmp_tables = self._tmp_table_count()
开发者ID:jansuategui,项目名称:biom-format,代码行数:7,代码来源:biomdb.py
注:本文中的MySQLdb.connect函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论