Aaron Ash
11/25/2021, 1:35 AMMadhavi
11/25/2021, 3:31 AM夏文思
11/25/2021, 6:44 AMYong Tian
11/25/2021, 8:03 AMAndré Petersen
11/25/2021, 10:08 AMDaniil Ponizov
11/25/2021, 10:39 AMhaf
11/25/2021, 1:28 PMKeyError: 'data'
— have you seen this error message before?haf
11/25/2021, 1:30 PMVince Bob
11/25/2021, 2:17 PMvalidation_task(
context_root_dir=root_dir,
checkpoint_name=expectation_checkpoint_name
)
When I run the command on GE (great_expectations --V3-api checkpoint run my_checkpoint), it works, but on prefect task, I have an exception:
With GE V3 api:
.....
for batch in ge_checkpoint["batches"]:
TypeError: 'Checkpoint' object is not subscriptable
The same with GE V2 api
...
for batch in ge_checkpoint["batches"]:
TypeError: 'LegacyCheckpoint' object is not subscriptable
Great_expectations=0.13.43 (also tried with 0.12.10 version)
prefect=0.15.9
Anyone experienced this pb?Elijah Roussos
11/25/2021, 3:38 PM~/.prefect/config.toml
locally. From the docs it seems like you can only set strings in the toml, but I need JSON. I’ve tried setting it as a JSON string and also in toml syntax to no avail. Is there any way to set a local JSON secret?Adam Everington
11/25/2021, 3:48 PMAnh Nguyen
11/26/2021, 2:36 AMBruno Murino
11/26/2021, 11:17 AMJohn Shearer
11/26/2021, 12:42 PMdate
in prefect context. The docs say "an actual datetime object representing the current time".
The datetime value appears to be the same value across all tasks within a flow- so I assuem this is actually the start time of the flow? This behaviour is what I want, but want to confirm my assumption.Giovanni Giacco
11/26/2021, 1:41 PMAleksandr Liadov
11/26/2021, 2:44 PMPrasanth Kothuri
11/26/2021, 3:41 PMJinho Chung
11/26/2021, 4:46 PMErick House
11/26/2021, 11:48 PMitay livni
11/27/2021, 3:48 AMhaf
11/27/2021, 12:05 PMKilledWorker
error which seemingly fails the whole flow. Despite this, the workers are alive and fine (more in thread)Jake Watson
11/27/2021, 3:52 PMErick House
11/27/2021, 4:29 PMfrom prefect import flow
@flow
def my_favorite_function():
print("This function doesn't do much")
return 42
print(my_favorite_function())
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow
[SQL: INSERT INTO flow (id, created, updated, name, tags) VALUES (?, ?, ?, ?, ?) ON CONFLICT (name) DO NOTHING]
[parameters: ('f4971b70-0675-41c8-af7b-efcf8e3c2254', '2021-11-27 162421.653684', '2021-11-27 162421.653699', 'my-favorite-function', '[]')]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
❯ prefect version
2.0a5
❯ sqlite3 version
SQLite version 3.36.0 2021-06-18 185849Lana Dann
11/27/2021, 4:34 PMECSRun
to take the most recent (or the only revision) of a task_definition_arn
? Otherwise we’d have to update and deploy the flow every time we update a task definition which is not idealitay livni
11/27/2021, 9:37 PMLuis Jaramillo
11/28/2021, 3:31 PMSridhar
11/29/2021, 12:38 AMget_data_asynchronous()
function below creates 10 threads and calls the api concurrently. I am using this function in run_factset_api()
. As a standalone code locally this works fine. But when I schedule a run on prefect, the run_factset_api()
function exits before execution and returns coroutine object (although locally it returns the desired value). Is there something I should do to facilitate parallel run on prefect?
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=10) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(
executor,
company.get_company_records,
*(session, [companies], {**company_info, **formulas})
# Allows us to pass in multiple arguments to `fetch`
)
for companies in companies_to_fetch
]
for response in await asyncio.gather(*tasks):
master = master.append(response, ignore_index=True)
return master
@task
def run_factset_api():
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
master = loop.run_until_complete(future)
return master
@task
def save_data_to_s3(emmi_reduction):
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'factset_output_data.csv').put(Body=csv_buffer.getvalue())
with Flow('api-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
response = run_factset_api()
if response:
save_data_to_db(response)
flow.register('pipeline')
Priyab Dash
11/29/2021, 9:08 AM@task(log_stdout=True, state_handlers=[notify_run_failure])
def submit_job_run_to_tmc(job_run):
but this is being called twice when we run a flowGabriel Milan
11/29/2021, 11:45 AMagent
section of the values.yaml
file looks like this:
agent:
enabled: true
prefectLabels:
- mylabel
...
job:
...
envFrom:
- secretRef:
name: gcp-credentials
The secret gcp-credentials
exists and is correct. Unfortunately, this doesn't seem to workZohaa Qamar
11/29/2021, 2:07 PM