#python hello.py runserver --host 0.0.0.0
from flask import Flask,request,redirect,abort,render_template,session,url_for,flash
from flask_script import Manager,Shell
from flask_bootstrap import Bootstrap
from flask_moment import Moment
from flask_wtf import Form
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate, MigrateCommand
#from flask_mail import Mail,Message
import smtplib
from import MIMEText
from wtforms import StringField, SubmitField
app开发实例
from wtforms.validators import Required
from datetime import datetime
from threading import Thread
import os,socket,hashlib,time
app = Flask(__name__)
basedir = os.path.abspath(os.path.dirname(__file__))
#=========================表单设置=========================#
#跨站请求伪造(Cross-Site Request Forgery,CSRF)
#实现 CSRF 保护,Flask-WTF 使用这个密钥生成加密令牌,再用令牌验证请求中表单数据的真伪
#=========================数据库设置=========================#
#=========================邮件设置=========================#
#=========================socket设置=========================#
manager = Manager(app)
bootstrap = Bootstrap(app)
moment = Moment(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
#mail=Mail(app)
#=========================函数=========================#
#让 Flask-Script 的 shell 命令自动导入特定的对象
#make_shell_context() 函数注册了程序、数据库实例以及模型,因此这些对象能直接导入shell
def make_shell_context():
return dict(app=app, db=db, User=User, Role=Role)
#=========================md5加密=========================
def md5_encryption(text):
md5=hashlib.md5()
md5.de('utf-8'))
data_to_send=md5.hexdigest()
return dat
a_to_send
#=========================异步邮件=========================
def send_async_email(app,s,msg,sender,to):
try:
s.sendmail(sender, to, msg.as_string())
except smtplib.SMTPDataError as e:#554 DT:SPM 垃圾邮件
pass
def send_email(to, subject, template, **kwargs):
fig['MAIL_SERVER']
fig['MAIL_PORT']
fig['FLASKY_MAIL_SENDER']
fig['MAIL_PASSWORD']
with app.app_context():#激活程序上下文
html = render_template(template + '.html', **kwargs)#正文
msg = MIMEText(html, 'html')
msg['subject'] = fig['FLASKY_MAIL_SUBJECT_PREFIX']+subject#主题
msg['from'] = fig['FLASKY_MAIL_SENDER']
msg['to'] = 'li542131220@163'#to
try: 
s = smtplib.SMTP(host, port) 
s.login(sender, pwd)
thr=Thread(target=send_async_email,args=[app,s,msg,sender,to])
thr.start()
return thr
except smtplib.SMTPException: 
pass
#=========================socket服务器=========================
def server():
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind((fig['server_host'],fig['socket_port']))
sock.listen(5)
while True:
print('Listening at:'+sockname()))
data=b''
clit,addr=sock.accept()
while True:
v(1024)
data+=more
if len(more)<1024:
break
data=data.decode()
with open('','w') as f:
f.write(data)
if data.split(':',1)[0]=='send_mesg':
if data.split(':',1)[1] == md5_encryption('get nothing from form.data.data'):
clit.sendall('not done'.encode('utf-8'))
else:
message=Mesg(mesg=data.split(':',1)[1])
db.session.add(message)
db.sessionmit()
clit.sendall('all done'.encode('utf-8'))
elif data.split(':',1)[0]=='recv_message':
data_to_send=''
msg_list=Mesg.query.all()
for i in msg_list:
data_to_send+=('\n'+str(i)+'\n')
#print(data_to_send)
clit.sendall(data_de('utf-8'))
clit.close()
#=========================socket客户端=========================
#发送客户端,只可有一个,程序通过本客户端把消息发送到服务器,再由服务器分配到其他客户端
def client_send(data_to_send):
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
if not data_to_send:
data_to_send=md5_encryption('get nothing from form.data.data')
sock.sendall(('send_mesg:'+data_to_send).encode('utf-8'))
data_v(1024).decode()
if data_recv=='not done':
data_recv=No
ne
try:       
return data_recv.decode()
except AttributeError:
return data_recv
#接收客户端,接收从服务器发送来的消息,可同时有多个
def client_recv():
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.sendall('recv_message:from {}'.sockname()).encode('utf-8'))
#print('recv_message:from {}'.sockname()))
data_v(1024).decode()
return data_recv
#=========================表单模型=========================#
#NameForm 表单中有一个名为 name 的文本字段和一个名为 submit 的提交按钮
#StringField类表示属性为 type="text" 的 <input> 元素
#SubmitField 类表示属性为 type="submit" 的<input> 元素
class NameForm(Form):
#<input name="name" type='text' ...>,<input name="submit" type='submit' ...>
name = StringField('What is your name?', validators=[Required()])#验证函数 Required() 确保提交的字段不为空
data = StringField('Input something here')
submit = SubmitField('Submit')
#=========================数据库ORM模型=========================#
#定义 Role 和 User (ORM)模型
#__repr__方法 返回一个具有可读性的字符串表示模型,调用类或方法时会引用
class Role(db.Model):
__tablename__ = 'roles'# __tablename__ 定义在数据库中使用的表名
id = db.Column(db.Integer, primary_key=True)#整数型,主键
name = db.Column(db.String(64), unique=True)
#backref 参数(添加反向引用)向 User 模型中添加一个 role 属性,这一属性可替代 role_id 访问 Role 模型
#lazy参数,禁止自动执行查询,可添加过滤器,否则执行如Role.users的命令时自动执行all()
users = db.relationship('User', backref='role', lazy='dynamic')
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True, index=True)#字符型,不可重复,创建索引
role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))#外键,这列的值是 roles 表中行的 id 值
def __repr__(self):
return '<User %r>' % self.username
class Mesg(db.Model):
__tablename__ = 'mesg'
id = db.Column(db.Integer, primary_key=True)
mesg=db.Column(db.String(64))
def __repr__(self):
return '<Mesg %r>' % sg
#=========================路由=========================#
#刷新页面时浏览器会重新发送之前已经发送过的最后一个请求,
#如果这个请求是一个包含表单数据的 POST 请求,
#刷新页面后会再次提交表单,
#因此最好别让 Web 程序把 POST 请求作为浏览器发送的最后一个请求
@ute('/',methods=['GET', 'POST'])
def index():
form = Na
meForm()
digit_role=Role.query.all()[1]
alpha_role=Role.query.all()[0]
users=User.query.all()
user_digit=User.query.filter_by(role=digit_role).all()
user_alpha=User.query.filter_by(role=alpha_role).all()
if form.validate_on_submit():#validate_on_submit,提交通过验证
old_name = ('name')
#过滤出 等于 用户填写字符 的 第一个用户名
user = User.query.filter_by(username=form.name.data).first()
if user is None:#数据库中不存在输入的用户名称,即该用户为新用户
if form.name.data.isdigit():
user = User(username = form.name.data,role=digit_role)
elif form.name.data.isalpha():
user = User(username = form.name.data,role=alpha_role)
else:
user = User(username = form.name.data)
#如填入名称不存在,则使用该名称创建新用户
db.session.add(user)#把新用户注册到数据库中
db.sessionmit()
session['known'] = False#不明用户
fig['FLASKY_ADMIN']:#每当表单接收新名字时,程序都会给管理员发送一封
send_fig['FLASKY_ADMIN'],#收件人地址
'New User',#主题
'mail_new_user',#渲染邮件正文的模板
user=user)#关键字参数列表
alert=client_send(('data'))#通过客户端发送给服务器
if alert != md5_encryption('get nothing from form.data.data') and alert is not None:
session['alert']=alert
else:
session['alert']=None
else:
session['known'] = True
session['alert']=None
if old_name is not None and old_name != form.name.data:
flash('Looks like you have changed your name!')
session['name'] = form.name.data#更新cookies中存放name选项,值为取出表单模型name中的数据
form.name.data = ''
return redirect(url_for('index'))#生成 HTTP 重定向响应,参数是重定向的 URL,方法是GET
('alert')
session['alert']=None
return render_template('index.html',
current_time=datetime.utcnow(),
('name'),
form=form,
known = ('known', False),
users=users,
user_digit=user_digit,
user_alpha=user_alpha,
alert=alert)
'''
@ute('/post', methods=['POST'])
def post():
session['alert']=None
return redirect(url_for('index'))
'''
@ute('/user/<name>')
def user(name):
return render_template('user.html',name=name)
@handler(404)
def page_not_found(e):
return render_template('404.html'), 404
@handler(500)
def internal_server_error(e):
return render_template('500.html'), 500
#=========================运行=========================#
if __name__ == '__main__':
print('输入 {}:{} 即可访问'.fig['client_host'],5000))
manager.add_command("shell", Shell(make_context=make_shell_context))#命令自动导入特定的对象
manager.add_command('db', MigrateCommand)#导出数据库迁移命令
#manager.run
#传递设置选项的理想方式是使用命令行参数,
#Flask-Script 自带了一组常用选项,而且还支持自定义命令
program_to_run=[server,manager.run]
thread_list=[]
for i in program_to_run:
thread_list.append(Thread(target=i))
for i in thread_list:
i.start()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。