from datetime import datetime, timedelta
def get_surrounding_blocks(es, index, location, time_field, time_value, time_format, time_zone, block_size):
"""
获取特定时间点周围的时间块
:param es: Elasticsearch 客户端实例
:param index: Elasticsearch 索引名
:param location: 地理位置点,格式为 [经度, 纬度]
:param time_field: 时间字段名
:param time_value: 时间值,格式为字符串
:param time_format: 时间值的格式
:param time_zone: 时区字符串
:param block_size: 时间块大小,格式为字符串,例如 '5m' 或 '1h'
:return: 时间周围的块列表
"""
# 解析时间值
time_value_parsed = datetime.strptime(time_value, time_format)
if time_zone:
time_value_parsed = time_value_parsed.replace(tzinfo=pytz.timezone(time_zone))
# 计算时间周围的块
before_time = time_value_parsed - timedelta(minutes=5) # 提前5分钟
after_time = time_value_parsed + timedelta(minutes=5) # 延后5分钟
# 转换时间块大小
block_size_dict = {
'5m': {'minutes': 5},
'10m': {'minutes': 10},
'30m': {'minutes': 30},
'1h': {'hours': 1},
'2h': {'hours': 2},
'6h': {'hours': 6},
'12h': {'hours': 12},
'1d': {'days': 1},
}
block_size_timedelta = timedelta(**block_size_dict[block_size])
# 计算边界时间块
before_block_start = before_time - block_size_timedelta
after_block_start = after_time - block_size_timedelta
# 查询边界时间块
before_block_query = {
"query": {
"bool": {
"filter": [
{
"range": {
time_field: {
"gte": before_block_start,
"lt": before_time,
}
}
},
{
"geo_distance": {
"distance": block_size_dict[block_size]['minutes'] * 60 * 1000, # 转换为毫秒
"location_field": location
}
}
]
}
}
}
after_block_query = {
"query": {
"bool": {
"filter": [
{
"range": {
time_field: {
"gt": after_time,
"lte"
评论已关闭